1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Samba Python tests."""
25 from samba
import param
26 from samba
.samdb
import SamDB
27 from samba
import credentials
29 import samba
.dcerpc
.dcerpc
30 import samba
.dcerpc
.base
31 import samba
.dcerpc
.epmapper
32 from samba
.credentials
import Credentials
33 from samba
import gensec
42 from unittest
import SkipTest
44 class SkipTest(Exception):
47 HEXDUMP_FILTER
=''.join([(len(repr(chr(x
)))==3) and chr(x
) or '.' for x
in range(256)])
49 class TestCase(unittest
.TestCase
):
50 """A Samba test case."""
53 super(TestCase
, self
).setUp()
54 test_debug_level
= os
.getenv("TEST_DEBUG_LEVEL")
55 if test_debug_level
is not None:
56 test_debug_level
= int(test_debug_level
)
57 self
._old
_debug
_level
= samba
.get_debug_level()
58 samba
.set_debug_level(test_debug_level
)
59 self
.addCleanup(samba
.set_debug_level
, test_debug_level
)
61 def get_loadparm(self
):
64 def get_credentials(self
):
65 return cmdline_credentials
67 def hexdump(self
, src
):
74 hl
= ' '.join(["%02X" % ord(x
) for x
in ll
])
75 hr
= ' '.join(["%02X" % ord(x
) for x
in lr
])
76 ll
= ll
.translate(HEXDUMP_FILTER
)
77 lr
= lr
.translate(HEXDUMP_FILTER
)
78 result
+= "[%04X] %-*s %-*s %s %s\n" % (N
, 8*3, hl
, 8*3, hr
, ll
, lr
)
82 # These functions didn't exist before Python2.7:
83 if sys
.version_info
< (2, 7):
86 def skipTest(self
, reason
):
87 raise SkipTest(reason
)
89 def assertIn(self
, member
, container
, msg
=None):
90 self
.assertTrue(member
in container
, msg
)
92 def assertIs(self
, a
, b
, msg
=None):
93 self
.assertTrue(a
is b
, msg
)
95 def assertIsNot(self
, a
, b
, msg
=None):
96 self
.assertTrue(a
is not b
, msg
)
98 def assertIsNotNone(self
, a
, msg
=None):
99 self
.assertTrue(a
is not None)
101 def assertIsInstance(self
, a
, b
, msg
=None):
102 self
.assertTrue(isinstance(a
, b
), msg
)
104 def assertIsNone(self
, a
, msg
=None):
105 self
.assertTrue(a
is None, msg
)
107 def assertGreater(self
, a
, b
, msg
=None):
108 self
.assertTrue(a
> b
, msg
)
110 def assertGreaterEqual(self
, a
, b
, msg
=None):
111 self
.assertTrue(a
>= b
, msg
)
113 def assertLess(self
, a
, b
, msg
=None):
114 self
.assertTrue(a
< b
, msg
)
116 def assertLessEqual(self
, a
, b
, msg
=None):
117 self
.assertTrue(a
<= b
, msg
)
119 def addCleanup(self
, fn
, *args
, **kwargs
):
120 self
._cleanups
= getattr(self
, "_cleanups", []) + [
123 def _addSkip(self
, result
, reason
):
124 addSkip
= getattr(result
, 'addSkip', None)
125 if addSkip
is not None:
126 addSkip(self
, reason
)
128 warnings
.warn("TestResult has no addSkip method, skips not reported",
130 result
.addSuccess(self
)
132 def run(self
, result
=None):
133 if result
is None: result
= self
.defaultTestResult()
134 result
.startTest(self
)
135 testMethod
= getattr(self
, self
._testMethodName
)
140 self
._addSkip
(result
, str(e
))
142 except KeyboardInterrupt:
145 result
.addError(self
, self
._exc
_info
())
153 self
._addSkip
(result
, str(e
))
155 except self
.failureException
:
156 result
.addFailure(self
, self
._exc
_info
())
157 except KeyboardInterrupt:
160 result
.addError(self
, self
._exc
_info
())
165 self
._addSkip
(result
, str(e
))
166 except KeyboardInterrupt:
169 result
.addError(self
, self
._exc
_info
())
172 for (fn
, args
, kwargs
) in reversed(getattr(self
, "_cleanups", [])):
174 if ok
: result
.addSuccess(self
)
176 result
.stopTest(self
)
179 class LdbTestCase(TestCase
):
180 """Trivial test case for running tests against a LDB."""
183 super(LdbTestCase
, self
).setUp()
184 self
.filename
= os
.tempnam()
185 self
.ldb
= samba
.Ldb(self
.filename
)
187 def set_modules(self
, modules
=[]):
188 """Change the modules for this Ldb."""
190 m
.dn
= ldb
.Dn(self
.ldb
, "@MODULES")
191 m
["@LIST"] = ",".join(modules
)
193 self
.ldb
= samba
.Ldb(self
.filename
)
196 class TestCaseInTempDir(TestCase
):
199 super(TestCaseInTempDir
, self
).setUp()
200 self
.tempdir
= tempfile
.mkdtemp()
201 self
.addCleanup(self
._remove
_tempdir
)
203 def _remove_tempdir(self
):
204 self
.assertEquals([], os
.listdir(self
.tempdir
))
205 os
.rmdir(self
.tempdir
)
210 lp
= param
.LoadParm()
212 lp
.load(os
.environ
["SMB_CONF_PATH"])
214 raise KeyError("SMB_CONF_PATH not set")
218 def env_get_var_value(var_name
, allow_missing
=False):
219 """Returns value for variable in os.environ
221 Function throws AssertionError if variable is defined.
222 Unit-test based python tests require certain input params
223 to be set in environment, otherwise they can't be run
226 if var_name
not in os
.environ
.keys():
228 assert var_name
in os
.environ
.keys(), "Please supply %s in environment" % var_name
229 return os
.environ
[var_name
]
232 cmdline_credentials
= None
234 class RpcInterfaceTestCase(TestCase
):
235 """DCE/RPC Test case."""
237 class RawDCERPCTest(TestCase
):
238 """A raw DCE/RPC Test case."""
240 def _disconnect(self
, reason
):
246 sys
.stderr
.write("disconnect[%s]\n" % reason
)
250 self
.a
= socket
.getaddrinfo(self
.host
, self
.tcp_port
, socket
.AF_UNSPEC
,
251 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
253 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
254 self
.s
.settimeout(10)
255 self
.s
.connect(self
.a
[0][4])
256 except socket
.error
as e
:
262 except Exception as e
:
268 super(RawDCERPCTest
, self
).setUp()
269 self
.do_ndr_print
= False
270 self
.do_hexdump
= False
272 self
.host
= samba
.tests
.env_get_var_value('SERVER')
273 self
.target_hostname
= samba
.tests
.env_get_var_value('TARGET_HOSTNAME', allow_missing
=True)
274 if self
.target_hostname
is None:
275 self
.target_hostname
= self
.host
279 self
.settings
["lp_ctx"] = self
.lp_ctx
= samba
.tests
.env_loadparm()
280 self
.settings
["target_hostname"] = self
.target_hostname
287 def second_connection(self
, tcp_port
=None):
288 c
= RawDCERPCTest(methodName
='noop')
289 c
.do_ndr_print
= self
.do_ndr_print
290 c
.do_hexdump
= self
.do_hexdump
293 c
.target_hostname
= self
.target_hostname
294 if tcp_port
is not None:
295 c
.tcp_port
= tcp_port
297 c
.tcp_port
= self
.tcp_port
299 c
.settings
= self
.settings
304 def get_user_creds(self
):
307 username
= samba
.tests
.env_get_var_value('USERNAME')
308 password
= samba
.tests
.env_get_var_value('PASSWORD')
309 c
.set_username(username
)
310 c
.set_password(password
)
313 def get_anon_creds(self
):
318 def get_auth_context_creds(self
, creds
, auth_type
, auth_level
,
322 if g_auth_level
is None:
323 g_auth_level
= auth_level
325 g
= gensec
.Security
.start_client(self
.settings
)
326 g
.set_credentials(creds
)
327 g
.want_feature(gensec
.FEATURE_DCE_STYLE
)
328 g
.start_mech_by_authtype(auth_type
, g_auth_level
)
331 auth_context
["auth_type"] = auth_type
332 auth_context
["auth_level"] = auth_level
333 auth_context
["auth_context_id"] = auth_context_id
334 auth_context
["g_auth_level"] = g_auth_level
335 auth_context
["gensec"] = g
339 def do_generic_bind(self
, ctx
, auth_context
=None,
340 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
341 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
342 assoc_group_id
=0, call_id
=0,
343 nak_reason
=None, alter_fault
=None):
346 if auth_context
is not None:
348 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
349 self
.assertFalse(finished
)
351 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
352 auth_level
=auth_context
["auth_level"],
353 auth_context_id
=auth_context
["auth_context_id"],
358 req
= self
.generate_bind(call_id
=call_id
,
361 assoc_group_id
=assoc_group_id
,
364 rep
= self
.recv_pdu()
365 if nak_reason
is not None:
366 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_NAK
, req
.call_id
,
368 self
.assertEquals(rep
.u
.reject_reason
, nak_reason
)
369 self
.assertEquals(rep
.u
.num_versions
, 1)
370 self
.assertEquals(rep
.u
.versions
[0].rpc_vers
, req
.rpc_vers
)
371 self
.assertEquals(rep
.u
.versions
[0].rpc_vers_minor
, req
.rpc_vers_minor
)
372 self
.assertEquals(len(rep
.u
._pad
), 3)
373 self
.assertEquals(rep
.u
._pad
, '\0' * 3)
375 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_ACK
, req
.call_id
,
377 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
378 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
379 if assoc_group_id
!= 0:
380 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
382 self
.assertNotEquals(rep
.u
.assoc_group_id
, 0)
383 assoc_group_id
= rep
.u
.assoc_group_id
384 port_str
= "%d" % self
.tcp_port
385 port_len
= len(port_str
) + 1
386 mod_len
= (2 + port_len
) % 4
388 port_pad
= 4 - mod_len
391 self
.assertEquals(rep
.u
.secondary_address_size
, port_len
)
392 self
.assertEquals(rep
.u
.secondary_address
, port_str
)
393 self
.assertEquals(len(rep
.u
._pad
1), port_pad
)
394 # sometimes windows sends random bytes
395 # self.assertEquals(rep.u._pad1, '\0' * port_pad)
396 self
.assertEquals(rep
.u
.num_results
, 1)
397 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
398 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
399 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
400 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
401 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
403 if auth_context
is None:
404 self
.assertEquals(rep
.auth_length
, 0)
405 self
.assertEquals(len(rep
.u
.auth_info
), 0)
407 self
.assertNotEquals(rep
.auth_length
, 0)
408 self
.assertGreater(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
409 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
411 a
= self
.parse_auth(rep
.u
.auth_info
)
413 from_server
= a
.credentials
414 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
415 self
.assertFalse(finished
)
417 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
418 auth_level
=auth_context
["auth_level"],
419 auth_context_id
=auth_context
["auth_context_id"],
421 req
= self
.generate_alter(call_id
=call_id
,
423 assoc_group_id
=0xffffffff-assoc_group_id
,
426 rep
= self
.recv_pdu()
427 if alter_fault
is not None:
428 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
429 pfc_flags
=req
.pfc_flags |
430 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
,
432 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
433 self
.assertEquals(rep
.u
.context_id
, 0)
434 self
.assertEquals(rep
.u
.cancel_count
, 0)
435 self
.assertEquals(rep
.u
.flags
, 0)
436 self
.assertEquals(rep
.u
.status
, alter_fault
)
437 self
.assertEquals(rep
.u
.reserved
, 0)
438 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
440 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER_RESP
, req
.call_id
)
441 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
442 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
443 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
444 self
.assertEquals(rep
.u
.secondary_address_size
, 0)
445 self
.assertEquals(rep
.u
.secondary_address
, '')
446 self
.assertEquals(len(rep
.u
._pad
1), 2)
447 # sometimes windows sends random bytes
448 # self.assertEquals(rep.u._pad1, '\0' * 2)
449 self
.assertEquals(rep
.u
.num_results
, 1)
450 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
451 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
452 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
453 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
454 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
455 self
.assertNotEquals(rep
.auth_length
, 0)
456 self
.assertGreater(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
457 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
459 a
= self
.parse_auth(rep
.u
.auth_info
)
461 from_server
= a
.credentials
462 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
463 self
.assertTrue(finished
)
467 def prepare_presentation(self
, abstract
, transfer
, object=None,
468 context_id
=0xffff, epmap
=False, auth_context
=None,
469 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
470 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
474 self
.epmap_reconnect(abstract
, transfer
=transfer
, object=object)
476 tsf1_list
= [transfer
]
477 ctx
= samba
.dcerpc
.dcerpc
.ctx_list()
478 ctx
.context_id
= context_id
479 ctx
.num_transfer_syntaxes
= len(tsf1_list
)
480 ctx
.abstract_syntax
= abstract
481 ctx
.transfer_syntaxes
= tsf1_list
483 ack
= self
.do_generic_bind(ctx
=ctx
,
484 auth_context
=auth_context
,
486 assoc_group_id
=assoc_group_id
)
494 def do_single_request(self
, call_id
, ctx
, io
,
497 bigendian
=False, ndr64
=False,
498 allow_remaining
=False,
501 fault_pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
502 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
504 fault_context_id
=None,
509 if fault_context_id
is None:
510 fault_context_id
= ctx
.context_id
512 if ndr_print
is None:
513 ndr_print
= self
.do_ndr_print
515 hexdump
= self
.do_hexdump
519 sys
.stderr
.write("in: %s" % samba
.ndr
.ndr_print_in(io
))
520 stub_in
= samba
.ndr
.ndr_pack_in(io
, bigendian
=bigendian
, ndr64
=ndr64
)
522 sys
.stderr
.write("stub_in: %d\n%s" % (len(stub_in
), self
.hexdump(stub_in
)))
524 # only used for sig_size calculation
525 stub_in
= '\xff' * samba
.dcerpc
.dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
528 if auth_context
is not None:
529 mod_len
= len(stub_in
) % samba
.dcerpc
.dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
532 auth_pad_length
= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
- mod_len
533 stub_in
+= '\x00' * auth_pad_length
535 if auth_context
["g_auth_level"] >= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
536 sig_size
= auth_context
["gensec"].sig_size(len(stub_in
))
540 zero_sig
= "\x00"*sig_size
541 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
542 auth_level
=auth_context
["auth_level"],
543 auth_pad_length
=auth_pad_length
,
544 auth_context_id
=auth_context
["auth_context_id"],
549 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST
550 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
551 if object is not None:
552 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_OBJECT_UUID
554 req
= self
.generate_request(call_id
=call_id
,
555 context_id
=ctx
.context_id
,
563 if sig_size
!= 0 and auth_context
["auth_level"] >= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
564 req_blob
= samba
.ndr
.ndr_pack(req
)
565 ofs_stub
= samba
.dcerpc
.dcerpc
.DCERPC_REQUEST_LENGTH
566 ofs_sig
= len(req_blob
) - req
.auth_length
567 ofs_trailer
= ofs_sig
- samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
568 req_data
= req_blob
[ofs_stub
:ofs_trailer
]
569 req_whole
= req_blob
[0:ofs_sig
]
570 sig
= auth_context
["gensec"].sign_packet(req_data
, req_whole
)
571 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
572 auth_level
=auth_context
["auth_level"],
573 auth_pad_length
=auth_pad_length
,
574 auth_context_id
=auth_context
["auth_context_id"],
576 req
= self
.generate_request(call_id
=call_id
,
577 context_id
=ctx
.context_id
,
583 self
.send_pdu(req
, ndr_print
=ndr_print
, hexdump
=hexdump
)
585 (rep
, rep_blob
) = self
.recv_pdu_raw(timeout
=timeout
,
589 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
590 pfc_flags
=fault_pfc_flags
, auth_length
=0)
591 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
592 self
.assertEquals(rep
.u
.context_id
, fault_context_id
)
593 self
.assertEquals(rep
.u
.cancel_count
, 0)
594 self
.assertEquals(rep
.u
.flags
, 0)
595 self
.assertEquals(rep
.u
.status
, fault_status
)
596 self
.assertEquals(rep
.u
.reserved
, 0)
597 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
600 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_RESPONSE
, req
.call_id
,
601 auth_length
=sig_size
)
602 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
603 self
.assertEquals(rep
.u
.context_id
, req
.u
.context_id
& 0xff)
604 self
.assertEquals(rep
.u
.cancel_count
, 0)
605 self
.assertGreaterEqual(len(rep
.u
.stub_and_verifier
), rep
.u
.alloc_hint
)
608 ofs_stub
= samba
.dcerpc
.dcerpc
.DCERPC_REQUEST_LENGTH
609 ofs_sig
= rep
.frag_length
- rep
.auth_length
610 ofs_trailer
= ofs_sig
- samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
611 rep_data
= rep_blob
[ofs_stub
:ofs_trailer
]
612 rep_whole
= rep_blob
[0:ofs_sig
]
613 rep_sig
= rep_blob
[ofs_sig
:]
614 rep_auth_info_blob
= rep_blob
[ofs_trailer
:]
616 rep_auth_info
= self
.parse_auth(rep_auth_info_blob
)
617 self
.assertEquals(rep_auth_info
.auth_type
, auth_context
["auth_type"])
618 self
.assertEquals(rep_auth_info
.auth_level
, auth_context
["auth_level"])
619 self
.assertLessEqual(rep_auth_info
.auth_pad_length
, len(rep_data
))
620 self
.assertEquals(rep_auth_info
.auth_reserved
, 0)
621 self
.assertEquals(rep_auth_info
.auth_context_id
, auth_context
["auth_context_id"])
622 self
.assertEquals(rep_auth_info
.credentials
, rep_sig
)
624 if auth_context
["auth_level"] >= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
625 auth_context
["gensec"].check_packet(rep_data
, rep_whole
, rep_sig
)
627 stub_out
= rep_data
[0:-rep_auth_info
.auth_pad_length
]
629 stub_out
= rep
.u
.stub_and_verifier
632 sys
.stderr
.write("stub_out: %d\n%s" % (len(stub_out
), self
.hexdump(stub_out
)))
633 samba
.ndr
.ndr_unpack_out(io
, stub_out
, bigendian
=bigendian
, ndr64
=ndr64
,
634 allow_remaining
=allow_remaining
)
636 sys
.stderr
.write("out: %s" % samba
.ndr
.ndr_print_out(io
))
638 def epmap_reconnect(self
, abstract
, transfer
=None, object=None):
639 ndr32
= samba
.dcerpc
.base
.transfer_syntax_ndr()
645 object = samba
.dcerpc
.misc
.GUID()
647 ctx
= self
.prepare_presentation(samba
.dcerpc
.epmapper
.abstract_syntax(),
648 transfer
, context_id
=0)
650 data1
= samba
.ndr
.ndr_pack(abstract
)
651 lhs1
= samba
.dcerpc
.epmapper
.epm_lhs()
652 lhs1
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
653 lhs1
.lhs_data
= data1
[:18]
654 rhs1
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
655 rhs1
.unknown
= data1
[18:]
656 floor1
= samba
.dcerpc
.epmapper
.epm_floor()
659 data2
= samba
.ndr
.ndr_pack(transfer
)
660 lhs2
= samba
.dcerpc
.epmapper
.epm_lhs()
661 lhs2
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
662 lhs2
.lhs_data
= data2
[:18]
663 rhs2
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
664 rhs2
.unknown
= data1
[18:]
665 floor2
= samba
.dcerpc
.epmapper
.epm_floor()
668 lhs3
= samba
.dcerpc
.epmapper
.epm_lhs()
669 lhs3
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_NCACN
671 floor3
= samba
.dcerpc
.epmapper
.epm_floor()
673 floor3
.rhs
.minor_version
= 0
674 lhs4
= samba
.dcerpc
.epmapper
.epm_lhs()
675 lhs4
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
677 floor4
= samba
.dcerpc
.epmapper
.epm_floor()
679 floor4
.rhs
.port
= self
.tcp_port
680 lhs5
= samba
.dcerpc
.epmapper
.epm_lhs()
681 lhs5
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_IP
683 floor5
= samba
.dcerpc
.epmapper
.epm_floor()
685 floor5
.rhs
.ipaddr
= "0.0.0.0"
687 floors
= [floor1
,floor2
,floor3
,floor4
,floor5
]
688 req_tower
= samba
.dcerpc
.epmapper
.epm_tower()
689 req_tower
.num_floors
= len(floors
)
690 req_tower
.floors
= floors
691 req_twr
= samba
.dcerpc
.epmapper
.epm_twr_t()
692 req_twr
.tower
= req_tower
694 epm_map
= samba
.dcerpc
.epmapper
.epm_Map()
695 epm_map
.in_object
= object
696 epm_map
.in_map_tower
= req_twr
697 epm_map
.in_entry_handle
= samba
.dcerpc
.misc
.policy_handle()
698 epm_map
.in_max_towers
= 4
700 self
.do_single_request(call_id
=2, ctx
=ctx
, io
=epm_map
)
702 self
.assertGreaterEqual(epm_map
.out_num_towers
, 1)
703 rep_twr
= epm_map
.out_towers
[0].twr
704 self
.assertIsNotNone(rep_twr
)
705 self
.assertEqual(rep_twr
.tower_length
, 75)
706 self
.assertEqual(rep_twr
.tower
.num_floors
, 5)
707 self
.assertEqual(len(rep_twr
.tower
.floors
), 5)
708 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
709 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
710 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
711 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
713 # reconnect to the given port
714 self
._disconnect
("epmap_reconnect")
715 self
.tcp_port
= rep_twr
.tower
.floors
[3].rhs
.port
718 def send_pdu(self
, req
, ndr_print
=None, hexdump
=None):
719 if ndr_print
is None:
720 ndr_print
= self
.do_ndr_print
722 hexdump
= self
.do_hexdump
724 req_pdu
= samba
.ndr
.ndr_pack(req
)
726 sys
.stderr
.write("send_pdu: %s" % samba
.ndr
.ndr_print(req
))
728 sys
.stderr
.write("send_pdu: %d\n%s" % (len(req_pdu
), self
.hexdump(req_pdu
)))
730 sent
= self
.s
.send(req_pdu
, 0)
731 if sent
== len(req_pdu
):
733 req_pdu
= req_pdu
[sent
:]
734 except socket
.error
as e
:
735 self
._disconnect
("send_pdu: %s" % e
)
738 self
._disconnect
("send_pdu: %s" % e
)
743 def recv_raw(self
, hexdump
=None, timeout
=None):
746 hexdump
= self
.do_hexdump
748 if timeout
is not None:
749 self
.s
.settimeout(timeout
)
750 rep_pdu
= self
.s
.recv(0xffff, 0)
751 self
.s
.settimeout(10)
752 if len(rep_pdu
) == 0:
753 self
._disconnect
("recv_raw: EOF")
756 sys
.stderr
.write("recv_raw: %d\n%s" % (len(rep_pdu
), self
.hexdump(rep_pdu
)))
757 except socket
.timeout
as e
:
758 self
.s
.settimeout(10)
759 sys
.stderr
.write("recv_raw: TIMEOUT\n")
761 except socket
.error
as e
:
762 self
._disconnect
("recv_raw: %s" % e
)
765 self
._disconnect
("recv_raw: %s" % e
)
771 def recv_pdu_raw(self
, ndr_print
=None, hexdump
=None, timeout
=None):
774 if ndr_print
is None:
775 ndr_print
= self
.do_ndr_print
777 hexdump
= self
.do_hexdump
779 rep_pdu
= self
.recv_raw(hexdump
=hexdump
, timeout
=timeout
)
782 rep
= samba
.ndr
.ndr_unpack(samba
.dcerpc
.dcerpc
.ncacn_packet
, rep_pdu
, allow_remaining
=True)
784 sys
.stderr
.write("recv_pdu: %s" % samba
.ndr
.ndr_print(rep
))
785 self
.assertEqual(rep
.frag_length
, len(rep_pdu
))
788 return (rep
, rep_pdu
)
790 def recv_pdu(self
, ndr_print
=None, hexdump
=None, timeout
=None):
791 (rep
, rep_pdu
) = self
.recv_pdu_raw(ndr_print
=ndr_print
,
796 def generate_auth(self
,
800 auth_context_id
=None,
802 ndr_print
=None, hexdump
=None):
803 if ndr_print
is None:
804 ndr_print
= self
.do_ndr_print
806 hexdump
= self
.do_hexdump
808 if auth_type
is not None:
809 a
= samba
.dcerpc
.dcerpc
.auth()
810 a
.auth_type
= auth_type
811 a
.auth_level
= auth_level
812 a
.auth_pad_length
= auth_pad_length
813 a
.auth_context_id
= auth_context_id
814 a
.credentials
= auth_blob
816 ai
= samba
.ndr
.ndr_pack(a
)
818 sys
.stderr
.write("generate_auth: %s" % samba
.ndr
.ndr_print(a
))
820 sys
.stderr
.write("generate_auth: %d\n%s" % (len(ai
), self
.hexdump(ai
)))
826 def parse_auth(self
, auth_info
, ndr_print
=None, hexdump
=None):
827 if ndr_print
is None:
828 ndr_print
= self
.do_ndr_print
830 hexdump
= self
.do_hexdump
832 if (len(auth_info
) <= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
):
836 sys
.stderr
.write("parse_auth: %d\n%s" % (len(auth_info
), self
.hexdump(auth_info
)))
837 a
= samba
.ndr
.ndr_unpack(samba
.dcerpc
.dcerpc
.auth
, auth_info
, allow_remaining
=True)
839 sys
.stderr
.write("parse_auth: %s" % samba
.ndr
.ndr_print(a
))
843 def generate_pdu(self
, ptype
, call_id
, payload
,
846 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
847 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
848 drep
= [samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
849 ndr_print
=None, hexdump
=None):
851 if getattr(payload
, 'auth_info', None):
852 ai
= payload
.auth_info
856 p
= samba
.dcerpc
.dcerpc
.ncacn_packet()
857 p
.rpc_vers
= rpc_vers
858 p
.rpc_vers_minor
= rpc_vers_minor
860 p
.pfc_flags
= pfc_flags
863 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
864 p
.auth_length
= len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
870 pdu
= samba
.ndr
.ndr_pack(p
)
871 p
.frag_length
= len(pdu
)
875 def verify_pdu(self
, p
, ptype
, call_id
,
878 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
879 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
880 drep
= [samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
883 self
.assertIsNotNone(p
, "No valid pdu")
885 if getattr(p
.u
, 'auth_info', None):
890 self
.assertEqual(p
.rpc_vers
, rpc_vers
)
891 self
.assertEqual(p
.rpc_vers_minor
, rpc_vers_minor
)
892 self
.assertEqual(p
.ptype
, ptype
)
893 self
.assertEqual(p
.pfc_flags
, pfc_flags
)
894 self
.assertEqual(p
.drep
, drep
)
895 self
.assertGreaterEqual(p
.frag_length
,
896 samba
.dcerpc
.dcerpc
.DCERPC_NCACN_PAYLOAD_OFFSET
)
897 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
898 self
.assertEqual(p
.auth_length
,
899 len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
900 elif auth_length
is not None:
901 self
.assertEqual(p
.auth_length
, auth_length
)
903 self
.assertEqual(p
.auth_length
, 0)
904 self
.assertEqual(p
.call_id
, call_id
)
908 def generate_bind(self
, call_id
,
909 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
910 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
916 ndr_print
=None, hexdump
=None):
918 b
= samba
.dcerpc
.dcerpc
.bind()
919 b
.max_xmit_frag
= max_xmit_frag
920 b
.max_recv_frag
= max_recv_frag
921 b
.assoc_group_id
= assoc_group_id
922 b
.num_contexts
= len(ctx_list
)
923 b
.ctx_list
= ctx_list
924 b
.auth_info
= auth_info
926 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND
,
930 ndr_print
=ndr_print
, hexdump
=hexdump
)
934 def generate_alter(self
, call_id
,
935 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
936 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
942 ndr_print
=None, hexdump
=None):
944 a
= samba
.dcerpc
.dcerpc
.bind()
945 a
.max_xmit_frag
= max_xmit_frag
946 a
.max_recv_frag
= max_recv_frag
947 a
.assoc_group_id
= assoc_group_id
948 a
.num_contexts
= len(ctx_list
)
949 a
.ctx_list
= ctx_list
950 a
.auth_info
= auth_info
952 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER
,
956 ndr_print
=ndr_print
, hexdump
=hexdump
)
960 def generate_auth3(self
, call_id
,
961 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
962 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
964 ndr_print
=None, hexdump
=None):
966 a
= samba
.dcerpc
.dcerpc
.auth3()
967 a
.auth_info
= auth_info
969 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_AUTH3
,
973 ndr_print
=ndr_print
, hexdump
=hexdump
)
977 def generate_request(self
, call_id
,
978 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
979 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
986 ndr_print
=None, hexdump
=None):
988 if alloc_hint
is None:
989 alloc_hint
= len(stub
)
991 r
= samba
.dcerpc
.dcerpc
.request()
992 r
.alloc_hint
= alloc_hint
993 r
.context_id
= context_id
995 if object is not None:
997 r
.stub_and_verifier
= stub
+ auth_info
999 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_REQUEST
,
1000 pfc_flags
=pfc_flags
,
1003 ndr_print
=ndr_print
, hexdump
=hexdump
)
1005 if len(auth_info
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
1006 p
.auth_length
= len(auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
1010 def generate_co_cancel(self
, call_id
,
1011 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
1012 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
1014 ndr_print
=None, hexdump
=None):
1016 c
= samba
.dcerpc
.dcerpc
.co_cancel()
1017 c
.auth_info
= auth_info
1019 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_CO_CANCEL
,
1020 pfc_flags
=pfc_flags
,
1023 ndr_print
=ndr_print
, hexdump
=hexdump
)
1027 def generate_orphaned(self
, call_id
,
1028 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
1029 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
1031 ndr_print
=None, hexdump
=None):
1033 o
= samba
.dcerpc
.dcerpc
.orphaned()
1034 o
.auth_info
= auth_info
1036 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ORPHANED
,
1037 pfc_flags
=pfc_flags
,
1040 ndr_print
=ndr_print
, hexdump
=hexdump
)
1044 def generate_shutdown(self
, call_id
,
1045 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
1046 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
1047 ndr_print
=None, hexdump
=None):
1049 s
= samba
.dcerpc
.dcerpc
.shutdown()
1051 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_SHUTDOWN
,
1052 pfc_flags
=pfc_flags
,
1055 ndr_print
=ndr_print
, hexdump
=hexdump
)
1059 def assertIsConnected(self
):
1060 self
.assertIsNotNone(self
.s
, msg
="Not connected")
1063 def assertNotConnected(self
):
1064 self
.assertIsNone(self
.s
, msg
="Is connected")
1067 def assertNDRSyntaxEquals(self
, s1
, s2
):
1068 self
.assertEqual(s1
.uuid
, s2
.uuid
)
1069 self
.assertEqual(s1
.if_version
, s2
.if_version
)
1072 class ValidNetbiosNameTests(TestCase
):
1074 def test_valid(self
):
1075 self
.assertTrue(samba
.valid_netbios_name("FOO"))
1077 def test_too_long(self
):
1078 self
.assertFalse(samba
.valid_netbios_name("FOO"*10))
1080 def test_invalid_characters(self
):
1081 self
.assertFalse(samba
.valid_netbios_name("*BLA"))
1084 class BlackboxProcessError(Exception):
1085 """This is raised when check_output() process returns a non-zero exit status
1087 Exception instance should contain the exact exit code (S.returncode),
1088 command line (S.cmd), process output (S.stdout) and process error stream
1092 def __init__(self
, returncode
, cmd
, stdout
, stderr
):
1093 self
.returncode
= returncode
1095 self
.stdout
= stdout
1096 self
.stderr
= stderr
1099 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self
.cmd
, self
.returncode
,
1100 self
.stdout
, self
.stderr
)
1102 class BlackboxTestCase(TestCaseInTempDir
):
1103 """Base test case for blackbox tests."""
1105 def _make_cmdline(self
, line
):
1106 bindir
= os
.path
.abspath(os
.path
.join(os
.path
.dirname(__file__
), "../../../../bin"))
1107 parts
= line
.split(" ")
1108 if os
.path
.exists(os
.path
.join(bindir
, parts
[0])):
1109 parts
[0] = os
.path
.join(bindir
, parts
[0])
1110 line
= " ".join(parts
)
1113 def check_run(self
, line
):
1114 line
= self
._make
_cmdline
(line
)
1115 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True)
1118 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
1120 def check_output(self
, line
):
1121 line
= self
._make
_cmdline
(line
)
1122 p
= subprocess
.Popen(line
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
, shell
=True, close_fds
=True)
1125 raise BlackboxProcessError(retcode
, line
, p
.stdout
.read(), p
.stderr
.read())
1126 return p
.stdout
.read()
1129 def connect_samdb(samdb_url
, lp
=None, session_info
=None, credentials
=None,
1130 flags
=0, ldb_options
=None, ldap_only
=False, global_schema
=True):
1131 """Create SamDB instance and connects to samdb_url database.
1133 :param samdb_url: Url for database to connect to.
1134 :param lp: Optional loadparm object
1135 :param session_info: Optional session information
1136 :param credentials: Optional credentials, defaults to anonymous.
1137 :param flags: Optional LDB flags
1138 :param ldap_only: If set, only remote LDAP connection will be created.
1139 :param global_schema: Whether to use global schema.
1141 Added value for tests is that we have a shorthand function
1142 to make proper URL for ldb.connect() while using default
1143 parameters for connection based on test environment
1145 if not "://" in samdb_url
:
1146 if not ldap_only
and os
.path
.isfile(samdb_url
):
1147 samdb_url
= "tdb://%s" % samdb_url
1149 samdb_url
= "ldap://%s" % samdb_url
1150 # use 'paged_search' module when connecting remotely
1151 if samdb_url
.startswith("ldap://"):
1152 ldb_options
= ["modules:paged_searches"]
1154 raise AssertionError("Trying to connect to %s while remote "
1155 "connection is required" % samdb_url
)
1157 # set defaults for test environment
1160 if session_info
is None:
1161 session_info
= samba
.auth
.system_session(lp
)
1162 if credentials
is None:
1163 credentials
= cmdline_credentials
1165 return SamDB(url
=samdb_url
,
1167 session_info
=session_info
,
1168 credentials
=credentials
,
1170 options
=ldb_options
,
1171 global_schema
=global_schema
)
1174 def connect_samdb_ex(samdb_url
, lp
=None, session_info
=None, credentials
=None,
1175 flags
=0, ldb_options
=None, ldap_only
=False):
1176 """Connects to samdb_url database
1178 :param samdb_url: Url for database to connect to.
1179 :param lp: Optional loadparm object
1180 :param session_info: Optional session information
1181 :param credentials: Optional credentials, defaults to anonymous.
1182 :param flags: Optional LDB flags
1183 :param ldap_only: If set, only remote LDAP connection will be created.
1184 :return: (sam_db_connection, rootDse_record) tuple
1186 sam_db
= connect_samdb(samdb_url
, lp
, session_info
, credentials
,
1187 flags
, ldb_options
, ldap_only
)
1189 res
= sam_db
.search(base
="", expression
="", scope
=ldb
.SCOPE_BASE
,
1191 return (sam_db
, res
[0])
1194 def connect_samdb_env(env_url
, env_username
, env_password
, lp
=None):
1195 """Connect to SamDB by getting URL and Credentials from environment
1197 :param env_url: Environment variable name to get lsb url from
1198 :param env_username: Username environment variable
1199 :param env_password: Password environment variable
1200 :return: sam_db_connection
1202 samdb_url
= env_get_var_value(env_url
)
1203 creds
= credentials
.Credentials()
1205 # guess Credentials parameters here. Otherwise workstation
1206 # and domain fields are NULL and gencache code segfalts
1207 lp
= param
.LoadParm()
1209 creds
.set_username(env_get_var_value(env_username
))
1210 creds
.set_password(env_get_var_value(env_password
))
1211 return connect_samdb(samdb_url
, credentials
=creds
, lp
=lp
)
1214 def delete_force(samdb
, dn
):
1217 except ldb
.LdbError
, (num
, errstr
):
1218 assert num
== ldb
.ERR_NO_SUCH_OBJECT
, "ldb.delete() failed: %s" % errstr