py:dcerpc/raw_testcase: prepare do_generic_bind() for raw NTLMSSP and Kerberos authen...
[Samba.git] / python / samba / tests / dcerpc / raw_testcase.py
blob8d213e5ac79a64620e3a74512329831e10bfe527
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 import socket
21 import samba.dcerpc.dcerpc as dcerpc
22 import samba.dcerpc.base
23 import samba.dcerpc.epmapper
24 import samba.tests
25 from samba import gensec
26 from samba.credentials import Credentials
27 from samba.tests import TestCase
28 from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
29 from samba.compat import text_type
32 class RawDCERPCTest(TestCase):
33 """A raw DCE/RPC Test case."""
35 def _disconnect(self, reason):
36 if self.s is None:
37 return
38 self.s.close()
39 self.s = None
40 if self.do_hexdump:
41 sys.stderr.write("disconnect[%s]\n" % reason)
43 def connect(self):
44 try:
45 self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
46 socket.SOCK_STREAM, socket.SOL_TCP,
48 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
49 self.s.settimeout(10)
50 self.s.connect(self.a[0][4])
51 except socket.error as e:
52 self.s.close()
53 raise
54 except IOError as e:
55 self.s.close()
56 raise
57 except Exception as e:
58 raise
59 finally:
60 pass
62 def setUp(self):
63 super(RawDCERPCTest, self).setUp()
64 self.do_ndr_print = False
65 self.do_hexdump = False
67 self.ignore_random_pad = samba.tests.env_get_var_value('IGNORE_RANDOM_PAD',
68 allow_missing=True)
69 self.host = samba.tests.env_get_var_value('SERVER')
70 self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
71 if self.target_hostname is None:
72 self.target_hostname = self.host
73 self.tcp_port = 135
75 self.settings = {}
76 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
77 self.settings["target_hostname"] = self.target_hostname
79 self.connect()
81 def tearDown(self):
82 self._disconnect("tearDown")
83 super(TestCase, self).tearDown()
85 def noop(self):
86 return
88 def second_connection(self, tcp_port=None):
89 c = RawDCERPCTest(methodName='noop')
90 c.do_ndr_print = self.do_ndr_print
91 c.do_hexdump = self.do_hexdump
92 c.ignore_random_pad = self.ignore_random_pad
94 c.host = self.host
95 c.target_hostname = self.target_hostname
96 if tcp_port is not None:
97 c.tcp_port = tcp_port
98 else:
99 c.tcp_port = self.tcp_port
101 c.settings = self.settings
103 c.connect()
104 return c
106 def get_user_creds(self):
107 c = Credentials()
108 c.guess()
109 domain = samba.tests.env_get_var_value('DOMAIN')
110 realm = samba.tests.env_get_var_value('REALM')
111 username = samba.tests.env_get_var_value('USERNAME')
112 password = samba.tests.env_get_var_value('PASSWORD')
113 c.set_domain(domain)
114 c.set_realm(realm)
115 c.set_username(username)
116 c.set_password(password)
117 return c
119 def get_anon_creds(self):
120 c = Credentials()
121 c.set_anonymous()
122 return c
124 def get_auth_context_creds(self, creds, auth_type, auth_level,
125 auth_context_id,
126 g_auth_level=None):
128 if g_auth_level is None:
129 g_auth_level = auth_level
131 g = gensec.Security.start_client(self.settings)
132 g.set_credentials(creds)
133 g.want_feature(gensec.FEATURE_DCE_STYLE)
134 g.start_mech_by_authtype(auth_type, g_auth_level)
136 if auth_type == dcerpc.DCERPC_AUTH_TYPE_KRB5:
137 expect_3legs = True
138 elif auth_type == dcerpc.DCERPC_AUTH_TYPE_NTLMSSP:
139 expect_3legs = True
140 else:
141 expect_3legs = False
143 auth_context = {}
144 auth_context["auth_type"] = auth_type
145 auth_context["auth_level"] = auth_level
146 auth_context["auth_context_id"] = auth_context_id
147 auth_context["g_auth_level"] = g_auth_level
148 auth_context["gensec"] = g
149 auth_context["expect_3legs"] = expect_3legs
151 return auth_context
153 def do_generic_bind(self, ctx, auth_context=None,
154 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
155 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
156 assoc_group_id=0, call_id=0,
157 nak_reason=None, alter_fault=None):
158 ctx_list = [ctx]
160 if auth_context is not None:
161 expect_3legs = auth_context["expect_3legs"]
163 from_server = b""
164 (finished, to_server) = auth_context["gensec"].update(from_server)
165 self.assertFalse(finished)
167 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
168 auth_level=auth_context["auth_level"],
169 auth_context_id=auth_context["auth_context_id"],
170 auth_blob=to_server)
171 else:
172 auth_info = b""
174 req = self.generate_bind(call_id=call_id,
175 pfc_flags=pfc_flags,
176 ctx_list=ctx_list,
177 assoc_group_id=assoc_group_id,
178 auth_info=auth_info)
179 self.send_pdu(req)
180 rep = self.recv_pdu()
181 if nak_reason is not None:
182 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
183 auth_length=0)
184 self.assertEquals(rep.u.reject_reason, nak_reason)
185 self.assertEquals(rep.u.num_versions, 1)
186 self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
187 self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
188 self.assertPadding(rep.u._pad, 3)
189 return
190 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
191 pfc_flags=pfc_flags)
192 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
193 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
194 if assoc_group_id != 0:
195 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
196 else:
197 self.assertNotEquals(rep.u.assoc_group_id, 0)
198 assoc_group_id = rep.u.assoc_group_id
199 port_str = "%d" % self.tcp_port
200 port_len = len(port_str) + 1
201 mod_len = (2 + port_len) % 4
202 if mod_len != 0:
203 port_pad = 4 - mod_len
204 else:
205 port_pad = 0
206 self.assertEquals(rep.u.secondary_address_size, port_len)
207 self.assertEquals(rep.u.secondary_address, port_str)
208 self.assertPadding(rep.u._pad1, port_pad)
209 self.assertEquals(rep.u.num_results, 1)
210 self.assertEquals(rep.u.ctx_list[0].result,
211 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
212 self.assertEquals(rep.u.ctx_list[0].reason,
213 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
214 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
215 ack = rep
216 if auth_context is None:
217 self.assertEquals(rep.auth_length, 0)
218 self.assertEquals(len(rep.u.auth_info), 0)
219 return ack
220 self.assertNotEquals(rep.auth_length, 0)
221 self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
222 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
224 a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
226 from_server = a.credentials
227 (finished, to_server) = auth_context["gensec"].update(from_server)
228 if expect_3legs:
229 self.assertTrue(finished)
230 else:
231 self.assertFalse(finished)
233 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
234 auth_level=auth_context["auth_level"],
235 auth_context_id=auth_context["auth_context_id"],
236 auth_blob=to_server)
237 req = self.generate_alter(call_id=call_id,
238 ctx_list=ctx_list,
239 assoc_group_id=0xffffffff - assoc_group_id,
240 auth_info=auth_info)
241 self.send_pdu(req)
242 rep = self.recv_pdu()
243 if alter_fault is not None:
244 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
245 pfc_flags=req.pfc_flags |
246 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
247 auth_length=0)
248 self.assertNotEquals(rep.u.alloc_hint, 0)
249 self.assertEquals(rep.u.context_id, 0)
250 self.assertEquals(rep.u.cancel_count, 0)
251 self.assertEquals(rep.u.flags, 0)
252 self.assertEquals(rep.u.status, alter_fault)
253 self.assertEquals(rep.u.reserved, 0)
254 self.assertEquals(len(rep.u.error_and_verifier), 0)
255 return None
256 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
257 self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
258 self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
259 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
260 self.assertEquals(rep.u.secondary_address_size, 0)
261 self.assertEquals(rep.u.secondary_address, '')
262 self.assertPadding(rep.u._pad1, 2)
263 self.assertEquals(rep.u.num_results, 1)
264 self.assertEquals(rep.u.ctx_list[0].result,
265 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
266 self.assertEquals(rep.u.ctx_list[0].reason,
267 samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
268 self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
269 if finished:
270 self.assertEquals(rep.auth_length, 0)
271 else:
272 self.assertNotEquals(rep.auth_length, 0)
273 self.assertGreaterEqual(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
274 self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
276 a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
278 if finished:
279 return ack
281 from_server = a.credentials
282 (finished, to_server) = auth_context["gensec"].update(from_server)
283 self.assertTrue(finished)
285 return ack
287 def prepare_presentation(self, abstract, transfer, object=None,
288 context_id=0xffff, epmap=False, auth_context=None,
289 pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
290 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
291 assoc_group_id=0,
292 return_ack=False):
293 if epmap:
294 self.epmap_reconnect(abstract, transfer=transfer, object=object)
296 tsf1_list = [transfer]
297 ctx = samba.dcerpc.dcerpc.ctx_list()
298 ctx.context_id = context_id
299 ctx.num_transfer_syntaxes = len(tsf1_list)
300 ctx.abstract_syntax = abstract
301 ctx.transfer_syntaxes = tsf1_list
303 ack = self.do_generic_bind(ctx=ctx,
304 auth_context=auth_context,
305 pfc_flags=pfc_flags,
306 assoc_group_id=assoc_group_id)
307 if ack is None:
308 ctx = None
310 if return_ack:
311 return (ctx, ack)
312 return ctx
314 def do_single_request(self, call_id, ctx, io,
315 auth_context=None,
316 object=None,
317 bigendian=False, ndr64=False,
318 allow_remaining=False,
319 send_req=True,
320 recv_rep=True,
321 fault_pfc_flags=(
322 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
323 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
324 fault_status=None,
325 fault_context_id=None,
326 timeout=None,
327 ndr_print=None,
328 hexdump=None):
330 if fault_context_id is None:
331 fault_context_id = ctx.context_id
333 if ndr_print is None:
334 ndr_print = self.do_ndr_print
335 if hexdump is None:
336 hexdump = self.do_hexdump
338 if send_req:
339 if ndr_print:
340 sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
341 stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
342 if hexdump:
343 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
345 pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
346 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
347 if object is not None:
348 pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
350 req = self.generate_request_auth(call_id=call_id,
351 context_id=ctx.context_id,
352 pfc_flags=pfc_flags,
353 object=object,
354 opnum=io.opnum(),
355 stub=stub_in,
356 auth_context=auth_context)
357 if send_req:
358 self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
359 if recv_rep:
360 (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
361 ndr_print=ndr_print,
362 hexdump=hexdump)
363 if fault_status:
364 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
365 pfc_flags=fault_pfc_flags, auth_length=0)
366 self.assertNotEquals(rep.u.alloc_hint, 0)
367 self.assertEquals(rep.u.context_id, fault_context_id)
368 self.assertEquals(rep.u.cancel_count, 0)
369 self.assertEquals(rep.u.flags, 0)
370 self.assertEquals(rep.u.status, fault_status)
371 self.assertEquals(rep.u.reserved, 0)
372 self.assertEquals(len(rep.u.error_and_verifier), 0)
373 return
375 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
376 auth_length=req.auth_length)
377 self.assertNotEquals(rep.u.alloc_hint, 0)
378 self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
379 self.assertEquals(rep.u.cancel_count, 0)
380 self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
381 stub_out = self.check_response_auth(rep, rep_blob, auth_context)
382 self.assertEqual(len(stub_out), rep.u.alloc_hint)
384 if hexdump:
385 sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
386 ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
387 allow_remaining=allow_remaining)
388 if ndr_print:
389 sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
391 def epmap_reconnect(self, abstract, transfer=None, object=None):
392 ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
394 if transfer is None:
395 transfer = ndr32
397 if object is None:
398 object = samba.dcerpc.misc.GUID()
400 ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
401 transfer, context_id=0)
403 data1 = ndr_pack(abstract)
404 lhs1 = samba.dcerpc.epmapper.epm_lhs()
405 lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
406 lhs1.lhs_data = data1[:18]
407 rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
408 rhs1.unknown = data1[18:]
409 floor1 = samba.dcerpc.epmapper.epm_floor()
410 floor1.lhs = lhs1
411 floor1.rhs = rhs1
412 data2 = ndr_pack(transfer)
413 lhs2 = samba.dcerpc.epmapper.epm_lhs()
414 lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
415 lhs2.lhs_data = data2[:18]
416 rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
417 rhs2.unknown = data1[18:]
418 floor2 = samba.dcerpc.epmapper.epm_floor()
419 floor2.lhs = lhs2
420 floor2.rhs = rhs2
421 lhs3 = samba.dcerpc.epmapper.epm_lhs()
422 lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
423 lhs3.lhs_data = b""
424 floor3 = samba.dcerpc.epmapper.epm_floor()
425 floor3.lhs = lhs3
426 floor3.rhs.minor_version = 0
427 lhs4 = samba.dcerpc.epmapper.epm_lhs()
428 lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
429 lhs4.lhs_data = b""
430 floor4 = samba.dcerpc.epmapper.epm_floor()
431 floor4.lhs = lhs4
432 floor4.rhs.port = self.tcp_port
433 lhs5 = samba.dcerpc.epmapper.epm_lhs()
434 lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
435 lhs5.lhs_data = b""
436 floor5 = samba.dcerpc.epmapper.epm_floor()
437 floor5.lhs = lhs5
438 floor5.rhs.ipaddr = "0.0.0.0"
440 floors = [floor1, floor2, floor3, floor4, floor5]
441 req_tower = samba.dcerpc.epmapper.epm_tower()
442 req_tower.num_floors = len(floors)
443 req_tower.floors = floors
444 req_twr = samba.dcerpc.epmapper.epm_twr_t()
445 req_twr.tower = req_tower
447 epm_map = samba.dcerpc.epmapper.epm_Map()
448 epm_map.in_object = object
449 epm_map.in_map_tower = req_twr
450 epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
451 epm_map.in_max_towers = 4
453 self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
455 self.assertGreaterEqual(epm_map.out_num_towers, 1)
456 rep_twr = epm_map.out_towers[0].twr
457 self.assertIsNotNone(rep_twr)
458 self.assertEqual(rep_twr.tower_length, 75)
459 self.assertEqual(rep_twr.tower.num_floors, 5)
460 self.assertEqual(len(rep_twr.tower.floors), 5)
461 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
462 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
463 self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
464 samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
466 # reconnect to the given port
467 self._disconnect("epmap_reconnect")
468 self.tcp_port = rep_twr.tower.floors[3].rhs.port
469 self.connect()
471 def send_pdu(self, req, ndr_print=None, hexdump=None):
472 if ndr_print is None:
473 ndr_print = self.do_ndr_print
474 if hexdump is None:
475 hexdump = self.do_hexdump
476 try:
477 req_pdu = ndr_pack(req)
478 if ndr_print:
479 sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
480 if hexdump:
481 sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
482 while True:
483 sent = self.s.send(req_pdu, 0)
484 if sent == len(req_pdu):
485 break
486 req_pdu = req_pdu[sent:]
487 except socket.error as e:
488 self._disconnect("send_pdu: %s" % e)
489 raise
490 except IOError as e:
491 self._disconnect("send_pdu: %s" % e)
492 raise
493 finally:
494 pass
496 def recv_raw(self, hexdump=None, timeout=None):
497 rep_pdu = None
498 if hexdump is None:
499 hexdump = self.do_hexdump
500 try:
501 if timeout is not None:
502 self.s.settimeout(timeout)
503 rep_pdu = self.s.recv(0xffff, 0)
504 self.s.settimeout(10)
505 if len(rep_pdu) == 0:
506 self._disconnect("recv_raw: EOF")
507 return None
508 if hexdump:
509 sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
510 except socket.timeout as e:
511 self.s.settimeout(10)
512 sys.stderr.write("recv_raw: TIMEOUT\n")
513 pass
514 except socket.error as e:
515 self._disconnect("recv_raw: %s" % e)
516 raise
517 except IOError as e:
518 self._disconnect("recv_raw: %s" % e)
519 raise
520 finally:
521 pass
522 return rep_pdu
524 def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
525 rep_pdu = None
526 rep = None
527 if ndr_print is None:
528 ndr_print = self.do_ndr_print
529 if hexdump is None:
530 hexdump = self.do_hexdump
531 try:
532 rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
533 if rep_pdu is None:
534 return (None, None)
535 rep = ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
536 if ndr_print:
537 sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
538 self.assertEqual(rep.frag_length, len(rep_pdu))
539 finally:
540 pass
541 return (rep, rep_pdu)
543 def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
544 (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
545 hexdump=hexdump,
546 timeout=timeout)
547 return rep
549 def generate_auth(self,
550 auth_type=None,
551 auth_level=None,
552 auth_pad_length=0,
553 auth_context_id=None,
554 auth_blob=None,
555 ndr_print=None, hexdump=None):
556 if ndr_print is None:
557 ndr_print = self.do_ndr_print
558 if hexdump is None:
559 hexdump = self.do_hexdump
561 if auth_type is not None:
562 a = samba.dcerpc.dcerpc.auth()
563 a.auth_type = auth_type
564 a.auth_level = auth_level
565 a.auth_pad_length = auth_pad_length
566 a.auth_context_id = auth_context_id
567 a.credentials = auth_blob
569 ai = ndr_pack(a)
570 if ndr_print:
571 sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
572 if hexdump:
573 sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
574 else:
575 ai = b""
577 return ai
579 def parse_auth(self, auth_info, ndr_print=None, hexdump=None,
580 auth_context=None, stub_len=0):
581 if ndr_print is None:
582 ndr_print = self.do_ndr_print
583 if hexdump is None:
584 hexdump = self.do_hexdump
586 if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
587 return None
589 if hexdump:
590 sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
591 a = ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
592 if ndr_print:
593 sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
595 if auth_context is not None:
596 self.assertEquals(a.auth_type, auth_context["auth_type"])
597 self.assertEquals(a.auth_level, auth_context["auth_level"])
598 self.assertEquals(a.auth_reserved, 0)
599 self.assertEquals(a.auth_context_id, auth_context["auth_context_id"])
601 self.assertLessEqual(a.auth_pad_length, dcerpc.DCERPC_AUTH_PAD_ALIGNMENT)
602 self.assertLessEqual(a.auth_pad_length, stub_len)
604 return a
606 def check_response_auth(self, rep, rep_blob, auth_context=None,
607 auth_pad_length=None):
609 if auth_context is None:
610 self.assertEquals(rep.auth_length, 0)
611 return rep.u.stub_and_verifier
613 ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
614 ofs_sig = rep.frag_length - rep.auth_length
615 ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
616 rep_data = rep_blob[ofs_stub:ofs_trailer]
617 rep_whole = rep_blob[0:ofs_sig]
618 rep_sig = rep_blob[ofs_sig:]
619 rep_auth_info_blob = rep_blob[ofs_trailer:]
621 rep_auth_info = self.parse_auth(rep_auth_info_blob,
622 auth_context=auth_context,
623 stub_len=len(rep_data))
624 if auth_pad_length is not None:
625 self.assertEquals(rep_auth_info.auth_pad_length, auth_pad_length)
626 self.assertEquals(rep_auth_info.credentials, rep_sig)
628 if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
629 # TODO: not yet supported here
630 self.assertTrue(False)
631 elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
632 auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
634 stub_out = rep_data[0:len(rep_data)-rep_auth_info.auth_pad_length]
636 return stub_out
638 def generate_pdu(self, ptype, call_id, payload,
639 rpc_vers=5,
640 rpc_vers_minor=0,
641 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
642 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
643 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
644 ndr_print=None, hexdump=None):
646 if getattr(payload, 'auth_info', None):
647 ai = payload.auth_info
648 else:
649 ai = b""
651 p = samba.dcerpc.dcerpc.ncacn_packet()
652 p.rpc_vers = rpc_vers
653 p.rpc_vers_minor = rpc_vers_minor
654 p.ptype = ptype
655 p.pfc_flags = pfc_flags
656 p.drep = drep
657 p.frag_length = 0
658 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
659 p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
660 else:
661 p.auth_length = 0
662 p.call_id = call_id
663 p.u = payload
665 pdu = ndr_pack(p)
666 p.frag_length = len(pdu)
668 return p
670 def generate_request_auth(self, call_id,
671 pfc_flags=(dcerpc.DCERPC_PFC_FLAG_FIRST |
672 dcerpc.DCERPC_PFC_FLAG_LAST),
673 alloc_hint=None,
674 context_id=None,
675 opnum=None,
676 object=None,
677 stub=None,
678 auth_context=None,
679 ndr_print=None, hexdump=None):
681 if stub is None:
682 stub = b""
684 sig_size = 0
685 if auth_context is not None:
686 mod_len = len(stub) % dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
687 auth_pad_length = 0
688 if mod_len > 0:
689 auth_pad_length = dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
690 stub += b'\x00' * auth_pad_length
692 if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
693 sig_size = auth_context["gensec"].sig_size(len(stub))
694 else:
695 sig_size = 16
697 zero_sig = b"\x00" * sig_size
698 auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
699 auth_level=auth_context["auth_level"],
700 auth_pad_length=auth_pad_length,
701 auth_context_id=auth_context["auth_context_id"],
702 auth_blob=zero_sig)
703 else:
704 auth_info = b""
706 req = self.generate_request(call_id=call_id,
707 pfc_flags=pfc_flags,
708 alloc_hint=alloc_hint,
709 context_id=context_id,
710 opnum=opnum,
711 object=object,
712 stub=stub,
713 auth_info=auth_info,
714 ndr_print=ndr_print,
715 hexdump=hexdump)
716 if auth_context is None:
717 return req
719 req_blob = samba.ndr.ndr_pack(req)
720 ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
721 ofs_sig = len(req_blob) - req.auth_length
722 ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
723 req_data = req_blob[ofs_stub:ofs_trailer]
724 req_whole = req_blob[0:ofs_sig]
726 if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
727 # TODO: not yet supported here
728 self.assertTrue(False)
729 elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
730 req_sig = auth_context["gensec"].sign_packet(req_data, req_whole)
731 else:
732 return req
733 self.assertEquals(len(req_sig), req.auth_length)
734 self.assertEquals(len(req_sig), sig_size)
736 stub_sig_ofs = len(req.u.stub_and_verifier) - sig_size
737 stub = req.u.stub_and_verifier[0:stub_sig_ofs] + req_sig
738 req.u.stub_and_verifier = stub
740 return req
742 def verify_pdu(self, p, ptype, call_id,
743 rpc_vers=5,
744 rpc_vers_minor=0,
745 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
746 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
747 drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
748 auth_length=None):
750 self.assertIsNotNone(p, "No valid pdu")
752 if getattr(p.u, 'auth_info', None):
753 ai = p.u.auth_info
754 else:
755 ai = b""
757 self.assertEqual(p.rpc_vers, rpc_vers)
758 self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
759 self.assertEqual(p.ptype, ptype)
760 self.assertEqual(p.pfc_flags, pfc_flags)
761 self.assertEqual(p.drep, drep)
762 self.assertGreaterEqual(p.frag_length,
763 samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
764 if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
765 self.assertEqual(p.auth_length,
766 len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
767 elif auth_length is not None:
768 self.assertEqual(p.auth_length, auth_length)
769 else:
770 self.assertEqual(p.auth_length, 0)
771 self.assertEqual(p.call_id, call_id)
773 return
775 def generate_bind(self, call_id,
776 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
777 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
778 max_xmit_frag=5840,
779 max_recv_frag=5840,
780 assoc_group_id=0,
781 ctx_list=[],
782 auth_info=b"",
783 ndr_print=None, hexdump=None):
785 b = samba.dcerpc.dcerpc.bind()
786 b.max_xmit_frag = max_xmit_frag
787 b.max_recv_frag = max_recv_frag
788 b.assoc_group_id = assoc_group_id
789 b.num_contexts = len(ctx_list)
790 b.ctx_list = ctx_list
791 b.auth_info = auth_info
793 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
794 pfc_flags=pfc_flags,
795 call_id=call_id,
796 payload=b,
797 ndr_print=ndr_print, hexdump=hexdump)
799 return p
801 def generate_alter(self, call_id,
802 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
803 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
804 max_xmit_frag=5840,
805 max_recv_frag=5840,
806 assoc_group_id=0,
807 ctx_list=[],
808 auth_info=b"",
809 ndr_print=None, hexdump=None):
811 a = samba.dcerpc.dcerpc.bind()
812 a.max_xmit_frag = max_xmit_frag
813 a.max_recv_frag = max_recv_frag
814 a.assoc_group_id = assoc_group_id
815 a.num_contexts = len(ctx_list)
816 a.ctx_list = ctx_list
817 a.auth_info = auth_info
819 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
820 pfc_flags=pfc_flags,
821 call_id=call_id,
822 payload=a,
823 ndr_print=ndr_print, hexdump=hexdump)
825 return p
827 def generate_auth3(self, call_id,
828 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
829 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
830 auth_info=b"",
831 ndr_print=None, hexdump=None):
833 a = samba.dcerpc.dcerpc.auth3()
834 a.auth_info = auth_info
836 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
837 pfc_flags=pfc_flags,
838 call_id=call_id,
839 payload=a,
840 ndr_print=ndr_print, hexdump=hexdump)
842 return p
844 def generate_request(self, call_id,
845 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
846 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
847 alloc_hint=None,
848 context_id=None,
849 opnum=None,
850 object=None,
851 stub=None,
852 auth_info=b"",
853 ndr_print=None, hexdump=None):
855 if alloc_hint is None:
856 alloc_hint = len(stub)
858 r = samba.dcerpc.dcerpc.request()
859 r.alloc_hint = alloc_hint
860 r.context_id = context_id
861 r.opnum = opnum
862 if object is not None:
863 r.object = object
864 r.stub_and_verifier = stub + auth_info
866 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
867 pfc_flags=pfc_flags,
868 call_id=call_id,
869 payload=r,
870 ndr_print=ndr_print, hexdump=hexdump)
872 if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
873 p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
875 return p
877 def generate_co_cancel(self, call_id,
878 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
879 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
880 auth_info=b"",
881 ndr_print=None, hexdump=None):
883 c = samba.dcerpc.dcerpc.co_cancel()
884 c.auth_info = auth_info
886 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
887 pfc_flags=pfc_flags,
888 call_id=call_id,
889 payload=c,
890 ndr_print=ndr_print, hexdump=hexdump)
892 return p
894 def generate_orphaned(self, call_id,
895 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
896 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
897 auth_info=b"",
898 ndr_print=None, hexdump=None):
900 o = samba.dcerpc.dcerpc.orphaned()
901 o.auth_info = auth_info
903 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
904 pfc_flags=pfc_flags,
905 call_id=call_id,
906 payload=o,
907 ndr_print=ndr_print, hexdump=hexdump)
909 return p
911 def generate_shutdown(self, call_id,
912 pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
913 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
914 ndr_print=None, hexdump=None):
916 s = samba.dcerpc.dcerpc.shutdown()
918 p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
919 pfc_flags=pfc_flags,
920 call_id=call_id,
921 payload=s,
922 ndr_print=ndr_print, hexdump=hexdump)
924 return p
926 def assertIsConnected(self):
927 self.assertIsNotNone(self.s, msg="Not connected")
928 return
930 def assertNotConnected(self):
931 self.assertIsNone(self.s, msg="Is connected")
932 return
934 def assertNDRSyntaxEquals(self, s1, s2):
935 self.assertEqual(s1.uuid, s2.uuid)
936 self.assertEqual(s1.if_version, s2.if_version)
937 return
939 def assertPadding(self, pad, length):
940 self.assertEquals(len(pad), length)
942 # sometimes windows sends random bytes
944 # we have IGNORE_RANDOM_PAD=1 to
945 # disable the check
947 if self.ignore_random_pad:
948 return
949 zero_pad = b'\0' * length
950 self.assertEquals(pad, zero_pad)