From 60099d491b18d460330aaeb49c1560cc5cd1816d Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 23 Oct 2015 15:39:34 +0200 Subject: [PATCH] python/tests: add bind time feature related tests to dcerpc raw protocol tests Signed-off-by: Stefan Metzmacher Reviewed-by: Andreas Schneider --- python/samba/tests/dcerpc/raw_protocol.py | 146 ++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index 3c1d4adb3bc..ef660ea32b7 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -1296,6 +1296,152 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertEquals(rep.u.cancel_count, 0) self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + def test_no_auth_bind_time_none_simple(self): + features = 0 + btf = base.bind_time_features_syntax(features) + + zero_syntax = misc.ndr_syntax_id() + + tsf1_list = [btf] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = zero_syntax + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK) + self.assertEquals(rep.u.ctx_list[0].reason, features) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + def test_no_auth_bind_time_none_ignore_additional(self): + features1 = 0 + btf1 = base.bind_time_features_syntax(features1) + + features2 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN + features2 |= dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING + btf2 = base.bind_time_features_syntax(features2) + + zero_syntax = misc.ndr_syntax_id() + ndr64 = base.transfer_syntax_ndr64() + + tsf1_list = [btf1,btf2,zero_syntax] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = ndr64 + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK) + self.assertEquals(rep.u.ctx_list[0].reason, features1) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + def test_no_auth_bind_time_only_first(self): + features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN + btf1 = base.bind_time_features_syntax(features1) + + features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING + btf2 = base.bind_time_features_syntax(features2) + + zero_syntax = misc.ndr_syntax_id() + + tsf1_list = [zero_syntax,btf1,btf2,zero_syntax] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = zero_syntax + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + def test_no_auth_bind_time_twice(self): + features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN + btf1 = base.bind_time_features_syntax(features1) + + features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING + btf2 = base.bind_time_features_syntax(features2) + + zero_syntax = misc.ndr_syntax_id() + + tsf1_list = [btf1] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = zero_syntax + ctx1.transfer_syntaxes = tsf1_list + + tsf2_list = [btf2] + ctx2 = dcerpc.ctx_list() + ctx2.context_id = 2 + ctx2.num_transfer_syntaxes = len(tsf2_list) + ctx2.abstract_syntax = zero_syntax + ctx2.transfer_syntaxes = tsf2_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1,ctx2]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED) + self.assertEquals(rep.u.num_versions, 1) + self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertEquals(len(rep.u._pad), 3) + self.assertEquals(rep.u._pad, '\0' * 3) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + def _test_auth_none_level_bind(self, auth_level, reason=dcerpc.DCERPC_BIND_NAK_REASON_INVALID_AUTH_TYPE): ndr32 = base.transfer_syntax_ndr() -- 2.11.4.GIT