python/tests: add a second_connection() helper function
[Samba.git] / python / samba / tests / __init__.py
blob62b2d99e045f70e04482fa8986a9a3262e13e6d3
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Samba Python tests."""
21 import os
22 import ldb
23 import samba
24 import samba.auth
25 from samba import param
26 from samba.samdb import SamDB
27 from samba import credentials
28 import samba.ndr
29 import samba.dcerpc.dcerpc
30 import samba.dcerpc.base
31 import samba.dcerpc.epmapper
32 from samba.credentials import Credentials
33 from samba import gensec
34 import socket
35 import struct
36 import subprocess
37 import sys
38 import tempfile
39 import unittest
41 try:
42 from unittest import SkipTest
43 except ImportError:
44 class SkipTest(Exception):
45 """Test skipped."""
47 HEXDUMP_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
49 class TestCase(unittest.TestCase):
50 """A Samba test case."""
52 def setUp(self):
53 super(TestCase, self).setUp()
54 test_debug_level = os.getenv("TEST_DEBUG_LEVEL")
55 if test_debug_level is not None:
56 test_debug_level = int(test_debug_level)
57 self._old_debug_level = samba.get_debug_level()
58 samba.set_debug_level(test_debug_level)
59 self.addCleanup(samba.set_debug_level, test_debug_level)
61 def get_loadparm(self):
62 return env_loadparm()
64 def get_credentials(self):
65 return cmdline_credentials
67 def hexdump(self, src):
68 N = 0
69 result = ''
70 while src:
71 ll = src[:8]
72 lr = src[8:16]
73 src = src[16:]
74 hl = ' '.join(["%02X" % ord(x) for x in ll])
75 hr = ' '.join(["%02X" % ord(x) for x in lr])
76 ll = ll.translate(HEXDUMP_FILTER)
77 lr = lr.translate(HEXDUMP_FILTER)
78 result += "[%04X] %-*s %-*s %s %s\n" % (N, 8*3, hl, 8*3, hr, ll, lr)
79 N += 16
80 return result
82 # These functions didn't exist before Python2.7:
83 if sys.version_info < (2, 7):
84 import warnings
86 def skipTest(self, reason):
87 raise SkipTest(reason)
89 def assertIn(self, member, container, msg=None):
90 self.assertTrue(member in container, msg)
92 def assertIs(self, a, b, msg=None):
93 self.assertTrue(a is b, msg)
95 def assertIsNot(self, a, b, msg=None):
96 self.assertTrue(a is not b, msg)
98 def assertIsNotNone(self, a, msg=None):
99 self.assertTrue(a is not None)
101 def assertIsInstance(self, a, b, msg=None):
102 self.assertTrue(isinstance(a, b), msg)
104 def assertIsNone(self, a, msg=None):
105 self.assertTrue(a is None, msg)
107 def assertGreater(self, a, b, msg=None):
108 self.assertTrue(a > b, msg)
110 def assertGreaterEqual(self, a, b, msg=None):
111 self.assertTrue(a >= b, msg)
113 def assertLess(self, a, b, msg=None):
114 self.assertTrue(a < b, msg)
116 def assertLessEqual(self, a, b, msg=None):
117 self.assertTrue(a <= b, msg)
119 def addCleanup(self, fn, *args, **kwargs):
120 self._cleanups = getattr(self, "_cleanups", []) + [
121 (fn, args, kwargs)]
123 def _addSkip(self, result, reason):
124 addSkip = getattr(result, 'addSkip', None)
125 if addSkip is not None:
126 addSkip(self, reason)
127 else:
128 warnings.warn("TestResult has no addSkip method, skips not reported",
129 RuntimeWarning, 2)
130 result.addSuccess(self)
132 def run(self, result=None):
133 if result is None: result = self.defaultTestResult()
134 result.startTest(self)
135 testMethod = getattr(self, self._testMethodName)
136 try:
137 try:
138 self.setUp()
139 except SkipTest, e:
140 self._addSkip(result, str(e))
141 return
142 except KeyboardInterrupt:
143 raise
144 except:
145 result.addError(self, self._exc_info())
146 return
148 ok = False
149 try:
150 testMethod()
151 ok = True
152 except SkipTest, e:
153 self._addSkip(result, str(e))
154 return
155 except self.failureException:
156 result.addFailure(self, self._exc_info())
157 except KeyboardInterrupt:
158 raise
159 except:
160 result.addError(self, self._exc_info())
162 try:
163 self.tearDown()
164 except SkipTest, e:
165 self._addSkip(result, str(e))
166 except KeyboardInterrupt:
167 raise
168 except:
169 result.addError(self, self._exc_info())
170 ok = False
172 for (fn, args, kwargs) in reversed(getattr(self, "_cleanups", [])):
173 fn(*args, **kwargs)
174 if ok: result.addSuccess(self)
175 finally:
176 result.stopTest(self)
179 class LdbTestCase(TestCase):
180 """Trivial test case for running tests against a LDB."""
182 def setUp(self):
183 super(LdbTestCase, self).setUp()
184 self.filename = os.tempnam()
185 self.ldb = samba.Ldb(self.filename)
187 def set_modules(self, modules=[]):
188 """Change the modules for this Ldb."""
189 m = ldb.Message()
190 m.dn = ldb.Dn(self.ldb, "@MODULES")
191 m["@LIST"] = ",".join(modules)
192 self.ldb.add(m)
193 self.ldb = samba.Ldb(self.filename)
196 class TestCaseInTempDir(TestCase):
198 def setUp(self):
199 super(TestCaseInTempDir, self).setUp()
200 self.tempdir = tempfile.mkdtemp()
201 self.addCleanup(self._remove_tempdir)
203 def _remove_tempdir(self):
204 self.assertEquals([], os.listdir(self.tempdir))
205 os.rmdir(self.tempdir)
206 self.tempdir = None
209 def env_loadparm():
210 lp = param.LoadParm()
211 try:
212 lp.load(os.environ["SMB_CONF_PATH"])
213 except KeyError:
214 raise KeyError("SMB_CONF_PATH not set")
215 return lp
218 def env_get_var_value(var_name, allow_missing=False):
219 """Returns value for variable in os.environ
221 Function throws AssertionError if variable is defined.
222 Unit-test based python tests require certain input params
223 to be set in environment, otherwise they can't be run
225 if allow_missing:
226 if var_name not in os.environ.keys():
227 return None
228 assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
229 return os.environ[var_name]
232 cmdline_credentials = None
234 class RpcInterfaceTestCase(TestCase):
235 """DCE/RPC Test case."""
237 class RawDCERPCTest(TestCase):
238 """A raw DCE/RPC Test case."""
240 def _disconnect(self, reason):
241 if self.s is None:
242 return
243 self.s.close()
244 self.s = None
245 if self.do_hexdump:
246 sys.stderr.write("disconnect[%s]\n" % reason)
248 def connect(self):
249 try:
250 self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
251 socket.SOCK_STREAM, socket.SOL_TCP,
253 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
254 self.s.settimeout(10)
255 self.s.connect(self.a[0][4])
256 except socket.error as e:
257 self.s.close()
258 raise
259 except IOError as e:
260 self.s.close()
261 raise
262 except Exception as e:
263 raise
264 finally:
265 pass
267 def setUp(self):
268 super(RawDCERPCTest, self).setUp()
269 self.do_ndr_print = False
270 self.do_hexdump = False
272 self.host = samba.tests.env_get_var_value('SERVER')
273 self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
274 if self.target_hostname is None:
275 self.target_hostname = self.host
276 self.tcp_port = 135
278 self.settings = {}
279 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
280 self.settings["target_hostname"] = self.target_hostname
282 self.connect()
284 def noop(self):
285 return
287 def second_connection(self, tcp_port=None):
288 c = RawDCERPCTest(methodName='noop')
289 c.do_ndr_print = self.do_ndr_print
290 c.do_hexdump = self.do_hexdump
292 c.host = self.host
293 c.target_hostname = self.target_hostname
294 if tcp_port is not None:
295 c.tcp_port = tcp_port
296 else:
297 c.tcp_port = self.tcp_port
299 c.settings = self.settings
301 c.connect()
302 return c
304 def get_user_creds(self):
305 c = Credentials()
306 c.guess()
307 username = samba.tests.env_get_var_value('USERNAME')
308 password = samba.tests.env_get_var_value('PASSWORD')
309 c.set_username(username)
310 c.set_password(password)
311 return c
313 def get_anon_creds(self):
314 c = Credentials()
315 c.set_anonymous()
316 return c
318 def get_auth_context_creds(self, creds, auth_type, auth_level,
319 auth_context_id,
320 g_auth_level=None):
322 if g_auth_level is None:
323 g_auth_level = auth_level
325 g = gensec.Security.start_client(self.settings)
326 g.set_credentials(creds)
327 g.want_feature(gensec.FEATURE_DCE_STYLE)
328 g.start_mech_by_authtype(auth_type, g_auth_level)
330 auth_context = {}
331 auth_context["auth_type"] = auth_type
332 auth_context["auth_level"] = auth_level
333 auth_context["auth_context_id"] = auth_context_id
334 auth_context["g_auth_level"] = g_auth_level
335 auth_context["gensec"] = g
337 return auth_context
339 def do_generic_bind(self, ctx, auth_context=None,
340 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
341 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
342 assoc_group_id=0, call_id=0,
343 nak_reason=None, alter_fault=None):
344 ctx_list = [ctx]
346 if auth_context is not None:
347 from_server = ""
348 (finished, to_server) = auth_context["gensec"].update(from_server)
349 self.assertFalse(finished)
351 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
352 auth_level=auth_context["auth_level"],
353 auth_context_id=auth_context["auth_context_id"],
354 auth_blob=to_server)
355 else:
356 auth_info = ""
358 req = self.generate_bind(call_id=call_id,
359 pfc_flags=pfc_flags,
360 ctx_list=ctx_list,
361 assoc_group_id=assoc_group_id,
362 auth_info=auth_info)
363 self.send_pdu(req)
364 rep = self.recv_pdu()
365 if nak_reason is not None:
366 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
367 auth_length=0)
368 self.assertEquals(rep.u.reject_reason, nak_reason)
369 self.assertEquals(rep.u.num_versions, 1)
370 self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
371 self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
372 self.assertEquals(len(rep.u._pad), 3)
373 self.assertEquals(rep.u._pad, '\0' * 3)
374 return
375 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
376 pfc_flags=pfc_flags)
377 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
378 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
379 if assoc_group_id != 0:
380 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
381 else:
382 self.assertNotEquals(rep.u.assoc_group_id, 0)
383 assoc_group_id = rep.u.assoc_group_id
384 port_str = "%d" % self.tcp_port
385 port_len = len(port_str) + 1
386 mod_len = (2 + port_len) % 4
387 if mod_len != 0:
388 port_pad = 4 - mod_len
389 else:
390 port_pad = 0
391 self.assertEquals(rep.u.secondary_address_size, port_len)
392 self.assertEquals(rep.u.secondary_address, port_str)
393 self.assertEquals(len(rep.u._pad1), port_pad)
394 # sometimes windows sends random bytes
395 # self.assertEquals(rep.u._pad1, '\0' * port_pad)
396 self.assertEquals(rep.u.num_results, 1)
397 self.assertEquals(rep.u.ctx_list[0].result,
398 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
399 self.assertEquals(rep.u.ctx_list[0].reason,
400 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
401 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
402 ack = rep
403 if auth_context is None:
404 self.assertEquals(rep.auth_length, 0)
405 self.assertEquals(len(rep.u.auth_info), 0)
406 return ack
407 self.assertNotEquals(rep.auth_length, 0)
408 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
409 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
411 a = self.parse_auth(rep.u.auth_info)
413 from_server = a.credentials
414 (finished, to_server) = auth_context["gensec"].update(from_server)
415 self.assertFalse(finished)
417 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
418 auth_level=auth_context["auth_level"],
419 auth_context_id=auth_context["auth_context_id"],
420 auth_blob=to_server)
421 req = self.generate_alter(call_id=call_id,
422 ctx_list=ctx_list,
423 assoc_group_id=0xffffffff-assoc_group_id,
424 auth_info=auth_info)
425 self.send_pdu(req)
426 rep = self.recv_pdu()
427 if alter_fault is not None:
428 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
429 pfc_flags=req.pfc_flags |
430 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
431 auth_length=0)
432 self.assertNotEquals(rep.u.alloc_hint, 0)
433 self.assertEquals(rep.u.context_id, 0)
434 self.assertEquals(rep.u.cancel_count, 0)
435 self.assertEquals(rep.u.flags, 0)
436 self.assertEquals(rep.u.status, alter_fault)
437 self.assertEquals(rep.u.reserved, 0)
438 self.assertEquals(len(rep.u.error_and_verifier), 0)
439 return None
440 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
441 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
442 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
443 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
444 self.assertEquals(rep.u.secondary_address_size, 0)
445 self.assertEquals(rep.u.secondary_address, '')
446 self.assertEquals(len(rep.u._pad1), 2)
447 # sometimes windows sends random bytes
448 # self.assertEquals(rep.u._pad1, '\0' * 2)
449 self.assertEquals(rep.u.num_results, 1)
450 self.assertEquals(rep.u.ctx_list[0].result,
451 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
452 self.assertEquals(rep.u.ctx_list[0].reason,
453 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
454 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
455 self.assertNotEquals(rep.auth_length, 0)
456 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
457 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
459 a = self.parse_auth(rep.u.auth_info)
461 from_server = a.credentials
462 (finished, to_server) = auth_context["gensec"].update(from_server)
463 self.assertTrue(finished)
465 return ack
467 def prepare_presentation(self, abstract, transfer, object=None,
468 context_id=0xffff, epmap=False, auth_context=None,
469 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
470 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
471 assoc_group_id=0,
472 return_ack=False):
473 if epmap:
474 self.epmap_reconnect(abstract, transfer=transfer, object=object)
476 tsf1_list = [transfer]
477 ctx = samba.dcerpc.dcerpc.ctx_list()
478 ctx.context_id = context_id
479 ctx.num_transfer_syntaxes = len(tsf1_list)
480 ctx.abstract_syntax = abstract
481 ctx.transfer_syntaxes = tsf1_list
483 ack = self.do_generic_bind(ctx=ctx,
484 auth_context=auth_context,
485 pfc_flags=pfc_flags,
486 assoc_group_id=assoc_group_id)
487 if ack is None:
488 ctx = None
490 if return_ack:
491 return (ctx, ack)
492 return ctx
494 def do_single_request(self, call_id, ctx, io,
495 auth_context=None,
496 object=None,
497 bigendian=False, ndr64=False,
498 allow_remaining=False,
499 send_req=True,
500 recv_rep=True,
501 fault_pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
502 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
503 fault_status=None,
504 fault_context_id=None,
505 timeout=None,
506 ndr_print=None,
507 hexdump=None):
509 if fault_context_id is None:
510 fault_context_id = ctx.context_id
512 if ndr_print is None:
513 ndr_print = self.do_ndr_print
514 if hexdump is None:
515 hexdump = self.do_hexdump
517 if send_req:
518 if ndr_print:
519 sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
520 stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
521 if hexdump:
522 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
523 else:
524 # only used for sig_size calculation
525 stub_in = '\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
527 sig_size = 0
528 if auth_context is not None:
529 mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
530 auth_pad_length = 0
531 if mod_len > 0:
532 auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
533 stub_in += '\x00' * auth_pad_length
535 if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
536 sig_size = auth_context["gensec"].sig_size(len(stub_in))
537 else:
538 sig_size = 16
540 zero_sig = "\x00"*sig_size
541 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
542 auth_level=auth_context["auth_level"],
543 auth_pad_length=auth_pad_length,
544 auth_context_id=auth_context["auth_context_id"],
545 auth_blob=zero_sig)
546 else:
547 auth_info=""
549 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
550 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
551 if object is not None:
552 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
554 req = self.generate_request(call_id=call_id,
555 context_id=ctx.context_id,
556 pfc_flags=pfc_flags,
557 object=object,
558 opnum=io.opnum(),
559 stub=stub_in,
560 auth_info=auth_info)
562 if send_req:
563 if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
564 req_blob = samba.ndr.ndr_pack(req)
565 ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
566 ofs_sig = len(req_blob) - req.auth_length
567 ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
568 req_data = req_blob[ofs_stub:ofs_trailer]
569 req_whole = req_blob[0:ofs_sig]
570 sig = auth_context["gensec"].sign_packet(req_data, req_whole)
571 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
572 auth_level=auth_context["auth_level"],
573 auth_pad_length=auth_pad_length,
574 auth_context_id=auth_context["auth_context_id"],
575 auth_blob=sig)
576 req = self.generate_request(call_id=call_id,
577 context_id=ctx.context_id,
578 pfc_flags=pfc_flags,
579 object=object,
580 opnum=io.opnum(),
581 stub=stub_in,
582 auth_info=auth_info)
583 self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
584 if recv_rep:
585 (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
586 ndr_print=ndr_print,
587 hexdump=hexdump)
588 if fault_status:
589 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
590 pfc_flags=fault_pfc_flags, auth_length=0)
591 self.assertNotEquals(rep.u.alloc_hint, 0)
592 self.assertEquals(rep.u.context_id, fault_context_id)
593 self.assertEquals(rep.u.cancel_count, 0)
594 self.assertEquals(rep.u.flags, 0)
595 self.assertEquals(rep.u.status, fault_status)
596 self.assertEquals(rep.u.reserved, 0)
597 self.assertEquals(len(rep.u.error_and_verifier), 0)
598 return
600 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
601 auth_length=sig_size)
602 self.assertNotEquals(rep.u.alloc_hint, 0)
603 self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
604 self.assertEquals(rep.u.cancel_count, 0)
605 self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
606 if sig_size != 0:
608 ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
609 ofs_sig = rep.frag_length - rep.auth_length
610 ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
611 rep_data = rep_blob[ofs_stub:ofs_trailer]
612 rep_whole = rep_blob[0:ofs_sig]
613 rep_sig = rep_blob[ofs_sig:]
614 rep_auth_info_blob = rep_blob[ofs_trailer:]
616 rep_auth_info = self.parse_auth(rep_auth_info_blob)
617 self.assertEquals(rep_auth_info.auth_type, auth_context["auth_type"])
618 self.assertEquals(rep_auth_info.auth_level, auth_context["auth_level"])
619 self.assertLessEqual(rep_auth_info.auth_pad_length, len(rep_data))
620 self.assertEquals(rep_auth_info.auth_reserved, 0)
621 self.assertEquals(rep_auth_info.auth_context_id, auth_context["auth_context_id"])
622 self.assertEquals(rep_auth_info.credentials, rep_sig)
624 if auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
625 auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
627 stub_out = rep_data[0:-rep_auth_info.auth_pad_length]
628 else:
629 stub_out = rep.u.stub_and_verifier
631 if hexdump:
632 sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
633 samba.ndr.ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
634 allow_remaining=allow_remaining)
635 if ndr_print:
636 sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
638 def epmap_reconnect(self, abstract, transfer=None, object=None):
639 ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
641 if transfer is None:
642 transfer = ndr32
644 if object is None:
645 object = samba.dcerpc.misc.GUID()
647 ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
648 transfer, context_id=0)
650 data1 = samba.ndr.ndr_pack(abstract)
651 lhs1 = samba.dcerpc.epmapper.epm_lhs()
652 lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
653 lhs1.lhs_data = data1[:18]
654 rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
655 rhs1.unknown = data1[18:]
656 floor1 = samba.dcerpc.epmapper.epm_floor()
657 floor1.lhs = lhs1
658 floor1.rhs = rhs1
659 data2 = samba.ndr.ndr_pack(transfer)
660 lhs2 = samba.dcerpc.epmapper.epm_lhs()
661 lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
662 lhs2.lhs_data = data2[:18]
663 rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
664 rhs2.unknown = data1[18:]
665 floor2 = samba.dcerpc.epmapper.epm_floor()
666 floor2.lhs = lhs2
667 floor2.rhs = rhs2
668 lhs3 = samba.dcerpc.epmapper.epm_lhs()
669 lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
670 lhs3.lhs_data = ""
671 floor3 = samba.dcerpc.epmapper.epm_floor()
672 floor3.lhs = lhs3
673 floor3.rhs.minor_version = 0
674 lhs4 = samba.dcerpc.epmapper.epm_lhs()
675 lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
676 lhs4.lhs_data = ""
677 floor4 = samba.dcerpc.epmapper.epm_floor()
678 floor4.lhs = lhs4
679 floor4.rhs.port = self.tcp_port
680 lhs5 = samba.dcerpc.epmapper.epm_lhs()
681 lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
682 lhs5.lhs_data = ""
683 floor5 = samba.dcerpc.epmapper.epm_floor()
684 floor5.lhs = lhs5
685 floor5.rhs.ipaddr = "0.0.0.0"
687 floors = [floor1,floor2,floor3,floor4,floor5]
688 req_tower = samba.dcerpc.epmapper.epm_tower()
689 req_tower.num_floors = len(floors)
690 req_tower.floors = floors
691 req_twr = samba.dcerpc.epmapper.epm_twr_t()
692 req_twr.tower = req_tower
694 epm_map = samba.dcerpc.epmapper.epm_Map()
695 epm_map.in_object = object
696 epm_map.in_map_tower = req_twr
697 epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
698 epm_map.in_max_towers = 4
700 self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
702 self.assertGreaterEqual(epm_map.out_num_towers, 1)
703 rep_twr = epm_map.out_towers[0].twr
704 self.assertIsNotNone(rep_twr)
705 self.assertEqual(rep_twr.tower_length, 75)
706 self.assertEqual(rep_twr.tower.num_floors, 5)
707 self.assertEqual(len(rep_twr.tower.floors), 5)
708 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
709 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
710 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
711 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
713 # reconnect to the given port
714 self._disconnect("epmap_reconnect")
715 self.tcp_port = rep_twr.tower.floors[3].rhs.port
716 self.connect()
718 def send_pdu(self, req, ndr_print=None, hexdump=None):
719 if ndr_print is None:
720 ndr_print = self.do_ndr_print
721 if hexdump is None:
722 hexdump = self.do_hexdump
723 try:
724 req_pdu = samba.ndr.ndr_pack(req)
725 if ndr_print:
726 sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
727 if hexdump:
728 sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
729 while True:
730 sent = self.s.send(req_pdu, 0)
731 if sent == len(req_pdu):
732 break
733 req_pdu = req_pdu[sent:]
734 except socket.error as e:
735 self._disconnect("send_pdu: %s" % e)
736 raise
737 except IOError as e:
738 self._disconnect("send_pdu: %s" % e)
739 raise
740 finally:
741 pass
743 def recv_raw(self, hexdump=None, timeout=None):
744 rep_pdu = None
745 if hexdump is None:
746 hexdump = self.do_hexdump
747 try:
748 if timeout is not None:
749 self.s.settimeout(timeout)
750 rep_pdu = self.s.recv(0xffff, 0)
751 self.s.settimeout(10)
752 if len(rep_pdu) == 0:
753 self._disconnect("recv_raw: EOF")
754 return None
755 if hexdump:
756 sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
757 except socket.timeout as e:
758 self.s.settimeout(10)
759 sys.stderr.write("recv_raw: TIMEOUT\n")
760 pass
761 except socket.error as e:
762 self._disconnect("recv_raw: %s" % e)
763 raise
764 except IOError as e:
765 self._disconnect("recv_raw: %s" % e)
766 raise
767 finally:
768 pass
769 return rep_pdu
771 def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
772 rep_pdu = None
773 rep = None
774 if ndr_print is None:
775 ndr_print = self.do_ndr_print
776 if hexdump is None:
777 hexdump = self.do_hexdump
778 try:
779 rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
780 if rep_pdu is None:
781 return (None,None)
782 rep = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
783 if ndr_print:
784 sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
785 self.assertEqual(rep.frag_length, len(rep_pdu))
786 finally:
787 pass
788 return (rep, rep_pdu)
790 def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
791 (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
792 hexdump=hexdump,
793 timeout=timeout)
794 return rep
796 def generate_auth(self,
797 auth_type=None,
798 auth_level=None,
799 auth_pad_length=0,
800 auth_context_id=None,
801 auth_blob=None,
802 ndr_print=None, hexdump=None):
803 if ndr_print is None:
804 ndr_print = self.do_ndr_print
805 if hexdump is None:
806 hexdump = self.do_hexdump
808 if auth_type is not None:
809 a = samba.dcerpc.dcerpc.auth()
810 a.auth_type = auth_type
811 a.auth_level = auth_level
812 a.auth_pad_length = auth_pad_length
813 a.auth_context_id= auth_context_id
814 a.credentials = auth_blob
816 ai = samba.ndr.ndr_pack(a)
817 if ndr_print:
818 sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
819 if hexdump:
820 sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
821 else:
822 ai = ""
824 return ai
826 def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
827 if ndr_print is None:
828 ndr_print = self.do_ndr_print
829 if hexdump is None:
830 hexdump = self.do_hexdump
832 if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
833 return None
835 if hexdump:
836 sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
837 a = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
838 if ndr_print:
839 sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
841 return a
843 def generate_pdu(self, ptype, call_id, payload,
844 rpc_vers=5,
845 rpc_vers_minor=0,
846 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
847 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
848 drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
849 ndr_print=None, hexdump=None):
851 if getattr(payload, 'auth_info', None):
852 ai = payload.auth_info
853 else:
854 ai = ""
856 p = samba.dcerpc.dcerpc.ncacn_packet()
857 p.rpc_vers = rpc_vers
858 p.rpc_vers_minor = rpc_vers_minor
859 p.ptype = ptype
860 p.pfc_flags = pfc_flags
861 p.drep = drep
862 p.frag_length = 0
863 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
864 p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
865 else:
866 p.auth_length = 0
867 p.call_id = call_id
868 p.u = payload
870 pdu = samba.ndr.ndr_pack(p)
871 p.frag_length = len(pdu)
873 return p
875 def verify_pdu(self, p, ptype, call_id,
876 rpc_vers=5,
877 rpc_vers_minor=0,
878 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
879 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
880 drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
881 auth_length=None):
883 self.assertIsNotNone(p, "No valid pdu")
885 if getattr(p.u, 'auth_info', None):
886 ai = p.u.auth_info
887 else:
888 ai = ""
890 self.assertEqual(p.rpc_vers, rpc_vers)
891 self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
892 self.assertEqual(p.ptype, ptype)
893 self.assertEqual(p.pfc_flags, pfc_flags)
894 self.assertEqual(p.drep, drep)
895 self.assertGreaterEqual(p.frag_length,
896 samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
897 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
898 self.assertEqual(p.auth_length,
899 len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
900 elif auth_length is not None:
901 self.assertEqual(p.auth_length, auth_length)
902 else:
903 self.assertEqual(p.auth_length, 0)
904 self.assertEqual(p.call_id, call_id)
906 return
908 def generate_bind(self, call_id,
909 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
910 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
911 max_xmit_frag=5840,
912 max_recv_frag=5840,
913 assoc_group_id=0,
914 ctx_list=[],
915 auth_info="",
916 ndr_print=None, hexdump=None):
918 b = samba.dcerpc.dcerpc.bind()
919 b.max_xmit_frag = max_xmit_frag
920 b.max_recv_frag = max_recv_frag
921 b.assoc_group_id = assoc_group_id
922 b.num_contexts = len(ctx_list)
923 b.ctx_list = ctx_list
924 b.auth_info = auth_info
926 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
927 pfc_flags=pfc_flags,
928 call_id=call_id,
929 payload=b,
930 ndr_print=ndr_print, hexdump=hexdump)
932 return p
934 def generate_alter(self, call_id,
935 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
936 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
937 max_xmit_frag=5840,
938 max_recv_frag=5840,
939 assoc_group_id=0,
940 ctx_list=[],
941 auth_info="",
942 ndr_print=None, hexdump=None):
944 a = samba.dcerpc.dcerpc.bind()
945 a.max_xmit_frag = max_xmit_frag
946 a.max_recv_frag = max_recv_frag
947 a.assoc_group_id = assoc_group_id
948 a.num_contexts = len(ctx_list)
949 a.ctx_list = ctx_list
950 a.auth_info = auth_info
952 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
953 pfc_flags=pfc_flags,
954 call_id=call_id,
955 payload=a,
956 ndr_print=ndr_print, hexdump=hexdump)
958 return p
960 def generate_auth3(self, call_id,
961 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
962 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
963 auth_info="",
964 ndr_print=None, hexdump=None):
966 a = samba.dcerpc.dcerpc.auth3()
967 a.auth_info = auth_info
969 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
970 pfc_flags=pfc_flags,
971 call_id=call_id,
972 payload=a,
973 ndr_print=ndr_print, hexdump=hexdump)
975 return p
977 def generate_request(self, call_id,
978 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
979 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
980 alloc_hint=None,
981 context_id=None,
982 opnum=None,
983 object=None,
984 stub=None,
985 auth_info="",
986 ndr_print=None, hexdump=None):
988 if alloc_hint is None:
989 alloc_hint = len(stub)
991 r = samba.dcerpc.dcerpc.request()
992 r.alloc_hint = alloc_hint
993 r.context_id = context_id
994 r.opnum = opnum
995 if object is not None:
996 r.object = object
997 r.stub_and_verifier = stub + auth_info
999 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
1000 pfc_flags=pfc_flags,
1001 call_id=call_id,
1002 payload=r,
1003 ndr_print=ndr_print, hexdump=hexdump)
1005 if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
1006 p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
1008 return p
1010 def generate_co_cancel(self, call_id,
1011 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1012 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
1013 auth_info="",
1014 ndr_print=None, hexdump=None):
1016 c = samba.dcerpc.dcerpc.co_cancel()
1017 c.auth_info = auth_info
1019 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
1020 pfc_flags=pfc_flags,
1021 call_id=call_id,
1022 payload=c,
1023 ndr_print=ndr_print, hexdump=hexdump)
1025 return p
1027 def generate_orphaned(self, call_id,
1028 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1029 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
1030 auth_info="",
1031 ndr_print=None, hexdump=None):
1033 o = samba.dcerpc.dcerpc.orphaned()
1034 o.auth_info = auth_info
1036 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
1037 pfc_flags=pfc_flags,
1038 call_id=call_id,
1039 payload=o,
1040 ndr_print=ndr_print, hexdump=hexdump)
1042 return p
1044 def generate_shutdown(self, call_id,
1045 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1046 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
1047 ndr_print=None, hexdump=None):
1049 s = samba.dcerpc.dcerpc.shutdown()
1051 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
1052 pfc_flags=pfc_flags,
1053 call_id=call_id,
1054 payload=s,
1055 ndr_print=ndr_print, hexdump=hexdump)
1057 return p
1059 def assertIsConnected(self):
1060 self.assertIsNotNone(self.s, msg="Not connected")
1061 return
1063 def assertNotConnected(self):
1064 self.assertIsNone(self.s, msg="Is connected")
1065 return
1067 def assertNDRSyntaxEquals(self, s1, s2):
1068 self.assertEqual(s1.uuid, s2.uuid)
1069 self.assertEqual(s1.if_version, s2.if_version)
1070 return
1072 class ValidNetbiosNameTests(TestCase):
1074 def test_valid(self):
1075 self.assertTrue(samba.valid_netbios_name("FOO"))
1077 def test_too_long(self):
1078 self.assertFalse(samba.valid_netbios_name("FOO"*10))
1080 def test_invalid_characters(self):
1081 self.assertFalse(samba.valid_netbios_name("*BLA"))
1084 class BlackboxProcessError(Exception):
1085 """This is raised when check_output() process returns a non-zero exit status
1087 Exception instance should contain the exact exit code (S.returncode),
1088 command line (S.cmd), process output (S.stdout) and process error stream
1089 (S.stderr)
1092 def __init__(self, returncode, cmd, stdout, stderr):
1093 self.returncode = returncode
1094 self.cmd = cmd
1095 self.stdout = stdout
1096 self.stderr = stderr
1098 def __str__(self):
1099 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
1100 self.stdout, self.stderr)
1102 class BlackboxTestCase(TestCaseInTempDir):
1103 """Base test case for blackbox tests."""
1105 def _make_cmdline(self, line):
1106 bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin"))
1107 parts = line.split(" ")
1108 if os.path.exists(os.path.join(bindir, parts[0])):
1109 parts[0] = os.path.join(bindir, parts[0])
1110 line = " ".join(parts)
1111 return line
1113 def check_run(self, line):
1114 line = self._make_cmdline(line)
1115 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
1116 retcode = p.wait()
1117 if retcode:
1118 raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
1120 def check_output(self, line):
1121 line = self._make_cmdline(line)
1122 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
1123 retcode = p.wait()
1124 if retcode:
1125 raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
1126 return p.stdout.read()
1129 def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
1130 flags=0, ldb_options=None, ldap_only=False, global_schema=True):
1131 """Create SamDB instance and connects to samdb_url database.
1133 :param samdb_url: Url for database to connect to.
1134 :param lp: Optional loadparm object
1135 :param session_info: Optional session information
1136 :param credentials: Optional credentials, defaults to anonymous.
1137 :param flags: Optional LDB flags
1138 :param ldap_only: If set, only remote LDAP connection will be created.
1139 :param global_schema: Whether to use global schema.
1141 Added value for tests is that we have a shorthand function
1142 to make proper URL for ldb.connect() while using default
1143 parameters for connection based on test environment
1145 if not "://" in samdb_url:
1146 if not ldap_only and os.path.isfile(samdb_url):
1147 samdb_url = "tdb://%s" % samdb_url
1148 else:
1149 samdb_url = "ldap://%s" % samdb_url
1150 # use 'paged_search' module when connecting remotely
1151 if samdb_url.startswith("ldap://"):
1152 ldb_options = ["modules:paged_searches"]
1153 elif ldap_only:
1154 raise AssertionError("Trying to connect to %s while remote "
1155 "connection is required" % samdb_url)
1157 # set defaults for test environment
1158 if lp is None:
1159 lp = env_loadparm()
1160 if session_info is None:
1161 session_info = samba.auth.system_session(lp)
1162 if credentials is None:
1163 credentials = cmdline_credentials
1165 return SamDB(url=samdb_url,
1166 lp=lp,
1167 session_info=session_info,
1168 credentials=credentials,
1169 flags=flags,
1170 options=ldb_options,
1171 global_schema=global_schema)
1174 def connect_samdb_ex(samdb_url, lp=None, session_info=None, credentials=None,
1175 flags=0, ldb_options=None, ldap_only=False):
1176 """Connects to samdb_url database
1178 :param samdb_url: Url for database to connect to.
1179 :param lp: Optional loadparm object
1180 :param session_info: Optional session information
1181 :param credentials: Optional credentials, defaults to anonymous.
1182 :param flags: Optional LDB flags
1183 :param ldap_only: If set, only remote LDAP connection will be created.
1184 :return: (sam_db_connection, rootDse_record) tuple
1186 sam_db = connect_samdb(samdb_url, lp, session_info, credentials,
1187 flags, ldb_options, ldap_only)
1188 # fetch RootDse
1189 res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE,
1190 attrs=["*"])
1191 return (sam_db, res[0])
1194 def connect_samdb_env(env_url, env_username, env_password, lp=None):
1195 """Connect to SamDB by getting URL and Credentials from environment
1197 :param env_url: Environment variable name to get lsb url from
1198 :param env_username: Username environment variable
1199 :param env_password: Password environment variable
1200 :return: sam_db_connection
1202 samdb_url = env_get_var_value(env_url)
1203 creds = credentials.Credentials()
1204 if lp is None:
1205 # guess Credentials parameters here. Otherwise workstation
1206 # and domain fields are NULL and gencache code segfalts
1207 lp = param.LoadParm()
1208 creds.guess(lp)
1209 creds.set_username(env_get_var_value(env_username))
1210 creds.set_password(env_get_var_value(env_password))
1211 return connect_samdb(samdb_url, credentials=creds, lp=lp)
1214 def delete_force(samdb, dn):
1215 try:
1216 samdb.delete(dn)
1217 except ldb.LdbError, (num, errstr):
1218 assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr