From d5b58bb730127a89feced87ad2218c4fdd1f8e1c Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 16 Sep 2016 11:11:58 +0200 Subject: [PATCH] python:tests: add more helper functions to RawDCERPCTest Signed-off-by: Stefan Metzmacher Reviewed-by: Andreas Schneider --- python/samba/tests/__init__.py | 420 ++++++++++++++++++++++++++++++++++------- 1 file changed, 354 insertions(+), 66 deletions(-) diff --git a/python/samba/tests/__init__.py b/python/samba/tests/__init__.py index dec9ff52e5a..de3dc9edd8e 100644 --- a/python/samba/tests/__init__.py +++ b/python/samba/tests/__init__.py @@ -29,6 +29,8 @@ import samba.ndr import samba.dcerpc.dcerpc import samba.dcerpc.base import samba.dcerpc.epmapper +from samba.credentials import Credentials +from samba import gensec import socket import struct import subprocess @@ -273,37 +275,352 @@ class RawDCERPCTest(TestCase): self.connect() - def epmap_reconnect(self, abstract): - ndr32 = samba.dcerpc.base.transfer_syntax_ndr() - - tsf0_list = [ndr32] - ctx0 = samba.dcerpc.dcerpc.ctx_list() - ctx0.context_id = 1 - ctx0.num_transfer_syntaxes = len(tsf0_list) - ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax() - ctx0.transfer_syntaxes = tsf0_list + def get_user_creds(self): + c = Credentials() + c.guess() + username = samba.tests.env_get_var_value('USERNAME') + password = samba.tests.env_get_var_value('PASSWORD') + c.set_username(username) + c.set_password(password) + return c + + def get_anon_creds(self): + c = Credentials() + c.set_anonymous() + return c + + def get_auth_context_creds(self, creds, auth_type, auth_level, + auth_context_id, + g_auth_level=None): + + if g_auth_level is None: + g_auth_level = auth_level + + g = gensec.Security.start_client(self.settings) + g.set_credentials(creds) + g.want_feature(gensec.FEATURE_DCE_STYLE) + g.start_mech_by_authtype(auth_type, g_auth_level) + + auth_context = {} + auth_context["auth_type"] = auth_type + auth_context["auth_level"] = auth_level + auth_context["auth_context_id"] = auth_context_id + auth_context["g_auth_level"] = g_auth_level + auth_context["gensec"] = g + + return auth_context + + def do_generic_bind(self, ctx, auth_context=None, + pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + assoc_group_id=0, call_id=0, + nak_reason=None, alter_fault=None): + ctx_list = [ctx] + + if auth_context is not None: + from_server = "" + (finished, to_server) = auth_context["gensec"].update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_context["auth_type"], + auth_level=auth_context["auth_level"], + auth_context_id=auth_context["auth_context_id"], + auth_blob=to_server) + else: + auth_info = "" - req = self.generate_bind(call_id=0, ctx_list=[ctx0]) + req = self.generate_bind(call_id=call_id, + pfc_flags=pfc_flags, + ctx_list=ctx_list, + assoc_group_id=assoc_group_id, + auth_info=auth_info) self.send_pdu(req) rep = self.recv_pdu() - self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, - req.call_id, auth_length=0) - self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) - self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) - self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) - self.assertEqual(rep.u.secondary_address_size, 4) - self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port) - self.assertEqual(len(rep.u._pad1), 2) - self.assertEqual(rep.u._pad1, '\0' * 2) - self.assertEqual(rep.u.num_results, 1) - self.assertEqual(rep.u.ctx_list[0].result, + if nak_reason is not None: + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.reject_reason, nak_reason) + self.assertEquals(rep.u.num_versions, 1) + self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertEquals(len(rep.u._pad), 3) + self.assertEquals(rep.u._pad, '\0' * 3) + return + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + pfc_flags=pfc_flags) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + if assoc_group_id != 0: + self.assertEquals(rep.u.assoc_group_id, assoc_group_id) + else: + self.assertNotEquals(rep.u.assoc_group_id, 0) + assoc_group_id = rep.u.assoc_group_id + port_str = "%d" % self.tcp_port + port_len = len(port_str) + 1 + mod_len = (2 + port_len) % 4 + if mod_len != 0: + port_pad = 4 - mod_len + else: + port_pad = 0 + self.assertEquals(rep.u.secondary_address_size, port_len) + self.assertEquals(rep.u.secondary_address, port_str) + self.assertEquals(len(rep.u._pad1), port_pad) + # sometimes windows sends random bytes + # self.assertEquals(rep.u._pad1, '\0' * port_pad) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) - self.assertEqual(rep.u.ctx_list[0].reason, + self.assertEquals(rep.u.ctx_list[0].reason, samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) - self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) - self.assertEqual(rep.u.auth_info, '\0' * 0) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0]) + ack = rep + if auth_context is None: + self.assertEquals(rep.auth_length, 0) + self.assertEquals(len(rep.u.auth_info), 0) + return ack + self.assertNotEquals(rep.auth_length, 0) + self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = auth_context["gensec"].update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_context["auth_type"], + auth_level=auth_context["auth_level"], + auth_context_id=auth_context["auth_context_id"], + auth_blob=to_server) + req = self.generate_alter(call_id=call_id, + ctx_list=ctx_list, + assoc_group_id=0xffffffff-assoc_group_id, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + if alter_fault is not None: + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, 0) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, alter_fault) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + return None + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertEquals(rep.u.assoc_group_id, assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(rep.u.secondary_address, '') + self.assertEquals(len(rep.u._pad1), 2) + # sometimes windows sends random bytes + # self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0]) + self.assertNotEquals(rep.auth_length, 0) + self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = auth_context["gensec"].update(from_server) + self.assertTrue(finished) + + return ack + + def prepare_presentation(self, abstract, transfer, object=None, + context_id=0xffff, epmap=False, auth_context=None, + pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + assoc_group_id=0, + return_ack=False): + if epmap: + self.epmap_reconnect(abstract, transfer=transfer, object=object) + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = context_id + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + ack = self.do_generic_bind(ctx=ctx, + auth_context=auth_context, + pfc_flags=pfc_flags, + assoc_group_id=assoc_group_id) + if ack is None: + ctx = None + + if return_ack: + return (ctx, ack) + return ctx + + def do_single_request(self, call_id, ctx, io, + auth_context=None, + object=None, + bigendian=False, ndr64=False, + allow_remaining=False, + send_req=True, + recv_rep=True, + fault_pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + fault_status=None, + fault_context_id=None, + timeout=None, + ndr_print=None, + hexdump=None): + + if fault_context_id is None: + fault_context_id = ctx.context_id + + if ndr_print is None: + ndr_print = self.do_ndr_print + if hexdump is None: + hexdump = self.do_hexdump + + if send_req: + if ndr_print: + sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io)) + stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64) + if hexdump: + sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in))) + else: + # only used for sig_size calculation + stub_in = '\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT + + sig_size = 0 + if auth_context is not None: + mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT + auth_pad_length = 0 + if mod_len > 0: + auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len + stub_in += '\x00' * auth_pad_length + + if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET: + sig_size = auth_context["gensec"].sig_size(len(stub_in)) + else: + sig_size = 16 + + zero_sig = "\x00"*sig_size + auth_info = self.generate_auth(auth_type=auth_context["auth_type"], + auth_level=auth_context["auth_level"], + auth_pad_length=auth_pad_length, + auth_context_id=auth_context["auth_context_id"], + auth_blob=zero_sig) + else: + auth_info="" + + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST + if object is not None: + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID + + req = self.generate_request(call_id=call_id, + context_id=ctx.context_id, + pfc_flags=pfc_flags, + object=object, + opnum=io.opnum(), + stub=stub_in, + auth_info=auth_info) + + if send_req: + if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET: + req_blob = samba.ndr.ndr_pack(req) + ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH + ofs_sig = len(req_blob) - req.auth_length + ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req_data = req_blob[ofs_stub:ofs_trailer] + req_whole = req_blob[0:ofs_sig] + sig = auth_context["gensec"].sign_packet(req_data, req_whole) + auth_info = self.generate_auth(auth_type=auth_context["auth_type"], + auth_level=auth_context["auth_level"], + auth_pad_length=auth_pad_length, + auth_context_id=auth_context["auth_context_id"], + auth_blob=sig) + req = self.generate_request(call_id=call_id, + context_id=ctx.context_id, + pfc_flags=pfc_flags, + object=object, + opnum=io.opnum(), + stub=stub_in, + auth_info=auth_info) + self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump) + if recv_rep: + (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout, + ndr_print=ndr_print, + hexdump=hexdump) + if fault_status: + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=fault_pfc_flags, auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, fault_context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, fault_status) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + return + + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=sig_size) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + if sig_size != 0: + + ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH + ofs_sig = rep.frag_length - rep.auth_length + ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH + rep_data = rep_blob[ofs_stub:ofs_trailer] + rep_whole = rep_blob[0:ofs_sig] + rep_sig = rep_blob[ofs_sig:] + rep_auth_info_blob = rep_blob[ofs_trailer:] + + rep_auth_info = self.parse_auth(rep_auth_info_blob) + self.assertEquals(rep_auth_info.auth_type, auth_context["auth_type"]) + self.assertEquals(rep_auth_info.auth_level, auth_context["auth_level"]) + self.assertLessEqual(rep_auth_info.auth_pad_length, len(rep_data)) + self.assertEquals(rep_auth_info.auth_reserved, 0) + self.assertEquals(rep_auth_info.auth_context_id, auth_context["auth_context_id"]) + self.assertEquals(rep_auth_info.credentials, rep_sig) + + if auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET: + auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig) + + stub_out = rep_data[0:-rep_auth_info.auth_pad_length] + else: + stub_out = rep.u.stub_and_verifier + + if hexdump: + sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out))) + samba.ndr.ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64, + allow_remaining=allow_remaining) + if ndr_print: + sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io)) + + def epmap_reconnect(self, abstract, transfer=None, object=None): + ndr32 = samba.dcerpc.base.transfer_syntax_ndr() + + if transfer is None: + transfer = ndr32 + + if object is None: + object = samba.dcerpc.misc.GUID() + + ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(), + transfer, context_id=0) - # And now try a request data1 = samba.ndr.ndr_pack(abstract) lhs1 = samba.dcerpc.epmapper.epm_lhs() lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID @@ -313,7 +630,7 @@ class RawDCERPCTest(TestCase): floor1 = samba.dcerpc.epmapper.epm_floor() floor1.lhs = lhs1 floor1.rhs = rhs1 - data2 = samba.ndr.ndr_pack(ndr32) + data2 = samba.ndr.ndr_pack(transfer) lhs2 = samba.dcerpc.epmapper.epm_lhs() lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID lhs2.lhs_data = data2[:18] @@ -348,46 +665,17 @@ class RawDCERPCTest(TestCase): req_twr = samba.dcerpc.epmapper.epm_twr_t() req_twr.tower = req_tower - pack_twr = samba.ndr.ndr_pack(req_twr) - - # object - stub = "\x01\x00\x00\x00" - stub += "\x00" * 16 - # tower - stub += "\x02\x00\x00\x00" - stub += pack_twr - # padding? - stub += "\x00" * 1 - # handle - stub += "\x00" * 20 - # max_towers - stub += "\x04\x00\x00\x00" - - # we do an epm_Map() request - req = self.generate_request(call_id = 1, - context_id=ctx0.context_id, - opnum=3, - stub=stub) - self.send_pdu(req) - rep = self.recv_pdu() - self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, - req.call_id, auth_length=0) - self.assertNotEqual(rep.u.alloc_hint, 0) - self.assertEqual(rep.u.context_id, req.u.context_id) - self.assertEqual(rep.u.cancel_count, 0) - self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) - - num_towers = struct.unpack_from("