py:dcerpc/raw_testcase: add start_with_alter to do_generic_bind()
[Samba.git] / python / samba / tests / dcerpc / raw_testcase.py
blobbda13d8d4ccc4b96e34470bd3d67c733dd4f5df4
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import samba.dcerpc.dcerpc as dcerpc
22 import samba.dcerpc.base
23 import samba.dcerpc.epmapper
24 import samba.tests
25 from samba import gensec
26 from samba.credentials import Credentials
27 from samba.tests import TestCase
28 from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
29 from samba.compat import text_type
32 class RawDCERPCTest(TestCase):
33 """A raw DCE/RPC Test case."""
35 def _disconnect(self, reason):
36 if self.s is None:
37 return
38 self.s.close()
39 self.s = None
40 if self.do_hexdump:
41 sys.stderr.write("disconnect[%s]\n" % reason)
43 def connect(self):
44 try:
45 self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
46 socket.SOCK_STREAM, socket.SOL_TCP,
48 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
49 self.s.settimeout(10)
50 self.s.connect(self.a[0][4])
51 except socket.error as e:
52 self.s.close()
53 raise
54 except IOError as e:
55 self.s.close()
56 raise
57 except Exception as e:
58 raise
59 finally:
60 pass
62 def setUp(self):
63 super(RawDCERPCTest, self).setUp()
64 self.do_ndr_print = False
65 self.do_hexdump = False
67 self.ignore_random_pad = samba.tests.env_get_var_value('IGNORE_RANDOM_PAD',
68 allow_missing=True)
69 self.host = samba.tests.env_get_var_value('SERVER')
70 self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
71 if self.target_hostname is None:
72 self.target_hostname = self.host
73 self.tcp_port = 135
75 self.settings = {}
76 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
77 self.settings["target_hostname"] = self.target_hostname
79 self.connect()
81 def tearDown(self):
82 self._disconnect("tearDown")
83 super(TestCase, self).tearDown()
85 def noop(self):
86 return
88 def second_connection(self, tcp_port=None):
89 c = RawDCERPCTest(methodName='noop')
90 c.do_ndr_print = self.do_ndr_print
91 c.do_hexdump = self.do_hexdump
92 c.ignore_random_pad = self.ignore_random_pad
94 c.host = self.host
95 c.target_hostname = self.target_hostname
96 if tcp_port is not None:
97 c.tcp_port = tcp_port
98 else:
99 c.tcp_port = self.tcp_port
101 c.settings = self.settings
103 c.connect()
104 return c
106 def get_user_creds(self):
107 c = Credentials()
108 c.guess()
109 domain = samba.tests.env_get_var_value('DOMAIN')
110 realm = samba.tests.env_get_var_value('REALM')
111 username = samba.tests.env_get_var_value('USERNAME')
112 password = samba.tests.env_get_var_value('PASSWORD')
113 c.set_domain(domain)
114 c.set_realm(realm)
115 c.set_username(username)
116 c.set_password(password)
117 return c
119 def get_anon_creds(self):
120 c = Credentials()
121 c.set_anonymous()
122 return c
124 def get_auth_context_creds(self, creds, auth_type, auth_level,
125 auth_context_id,
126 g_auth_level=None,
127 hdr_signing=False):
129 if g_auth_level is None:
130 g_auth_level = auth_level
132 g = gensec.Security.start_client(self.settings)
133 g.set_credentials(creds)
134 g.want_feature(gensec.FEATURE_DCE_STYLE)
135 g.start_mech_by_authtype(auth_type, g_auth_level)
137 if auth_type == dcerpc.DCERPC_AUTH_TYPE_KRB5:
138 expect_3legs = True
139 elif auth_type == dcerpc.DCERPC_AUTH_TYPE_NTLMSSP:
140 expect_3legs = True
141 else:
142 expect_3legs = False
144 auth_context = {}
145 auth_context["auth_type"] = auth_type
146 auth_context["auth_level"] = auth_level
147 auth_context["auth_context_id"] = auth_context_id
148 auth_context["g_auth_level"] = g_auth_level
149 auth_context["gensec"] = g
150 auth_context["hdr_signing"] = hdr_signing
151 auth_context["expect_3legs"] = expect_3legs
153 return auth_context
155 def do_generic_bind(self, ctx, auth_context=None,
156 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
157 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
158 assoc_group_id=0, call_id=0,
159 nak_reason=None, alter_fault=None,
160 start_with_alter=False):
161 ctx_list = [ctx]
163 if auth_context is not None:
164 if auth_context['hdr_signing']:
165 pfc_flags |= dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN
167 expect_3legs = auth_context["expect_3legs"]
169 from_server = b""
170 (finished, to_server) = auth_context["gensec"].update(from_server)
171 self.assertFalse(finished)
173 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
174 auth_level=auth_context["auth_level"],
175 auth_context_id=auth_context["auth_context_id"],
176 auth_blob=to_server)
177 else:
178 auth_info = b""
180 if start_with_alter:
181 req = self.generate_alter(call_id=call_id,
182 pfc_flags=pfc_flags,
183 ctx_list=ctx_list,
184 assoc_group_id=0xffffffff - assoc_group_id,
185 auth_info=auth_info)
186 self.send_pdu(req)
187 rep = self.recv_pdu()
188 if alter_fault is not None:
189 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
190 pfc_flags=req.pfc_flags |
191 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
192 auth_length=0)
193 self.assertNotEquals(rep.u.alloc_hint, 0)
194 self.assertEquals(rep.u.context_id, 0)
195 self.assertEquals(rep.u.cancel_count, 0)
196 self.assertEquals(rep.u.flags, 0)
197 self.assertEquals(rep.u.status, alter_fault)
198 self.assertEquals(rep.u.reserved, 0)
199 self.assertEquals(len(rep.u.error_and_verifier), 0)
200 return None
201 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
202 pfc_flags=req.pfc_flags)
203 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
204 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
205 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
206 self.assertEquals(rep.u.secondary_address_size, 0)
207 self.assertEquals(rep.u.secondary_address, '')
208 self.assertPadding(rep.u._pad1, 2)
209 else:
210 req = self.generate_bind(call_id=call_id,
211 pfc_flags=pfc_flags,
212 ctx_list=ctx_list,
213 assoc_group_id=assoc_group_id,
214 auth_info=auth_info)
215 self.send_pdu(req)
216 rep = self.recv_pdu()
217 if nak_reason is not None:
218 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
219 auth_length=0)
220 self.assertEquals(rep.u.reject_reason, nak_reason)
221 self.assertEquals(rep.u.num_versions, 1)
222 self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
223 self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
224 self.assertPadding(rep.u._pad, 3)
225 return
226 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
227 pfc_flags=pfc_flags)
228 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
229 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
230 if assoc_group_id != 0:
231 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
232 else:
233 self.assertNotEquals(rep.u.assoc_group_id, 0)
234 assoc_group_id = rep.u.assoc_group_id
235 port_str = "%d" % self.tcp_port
236 port_len = len(port_str) + 1
237 mod_len = (2 + port_len) % 4
238 if mod_len != 0:
239 port_pad = 4 - mod_len
240 else:
241 port_pad = 0
242 self.assertEquals(rep.u.secondary_address_size, port_len)
243 self.assertEquals(rep.u.secondary_address, port_str)
244 self.assertPadding(rep.u._pad1, port_pad)
245 self.assertEquals(rep.u.num_results, 1)
246 self.assertEquals(rep.u.ctx_list[0].result,
247 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
248 self.assertEquals(rep.u.ctx_list[0].reason,
249 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
250 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
251 ack = rep
252 if auth_context is None:
253 self.assertEquals(rep.auth_length, 0)
254 self.assertEquals(len(rep.u.auth_info), 0)
255 return ack
256 self.assertNotEquals(rep.auth_length, 0)
257 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
258 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
260 a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
262 from_server = a.credentials
263 (finished, to_server) = auth_context["gensec"].update(from_server)
264 if expect_3legs:
265 self.assertTrue(finished)
266 if auth_context['hdr_signing']:
267 auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
268 else:
269 self.assertFalse(finished)
271 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
272 auth_level=auth_context["auth_level"],
273 auth_context_id=auth_context["auth_context_id"],
274 auth_blob=to_server)
275 req = self.generate_alter(call_id=call_id,
276 ctx_list=ctx_list,
277 assoc_group_id=0xffffffff - assoc_group_id,
278 auth_info=auth_info)
279 self.send_pdu(req)
280 rep = self.recv_pdu()
281 if alter_fault is not None:
282 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
283 pfc_flags=req.pfc_flags |
284 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
285 auth_length=0)
286 self.assertNotEquals(rep.u.alloc_hint, 0)
287 self.assertEquals(rep.u.context_id, 0)
288 self.assertEquals(rep.u.cancel_count, 0)
289 self.assertEquals(rep.u.flags, 0)
290 self.assertEquals(rep.u.status, alter_fault)
291 self.assertEquals(rep.u.reserved, 0)
292 self.assertEquals(len(rep.u.error_and_verifier), 0)
293 return None
294 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
295 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
296 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
297 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
298 self.assertEquals(rep.u.secondary_address_size, 0)
299 self.assertEquals(rep.u.secondary_address, '')
300 self.assertPadding(rep.u._pad1, 2)
301 self.assertEquals(rep.u.num_results, 1)
302 self.assertEquals(rep.u.ctx_list[0].result,
303 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
304 self.assertEquals(rep.u.ctx_list[0].reason,
305 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
306 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
307 if finished:
308 self.assertEquals(rep.auth_length, 0)
309 else:
310 self.assertNotEquals(rep.auth_length, 0)
311 self.assertGreaterEqual(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
312 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
314 a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
316 if finished:
317 return ack
319 from_server = a.credentials
320 (finished, to_server) = auth_context["gensec"].update(from_server)
321 self.assertTrue(finished)
322 if auth_context['hdr_signing']:
323 auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
325 return ack
327 def prepare_presentation(self, abstract, transfer, object=None,
328 context_id=0xffff, epmap=False, auth_context=None,
329 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
330 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
331 assoc_group_id=0,
332 return_ack=False):
333 if epmap:
334 self.epmap_reconnect(abstract, transfer=transfer, object=object)
336 tsf1_list = [transfer]
337 ctx = samba.dcerpc.dcerpc.ctx_list()
338 ctx.context_id = context_id
339 ctx.num_transfer_syntaxes = len(tsf1_list)
340 ctx.abstract_syntax = abstract
341 ctx.transfer_syntaxes = tsf1_list
343 ack = self.do_generic_bind(ctx=ctx,
344 auth_context=auth_context,
345 pfc_flags=pfc_flags,
346 assoc_group_id=assoc_group_id)
347 if ack is None:
348 ctx = None
350 if return_ack:
351 return (ctx, ack)
352 return ctx
354 def do_single_request(self, call_id, ctx, io,
355 auth_context=None,
356 object=None,
357 bigendian=False, ndr64=False,
358 allow_remaining=False,
359 send_req=True,
360 recv_rep=True,
361 fault_pfc_flags=(
362 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
363 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
364 fault_status=None,
365 fault_context_id=None,
366 timeout=None,
367 ndr_print=None,
368 hexdump=None):
370 if fault_context_id is None:
371 fault_context_id = ctx.context_id
373 if ndr_print is None:
374 ndr_print = self.do_ndr_print
375 if hexdump is None:
376 hexdump = self.do_hexdump
378 if send_req:
379 if ndr_print:
380 sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
381 stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
382 if hexdump:
383 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
385 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
386 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
387 if object is not None:
388 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
390 req = self.generate_request_auth(call_id=call_id,
391 context_id=ctx.context_id,
392 pfc_flags=pfc_flags,
393 object=object,
394 opnum=io.opnum(),
395 stub=stub_in,
396 auth_context=auth_context)
397 if send_req:
398 self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
399 if recv_rep:
400 (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
401 ndr_print=ndr_print,
402 hexdump=hexdump)
403 if fault_status:
404 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
405 pfc_flags=fault_pfc_flags, auth_length=0)
406 self.assertNotEquals(rep.u.alloc_hint, 0)
407 self.assertEquals(rep.u.context_id, fault_context_id)
408 self.assertEquals(rep.u.cancel_count, 0)
409 self.assertEquals(rep.u.flags, 0)
410 self.assertEquals(rep.u.status, fault_status)
411 self.assertEquals(rep.u.reserved, 0)
412 self.assertEquals(len(rep.u.error_and_verifier), 0)
413 return
415 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
416 auth_length=req.auth_length)
417 self.assertNotEquals(rep.u.alloc_hint, 0)
418 self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
419 self.assertEquals(rep.u.cancel_count, 0)
420 self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
421 stub_out = self.check_response_auth(rep, rep_blob, auth_context)
422 self.assertEqual(len(stub_out), rep.u.alloc_hint)
424 if hexdump:
425 sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
426 ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
427 allow_remaining=allow_remaining)
428 if ndr_print:
429 sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
431 def epmap_reconnect(self, abstract, transfer=None, object=None):
432 ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
434 if transfer is None:
435 transfer = ndr32
437 if object is None:
438 object = samba.dcerpc.misc.GUID()
440 ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
441 transfer, context_id=0)
443 data1 = ndr_pack(abstract)
444 lhs1 = samba.dcerpc.epmapper.epm_lhs()
445 lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
446 lhs1.lhs_data = data1[:18]
447 rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
448 rhs1.unknown = data1[18:]
449 floor1 = samba.dcerpc.epmapper.epm_floor()
450 floor1.lhs = lhs1
451 floor1.rhs = rhs1
452 data2 = ndr_pack(transfer)
453 lhs2 = samba.dcerpc.epmapper.epm_lhs()
454 lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
455 lhs2.lhs_data = data2[:18]
456 rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
457 rhs2.unknown = data1[18:]
458 floor2 = samba.dcerpc.epmapper.epm_floor()
459 floor2.lhs = lhs2
460 floor2.rhs = rhs2
461 lhs3 = samba.dcerpc.epmapper.epm_lhs()
462 lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
463 lhs3.lhs_data = b""
464 floor3 = samba.dcerpc.epmapper.epm_floor()
465 floor3.lhs = lhs3
466 floor3.rhs.minor_version = 0
467 lhs4 = samba.dcerpc.epmapper.epm_lhs()
468 lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
469 lhs4.lhs_data = b""
470 floor4 = samba.dcerpc.epmapper.epm_floor()
471 floor4.lhs = lhs4
472 floor4.rhs.port = self.tcp_port
473 lhs5 = samba.dcerpc.epmapper.epm_lhs()
474 lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
475 lhs5.lhs_data = b""
476 floor5 = samba.dcerpc.epmapper.epm_floor()
477 floor5.lhs = lhs5
478 floor5.rhs.ipaddr = "0.0.0.0"
480 floors = [floor1, floor2, floor3, floor4, floor5]
481 req_tower = samba.dcerpc.epmapper.epm_tower()
482 req_tower.num_floors = len(floors)
483 req_tower.floors = floors
484 req_twr = samba.dcerpc.epmapper.epm_twr_t()
485 req_twr.tower = req_tower
487 epm_map = samba.dcerpc.epmapper.epm_Map()
488 epm_map.in_object = object
489 epm_map.in_map_tower = req_twr
490 epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
491 epm_map.in_max_towers = 4
493 self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
495 self.assertGreaterEqual(epm_map.out_num_towers, 1)
496 rep_twr = epm_map.out_towers[0].twr
497 self.assertIsNotNone(rep_twr)
498 self.assertEqual(rep_twr.tower_length, 75)
499 self.assertEqual(rep_twr.tower.num_floors, 5)
500 self.assertEqual(len(rep_twr.tower.floors), 5)
501 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
502 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
503 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
504 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
506 # reconnect to the given port
507 self._disconnect("epmap_reconnect")
508 self.tcp_port = rep_twr.tower.floors[3].rhs.port
509 self.connect()
511 def send_pdu(self, req, ndr_print=None, hexdump=None):
512 if ndr_print is None:
513 ndr_print = self.do_ndr_print
514 if hexdump is None:
515 hexdump = self.do_hexdump
516 try:
517 req_pdu = ndr_pack(req)
518 if ndr_print:
519 sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
520 if hexdump:
521 sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
522 while True:
523 sent = self.s.send(req_pdu, 0)
524 if sent == len(req_pdu):
525 break
526 req_pdu = req_pdu[sent:]
527 except socket.error as e:
528 self._disconnect("send_pdu: %s" % e)
529 raise
530 except IOError as e:
531 self._disconnect("send_pdu: %s" % e)
532 raise
533 finally:
534 pass
536 def recv_raw(self, hexdump=None, timeout=None):
537 rep_pdu = None
538 if hexdump is None:
539 hexdump = self.do_hexdump
540 try:
541 if timeout is not None:
542 self.s.settimeout(timeout)
543 rep_pdu = self.s.recv(0xffff, 0)
544 self.s.settimeout(10)
545 if len(rep_pdu) == 0:
546 self._disconnect("recv_raw: EOF")
547 return None
548 if hexdump:
549 sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
550 except socket.timeout as e:
551 self.s.settimeout(10)
552 sys.stderr.write("recv_raw: TIMEOUT\n")
553 pass
554 except socket.error as e:
555 self._disconnect("recv_raw: %s" % e)
556 raise
557 except IOError as e:
558 self._disconnect("recv_raw: %s" % e)
559 raise
560 finally:
561 pass
562 return rep_pdu
564 def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
565 rep_pdu = None
566 rep = None
567 if ndr_print is None:
568 ndr_print = self.do_ndr_print
569 if hexdump is None:
570 hexdump = self.do_hexdump
571 try:
572 rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
573 if rep_pdu is None:
574 return (None, None)
575 rep = ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
576 if ndr_print:
577 sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
578 self.assertEqual(rep.frag_length, len(rep_pdu))
579 finally:
580 pass
581 return (rep, rep_pdu)
583 def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
584 (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
585 hexdump=hexdump,
586 timeout=timeout)
587 return rep
589 def generate_auth(self,
590 auth_type=None,
591 auth_level=None,
592 auth_pad_length=0,
593 auth_context_id=None,
594 auth_blob=None,
595 ndr_print=None, hexdump=None):
596 if ndr_print is None:
597 ndr_print = self.do_ndr_print
598 if hexdump is None:
599 hexdump = self.do_hexdump
601 if auth_type is not None:
602 a = samba.dcerpc.dcerpc.auth()
603 a.auth_type = auth_type
604 a.auth_level = auth_level
605 a.auth_pad_length = auth_pad_length
606 a.auth_context_id = auth_context_id
607 a.credentials = auth_blob
609 ai = ndr_pack(a)
610 if ndr_print:
611 sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
612 if hexdump:
613 sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
614 else:
615 ai = b""
617 return ai
619 def parse_auth(self, auth_info, ndr_print=None, hexdump=None,
620 auth_context=None, stub_len=0):
621 if ndr_print is None:
622 ndr_print = self.do_ndr_print
623 if hexdump is None:
624 hexdump = self.do_hexdump
626 if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
627 return None
629 if hexdump:
630 sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
631 a = ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
632 if ndr_print:
633 sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
635 if auth_context is not None:
636 self.assertEquals(a.auth_type, auth_context["auth_type"])
637 self.assertEquals(a.auth_level, auth_context["auth_level"])
638 self.assertEquals(a.auth_reserved, 0)
639 self.assertEquals(a.auth_context_id, auth_context["auth_context_id"])
641 self.assertLessEqual(a.auth_pad_length, dcerpc.DCERPC_AUTH_PAD_ALIGNMENT)
642 self.assertLessEqual(a.auth_pad_length, stub_len)
644 return a
646 def check_response_auth(self, rep, rep_blob, auth_context=None,
647 auth_pad_length=None):
649 if auth_context is None:
650 self.assertEquals(rep.auth_length, 0)
651 return rep.u.stub_and_verifier
653 ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
654 ofs_sig = rep.frag_length - rep.auth_length
655 ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
656 rep_data = rep_blob[ofs_stub:ofs_trailer]
657 rep_whole = rep_blob[0:ofs_sig]
658 rep_sig = rep_blob[ofs_sig:]
659 rep_auth_info_blob = rep_blob[ofs_trailer:]
661 rep_auth_info = self.parse_auth(rep_auth_info_blob,
662 auth_context=auth_context,
663 stub_len=len(rep_data))
664 if auth_pad_length is not None:
665 self.assertEquals(rep_auth_info.auth_pad_length, auth_pad_length)
666 self.assertEquals(rep_auth_info.credentials, rep_sig)
668 if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
669 # TODO: not yet supported here
670 self.assertTrue(False)
671 elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
672 auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
674 stub_out = rep_data[0:len(rep_data)-rep_auth_info.auth_pad_length]
676 return stub_out
678 def generate_pdu(self, ptype, call_id, payload,
679 rpc_vers=5,
680 rpc_vers_minor=0,
681 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
682 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
683 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
684 ndr_print=None, hexdump=None):
686 if getattr(payload, 'auth_info', None):
687 ai = payload.auth_info
688 else:
689 ai = b""
691 p = samba.dcerpc.dcerpc.ncacn_packet()
692 p.rpc_vers = rpc_vers
693 p.rpc_vers_minor = rpc_vers_minor
694 p.ptype = ptype
695 p.pfc_flags = pfc_flags
696 p.drep = drep
697 p.frag_length = 0
698 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
699 p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
700 else:
701 p.auth_length = 0
702 p.call_id = call_id
703 p.u = payload
705 pdu = ndr_pack(p)
706 p.frag_length = len(pdu)
708 return p
710 def generate_request_auth(self, call_id,
711 pfc_flags=(dcerpc.DCERPC_PFC_FLAG_FIRST |
712 dcerpc.DCERPC_PFC_FLAG_LAST),
713 alloc_hint=None,
714 context_id=None,
715 opnum=None,
716 object=None,
717 stub=None,
718 auth_context=None,
719 ndr_print=None, hexdump=None):
721 if stub is None:
722 stub = b""
724 sig_size = 0
725 if auth_context is not None:
726 mod_len = len(stub) % dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
727 auth_pad_length = 0
728 if mod_len > 0:
729 auth_pad_length = dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
730 stub += b'\x00' * auth_pad_length
732 if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
733 sig_size = auth_context["gensec"].sig_size(len(stub))
734 else:
735 sig_size = 16
737 zero_sig = b"\x00" * sig_size
738 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
739 auth_level=auth_context["auth_level"],
740 auth_pad_length=auth_pad_length,
741 auth_context_id=auth_context["auth_context_id"],
742 auth_blob=zero_sig)
743 else:
744 auth_info = b""
746 req = self.generate_request(call_id=call_id,
747 pfc_flags=pfc_flags,
748 alloc_hint=alloc_hint,
749 context_id=context_id,
750 opnum=opnum,
751 object=object,
752 stub=stub,
753 auth_info=auth_info,
754 ndr_print=ndr_print,
755 hexdump=hexdump)
756 if auth_context is None:
757 return req
759 req_blob = samba.ndr.ndr_pack(req)
760 ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
761 ofs_sig = len(req_blob) - req.auth_length
762 ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
763 req_data = req_blob[ofs_stub:ofs_trailer]
764 req_whole = req_blob[0:ofs_sig]
766 if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
767 # TODO: not yet supported here
768 self.assertTrue(False)
769 elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
770 req_sig = auth_context["gensec"].sign_packet(req_data, req_whole)
771 else:
772 return req
773 self.assertEquals(len(req_sig), req.auth_length)
774 self.assertEquals(len(req_sig), sig_size)
776 stub_sig_ofs = len(req.u.stub_and_verifier) - sig_size
777 stub = req.u.stub_and_verifier[0:stub_sig_ofs] + req_sig
778 req.u.stub_and_verifier = stub
780 return req
782 def verify_pdu(self, p, ptype, call_id,
783 rpc_vers=5,
784 rpc_vers_minor=0,
785 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
786 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
787 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
788 auth_length=None):
790 self.assertIsNotNone(p, "No valid pdu")
792 if getattr(p.u, 'auth_info', None):
793 ai = p.u.auth_info
794 else:
795 ai = b""
797 self.assertEqual(p.rpc_vers, rpc_vers)
798 self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
799 self.assertEqual(p.ptype, ptype)
800 self.assertEqual(p.pfc_flags, pfc_flags)
801 self.assertEqual(p.drep, drep)
802 self.assertGreaterEqual(p.frag_length,
803 samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
804 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
805 self.assertEqual(p.auth_length,
806 len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
807 elif auth_length is not None:
808 self.assertEqual(p.auth_length, auth_length)
809 else:
810 self.assertEqual(p.auth_length, 0)
811 self.assertEqual(p.call_id, call_id)
813 return
815 def generate_bind(self, call_id,
816 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
817 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
818 max_xmit_frag=5840,
819 max_recv_frag=5840,
820 assoc_group_id=0,
821 ctx_list=[],
822 auth_info=b"",
823 ndr_print=None, hexdump=None):
825 b = samba.dcerpc.dcerpc.bind()
826 b.max_xmit_frag = max_xmit_frag
827 b.max_recv_frag = max_recv_frag
828 b.assoc_group_id = assoc_group_id
829 b.num_contexts = len(ctx_list)
830 b.ctx_list = ctx_list
831 b.auth_info = auth_info
833 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
834 pfc_flags=pfc_flags,
835 call_id=call_id,
836 payload=b,
837 ndr_print=ndr_print, hexdump=hexdump)
839 return p
841 def generate_alter(self, call_id,
842 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
843 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
844 max_xmit_frag=5840,
845 max_recv_frag=5840,
846 assoc_group_id=0,
847 ctx_list=[],
848 auth_info=b"",
849 ndr_print=None, hexdump=None):
851 a = samba.dcerpc.dcerpc.bind()
852 a.max_xmit_frag = max_xmit_frag
853 a.max_recv_frag = max_recv_frag
854 a.assoc_group_id = assoc_group_id
855 a.num_contexts = len(ctx_list)
856 a.ctx_list = ctx_list
857 a.auth_info = auth_info
859 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
860 pfc_flags=pfc_flags,
861 call_id=call_id,
862 payload=a,
863 ndr_print=ndr_print, hexdump=hexdump)
865 return p
867 def generate_auth3(self, call_id,
868 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
869 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
870 auth_info=b"",
871 ndr_print=None, hexdump=None):
873 a = samba.dcerpc.dcerpc.auth3()
874 a.auth_info = auth_info
876 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
877 pfc_flags=pfc_flags,
878 call_id=call_id,
879 payload=a,
880 ndr_print=ndr_print, hexdump=hexdump)
882 return p
884 def generate_request(self, call_id,
885 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
886 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
887 alloc_hint=None,
888 context_id=None,
889 opnum=None,
890 object=None,
891 stub=None,
892 auth_info=b"",
893 ndr_print=None, hexdump=None):
895 if alloc_hint is None:
896 alloc_hint = len(stub)
898 r = samba.dcerpc.dcerpc.request()
899 r.alloc_hint = alloc_hint
900 r.context_id = context_id
901 r.opnum = opnum
902 if object is not None:
903 r.object = object
904 r.stub_and_verifier = stub + auth_info
906 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
907 pfc_flags=pfc_flags,
908 call_id=call_id,
909 payload=r,
910 ndr_print=ndr_print, hexdump=hexdump)
912 if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
913 p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
915 return p
917 def generate_co_cancel(self, call_id,
918 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
919 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
920 auth_info=b"",
921 ndr_print=None, hexdump=None):
923 c = samba.dcerpc.dcerpc.co_cancel()
924 c.auth_info = auth_info
926 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
927 pfc_flags=pfc_flags,
928 call_id=call_id,
929 payload=c,
930 ndr_print=ndr_print, hexdump=hexdump)
932 return p
934 def generate_orphaned(self, call_id,
935 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
936 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
937 auth_info=b"",
938 ndr_print=None, hexdump=None):
940 o = samba.dcerpc.dcerpc.orphaned()
941 o.auth_info = auth_info
943 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
944 pfc_flags=pfc_flags,
945 call_id=call_id,
946 payload=o,
947 ndr_print=ndr_print, hexdump=hexdump)
949 return p
951 def generate_shutdown(self, call_id,
952 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
953 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
954 ndr_print=None, hexdump=None):
956 s = samba.dcerpc.dcerpc.shutdown()
958 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
959 pfc_flags=pfc_flags,
960 call_id=call_id,
961 payload=s,
962 ndr_print=ndr_print, hexdump=hexdump)
964 return p
966 def assertIsConnected(self):
967 self.assertIsNotNone(self.s, msg="Not connected")
968 return
970 def assertNotConnected(self):
971 self.assertIsNone(self.s, msg="Is connected")
972 return
974 def assertNDRSyntaxEquals(self, s1, s2):
975 self.assertEqual(s1.uuid, s2.uuid)
976 self.assertEqual(s1.if_version, s2.if_version)
977 return
979 def assertPadding(self, pad, length):
980 self.assertEquals(len(pad), length)
982 # sometimes windows sends random bytes
984 # we have IGNORE_RANDOM_PAD=1 to
985 # disable the check
987 if self.ignore_random_pad:
988 return
989 zero_pad = b'\0' * length
990 self.assertEquals(pad, zero_pad)