From 3c474cd4890a37c22b69f716164e2c830ab76c41 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Sun, 11 Sep 2016 23:25:49 +0200 Subject: [PATCH] python/tests: add simple dcerpc co_cancel tests CO_CANCEL is mostly ignored. It's up to the application server implementation to install a cancel handler. The only implementation I found so far is the witness server (see [MS-SWN] WitnessrAsyncNotify), which triggers a FAULT with DCERPC_FAULT_SERVER_UNAVAILABLE. Signed-off-by: Stefan Metzmacher Reviewed-by: Andreas Schneider --- python/samba/tests/dcerpc/raw_protocol.py | 74 +++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index 78aa720bed7..9220f06a56f 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -2317,6 +2317,80 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertEquals(rep.u.reserved, 0) self.assertEquals(len(rep.u.error_and_verifier), 0) + def test_co_cancel_no_request(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + ctx = self.prepare_presentation(abstract, ndr32, context_id=0xff) + + req = self.generate_co_cancel(call_id = 3) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.01) + self.assertIsNone(rep) + self.assertIsConnected() + + # And now try a request + req = self.generate_request(call_id = 1, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + def test_co_cancel_request_after_first(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + ctx = self.prepare_presentation(abstract, ndr32, context_id=0xff) + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.01) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_co_cancel(call_id = 1) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.01) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + # And now try a request + req = self.generate_request(call_id = 2, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + def test_spnego_connect_request(self): ndr32 = base.transfer_syntax_ndr() -- 2.11.4.GIT