1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import samba
.dcerpc
.dcerpc
as dcerpc
22 import samba
.dcerpc
.base
23 import samba
.dcerpc
.epmapper
25 from samba
import gensec
26 from samba
.credentials
import Credentials
27 from samba
.tests
import TestCase
28 from samba
.ndr
import ndr_pack
, ndr_unpack
, ndr_unpack_out
29 from samba
.compat
import text_type
32 class RawDCERPCTest(TestCase
):
33 """A raw DCE/RPC Test case."""
35 def _disconnect(self
, reason
):
41 sys
.stderr
.write("disconnect[%s]\n" % reason
)
45 self
.a
= socket
.getaddrinfo(self
.host
, self
.tcp_port
, socket
.AF_UNSPEC
,
46 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
48 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
50 self
.s
.connect(self
.a
[0][4])
51 except socket
.error
as e
:
57 except Exception as e
:
63 super(RawDCERPCTest
, self
).setUp()
64 self
.do_ndr_print
= False
65 self
.do_hexdump
= False
67 self
.ignore_random_pad
= samba
.tests
.env_get_var_value('IGNORE_RANDOM_PAD',
69 self
.host
= samba
.tests
.env_get_var_value('SERVER')
70 self
.target_hostname
= samba
.tests
.env_get_var_value('TARGET_HOSTNAME', allow_missing
=True)
71 if self
.target_hostname
is None:
72 self
.target_hostname
= self
.host
76 self
.settings
["lp_ctx"] = self
.lp_ctx
= samba
.tests
.env_loadparm()
77 self
.settings
["target_hostname"] = self
.target_hostname
82 self
._disconnect
("tearDown")
83 super(TestCase
, self
).tearDown()
88 def second_connection(self
, tcp_port
=None):
89 c
= RawDCERPCTest(methodName
='noop')
90 c
.do_ndr_print
= self
.do_ndr_print
91 c
.do_hexdump
= self
.do_hexdump
92 c
.ignore_random_pad
= self
.ignore_random_pad
95 c
.target_hostname
= self
.target_hostname
96 if tcp_port
is not None:
99 c
.tcp_port
= self
.tcp_port
101 c
.settings
= self
.settings
106 def get_user_creds(self
):
109 domain
= samba
.tests
.env_get_var_value('DOMAIN')
110 realm
= samba
.tests
.env_get_var_value('REALM')
111 username
= samba
.tests
.env_get_var_value('USERNAME')
112 password
= samba
.tests
.env_get_var_value('PASSWORD')
115 c
.set_username(username
)
116 c
.set_password(password
)
119 def get_anon_creds(self
):
124 def get_auth_context_creds(self
, creds
, auth_type
, auth_level
,
129 if g_auth_level
is None:
130 g_auth_level
= auth_level
132 g
= gensec
.Security
.start_client(self
.settings
)
133 g
.set_credentials(creds
)
134 g
.want_feature(gensec
.FEATURE_DCE_STYLE
)
135 g
.start_mech_by_authtype(auth_type
, g_auth_level
)
137 if auth_type
== dcerpc
.DCERPC_AUTH_TYPE_KRB5
:
139 elif auth_type
== dcerpc
.DCERPC_AUTH_TYPE_NTLMSSP
:
145 auth_context
["auth_type"] = auth_type
146 auth_context
["auth_level"] = auth_level
147 auth_context
["auth_context_id"] = auth_context_id
148 auth_context
["g_auth_level"] = g_auth_level
149 auth_context
["gensec"] = g
150 auth_context
["hdr_signing"] = hdr_signing
151 auth_context
["expect_3legs"] = expect_3legs
155 def do_generic_bind(self
, ctx
, auth_context
=None,
156 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
157 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
158 assoc_group_id
=0, call_id
=0,
159 nak_reason
=None, alter_fault
=None,
160 start_with_alter
=False):
163 if auth_context
is not None:
164 if auth_context
['hdr_signing']:
165 pfc_flags |
= dcerpc
.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN
167 expect_3legs
= auth_context
["expect_3legs"]
170 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
171 self
.assertFalse(finished
)
173 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
174 auth_level
=auth_context
["auth_level"],
175 auth_context_id
=auth_context
["auth_context_id"],
181 req
= self
.generate_alter(call_id
=call_id
,
184 assoc_group_id
=0xffffffff - assoc_group_id
,
187 rep
= self
.recv_pdu()
188 if alter_fault
is not None:
189 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
190 pfc_flags
=req
.pfc_flags |
191 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
,
193 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
194 self
.assertEquals(rep
.u
.context_id
, 0)
195 self
.assertEquals(rep
.u
.cancel_count
, 0)
196 self
.assertEquals(rep
.u
.flags
, 0)
197 self
.assertEquals(rep
.u
.status
, alter_fault
)
198 self
.assertEquals(rep
.u
.reserved
, 0)
199 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
201 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER_RESP
, req
.call_id
,
202 pfc_flags
=req
.pfc_flags
)
203 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
204 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
205 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
206 self
.assertEquals(rep
.u
.secondary_address_size
, 0)
207 self
.assertEquals(rep
.u
.secondary_address
, '')
208 self
.assertPadding(rep
.u
._pad
1, 2)
210 req
= self
.generate_bind(call_id
=call_id
,
213 assoc_group_id
=assoc_group_id
,
216 rep
= self
.recv_pdu()
217 if nak_reason
is not None:
218 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_NAK
, req
.call_id
,
220 self
.assertEquals(rep
.u
.reject_reason
, nak_reason
)
221 self
.assertEquals(rep
.u
.num_versions
, 1)
222 self
.assertEquals(rep
.u
.versions
[0].rpc_vers
, req
.rpc_vers
)
223 self
.assertEquals(rep
.u
.versions
[0].rpc_vers_minor
, req
.rpc_vers_minor
)
224 self
.assertPadding(rep
.u
._pad
, 3)
226 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_ACK
, req
.call_id
,
228 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
229 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
230 if assoc_group_id
!= 0:
231 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
233 self
.assertNotEquals(rep
.u
.assoc_group_id
, 0)
234 assoc_group_id
= rep
.u
.assoc_group_id
235 port_str
= "%d" % self
.tcp_port
236 port_len
= len(port_str
) + 1
237 mod_len
= (2 + port_len
) % 4
239 port_pad
= 4 - mod_len
242 self
.assertEquals(rep
.u
.secondary_address_size
, port_len
)
243 self
.assertEquals(rep
.u
.secondary_address
, port_str
)
244 self
.assertPadding(rep
.u
._pad
1, port_pad
)
245 self
.assertEquals(rep
.u
.num_results
, 1)
246 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
247 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
248 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
249 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
250 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
252 if auth_context
is None:
253 self
.assertEquals(rep
.auth_length
, 0)
254 self
.assertEquals(len(rep
.u
.auth_info
), 0)
256 self
.assertNotEquals(rep
.auth_length
, 0)
257 self
.assertGreater(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
258 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
260 a
= self
.parse_auth(rep
.u
.auth_info
, auth_context
=auth_context
)
262 from_server
= a
.credentials
263 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
265 self
.assertTrue(finished
)
266 if auth_context
['hdr_signing']:
267 auth_context
["gensec"].want_feature(gensec
.FEATURE_SIGN_PKT_HEADER
)
269 self
.assertFalse(finished
)
271 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
272 auth_level
=auth_context
["auth_level"],
273 auth_context_id
=auth_context
["auth_context_id"],
275 req
= self
.generate_alter(call_id
=call_id
,
277 assoc_group_id
=0xffffffff - assoc_group_id
,
280 rep
= self
.recv_pdu()
281 if alter_fault
is not None:
282 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
283 pfc_flags
=req
.pfc_flags |
284 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
,
286 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
287 self
.assertEquals(rep
.u
.context_id
, 0)
288 self
.assertEquals(rep
.u
.cancel_count
, 0)
289 self
.assertEquals(rep
.u
.flags
, 0)
290 self
.assertEquals(rep
.u
.status
, alter_fault
)
291 self
.assertEquals(rep
.u
.reserved
, 0)
292 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
294 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER_RESP
, req
.call_id
)
295 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
296 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
297 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
298 self
.assertEquals(rep
.u
.secondary_address_size
, 0)
299 self
.assertEquals(rep
.u
.secondary_address
, '')
300 self
.assertPadding(rep
.u
._pad
1, 2)
301 self
.assertEquals(rep
.u
.num_results
, 1)
302 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
303 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
304 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
305 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
306 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
308 self
.assertEquals(rep
.auth_length
, 0)
310 self
.assertNotEquals(rep
.auth_length
, 0)
311 self
.assertGreaterEqual(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
312 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
314 a
= self
.parse_auth(rep
.u
.auth_info
, auth_context
=auth_context
)
319 from_server
= a
.credentials
320 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
321 self
.assertTrue(finished
)
322 if auth_context
['hdr_signing']:
323 auth_context
["gensec"].want_feature(gensec
.FEATURE_SIGN_PKT_HEADER
)
327 def prepare_presentation(self
, abstract
, transfer
, object=None,
328 context_id
=0xffff, epmap
=False, auth_context
=None,
329 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
330 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
334 self
.epmap_reconnect(abstract
, transfer
=transfer
, object=object)
336 tsf1_list
= [transfer
]
337 ctx
= samba
.dcerpc
.dcerpc
.ctx_list()
338 ctx
.context_id
= context_id
339 ctx
.num_transfer_syntaxes
= len(tsf1_list
)
340 ctx
.abstract_syntax
= abstract
341 ctx
.transfer_syntaxes
= tsf1_list
343 ack
= self
.do_generic_bind(ctx
=ctx
,
344 auth_context
=auth_context
,
346 assoc_group_id
=assoc_group_id
)
354 def do_single_request(self
, call_id
, ctx
, io
,
357 bigendian
=False, ndr64
=False,
358 allow_remaining
=False,
362 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
363 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
365 fault_context_id
=None,
370 if fault_context_id
is None:
371 fault_context_id
= ctx
.context_id
373 if ndr_print
is None:
374 ndr_print
= self
.do_ndr_print
376 hexdump
= self
.do_hexdump
380 sys
.stderr
.write("in: %s" % samba
.ndr
.ndr_print_in(io
))
381 stub_in
= samba
.ndr
.ndr_pack_in(io
, bigendian
=bigendian
, ndr64
=ndr64
)
383 sys
.stderr
.write("stub_in: %d\n%s" % (len(stub_in
), self
.hexdump(stub_in
)))
385 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST
386 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
387 if object is not None:
388 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_OBJECT_UUID
390 req
= self
.generate_request_auth(call_id
=call_id
,
391 context_id
=ctx
.context_id
,
396 auth_context
=auth_context
)
398 self
.send_pdu(req
, ndr_print
=ndr_print
, hexdump
=hexdump
)
400 (rep
, rep_blob
) = self
.recv_pdu_raw(timeout
=timeout
,
404 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
405 pfc_flags
=fault_pfc_flags
, auth_length
=0)
406 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
407 self
.assertEquals(rep
.u
.context_id
, fault_context_id
)
408 self
.assertEquals(rep
.u
.cancel_count
, 0)
409 self
.assertEquals(rep
.u
.flags
, 0)
410 self
.assertEquals(rep
.u
.status
, fault_status
)
411 self
.assertEquals(rep
.u
.reserved
, 0)
412 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
415 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_RESPONSE
, req
.call_id
,
416 auth_length
=req
.auth_length
)
417 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
418 self
.assertEquals(rep
.u
.context_id
, req
.u
.context_id
& 0xff)
419 self
.assertEquals(rep
.u
.cancel_count
, 0)
420 self
.assertGreaterEqual(len(rep
.u
.stub_and_verifier
), rep
.u
.alloc_hint
)
421 stub_out
= self
.check_response_auth(rep
, rep_blob
, auth_context
)
422 self
.assertEqual(len(stub_out
), rep
.u
.alloc_hint
)
425 sys
.stderr
.write("stub_out: %d\n%s" % (len(stub_out
), self
.hexdump(stub_out
)))
426 ndr_unpack_out(io
, stub_out
, bigendian
=bigendian
, ndr64
=ndr64
,
427 allow_remaining
=allow_remaining
)
429 sys
.stderr
.write("out: %s" % samba
.ndr
.ndr_print_out(io
))
431 def epmap_reconnect(self
, abstract
, transfer
=None, object=None):
432 ndr32
= samba
.dcerpc
.base
.transfer_syntax_ndr()
438 object = samba
.dcerpc
.misc
.GUID()
440 ctx
= self
.prepare_presentation(samba
.dcerpc
.epmapper
.abstract_syntax(),
441 transfer
, context_id
=0)
443 data1
= ndr_pack(abstract
)
444 lhs1
= samba
.dcerpc
.epmapper
.epm_lhs()
445 lhs1
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
446 lhs1
.lhs_data
= data1
[:18]
447 rhs1
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
448 rhs1
.unknown
= data1
[18:]
449 floor1
= samba
.dcerpc
.epmapper
.epm_floor()
452 data2
= ndr_pack(transfer
)
453 lhs2
= samba
.dcerpc
.epmapper
.epm_lhs()
454 lhs2
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
455 lhs2
.lhs_data
= data2
[:18]
456 rhs2
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
457 rhs2
.unknown
= data1
[18:]
458 floor2
= samba
.dcerpc
.epmapper
.epm_floor()
461 lhs3
= samba
.dcerpc
.epmapper
.epm_lhs()
462 lhs3
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_NCACN
464 floor3
= samba
.dcerpc
.epmapper
.epm_floor()
466 floor3
.rhs
.minor_version
= 0
467 lhs4
= samba
.dcerpc
.epmapper
.epm_lhs()
468 lhs4
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
470 floor4
= samba
.dcerpc
.epmapper
.epm_floor()
472 floor4
.rhs
.port
= self
.tcp_port
473 lhs5
= samba
.dcerpc
.epmapper
.epm_lhs()
474 lhs5
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_IP
476 floor5
= samba
.dcerpc
.epmapper
.epm_floor()
478 floor5
.rhs
.ipaddr
= "0.0.0.0"
480 floors
= [floor1
, floor2
, floor3
, floor4
, floor5
]
481 req_tower
= samba
.dcerpc
.epmapper
.epm_tower()
482 req_tower
.num_floors
= len(floors
)
483 req_tower
.floors
= floors
484 req_twr
= samba
.dcerpc
.epmapper
.epm_twr_t()
485 req_twr
.tower
= req_tower
487 epm_map
= samba
.dcerpc
.epmapper
.epm_Map()
488 epm_map
.in_object
= object
489 epm_map
.in_map_tower
= req_twr
490 epm_map
.in_entry_handle
= samba
.dcerpc
.misc
.policy_handle()
491 epm_map
.in_max_towers
= 4
493 self
.do_single_request(call_id
=2, ctx
=ctx
, io
=epm_map
)
495 self
.assertGreaterEqual(epm_map
.out_num_towers
, 1)
496 rep_twr
= epm_map
.out_towers
[0].twr
497 self
.assertIsNotNone(rep_twr
)
498 self
.assertEqual(rep_twr
.tower_length
, 75)
499 self
.assertEqual(rep_twr
.tower
.num_floors
, 5)
500 self
.assertEqual(len(rep_twr
.tower
.floors
), 5)
501 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
502 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
503 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
504 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
506 # reconnect to the given port
507 self
._disconnect
("epmap_reconnect")
508 self
.tcp_port
= rep_twr
.tower
.floors
[3].rhs
.port
511 def send_pdu(self
, req
, ndr_print
=None, hexdump
=None):
512 if ndr_print
is None:
513 ndr_print
= self
.do_ndr_print
515 hexdump
= self
.do_hexdump
517 req_pdu
= ndr_pack(req
)
519 sys
.stderr
.write("send_pdu: %s" % samba
.ndr
.ndr_print(req
))
521 sys
.stderr
.write("send_pdu: %d\n%s" % (len(req_pdu
), self
.hexdump(req_pdu
)))
523 sent
= self
.s
.send(req_pdu
, 0)
524 if sent
== len(req_pdu
):
526 req_pdu
= req_pdu
[sent
:]
527 except socket
.error
as e
:
528 self
._disconnect
("send_pdu: %s" % e
)
531 self
._disconnect
("send_pdu: %s" % e
)
536 def recv_raw(self
, hexdump
=None, timeout
=None):
539 hexdump
= self
.do_hexdump
541 if timeout
is not None:
542 self
.s
.settimeout(timeout
)
543 rep_pdu
= self
.s
.recv(0xffff, 0)
544 self
.s
.settimeout(10)
545 if len(rep_pdu
) == 0:
546 self
._disconnect
("recv_raw: EOF")
549 sys
.stderr
.write("recv_raw: %d\n%s" % (len(rep_pdu
), self
.hexdump(rep_pdu
)))
550 except socket
.timeout
as e
:
551 self
.s
.settimeout(10)
552 sys
.stderr
.write("recv_raw: TIMEOUT\n")
554 except socket
.error
as e
:
555 self
._disconnect
("recv_raw: %s" % e
)
558 self
._disconnect
("recv_raw: %s" % e
)
564 def recv_pdu_raw(self
, ndr_print
=None, hexdump
=None, timeout
=None):
567 if ndr_print
is None:
568 ndr_print
= self
.do_ndr_print
570 hexdump
= self
.do_hexdump
572 rep_pdu
= self
.recv_raw(hexdump
=hexdump
, timeout
=timeout
)
575 rep
= ndr_unpack(samba
.dcerpc
.dcerpc
.ncacn_packet
, rep_pdu
, allow_remaining
=True)
577 sys
.stderr
.write("recv_pdu: %s" % samba
.ndr
.ndr_print(rep
))
578 self
.assertEqual(rep
.frag_length
, len(rep_pdu
))
581 return (rep
, rep_pdu
)
583 def recv_pdu(self
, ndr_print
=None, hexdump
=None, timeout
=None):
584 (rep
, rep_pdu
) = self
.recv_pdu_raw(ndr_print
=ndr_print
,
589 def generate_auth(self
,
593 auth_context_id
=None,
595 ndr_print
=None, hexdump
=None):
596 if ndr_print
is None:
597 ndr_print
= self
.do_ndr_print
599 hexdump
= self
.do_hexdump
601 if auth_type
is not None:
602 a
= samba
.dcerpc
.dcerpc
.auth()
603 a
.auth_type
= auth_type
604 a
.auth_level
= auth_level
605 a
.auth_pad_length
= auth_pad_length
606 a
.auth_context_id
= auth_context_id
607 a
.credentials
= auth_blob
611 sys
.stderr
.write("generate_auth: %s" % samba
.ndr
.ndr_print(a
))
613 sys
.stderr
.write("generate_auth: %d\n%s" % (len(ai
), self
.hexdump(ai
)))
619 def parse_auth(self
, auth_info
, ndr_print
=None, hexdump
=None,
620 auth_context
=None, stub_len
=0):
621 if ndr_print
is None:
622 ndr_print
= self
.do_ndr_print
624 hexdump
= self
.do_hexdump
626 if (len(auth_info
) <= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
):
630 sys
.stderr
.write("parse_auth: %d\n%s" % (len(auth_info
), self
.hexdump(auth_info
)))
631 a
= ndr_unpack(samba
.dcerpc
.dcerpc
.auth
, auth_info
, allow_remaining
=True)
633 sys
.stderr
.write("parse_auth: %s" % samba
.ndr
.ndr_print(a
))
635 if auth_context
is not None:
636 self
.assertEquals(a
.auth_type
, auth_context
["auth_type"])
637 self
.assertEquals(a
.auth_level
, auth_context
["auth_level"])
638 self
.assertEquals(a
.auth_reserved
, 0)
639 self
.assertEquals(a
.auth_context_id
, auth_context
["auth_context_id"])
641 self
.assertLessEqual(a
.auth_pad_length
, dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
)
642 self
.assertLessEqual(a
.auth_pad_length
, stub_len
)
646 def check_response_auth(self
, rep
, rep_blob
, auth_context
=None,
647 auth_pad_length
=None):
649 if auth_context
is None:
650 self
.assertEquals(rep
.auth_length
, 0)
651 return rep
.u
.stub_and_verifier
653 ofs_stub
= dcerpc
.DCERPC_REQUEST_LENGTH
654 ofs_sig
= rep
.frag_length
- rep
.auth_length
655 ofs_trailer
= ofs_sig
- dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
656 rep_data
= rep_blob
[ofs_stub
:ofs_trailer
]
657 rep_whole
= rep_blob
[0:ofs_sig
]
658 rep_sig
= rep_blob
[ofs_sig
:]
659 rep_auth_info_blob
= rep_blob
[ofs_trailer
:]
661 rep_auth_info
= self
.parse_auth(rep_auth_info_blob
,
662 auth_context
=auth_context
,
663 stub_len
=len(rep_data
))
664 if auth_pad_length
is not None:
665 self
.assertEquals(rep_auth_info
.auth_pad_length
, auth_pad_length
)
666 self
.assertEquals(rep_auth_info
.credentials
, rep_sig
)
668 if auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PRIVACY
:
669 # TODO: not yet supported here
670 self
.assertTrue(False)
671 elif auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
672 auth_context
["gensec"].check_packet(rep_data
, rep_whole
, rep_sig
)
674 stub_out
= rep_data
[0:len(rep_data
)-rep_auth_info
.auth_pad_length
]
678 def generate_pdu(self
, ptype
, call_id
, payload
,
681 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
682 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
683 drep
=[samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
684 ndr_print
=None, hexdump
=None):
686 if getattr(payload
, 'auth_info', None):
687 ai
= payload
.auth_info
691 p
= samba
.dcerpc
.dcerpc
.ncacn_packet()
692 p
.rpc_vers
= rpc_vers
693 p
.rpc_vers_minor
= rpc_vers_minor
695 p
.pfc_flags
= pfc_flags
698 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
699 p
.auth_length
= len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
706 p
.frag_length
= len(pdu
)
710 def generate_request_auth(self
, call_id
,
711 pfc_flags
=(dcerpc
.DCERPC_PFC_FLAG_FIRST |
712 dcerpc
.DCERPC_PFC_FLAG_LAST
),
719 ndr_print
=None, hexdump
=None):
725 if auth_context
is not None:
726 mod_len
= len(stub
) % dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
729 auth_pad_length
= dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
- mod_len
730 stub
+= b
'\x00' * auth_pad_length
732 if auth_context
["g_auth_level"] >= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
733 sig_size
= auth_context
["gensec"].sig_size(len(stub
))
737 zero_sig
= b
"\x00" * sig_size
738 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
739 auth_level
=auth_context
["auth_level"],
740 auth_pad_length
=auth_pad_length
,
741 auth_context_id
=auth_context
["auth_context_id"],
746 req
= self
.generate_request(call_id
=call_id
,
748 alloc_hint
=alloc_hint
,
749 context_id
=context_id
,
756 if auth_context
is None:
759 req_blob
= samba
.ndr
.ndr_pack(req
)
760 ofs_stub
= dcerpc
.DCERPC_REQUEST_LENGTH
761 ofs_sig
= len(req_blob
) - req
.auth_length
762 ofs_trailer
= ofs_sig
- dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
763 req_data
= req_blob
[ofs_stub
:ofs_trailer
]
764 req_whole
= req_blob
[0:ofs_sig
]
766 if auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PRIVACY
:
767 # TODO: not yet supported here
768 self
.assertTrue(False)
769 elif auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
770 req_sig
= auth_context
["gensec"].sign_packet(req_data
, req_whole
)
773 self
.assertEquals(len(req_sig
), req
.auth_length
)
774 self
.assertEquals(len(req_sig
), sig_size
)
776 stub_sig_ofs
= len(req
.u
.stub_and_verifier
) - sig_size
777 stub
= req
.u
.stub_and_verifier
[0:stub_sig_ofs
] + req_sig
778 req
.u
.stub_and_verifier
= stub
782 def verify_pdu(self
, p
, ptype
, call_id
,
785 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
786 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
787 drep
=[samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
790 self
.assertIsNotNone(p
, "No valid pdu")
792 if getattr(p
.u
, 'auth_info', None):
797 self
.assertEqual(p
.rpc_vers
, rpc_vers
)
798 self
.assertEqual(p
.rpc_vers_minor
, rpc_vers_minor
)
799 self
.assertEqual(p
.ptype
, ptype
)
800 self
.assertEqual(p
.pfc_flags
, pfc_flags
)
801 self
.assertEqual(p
.drep
, drep
)
802 self
.assertGreaterEqual(p
.frag_length
,
803 samba
.dcerpc
.dcerpc
.DCERPC_NCACN_PAYLOAD_OFFSET
)
804 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
805 self
.assertEqual(p
.auth_length
,
806 len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
807 elif auth_length
is not None:
808 self
.assertEqual(p
.auth_length
, auth_length
)
810 self
.assertEqual(p
.auth_length
, 0)
811 self
.assertEqual(p
.call_id
, call_id
)
815 def generate_bind(self
, call_id
,
816 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
817 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
823 ndr_print
=None, hexdump
=None):
825 b
= samba
.dcerpc
.dcerpc
.bind()
826 b
.max_xmit_frag
= max_xmit_frag
827 b
.max_recv_frag
= max_recv_frag
828 b
.assoc_group_id
= assoc_group_id
829 b
.num_contexts
= len(ctx_list
)
830 b
.ctx_list
= ctx_list
831 b
.auth_info
= auth_info
833 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND
,
837 ndr_print
=ndr_print
, hexdump
=hexdump
)
841 def generate_alter(self
, call_id
,
842 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
843 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
849 ndr_print
=None, hexdump
=None):
851 a
= samba
.dcerpc
.dcerpc
.bind()
852 a
.max_xmit_frag
= max_xmit_frag
853 a
.max_recv_frag
= max_recv_frag
854 a
.assoc_group_id
= assoc_group_id
855 a
.num_contexts
= len(ctx_list
)
856 a
.ctx_list
= ctx_list
857 a
.auth_info
= auth_info
859 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER
,
863 ndr_print
=ndr_print
, hexdump
=hexdump
)
867 def generate_auth3(self
, call_id
,
868 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
869 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
871 ndr_print
=None, hexdump
=None):
873 a
= samba
.dcerpc
.dcerpc
.auth3()
874 a
.auth_info
= auth_info
876 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_AUTH3
,
880 ndr_print
=ndr_print
, hexdump
=hexdump
)
884 def generate_request(self
, call_id
,
885 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
886 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
893 ndr_print
=None, hexdump
=None):
895 if alloc_hint
is None:
896 alloc_hint
= len(stub
)
898 r
= samba
.dcerpc
.dcerpc
.request()
899 r
.alloc_hint
= alloc_hint
900 r
.context_id
= context_id
902 if object is not None:
904 r
.stub_and_verifier
= stub
+ auth_info
906 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_REQUEST
,
910 ndr_print
=ndr_print
, hexdump
=hexdump
)
912 if len(auth_info
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
913 p
.auth_length
= len(auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
917 def generate_co_cancel(self
, call_id
,
918 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
919 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
921 ndr_print
=None, hexdump
=None):
923 c
= samba
.dcerpc
.dcerpc
.co_cancel()
924 c
.auth_info
= auth_info
926 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_CO_CANCEL
,
930 ndr_print
=ndr_print
, hexdump
=hexdump
)
934 def generate_orphaned(self
, call_id
,
935 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
936 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
938 ndr_print
=None, hexdump
=None):
940 o
= samba
.dcerpc
.dcerpc
.orphaned()
941 o
.auth_info
= auth_info
943 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ORPHANED
,
947 ndr_print
=ndr_print
, hexdump
=hexdump
)
951 def generate_shutdown(self
, call_id
,
952 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
953 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
954 ndr_print
=None, hexdump
=None):
956 s
= samba
.dcerpc
.dcerpc
.shutdown()
958 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_SHUTDOWN
,
962 ndr_print
=ndr_print
, hexdump
=hexdump
)
966 def assertIsConnected(self
):
967 self
.assertIsNotNone(self
.s
, msg
="Not connected")
970 def assertNotConnected(self
):
971 self
.assertIsNone(self
.s
, msg
="Is connected")
974 def assertNDRSyntaxEquals(self
, s1
, s2
):
975 self
.assertEqual(s1
.uuid
, s2
.uuid
)
976 self
.assertEqual(s1
.if_version
, s2
.if_version
)
979 def assertPadding(self
, pad
, length
):
980 self
.assertEquals(len(pad
), length
)
982 # sometimes windows sends random bytes
984 # we have IGNORE_RANDOM_PAD=1 to
987 if self
.ignore_random_pad
:
989 zero_pad
= b
'\0' * length
990 self
.assertEquals(pad
, zero_pad
)