Describe implication of upstream ICU-22610
[Samba.git] / python / samba / tests / ldap_raw.py
blob548039f120c122fc931a5c332137a4b80d639853
1 # Integration tests for the ldap server, using raw socket IO
3 # Tests for handling of malformed or large packets.
5 # Copyright (C) Catalyst.Net Ltd 2020
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import socket
22 import ssl
24 import samba.tests
25 from samba.tests import TestCase
29 # LDAP Operations
31 DELETE = b'\x4a'
32 DELETE_RES = b'\x6b'
34 # Bind
35 BIND = b'\x60'
36 BIND_RES = b'\x61'
37 SIMPLE_AUTH = b'\x80'
38 SASL_AUTH = b'\xa3'
40 # Search
41 SEARCH = b'\x63'
42 SEARCH_RES = b'\x64'
43 EQUALS = b'\xa3'
47 # LDAP response codes.
49 SUCCESS = b'\x00'
50 OPERATIONS_ERROR = b'\x01'
51 INVALID_CREDENTIALS = b'\x31'
52 INVALID_DN_SYNTAX = b'\x22'
55 # ASN.1 Element types
57 BOOLEAN = b'\x01'
58 INTEGER = b'\x02'
59 OCTET_STRING = b'\x04'
60 NULL = b'\x05'
61 ENUMERATED = b'\x0a'
62 SEQUENCE = b'\x30'
63 SET = b'\x31'
67 # ASN.1 Helper functions.
69 def encode_element(ber_type, data):
70 """ Encode an ASN.1 BER element. """
71 if data is None:
72 return ber_type + encode_length(0)
73 return ber_type + encode_length(len(data)) + data
76 def encode_length(length):
77 """ Encode the length of an ASN.1 BER element. """
79 if length > 0xFFFFFF:
80 return b'\x84' + length.to_bytes(4, "big")
81 if length > 0xFFFF:
82 return b'\x83' + length.to_bytes(3, "big")
83 if length > 0xFF:
84 return b'\x82' + length.to_bytes(2, "big")
85 if length > 0x7F:
86 return b'\x81' + length.to_bytes(1, "big")
87 return length.to_bytes(1, "big")
90 def encode_string(string):
91 """ Encode an octet string """
92 return encode_element(OCTET_STRING, string)
95 def encode_boolean(boolean):
96 """ Encode a boolean value """
97 if boolean:
98 return encode_element(BOOLEAN, b'\xFF')
99 return encode_element(BOOLEAN, b'\x00')
102 def encode_integer(integer):
103 """ Encode an integer value """
104 bit_len = integer.bit_length()
105 byte_len = (bit_len // 8) + 1
106 return encode_element(INTEGER, integer.to_bytes(byte_len, "big"))
109 def encode_enumerated(enum):
110 """ Encode an enumerated value """
111 return encode_element(ENUMERATED, enum.to_bytes(1, "big"))
114 def encode_sequence(sequence):
115 """ Encode a sequence """
116 return encode_element(SEQUENCE, sequence)
119 def decode_element(data):
121 decode an ASN.1 element
123 if data is None:
124 return None
126 if len(data) < 2:
127 return None
129 ber_type = data[0:1]
130 enc = int.from_bytes(data[1:2], byteorder='big')
131 if enc & 0x80:
132 l_end = 2 + (enc & ~0x80)
133 length = int.from_bytes(data[2:l_end], byteorder='big')
134 element = data[l_end:l_end + length]
135 rest = data[l_end + length:]
136 else:
137 length = enc
138 element = data[2:2 + length]
139 rest = data[2 + length:]
141 return (ber_type, length, element, rest)
144 class RawLdapTest(TestCase):
146 A raw Ldap Test case.
147 The ldap connections are made over https on port 636
149 Uses the following environment variables:
150 SERVER
151 USERNAME
152 PASSWORD
153 DNSNAME
156 def setUp(self):
157 super().setUp()
159 self.host = samba.tests.env_get_var_value('SERVER')
160 self.port = 636
161 self.socket = None
162 self.user = samba.tests.env_get_var_value('USERNAME')
163 self.password = samba.tests.env_get_var_value('PASSWORD')
164 self.dns_name = samba.tests.env_get_var_value('DNSNAME')
165 self.connect()
167 def tearDown(self):
168 self.disconnect()
169 super().tearDown()
171 def disconnect(self):
172 """ Disconnect from and clean up the connection to the server """
173 if self.socket is None:
174 return
175 self.socket.close()
176 self.socket = None
178 def connect(self):
179 """ Establish an ldaps connection to the test server """
181 # Disable host name and certificate verification
182 context = ssl.create_default_context()
183 context.check_hostname = False
184 context.verify_mode = ssl.CERT_NONE
186 sock = None
187 try:
188 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
189 sock.settimeout(10)
190 sock.connect((self.host, self.port))
191 self.socket = context.wrap_socket(sock, server_hostname=self.host)
192 except socket.error:
193 sock.close()
194 if self.socket is not None:
195 self.socket.close()
196 raise
198 def send(self, req):
199 """ Send the request to the server """
200 try:
201 self.socket.sendall(req)
202 except socket.error:
203 self.disconnect()
204 raise
206 def recv(self, num_recv=0xffff, timeout=None):
207 """ receive an array of bytes from the server """
208 data = None
209 try:
210 if timeout is not None:
211 self.socket.settimeout(timeout)
212 data = self.socket.recv(num_recv, 0)
213 self.socket.settimeout(10)
214 if len(data) == 0:
215 self.disconnect()
216 return None
217 except socket.timeout:
218 # We ignore timeout's as the ldap server will drop the connection
219 # on the errors we're testing. So returning None on a timeout is
220 # the desired behaviour.
221 self.socket.settimeout(10)
222 except socket.error:
223 self.disconnect()
224 raise
225 return data
227 def bind(self):
229 Perform a simple bind
232 user = self.user.encode('UTF8')
233 ou = self.dns_name.replace('.', ',dc=').encode('UTF8')
234 dn = b'cn=' + user + b',cn=users,dc=' + ou
236 password = self.password.encode('UTF8')
238 # Lets build an simple bind request
239 bind = encode_integer(3) # ldap version
240 bind += encode_string(dn)
241 bind += encode_element(SIMPLE_AUTH, password)
243 bind_op = encode_element(BIND, bind)
245 msg_no = encode_integer(1)
246 packet = encode_sequence(msg_no + bind_op)
248 self.send(packet)
249 data = self.recv()
250 self.assertIsNotNone(data)
253 # Decode and validate the response
255 # Should be a sequence
256 (ber_type, length, element, rest) = decode_element(data)
257 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
258 self.assertTrue(length > 0)
259 self.assertEqual(0, len(rest))
261 # message id should be 1
262 (ber_type, length, element, rest) = decode_element(element)
263 self.assertEqual(INTEGER.hex(), ber_type.hex())
264 msg_no = int.from_bytes(element, byteorder='big')
265 self.assertEqual(1, msg_no)
266 self.assertGreater(len(rest), 0)
268 # Should have a Bind response element
269 (ber_type, length, element, rest) = decode_element(rest)
270 self.assertEqual(BIND_RES.hex(), ber_type.hex())
271 self.assertEqual(0, len(rest))
273 # Check the response code
274 (ber_type, length, element, rest) = decode_element(element)
275 self.assertEqual(ENUMERATED.hex(), ber_type.hex())
276 self.assertEqual(SUCCESS.hex(), element.hex())
277 self.assertGreater(len(rest), 0)
279 def test_decode_element(self):
280 """ Tests for the decode_element method """
282 # Boolean true value
283 data = b'\x01\x01\xff'
284 (ber_type, length, element, rest) = decode_element(data)
285 self.assertEqual(BOOLEAN.hex(), ber_type.hex())
286 self.assertEqual(1, length)
287 self.assertEqual(b'\xff'.hex(), element.hex())
288 self.assertEqual(0, len(rest))
290 # Boolean false value
291 data = b'\x01\x01\x00'
292 (ber_type, length, element, rest) = decode_element(data)
293 self.assertEqual(BOOLEAN.hex(), ber_type.hex())
294 self.assertEqual(1, length)
295 self.assertEqual(b'\x00'.hex(), element.hex())
296 self.assertEqual(0, len(rest))
298 # Boolean true value with trailing data
299 data = b'\x01\x01\xff\x05\x00'
300 (ber_type, length, element, rest) = decode_element(data)
301 self.assertEqual(BOOLEAN.hex(), ber_type.hex())
302 self.assertEqual(1, length)
303 self.assertEqual(b'\xff'.hex(), element.hex())
304 self.assertEqual(b'\x05\x00'.hex(), rest.hex())
306 # Octet string byte length encoding
307 data = b'\x04\x02\xca\xfe\x05\x00'
308 (ber_type, length, element, rest) = decode_element(data)
309 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
310 self.assertEqual(2, length)
311 self.assertEqual(b'\xca\xfe'.hex(), element.hex())
312 self.assertEqual(b'\x05\x00'.hex(), rest.hex())
314 # Octet string 81 byte length encoding
315 data = b'\x04\x81\x02\xca\xfe\x05\x00'
316 (ber_type, length, element, rest) = decode_element(data)
317 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
318 self.assertEqual(2, length)
319 self.assertEqual(b'\xca\xfe'.hex(), element.hex())
320 self.assertEqual(b'\x05\x00'.hex(), rest.hex())
322 # Octet string 82 byte length encoding
323 data = b'\x04\x82\x00\x02\xca\xfe\x05\x00'
324 (ber_type, length, element, rest) = decode_element(data)
325 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
326 self.assertEqual(2, length)
327 self.assertEqual(b'\xca\xfe'.hex(), element.hex())
328 self.assertEqual(b'\x05\x00'.hex(), rest.hex())
330 # Octet string 85 byte length encoding
331 # For Samba we limit the length encoding to 4 bytes, but it's useful
332 # to be able to decode longer lengths in a test.
333 data = b'\x04\x85\x00\x00\x00\x00\x02\xca\xfe\x05\x00'
334 (ber_type, length, element, rest) = decode_element(data)
335 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
336 self.assertEqual(2, length)
337 self.assertEqual(b'\xca\xfe'.hex(), element.hex())
338 self.assertEqual(b'\x05\x00'.hex(), rest.hex())
340 def test_search_equals_maximum_permitted_size(self):
342 Check that an LDAP search request equal to the maximum size is accepted
343 This test is done on a authenticated connection so that the maximum
344 non search request is 16MiB.
346 self.bind()
348 # Lets build an ldap search packet to query the RootDSE
349 header = encode_string(None) # Base DN, ""
350 header += encode_enumerated(0) # Enumeration scope
351 header += encode_enumerated(0) # Enumeration dereference
352 header += encode_integer(0) # Integer size limit
353 header += encode_integer(0) # Integer time limit
354 header += encode_boolean(False) # Boolean attributes only
357 # build an equality search of the form x...x=y...y
358 # With the length of x...x and y...y chosen to generate an
359 # ldap request of 256000 bytes.
360 x = encode_string(b'x' * 127974)
361 y = encode_string(b'y' * 127979)
362 equals = encode_element(EQUALS, x + y)
363 trailer = encode_sequence(None)
364 search = encode_element(SEARCH, header + equals + trailer)
366 msg_no = encode_integer(2)
367 packet = encode_sequence(msg_no + search)
369 # The length of the packet should be equal to the
370 # Maximum length of a search query
371 self.assertEqual(256000, len(packet))
373 self.send(packet)
374 data = self.recv()
375 self.assertIsNotNone(data)
378 # Decode and validate the response
380 # Should be a sequence
381 (ber_type, length, element, rest) = decode_element(data)
382 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
383 self.assertTrue(length > 0)
384 self.assertEqual(0, len(rest))
386 # message id should be 2
387 (ber_type, length, element, rest) = decode_element(element)
388 self.assertEqual(INTEGER.hex(), ber_type.hex())
389 msg_no = int.from_bytes(element, byteorder='big')
390 self.assertEqual(2, msg_no)
391 self.assertGreater(len(rest), 0)
393 # Should have a Search response element
394 (ber_type, length, element, rest) = decode_element(rest)
395 self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
396 self.assertEqual(0, len(rest))
398 # Should have an empty matching DN
399 (ber_type, length, element, rest) = decode_element(element)
400 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
401 self.assertEqual(0, len(element))
402 self.assertGreater(len(rest), 0)
404 # Then a sequence of attribute sequences
405 (ber_type, length, element, rest) = decode_element(rest)
406 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
407 self.assertEqual(0, len(rest))
409 # Check the first attribute sequence, it should be
410 # "configurationNamingContext"
411 # The remaining attribute sequences will be ignored but
412 # check that they exist.
413 (ber_type, length, element, rest) = decode_element(element)
414 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
415 # Check that there are remaining attribute sequences.
416 self.assertGreater(len(rest), 0)
418 # Check the name of the first attribute
419 (ber_type, length, element, rest) = decode_element(element)
420 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
421 self.assertGreater(len(rest), 0)
422 self.assertEqual(b'configurationNamingContext', element)
424 # And check that there is an attribute value set
425 (ber_type, length, element, rest) = decode_element(rest)
426 self.assertEqual(SET.hex(), ber_type.hex())
427 self.assertGreater(len(element), 0)
428 self.assertEqual(0, len(rest))
430 def test_search_exceeds_maximum_permitted_size(self):
432 Test that a search query longer than the maximum permitted
433 size is rejected.
434 This test is done on a authenticated connection so that the maximum
435 non search request is 16MiB.
438 self.bind()
440 # Lets build an ldap search packet to query the RootDSE
441 header = encode_string(None) # Base DN, ""
442 header += encode_enumerated(0) # Enumeration scope
443 header += encode_enumerated(0) # Enumeration dereference
444 header += encode_integer(0) # Integer size limit
445 header += encode_integer(0) # Integer time limit
446 header += encode_boolean(False) # Boolean attributes only
449 # build an equality search of the form x...x=y...y
450 # With the length of x...x and y...y chosen to generate an
451 # ldap request of 256001 bytes.
452 x = encode_string(b'x' * 127979)
453 y = encode_string(b'y' * 127975)
454 equals = encode_element(EQUALS, x + y)
455 trailer = encode_sequence(None)
456 search = encode_element(SEARCH, header + equals + trailer)
458 msg_no = encode_integer(2)
459 packet = encode_sequence(msg_no + search)
461 # The length of the sequence data should be one greater than the
462 # Maximum length of a search query
463 self.assertEqual(256001, len(packet))
465 self.send(packet)
466 data = self.recv()
468 # The connection should be closed by the server and we should not
469 # see any data.
470 self.assertIsNone(data)
472 def test_simple_anonymous_bind(self):
474 Test a simple anonymous bind
477 # Lets build an anonymous simple bind request
478 bind = encode_integer(3) # ldap version
479 bind += encode_string(b'') # Empty name
480 bind += encode_element(SIMPLE_AUTH, b'') # Empty password
482 bind_op = encode_element(BIND, bind)
484 msg_no = encode_integer(1)
485 packet = encode_sequence(msg_no + bind_op)
487 self.send(packet)
488 data = self.recv()
489 self.assertIsNotNone(data)
492 # Decode and validate the response
494 # Should be a sequence
495 (ber_type, length, element, rest) = decode_element(data)
496 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
497 self.assertTrue(length > 0)
498 self.assertEqual(0, len(rest))
500 # message id should be 1
501 (ber_type, length, element, rest) = decode_element(element)
502 self.assertEqual(INTEGER.hex(), ber_type.hex())
503 msg_no = int.from_bytes(element, byteorder='big')
504 self.assertEqual(1, msg_no)
505 self.assertGreater(len(rest), 0)
507 # Should have a Bind response element
508 (ber_type, length, element, rest) = decode_element(rest)
509 self.assertEqual(BIND_RES.hex(), ber_type.hex())
510 self.assertEqual(0, len(rest))
512 # Check the response code
513 (ber_type, length, element, rest) = decode_element(element)
514 self.assertEqual(ENUMERATED.hex(), ber_type.hex())
515 self.assertEqual(SUCCESS.hex(), element.hex())
516 self.assertGreater(len(rest), 0)
518 def test_simple_bind_at_limit(self):
520 Test a simple bind, with a large invalid
521 user name. As the resulting packet is equal
522 to the maximum unauthenticated packet size we should see
523 an INVALID_CREDENTIALS response
526 # Lets build a simple bind request
527 bind = encode_integer(3) # ldap version
528 bind += encode_string(b' ' * 255977) # large name
529 bind += encode_element(SIMPLE_AUTH, b'') # Empty password
531 bind_op = encode_element(BIND, bind)
533 msg_no = encode_integer(1)
534 packet = encode_sequence(msg_no + bind_op)
536 # The length of the sequence data should be equal to the maximum
537 # Unauthenticated packet length
538 self.assertEqual(256000, len(packet))
540 self.send(packet)
541 data = self.recv()
542 self.assertIsNotNone(data)
545 # Decode and validate the response
547 # Should be a sequence
548 (ber_type, length, element, rest) = decode_element(data)
549 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
550 self.assertTrue(length > 0)
551 self.assertEqual(0, len(rest))
553 # message id should be 1
554 (ber_type, length, element, rest) = decode_element(element)
555 self.assertEqual(INTEGER.hex(), ber_type.hex())
556 msg_no = int.from_bytes(element, byteorder='big')
557 self.assertEqual(1, msg_no)
558 self.assertGreater(len(rest), 0)
560 # Should have a Bind response element
561 (ber_type, length, element, rest) = decode_element(rest)
562 self.assertEqual(BIND_RES.hex(), ber_type.hex())
563 self.assertEqual(0, len(rest))
565 # Check the response code
566 (ber_type, length, element, rest) = decode_element(element)
567 self.assertEqual(ENUMERATED.hex(), ber_type.hex())
568 self.assertEqual(INVALID_CREDENTIALS.hex(), element.hex())
569 self.assertGreater(len(rest), 0)
571 def test_simple_bind_gt_limit(self):
573 Test a simple bind, with a large invalid
574 user name. As the resulting packet is one greater than
575 the maximum unauthenticated packet size we should see
576 the connection reset.
579 # Lets build a simple bind request
580 bind = encode_integer(3) # ldap version
581 bind += encode_string(b' ' * 255978) # large name
582 bind += encode_element(SIMPLE_AUTH, b'') # Empty password
584 bind_op = encode_element(BIND, bind)
586 msg_no = encode_integer(1)
587 packet = encode_sequence(msg_no + bind_op)
589 # The length of the sequence data should be equal to the maximum
590 # Unauthenticated packet length
591 self.assertEqual(256001, len(packet))
593 self.send(packet)
594 data = self.recv()
595 self.assertIsNone(data)
597 def test_unauthenticated_delete_at_limit(self):
599 Test a delete, with a large invalid DN
600 As the resulting packet is equal to the maximum unauthenticated
601 packet size we should see an INVALID_DN_SYNTAX response
604 # Lets build a delete request, with a large invalid DN
605 dn = b' ' * 255987
606 del_op = encode_element(DELETE, dn)
608 msg_no = encode_integer(1)
609 packet = encode_sequence(msg_no + del_op)
611 # The length of the sequence data should be equal to the maximum
612 # Unauthenticated packet length
613 self.assertEqual(256000, len(packet))
615 self.send(packet)
616 data = self.recv()
617 self.assertIsNotNone(data)
620 # Decode and validate the response
622 # Should be a sequence
623 (ber_type, length, element, rest) = decode_element(data)
624 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
625 self.assertTrue(length > 0)
626 self.assertEqual(0, len(rest))
628 # message id should be 1
629 (ber_type, length, element, rest) = decode_element(element)
630 self.assertEqual(INTEGER.hex(), ber_type.hex())
631 msg_no = int.from_bytes(element, byteorder='big')
632 self.assertEqual(1, msg_no)
633 self.assertGreater(len(rest), 0)
635 # Should have a delete response element
636 (ber_type, length, element, rest) = decode_element(rest)
637 self.assertEqual(DELETE_RES.hex(), ber_type.hex())
638 self.assertEqual(0, len(rest))
640 # Check the response code
641 (ber_type, length, element, rest) = decode_element(element)
642 self.assertEqual(ENUMERATED.hex(), ber_type.hex())
643 self.assertEqual(INVALID_DN_SYNTAX.hex(), element.hex())
644 self.assertGreater(len(rest), 0)
646 def test_unauthenticated_delete_gt_limit(self):
648 Test a delete, with a large invalid DN
649 As the resulting packet is greater than the maximum unauthenticated
650 packet size we should see a connection reset
653 # Lets build a delete request, with a large invalid DN
654 dn = b' ' * 255988
655 del_op = encode_element(DELETE, dn)
657 msg_no = encode_integer(1)
658 packet = encode_sequence(msg_no + del_op)
660 # The length of the sequence data should one greater than the maximum
661 # unauthenticated packet length
662 self.assertEqual(256001, len(packet))
664 self.send(packet)
665 data = self.recv()
666 self.assertIsNone(data)
668 def test_authenticated_delete_at_limit(self):
670 Test a delete, with a large invalid DN
671 As the resulting packet is equal to the maximum authenticated
672 packet size we should see an INVALID_DN_SYNTAX response
675 # Lets build a delete request, with a large invalid DN
676 dn = b' ' * 16777203
677 del_op = encode_element(DELETE, dn)
679 self.bind()
681 msg_no = encode_integer(2)
682 packet = encode_sequence(msg_no + del_op)
684 # The length of the sequence data should be equal to the maximum
685 # authenticated packet length currently 16MiB
686 self.assertEqual(16 * 1024 * 1024, len(packet))
688 self.send(packet)
689 data = self.recv()
690 self.assertIsNotNone(data)
693 # Decode and validate the response
695 # Should be a sequence
696 (ber_type, length, element, rest) = decode_element(data)
697 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
698 self.assertTrue(length > 0)
699 self.assertEqual(0, len(rest))
701 # message id should be 2
702 (ber_type, length, element, rest) = decode_element(element)
703 self.assertEqual(INTEGER.hex(), ber_type.hex())
704 msg_no = int.from_bytes(element, byteorder='big')
705 self.assertEqual(2, msg_no)
706 self.assertGreater(len(rest), 0)
708 # Should have a delete response element
709 (ber_type, length, element, rest) = decode_element(rest)
710 self.assertEqual(DELETE_RES.hex(), ber_type.hex())
711 self.assertEqual(0, len(rest))
713 # Check the response code
714 (ber_type, length, element, rest) = decode_element(element)
715 self.assertEqual(ENUMERATED.hex(), ber_type.hex())
716 self.assertEqual(INVALID_DN_SYNTAX.hex(), element.hex())
717 self.assertGreater(len(rest), 0)
719 def test_authenticated_delete_gt_limit(self):
721 Test a delete, with a large invalid DN
722 As the resulting packet is one greater than the maximum
723 authenticated packet size we should see a connection reset
726 # Lets build a delete request, with a large invalid DN
727 dn = b' ' * 16777204
728 del_op = encode_element(DELETE, dn)
730 self.bind()
732 msg_no = encode_integer(2)
733 packet = encode_sequence(msg_no + del_op)
735 # The length of the sequence data should be one greater than the
736 # maximum authenticated packet length currently 16MiB
737 self.assertEqual(16 * 1024 * 1024 + 1, len(packet))
739 self.send(packet)
740 data = self.recv()
741 self.assertIsNone(data)
744 class RawCldapTest(TestCase):
746 A raw cldap Test case.
747 The ldap connections are made over UDP port 389
749 Uses the following environment variables:
750 SERVER
753 def setUp(self):
754 super().setUp()
756 self.host = samba.tests.env_get_var_value('SERVER')
757 self.port = 389
758 self.socket = None
759 self.connect()
761 def tearDown(self):
762 self.disconnect()
763 super().tearDown()
765 def disconnect(self):
766 """ Disconnect from and clean up the connection to the server """
767 if self.socket is None:
768 return
769 self.socket.close()
770 self.socket = None
772 def connect(self):
773 """ Establish an UDP connection to the test server """
775 try:
776 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
777 self.socket.settimeout(10)
778 self.socket.connect((self.host, self.port))
779 except socket.error:
780 if self.socket is not None:
781 self.socket.close()
782 raise
784 def send(self, req):
785 """ Send the request to the server """
786 try:
787 self.socket.sendall(req)
788 except socket.error:
789 self.disconnect()
790 raise
792 def recv(self, num_recv=0xffff, timeout=None):
793 """ receive an array of bytes from the server """
794 data = None
795 try:
796 if timeout is not None:
797 self.socket.settimeout(timeout)
798 data = self.socket.recv(num_recv, 0)
799 self.socket.settimeout(10)
800 if len(data) == 0:
801 self.disconnect()
802 return None
803 except socket.timeout:
804 # We ignore timeout's as the ldap server will drop the connection
805 # on the errors we're testing. So returning None on a timeout is
806 # the desired behaviour.
807 self.socket.settimeout(10)
808 except socket.error:
809 self.disconnect()
810 raise
811 return data
813 def test_search_equals_maximum_permitted_size(self):
815 Check that an CLDAP search request equal to the maximum size is
816 accepted
819 # Lets build an ldap search packet to query the RootDSE
820 header = encode_string(None) # Base DN, ""
821 header += encode_enumerated(0) # Enumeration scope
822 header += encode_enumerated(0) # Enumeration dereference
823 header += encode_integer(0) # Integer size limit
824 header += encode_integer(0) # Integer time limit
825 header += encode_boolean(False) # Boolean attributes only
828 # build an equality search of the form x...x=y...y
829 # With the length of x...x and y...y chosen to generate an
830 # cldap request of 4096 bytes.
831 x = encode_string(b'x' * 2027)
832 y = encode_string(b'y' * 2027)
833 equals = encode_element(EQUALS, x + y)
834 trailer = encode_sequence(None)
835 search = encode_element(SEARCH, header + equals + trailer)
837 msg_no = encode_integer(2)
838 packet = encode_sequence(msg_no + search)
840 # The length of the packet should be equal to the
841 # Maximum length of a cldap packet
842 self.assertEqual(4096, len(packet))
844 self.send(packet)
845 data = self.recv()
846 self.assertIsNotNone(data)
849 # Decode and validate the response
851 # Should be a sequence
852 (ber_type, length, element, rest) = decode_element(data)
853 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
854 self.assertTrue(length > 0)
855 self.assertGreater(len(rest), 0)
856 # rest should contain a Search request done element, but it's
857 # not validated in this test.
859 # message id should be 2
860 (ber_type, length, element, rest) = decode_element(element)
861 self.assertEqual(INTEGER.hex(), ber_type.hex())
862 msg_no = int.from_bytes(element, byteorder='big')
863 self.assertEqual(2, msg_no)
864 self.assertGreater(len(rest), 0)
866 # Should have a Search response element
867 (ber_type, length, element, rest) = decode_element(rest)
868 self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
869 self.assertEqual(0, len(rest))
871 # Should have an empty matching DN
872 (ber_type, length, element, rest) = decode_element(element)
873 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
874 self.assertEqual(0, len(element))
875 self.assertGreater(len(rest), 0)
877 # Then a sequence of attribute sequences
878 (ber_type, length, element, rest) = decode_element(rest)
879 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
880 self.assertEqual(0, len(rest))
882 # Check the first attribute sequence, it should be
883 # "configurationNamingContext"
884 # The remaining attribute sequences will be ignored but
885 # check that they exist.
886 (ber_type, length, element, rest) = decode_element(element)
887 self.assertEqual(SEQUENCE.hex(), ber_type.hex())
888 # Check that there are remaining attribute sequences.
889 self.assertGreater(len(rest), 0)
891 # Check the name of the first attribute
892 (ber_type, length, element, rest) = decode_element(element)
893 self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
894 self.assertGreater(len(rest), 0)
895 self.assertEqual(b'configurationNamingContext', element)
897 # And check that there is an attribute value set
898 (ber_type, length, element, rest) = decode_element(rest)
899 self.assertEqual(SET.hex(), ber_type.hex())
900 self.assertGreater(len(element), 0)
901 self.assertEqual(0, len(rest))
903 def test_search_exceeds_maximum_permitted_size(self):
905 Test that a cldap request longer than the maximum permitted
906 size is rejected.
909 # Lets build an ldap search packet to query the RootDSE
910 header = encode_string(None) # Base DN, ""
911 header += encode_enumerated(0) # Enumeration scope
912 header += encode_enumerated(0) # Enumeration dereference
913 header += encode_integer(0) # Integer size limit
914 header += encode_integer(0) # Integer time limit
915 header += encode_boolean(False) # Boolean attributes only
918 # build an equality search of the form x...x=y...y
919 # With the length of x...x and y...y chosen to generate an
920 # cldap request of 4097 bytes.
921 x = encode_string(b'x' * 2027)
922 y = encode_string(b'y' * 2028)
923 equals = encode_element(EQUALS, x + y)
924 trailer = encode_sequence(None)
925 search = encode_element(SEARCH, header + equals + trailer)
927 msg_no = encode_integer(2)
928 packet = encode_sequence(msg_no + search)
930 # The length of the sequence data should be one greater than the
931 # Maximum length of a cldap packet
932 self.assertEqual(4097, len(packet))
934 self.send(packet)
935 data = self.recv()
937 # The connection should be closed by the server and we should not
938 # see any data.
939 self.assertIsNone(data)