1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import samba
.dcerpc
.dcerpc
as dcerpc
22 import samba
.dcerpc
.base
23 import samba
.dcerpc
.epmapper
25 from samba
import gensec
26 from samba
.credentials
import Credentials
27 from samba
.tests
import TestCase
28 from samba
.ndr
import ndr_pack
, ndr_unpack
, ndr_unpack_out
29 from samba
.compat
import text_type
32 class RawDCERPCTest(TestCase
):
33 """A raw DCE/RPC Test case."""
35 def _disconnect(self
, reason
):
41 sys
.stderr
.write("disconnect[%s]\n" % reason
)
45 self
.a
= socket
.getaddrinfo(self
.host
, self
.tcp_port
, socket
.AF_UNSPEC
,
46 socket
.SOCK_STREAM
, socket
.SOL_TCP
,
48 self
.s
= socket
.socket(self
.a
[0][0], self
.a
[0][1], self
.a
[0][2])
50 self
.s
.connect(self
.a
[0][4])
51 except socket
.error
as e
:
57 except Exception as e
:
63 super(RawDCERPCTest
, self
).setUp()
64 self
.do_ndr_print
= False
65 self
.do_hexdump
= False
67 self
.ignore_random_pad
= samba
.tests
.env_get_var_value('IGNORE_RANDOM_PAD',
69 self
.host
= samba
.tests
.env_get_var_value('SERVER')
70 self
.target_hostname
= samba
.tests
.env_get_var_value('TARGET_HOSTNAME', allow_missing
=True)
71 if self
.target_hostname
is None:
72 self
.target_hostname
= self
.host
76 self
.settings
["lp_ctx"] = self
.lp_ctx
= samba
.tests
.env_loadparm()
77 self
.settings
["target_hostname"] = self
.target_hostname
82 self
._disconnect
("tearDown")
83 super(TestCase
, self
).tearDown()
88 def second_connection(self
, tcp_port
=None):
89 c
= RawDCERPCTest(methodName
='noop')
90 c
.do_ndr_print
= self
.do_ndr_print
91 c
.do_hexdump
= self
.do_hexdump
92 c
.ignore_random_pad
= self
.ignore_random_pad
95 c
.target_hostname
= self
.target_hostname
96 if tcp_port
is not None:
99 c
.tcp_port
= self
.tcp_port
101 c
.settings
= self
.settings
106 def get_user_creds(self
):
109 domain
= samba
.tests
.env_get_var_value('DOMAIN')
110 realm
= samba
.tests
.env_get_var_value('REALM')
111 username
= samba
.tests
.env_get_var_value('USERNAME')
112 password
= samba
.tests
.env_get_var_value('PASSWORD')
115 c
.set_username(username
)
116 c
.set_password(password
)
119 def get_anon_creds(self
):
124 def get_auth_context_creds(self
, creds
, auth_type
, auth_level
,
128 if g_auth_level
is None:
129 g_auth_level
= auth_level
131 g
= gensec
.Security
.start_client(self
.settings
)
132 g
.set_credentials(creds
)
133 g
.want_feature(gensec
.FEATURE_DCE_STYLE
)
134 g
.start_mech_by_authtype(auth_type
, g_auth_level
)
136 if auth_type
== dcerpc
.DCERPC_AUTH_TYPE_KRB5
:
138 elif auth_type
== dcerpc
.DCERPC_AUTH_TYPE_NTLMSSP
:
144 auth_context
["auth_type"] = auth_type
145 auth_context
["auth_level"] = auth_level
146 auth_context
["auth_context_id"] = auth_context_id
147 auth_context
["g_auth_level"] = g_auth_level
148 auth_context
["gensec"] = g
149 auth_context
["expect_3legs"] = expect_3legs
153 def do_generic_bind(self
, ctx
, auth_context
=None,
154 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
155 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
156 assoc_group_id
=0, call_id
=0,
157 nak_reason
=None, alter_fault
=None):
160 if auth_context
is not None:
161 expect_3legs
= auth_context
["expect_3legs"]
164 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
165 self
.assertFalse(finished
)
167 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
168 auth_level
=auth_context
["auth_level"],
169 auth_context_id
=auth_context
["auth_context_id"],
174 req
= self
.generate_bind(call_id
=call_id
,
177 assoc_group_id
=assoc_group_id
,
180 rep
= self
.recv_pdu()
181 if nak_reason
is not None:
182 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_NAK
, req
.call_id
,
184 self
.assertEquals(rep
.u
.reject_reason
, nak_reason
)
185 self
.assertEquals(rep
.u
.num_versions
, 1)
186 self
.assertEquals(rep
.u
.versions
[0].rpc_vers
, req
.rpc_vers
)
187 self
.assertEquals(rep
.u
.versions
[0].rpc_vers_minor
, req
.rpc_vers_minor
)
188 self
.assertPadding(rep
.u
._pad
, 3)
190 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND_ACK
, req
.call_id
,
192 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
193 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
194 if assoc_group_id
!= 0:
195 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
197 self
.assertNotEquals(rep
.u
.assoc_group_id
, 0)
198 assoc_group_id
= rep
.u
.assoc_group_id
199 port_str
= "%d" % self
.tcp_port
200 port_len
= len(port_str
) + 1
201 mod_len
= (2 + port_len
) % 4
203 port_pad
= 4 - mod_len
206 self
.assertEquals(rep
.u
.secondary_address_size
, port_len
)
207 self
.assertEquals(rep
.u
.secondary_address
, port_str
)
208 self
.assertPadding(rep
.u
._pad
1, port_pad
)
209 self
.assertEquals(rep
.u
.num_results
, 1)
210 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
211 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
212 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
213 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
214 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
216 if auth_context
is None:
217 self
.assertEquals(rep
.auth_length
, 0)
218 self
.assertEquals(len(rep
.u
.auth_info
), 0)
220 self
.assertNotEquals(rep
.auth_length
, 0)
221 self
.assertGreater(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
222 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
224 a
= self
.parse_auth(rep
.u
.auth_info
, auth_context
=auth_context
)
226 from_server
= a
.credentials
227 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
229 self
.assertTrue(finished
)
231 self
.assertFalse(finished
)
233 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
234 auth_level
=auth_context
["auth_level"],
235 auth_context_id
=auth_context
["auth_context_id"],
237 req
= self
.generate_alter(call_id
=call_id
,
239 assoc_group_id
=0xffffffff - assoc_group_id
,
242 rep
= self
.recv_pdu()
243 if alter_fault
is not None:
244 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
245 pfc_flags
=req
.pfc_flags |
246 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
,
248 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
249 self
.assertEquals(rep
.u
.context_id
, 0)
250 self
.assertEquals(rep
.u
.cancel_count
, 0)
251 self
.assertEquals(rep
.u
.flags
, 0)
252 self
.assertEquals(rep
.u
.status
, alter_fault
)
253 self
.assertEquals(rep
.u
.reserved
, 0)
254 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
256 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER_RESP
, req
.call_id
)
257 self
.assertEquals(rep
.u
.max_xmit_frag
, req
.u
.max_xmit_frag
)
258 self
.assertEquals(rep
.u
.max_recv_frag
, req
.u
.max_recv_frag
)
259 self
.assertEquals(rep
.u
.assoc_group_id
, assoc_group_id
)
260 self
.assertEquals(rep
.u
.secondary_address_size
, 0)
261 self
.assertEquals(rep
.u
.secondary_address
, '')
262 self
.assertPadding(rep
.u
._pad
1, 2)
263 self
.assertEquals(rep
.u
.num_results
, 1)
264 self
.assertEquals(rep
.u
.ctx_list
[0].result
,
265 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_RESULT_ACCEPTANCE
)
266 self
.assertEquals(rep
.u
.ctx_list
[0].reason
,
267 samba
.dcerpc
.dcerpc
.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED
)
268 self
.assertNDRSyntaxEquals(rep
.u
.ctx_list
[0].syntax
, ctx
.transfer_syntaxes
[0])
270 self
.assertEquals(rep
.auth_length
, 0)
272 self
.assertNotEquals(rep
.auth_length
, 0)
273 self
.assertGreaterEqual(len(rep
.u
.auth_info
), samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
274 self
.assertEquals(rep
.auth_length
, len(rep
.u
.auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
276 a
= self
.parse_auth(rep
.u
.auth_info
, auth_context
=auth_context
)
281 from_server
= a
.credentials
282 (finished
, to_server
) = auth_context
["gensec"].update(from_server
)
283 self
.assertTrue(finished
)
287 def prepare_presentation(self
, abstract
, transfer
, object=None,
288 context_id
=0xffff, epmap
=False, auth_context
=None,
289 pfc_flags
=samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
290 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
,
294 self
.epmap_reconnect(abstract
, transfer
=transfer
, object=object)
296 tsf1_list
= [transfer
]
297 ctx
= samba
.dcerpc
.dcerpc
.ctx_list()
298 ctx
.context_id
= context_id
299 ctx
.num_transfer_syntaxes
= len(tsf1_list
)
300 ctx
.abstract_syntax
= abstract
301 ctx
.transfer_syntaxes
= tsf1_list
303 ack
= self
.do_generic_bind(ctx
=ctx
,
304 auth_context
=auth_context
,
306 assoc_group_id
=assoc_group_id
)
314 def do_single_request(self
, call_id
, ctx
, io
,
317 bigendian
=False, ndr64
=False,
318 allow_remaining
=False,
322 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
323 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
325 fault_context_id
=None,
330 if fault_context_id
is None:
331 fault_context_id
= ctx
.context_id
333 if ndr_print
is None:
334 ndr_print
= self
.do_ndr_print
336 hexdump
= self
.do_hexdump
340 sys
.stderr
.write("in: %s" % samba
.ndr
.ndr_print_in(io
))
341 stub_in
= samba
.ndr
.ndr_pack_in(io
, bigendian
=bigendian
, ndr64
=ndr64
)
343 sys
.stderr
.write("stub_in: %d\n%s" % (len(stub_in
), self
.hexdump(stub_in
)))
345 pfc_flags
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST
346 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
347 if object is not None:
348 pfc_flags |
= samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_OBJECT_UUID
350 req
= self
.generate_request_auth(call_id
=call_id
,
351 context_id
=ctx
.context_id
,
356 auth_context
=auth_context
)
358 self
.send_pdu(req
, ndr_print
=ndr_print
, hexdump
=hexdump
)
360 (rep
, rep_blob
) = self
.recv_pdu_raw(timeout
=timeout
,
364 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_FAULT
, req
.call_id
,
365 pfc_flags
=fault_pfc_flags
, auth_length
=0)
366 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
367 self
.assertEquals(rep
.u
.context_id
, fault_context_id
)
368 self
.assertEquals(rep
.u
.cancel_count
, 0)
369 self
.assertEquals(rep
.u
.flags
, 0)
370 self
.assertEquals(rep
.u
.status
, fault_status
)
371 self
.assertEquals(rep
.u
.reserved
, 0)
372 self
.assertEquals(len(rep
.u
.error_and_verifier
), 0)
375 self
.verify_pdu(rep
, samba
.dcerpc
.dcerpc
.DCERPC_PKT_RESPONSE
, req
.call_id
,
376 auth_length
=req
.auth_length
)
377 self
.assertNotEquals(rep
.u
.alloc_hint
, 0)
378 self
.assertEquals(rep
.u
.context_id
, req
.u
.context_id
& 0xff)
379 self
.assertEquals(rep
.u
.cancel_count
, 0)
380 self
.assertGreaterEqual(len(rep
.u
.stub_and_verifier
), rep
.u
.alloc_hint
)
381 stub_out
= self
.check_response_auth(rep
, rep_blob
, auth_context
)
382 self
.assertEqual(len(stub_out
), rep
.u
.alloc_hint
)
385 sys
.stderr
.write("stub_out: %d\n%s" % (len(stub_out
), self
.hexdump(stub_out
)))
386 ndr_unpack_out(io
, stub_out
, bigendian
=bigendian
, ndr64
=ndr64
,
387 allow_remaining
=allow_remaining
)
389 sys
.stderr
.write("out: %s" % samba
.ndr
.ndr_print_out(io
))
391 def epmap_reconnect(self
, abstract
, transfer
=None, object=None):
392 ndr32
= samba
.dcerpc
.base
.transfer_syntax_ndr()
398 object = samba
.dcerpc
.misc
.GUID()
400 ctx
= self
.prepare_presentation(samba
.dcerpc
.epmapper
.abstract_syntax(),
401 transfer
, context_id
=0)
403 data1
= ndr_pack(abstract
)
404 lhs1
= samba
.dcerpc
.epmapper
.epm_lhs()
405 lhs1
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
406 lhs1
.lhs_data
= data1
[:18]
407 rhs1
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
408 rhs1
.unknown
= data1
[18:]
409 floor1
= samba
.dcerpc
.epmapper
.epm_floor()
412 data2
= ndr_pack(transfer
)
413 lhs2
= samba
.dcerpc
.epmapper
.epm_lhs()
414 lhs2
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_UUID
415 lhs2
.lhs_data
= data2
[:18]
416 rhs2
= samba
.dcerpc
.epmapper
.epm_rhs_uuid()
417 rhs2
.unknown
= data1
[18:]
418 floor2
= samba
.dcerpc
.epmapper
.epm_floor()
421 lhs3
= samba
.dcerpc
.epmapper
.epm_lhs()
422 lhs3
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_NCACN
424 floor3
= samba
.dcerpc
.epmapper
.epm_floor()
426 floor3
.rhs
.minor_version
= 0
427 lhs4
= samba
.dcerpc
.epmapper
.epm_lhs()
428 lhs4
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
430 floor4
= samba
.dcerpc
.epmapper
.epm_floor()
432 floor4
.rhs
.port
= self
.tcp_port
433 lhs5
= samba
.dcerpc
.epmapper
.epm_lhs()
434 lhs5
.protocol
= samba
.dcerpc
.epmapper
.EPM_PROTOCOL_IP
436 floor5
= samba
.dcerpc
.epmapper
.epm_floor()
438 floor5
.rhs
.ipaddr
= "0.0.0.0"
440 floors
= [floor1
, floor2
, floor3
, floor4
, floor5
]
441 req_tower
= samba
.dcerpc
.epmapper
.epm_tower()
442 req_tower
.num_floors
= len(floors
)
443 req_tower
.floors
= floors
444 req_twr
= samba
.dcerpc
.epmapper
.epm_twr_t()
445 req_twr
.tower
= req_tower
447 epm_map
= samba
.dcerpc
.epmapper
.epm_Map()
448 epm_map
.in_object
= object
449 epm_map
.in_map_tower
= req_twr
450 epm_map
.in_entry_handle
= samba
.dcerpc
.misc
.policy_handle()
451 epm_map
.in_max_towers
= 4
453 self
.do_single_request(call_id
=2, ctx
=ctx
, io
=epm_map
)
455 self
.assertGreaterEqual(epm_map
.out_num_towers
, 1)
456 rep_twr
= epm_map
.out_towers
[0].twr
457 self
.assertIsNotNone(rep_twr
)
458 self
.assertEqual(rep_twr
.tower_length
, 75)
459 self
.assertEqual(rep_twr
.tower
.num_floors
, 5)
460 self
.assertEqual(len(rep_twr
.tower
.floors
), 5)
461 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
462 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
463 self
.assertEqual(rep_twr
.tower
.floors
[3].lhs
.protocol
,
464 samba
.dcerpc
.epmapper
.EPM_PROTOCOL_TCP
)
466 # reconnect to the given port
467 self
._disconnect
("epmap_reconnect")
468 self
.tcp_port
= rep_twr
.tower
.floors
[3].rhs
.port
471 def send_pdu(self
, req
, ndr_print
=None, hexdump
=None):
472 if ndr_print
is None:
473 ndr_print
= self
.do_ndr_print
475 hexdump
= self
.do_hexdump
477 req_pdu
= ndr_pack(req
)
479 sys
.stderr
.write("send_pdu: %s" % samba
.ndr
.ndr_print(req
))
481 sys
.stderr
.write("send_pdu: %d\n%s" % (len(req_pdu
), self
.hexdump(req_pdu
)))
483 sent
= self
.s
.send(req_pdu
, 0)
484 if sent
== len(req_pdu
):
486 req_pdu
= req_pdu
[sent
:]
487 except socket
.error
as e
:
488 self
._disconnect
("send_pdu: %s" % e
)
491 self
._disconnect
("send_pdu: %s" % e
)
496 def recv_raw(self
, hexdump
=None, timeout
=None):
499 hexdump
= self
.do_hexdump
501 if timeout
is not None:
502 self
.s
.settimeout(timeout
)
503 rep_pdu
= self
.s
.recv(0xffff, 0)
504 self
.s
.settimeout(10)
505 if len(rep_pdu
) == 0:
506 self
._disconnect
("recv_raw: EOF")
509 sys
.stderr
.write("recv_raw: %d\n%s" % (len(rep_pdu
), self
.hexdump(rep_pdu
)))
510 except socket
.timeout
as e
:
511 self
.s
.settimeout(10)
512 sys
.stderr
.write("recv_raw: TIMEOUT\n")
514 except socket
.error
as e
:
515 self
._disconnect
("recv_raw: %s" % e
)
518 self
._disconnect
("recv_raw: %s" % e
)
524 def recv_pdu_raw(self
, ndr_print
=None, hexdump
=None, timeout
=None):
527 if ndr_print
is None:
528 ndr_print
= self
.do_ndr_print
530 hexdump
= self
.do_hexdump
532 rep_pdu
= self
.recv_raw(hexdump
=hexdump
, timeout
=timeout
)
535 rep
= ndr_unpack(samba
.dcerpc
.dcerpc
.ncacn_packet
, rep_pdu
, allow_remaining
=True)
537 sys
.stderr
.write("recv_pdu: %s" % samba
.ndr
.ndr_print(rep
))
538 self
.assertEqual(rep
.frag_length
, len(rep_pdu
))
541 return (rep
, rep_pdu
)
543 def recv_pdu(self
, ndr_print
=None, hexdump
=None, timeout
=None):
544 (rep
, rep_pdu
) = self
.recv_pdu_raw(ndr_print
=ndr_print
,
549 def generate_auth(self
,
553 auth_context_id
=None,
555 ndr_print
=None, hexdump
=None):
556 if ndr_print
is None:
557 ndr_print
= self
.do_ndr_print
559 hexdump
= self
.do_hexdump
561 if auth_type
is not None:
562 a
= samba
.dcerpc
.dcerpc
.auth()
563 a
.auth_type
= auth_type
564 a
.auth_level
= auth_level
565 a
.auth_pad_length
= auth_pad_length
566 a
.auth_context_id
= auth_context_id
567 a
.credentials
= auth_blob
571 sys
.stderr
.write("generate_auth: %s" % samba
.ndr
.ndr_print(a
))
573 sys
.stderr
.write("generate_auth: %d\n%s" % (len(ai
), self
.hexdump(ai
)))
579 def parse_auth(self
, auth_info
, ndr_print
=None, hexdump
=None,
580 auth_context
=None, stub_len
=0):
581 if ndr_print
is None:
582 ndr_print
= self
.do_ndr_print
584 hexdump
= self
.do_hexdump
586 if (len(auth_info
) <= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
):
590 sys
.stderr
.write("parse_auth: %d\n%s" % (len(auth_info
), self
.hexdump(auth_info
)))
591 a
= ndr_unpack(samba
.dcerpc
.dcerpc
.auth
, auth_info
, allow_remaining
=True)
593 sys
.stderr
.write("parse_auth: %s" % samba
.ndr
.ndr_print(a
))
595 if auth_context
is not None:
596 self
.assertEquals(a
.auth_type
, auth_context
["auth_type"])
597 self
.assertEquals(a
.auth_level
, auth_context
["auth_level"])
598 self
.assertEquals(a
.auth_reserved
, 0)
599 self
.assertEquals(a
.auth_context_id
, auth_context
["auth_context_id"])
601 self
.assertLessEqual(a
.auth_pad_length
, dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
)
602 self
.assertLessEqual(a
.auth_pad_length
, stub_len
)
606 def check_response_auth(self
, rep
, rep_blob
, auth_context
=None,
607 auth_pad_length
=None):
609 if auth_context
is None:
610 self
.assertEquals(rep
.auth_length
, 0)
611 return rep
.u
.stub_and_verifier
613 ofs_stub
= dcerpc
.DCERPC_REQUEST_LENGTH
614 ofs_sig
= rep
.frag_length
- rep
.auth_length
615 ofs_trailer
= ofs_sig
- dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
616 rep_data
= rep_blob
[ofs_stub
:ofs_trailer
]
617 rep_whole
= rep_blob
[0:ofs_sig
]
618 rep_sig
= rep_blob
[ofs_sig
:]
619 rep_auth_info_blob
= rep_blob
[ofs_trailer
:]
621 rep_auth_info
= self
.parse_auth(rep_auth_info_blob
,
622 auth_context
=auth_context
,
623 stub_len
=len(rep_data
))
624 if auth_pad_length
is not None:
625 self
.assertEquals(rep_auth_info
.auth_pad_length
, auth_pad_length
)
626 self
.assertEquals(rep_auth_info
.credentials
, rep_sig
)
628 if auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PRIVACY
:
629 # TODO: not yet supported here
630 self
.assertTrue(False)
631 elif auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
632 auth_context
["gensec"].check_packet(rep_data
, rep_whole
, rep_sig
)
634 stub_out
= rep_data
[0:len(rep_data
)-rep_auth_info
.auth_pad_length
]
638 def generate_pdu(self
, ptype
, call_id
, payload
,
641 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
642 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
643 drep
=[samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
644 ndr_print
=None, hexdump
=None):
646 if getattr(payload
, 'auth_info', None):
647 ai
= payload
.auth_info
651 p
= samba
.dcerpc
.dcerpc
.ncacn_packet()
652 p
.rpc_vers
= rpc_vers
653 p
.rpc_vers_minor
= rpc_vers_minor
655 p
.pfc_flags
= pfc_flags
658 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
659 p
.auth_length
= len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
666 p
.frag_length
= len(pdu
)
670 def generate_request_auth(self
, call_id
,
671 pfc_flags
=(dcerpc
.DCERPC_PFC_FLAG_FIRST |
672 dcerpc
.DCERPC_PFC_FLAG_LAST
),
679 ndr_print
=None, hexdump
=None):
685 if auth_context
is not None:
686 mod_len
= len(stub
) % dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
689 auth_pad_length
= dcerpc
.DCERPC_AUTH_PAD_ALIGNMENT
- mod_len
690 stub
+= b
'\x00' * auth_pad_length
692 if auth_context
["g_auth_level"] >= samba
.dcerpc
.dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
693 sig_size
= auth_context
["gensec"].sig_size(len(stub
))
697 zero_sig
= b
"\x00" * sig_size
698 auth_info
= self
.generate_auth(auth_type
=auth_context
["auth_type"],
699 auth_level
=auth_context
["auth_level"],
700 auth_pad_length
=auth_pad_length
,
701 auth_context_id
=auth_context
["auth_context_id"],
706 req
= self
.generate_request(call_id
=call_id
,
708 alloc_hint
=alloc_hint
,
709 context_id
=context_id
,
716 if auth_context
is None:
719 req_blob
= samba
.ndr
.ndr_pack(req
)
720 ofs_stub
= dcerpc
.DCERPC_REQUEST_LENGTH
721 ofs_sig
= len(req_blob
) - req
.auth_length
722 ofs_trailer
= ofs_sig
- dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
723 req_data
= req_blob
[ofs_stub
:ofs_trailer
]
724 req_whole
= req_blob
[0:ofs_sig
]
726 if auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PRIVACY
:
727 # TODO: not yet supported here
728 self
.assertTrue(False)
729 elif auth_context
["auth_level"] >= dcerpc
.DCERPC_AUTH_LEVEL_PACKET
:
730 req_sig
= auth_context
["gensec"].sign_packet(req_data
, req_whole
)
733 self
.assertEquals(len(req_sig
), req
.auth_length
)
734 self
.assertEquals(len(req_sig
), sig_size
)
736 stub_sig_ofs
= len(req
.u
.stub_and_verifier
) - sig_size
737 stub
= req
.u
.stub_and_verifier
[0:stub_sig_ofs
] + req_sig
738 req
.u
.stub_and_verifier
= stub
742 def verify_pdu(self
, p
, ptype
, call_id
,
745 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
746 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
747 drep
=[samba
.dcerpc
.dcerpc
.DCERPC_DREP_LE
, 0, 0, 0],
750 self
.assertIsNotNone(p
, "No valid pdu")
752 if getattr(p
.u
, 'auth_info', None):
757 self
.assertEqual(p
.rpc_vers
, rpc_vers
)
758 self
.assertEqual(p
.rpc_vers_minor
, rpc_vers_minor
)
759 self
.assertEqual(p
.ptype
, ptype
)
760 self
.assertEqual(p
.pfc_flags
, pfc_flags
)
761 self
.assertEqual(p
.drep
, drep
)
762 self
.assertGreaterEqual(p
.frag_length
,
763 samba
.dcerpc
.dcerpc
.DCERPC_NCACN_PAYLOAD_OFFSET
)
764 if len(ai
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
765 self
.assertEqual(p
.auth_length
,
766 len(ai
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
)
767 elif auth_length
is not None:
768 self
.assertEqual(p
.auth_length
, auth_length
)
770 self
.assertEqual(p
.auth_length
, 0)
771 self
.assertEqual(p
.call_id
, call_id
)
775 def generate_bind(self
, call_id
,
776 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
777 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
783 ndr_print
=None, hexdump
=None):
785 b
= samba
.dcerpc
.dcerpc
.bind()
786 b
.max_xmit_frag
= max_xmit_frag
787 b
.max_recv_frag
= max_recv_frag
788 b
.assoc_group_id
= assoc_group_id
789 b
.num_contexts
= len(ctx_list
)
790 b
.ctx_list
= ctx_list
791 b
.auth_info
= auth_info
793 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_BIND
,
797 ndr_print
=ndr_print
, hexdump
=hexdump
)
801 def generate_alter(self
, call_id
,
802 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
803 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
809 ndr_print
=None, hexdump
=None):
811 a
= samba
.dcerpc
.dcerpc
.bind()
812 a
.max_xmit_frag
= max_xmit_frag
813 a
.max_recv_frag
= max_recv_frag
814 a
.assoc_group_id
= assoc_group_id
815 a
.num_contexts
= len(ctx_list
)
816 a
.ctx_list
= ctx_list
817 a
.auth_info
= auth_info
819 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ALTER
,
823 ndr_print
=ndr_print
, hexdump
=hexdump
)
827 def generate_auth3(self
, call_id
,
828 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
829 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
831 ndr_print
=None, hexdump
=None):
833 a
= samba
.dcerpc
.dcerpc
.auth3()
834 a
.auth_info
= auth_info
836 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_AUTH3
,
840 ndr_print
=ndr_print
, hexdump
=hexdump
)
844 def generate_request(self
, call_id
,
845 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
846 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
853 ndr_print
=None, hexdump
=None):
855 if alloc_hint
is None:
856 alloc_hint
= len(stub
)
858 r
= samba
.dcerpc
.dcerpc
.request()
859 r
.alloc_hint
= alloc_hint
860 r
.context_id
= context_id
862 if object is not None:
864 r
.stub_and_verifier
= stub
+ auth_info
866 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_REQUEST
,
870 ndr_print
=ndr_print
, hexdump
=hexdump
)
872 if len(auth_info
) > samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
:
873 p
.auth_length
= len(auth_info
) - samba
.dcerpc
.dcerpc
.DCERPC_AUTH_TRAILER_LENGTH
877 def generate_co_cancel(self
, call_id
,
878 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
879 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
881 ndr_print
=None, hexdump
=None):
883 c
= samba
.dcerpc
.dcerpc
.co_cancel()
884 c
.auth_info
= auth_info
886 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_CO_CANCEL
,
890 ndr_print
=ndr_print
, hexdump
=hexdump
)
894 def generate_orphaned(self
, call_id
,
895 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
896 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
898 ndr_print
=None, hexdump
=None):
900 o
= samba
.dcerpc
.dcerpc
.orphaned()
901 o
.auth_info
= auth_info
903 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_ORPHANED
,
907 ndr_print
=ndr_print
, hexdump
=hexdump
)
911 def generate_shutdown(self
, call_id
,
912 pfc_flags
=(samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_FIRST |
913 samba
.dcerpc
.dcerpc
.DCERPC_PFC_FLAG_LAST
),
914 ndr_print
=None, hexdump
=None):
916 s
= samba
.dcerpc
.dcerpc
.shutdown()
918 p
= self
.generate_pdu(ptype
=samba
.dcerpc
.dcerpc
.DCERPC_PKT_SHUTDOWN
,
922 ndr_print
=ndr_print
, hexdump
=hexdump
)
926 def assertIsConnected(self
):
927 self
.assertIsNotNone(self
.s
, msg
="Not connected")
930 def assertNotConnected(self
):
931 self
.assertIsNone(self
.s
, msg
="Is connected")
934 def assertNDRSyntaxEquals(self
, s1
, s2
):
935 self
.assertEqual(s1
.uuid
, s2
.uuid
)
936 self
.assertEqual(s1
.if_version
, s2
.if_version
)
939 def assertPadding(self
, pad
, length
):
940 self
.assertEquals(len(pad
), length
)
942 # sometimes windows sends random bytes
944 # we have IGNORE_RANDOM_PAD=1 to
947 if self
.ignore_random_pad
:
949 zero_pad
= b
'\0' * length
950 self
.assertEquals(pad
, zero_pad
)