python: Fix erroneous increments of reference counts
[Samba.git] / source3 / rpc_client / rpc_transport_tstream.c
blob3103fa4603746a96a22c04ebaf27e1efe02d7c4d
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libcli/smb/tstream_smbXcli_np.h"
25 #include "cli_pipe.h"
27 #undef DBGC_CLASS
28 #define DBGC_CLASS DBGC_RPC_CLI
30 struct rpc_tstream_state {
31 struct tstream_context *stream;
32 struct tevent_queue *read_queue;
33 struct tevent_queue *write_queue;
34 unsigned int timeout;
37 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
39 TALLOC_FREE(s->stream);
42 static bool rpc_tstream_is_connected(void *priv)
44 struct rpc_tstream_state *transp =
45 talloc_get_type_abort(priv, struct rpc_tstream_state);
46 ssize_t ret;
48 if (!transp->stream) {
49 return false;
52 if (!tstream_is_smbXcli_np(transp->stream)) {
53 return true;
56 ret = tstream_pending_bytes(transp->stream);
57 if (ret == -1) {
58 return false;
61 return true;
64 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
66 struct rpc_tstream_state *transp =
67 talloc_get_type_abort(priv, struct rpc_tstream_state);
68 int orig_timeout;
69 bool ok;
71 ok = rpc_tstream_is_connected(transp);
72 if (!ok) {
73 return 0;
76 if (tstream_is_smbXcli_np(transp->stream)) {
77 transp->timeout = timeout;
78 return tstream_smbXcli_np_set_timeout(transp->stream, timeout);
81 orig_timeout = transp->timeout;
83 transp->timeout = timeout;
85 return orig_timeout;
88 struct rpc_tstream_next_vector_state {
89 uint8_t *buf;
90 size_t len;
91 off_t ofs;
94 static void rpc_tstream_next_vector_init(
95 struct rpc_tstream_next_vector_state *s,
96 uint8_t *buf, size_t len)
98 ZERO_STRUCTP(s);
100 s->buf = buf;
101 s->len = MIN(len, UINT16_MAX);
104 static int rpc_tstream_next_vector(struct tstream_context *stream,
105 void *private_data,
106 TALLOC_CTX *mem_ctx,
107 struct iovec **_vector,
108 size_t *count)
110 struct rpc_tstream_next_vector_state *state =
111 (struct rpc_tstream_next_vector_state *)private_data;
112 struct iovec *vector;
114 if (state->ofs == state->len) {
115 *_vector = NULL;
116 *count = 0;
117 return 0;
120 vector = talloc_array(mem_ctx, struct iovec, 1);
121 if (!vector) {
122 return -1;
125 vector[0].iov_base = state->buf;
126 vector[0].iov_len = state->len;
128 state->ofs = state->len;
130 *_vector = vector;
131 *count = 1;
132 return 0;
135 struct rpc_tstream_read_state {
136 struct rpc_tstream_state *transp;
137 struct rpc_tstream_next_vector_state next_vector;
138 ssize_t nread;
141 static void rpc_tstream_read_done(struct tevent_req *subreq);
143 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
144 struct tevent_context *ev,
145 uint8_t *data, size_t size,
146 void *priv)
148 struct rpc_tstream_state *transp =
149 talloc_get_type_abort(priv, struct rpc_tstream_state);
150 struct tevent_req *req, *subreq;
151 struct rpc_tstream_read_state *state;
152 struct timeval endtime;
154 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
155 if (req == NULL) {
156 return NULL;
158 if (!rpc_tstream_is_connected(transp)) {
159 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
160 if (tstream_is_smbXcli_np(transp->stream)) {
161 status = NT_STATUS_PIPE_DISCONNECTED;
163 tevent_req_nterror(req, status);
164 return tevent_req_post(req, ev);
166 state->transp = transp;
167 rpc_tstream_next_vector_init(&state->next_vector, data, size);
169 subreq = tstream_readv_pdu_queue_send(state, ev,
170 transp->stream,
171 transp->read_queue,
172 rpc_tstream_next_vector,
173 &state->next_vector);
174 if (subreq == NULL) {
175 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
176 return tevent_req_post(req, ev);
179 endtime = timeval_current_ofs_msec(transp->timeout);
180 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
181 goto fail;
184 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
185 return req;
186 fail:
187 TALLOC_FREE(req);
188 return NULL;
191 static void rpc_tstream_read_done(struct tevent_req *subreq)
193 struct tevent_req *req =
194 tevent_req_callback_data(subreq, struct tevent_req);
195 struct rpc_tstream_read_state *state =
196 tevent_req_data(req, struct rpc_tstream_read_state);
197 int err;
199 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
200 TALLOC_FREE(subreq);
201 if (state->nread < 0) {
202 rpc_tstream_disconnect(state->transp);
203 tevent_req_nterror(req, map_nt_error_from_unix(err));
204 return;
206 tevent_req_done(req);
209 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
211 struct rpc_tstream_read_state *state = tevent_req_data(
212 req, struct rpc_tstream_read_state);
213 NTSTATUS status;
215 if (tevent_req_is_nterror(req, &status)) {
216 return status;
218 *size = state->nread;
219 return NT_STATUS_OK;
222 struct rpc_tstream_write_state {
223 struct tevent_context *ev;
224 struct rpc_tstream_state *transp;
225 struct iovec iov;
226 ssize_t nwritten;
229 static void rpc_tstream_write_done(struct tevent_req *subreq);
231 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
232 struct tevent_context *ev,
233 const uint8_t *data, size_t size,
234 void *priv)
236 struct rpc_tstream_state *transp =
237 talloc_get_type_abort(priv, struct rpc_tstream_state);
238 struct tevent_req *req, *subreq;
239 struct rpc_tstream_write_state *state;
240 struct timeval endtime;
242 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
243 if (req == NULL) {
244 return NULL;
246 if (!rpc_tstream_is_connected(transp)) {
247 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
248 if (tstream_is_smbXcli_np(transp->stream)) {
249 status = NT_STATUS_PIPE_DISCONNECTED;
251 tevent_req_nterror(req, status);
252 return tevent_req_post(req, ev);
254 state->ev = ev;
255 state->transp = transp;
256 state->iov.iov_base = discard_const_p(void *, data);
257 state->iov.iov_len = size;
259 subreq = tstream_writev_queue_send(state, ev,
260 transp->stream,
261 transp->write_queue,
262 &state->iov, 1);
263 if (subreq == NULL) {
264 goto fail;
267 endtime = timeval_current_ofs_msec(transp->timeout);
268 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
269 goto fail;
272 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
273 return req;
274 fail:
275 TALLOC_FREE(req);
276 return NULL;
279 static void rpc_tstream_write_done(struct tevent_req *subreq)
281 struct tevent_req *req =
282 tevent_req_callback_data(subreq, struct tevent_req);
283 struct rpc_tstream_write_state *state =
284 tevent_req_data(req, struct rpc_tstream_write_state);
285 int err;
287 state->nwritten = tstream_writev_queue_recv(subreq, &err);
288 TALLOC_FREE(subreq);
289 if (state->nwritten < 0) {
290 rpc_tstream_disconnect(state->transp);
291 tevent_req_nterror(req, map_nt_error_from_unix(err));
292 return;
294 tevent_req_done(req);
297 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
299 struct rpc_tstream_write_state *state =
300 tevent_req_data(req, struct rpc_tstream_write_state);
301 NTSTATUS status;
303 if (tevent_req_is_nterror(req, &status)) {
304 return status;
306 *sent = state->nwritten;
307 return NT_STATUS_OK;
310 struct rpc_tstream_trans_state {
311 struct tevent_context *ev;
312 struct rpc_tstream_state *transp;
313 struct iovec req;
314 uint32_t max_rdata_len;
315 struct iovec rep;
318 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
319 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
321 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
322 void *private_data,
323 TALLOC_CTX *mem_ctx,
324 struct iovec **_vector,
325 size_t *count);
327 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
328 struct tevent_context *ev,
329 const uint8_t *data, size_t data_len,
330 uint32_t max_rdata_len,
331 void *priv)
333 struct rpc_tstream_state *transp =
334 talloc_get_type_abort(priv, struct rpc_tstream_state);
335 struct tevent_req *req, *subreq;
336 struct rpc_tstream_trans_state *state;
337 struct timeval endtime;
338 bool use_trans = false;
340 req = tevent_req_create(mem_ctx, &state,
341 struct rpc_tstream_trans_state);
342 if (req == NULL) {
343 return NULL;
346 if (!rpc_tstream_is_connected(transp)) {
347 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
348 if (tstream_is_smbXcli_np(transp->stream)) {
349 status = NT_STATUS_PIPE_DISCONNECTED;
351 tevent_req_nterror(req, status);
352 return tevent_req_post(req, ev);
354 state->ev = ev;
355 state->transp = transp;
356 state->req.iov_len = data_len;
357 state->req.iov_base = discard_const_p(void *, data);
358 state->max_rdata_len = max_rdata_len;
360 endtime = timeval_current_ofs_msec(transp->timeout);
362 if (tstream_is_smbXcli_np(transp->stream)) {
363 use_trans = true;
365 if (tevent_queue_length(transp->write_queue) > 0) {
366 use_trans = false;
368 if (tevent_queue_length(transp->read_queue) > 0) {
369 use_trans = false;
372 if (use_trans) {
373 tstream_smbXcli_np_use_trans(transp->stream);
376 subreq = tstream_writev_queue_send(state, ev,
377 transp->stream,
378 transp->write_queue,
379 &state->req, 1);
380 if (tevent_req_nomem(subreq, req)) {
381 return tevent_req_post(req, ev);
383 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
384 return tevent_req_post(req, ev);
386 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
388 subreq = tstream_readv_pdu_queue_send(state, ev,
389 transp->stream,
390 transp->read_queue,
391 rpc_tstream_trans_next_vector,
392 state);
393 if (tevent_req_nomem(subreq, req)) {
394 return tevent_req_post(req, ev);
396 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
397 return tevent_req_post(req, ev);
399 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
401 return req;
404 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
406 struct tevent_req *req =
407 tevent_req_callback_data(subreq,
408 struct tevent_req);
409 struct rpc_tstream_trans_state *state =
410 tevent_req_data(req,
411 struct rpc_tstream_trans_state);
412 int ret;
413 int err;
415 ret = tstream_writev_queue_recv(subreq, &err);
416 TALLOC_FREE(subreq);
417 if (ret == -1) {
418 rpc_tstream_disconnect(state->transp);
419 tevent_req_nterror(req, map_nt_error_from_unix(err));
420 return;
424 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
425 void *private_data,
426 TALLOC_CTX *mem_ctx,
427 struct iovec **_vector,
428 size_t *count)
430 struct rpc_tstream_trans_state *state =
431 talloc_get_type_abort(private_data,
432 struct rpc_tstream_trans_state);
433 struct iovec *vector;
435 if (state->max_rdata_len == state->rep.iov_len) {
436 *_vector = NULL;
437 *count = 0;
438 return 0;
441 state->rep.iov_base = talloc_array(state, uint8_t,
442 state->max_rdata_len);
443 if (state->rep.iov_base == NULL) {
444 return -1;
446 state->rep.iov_len = state->max_rdata_len;
448 vector = talloc_array(mem_ctx, struct iovec, 1);
449 if (!vector) {
450 return -1;
453 vector[0] = state->rep;
455 *_vector = vector;
456 *count = 1;
457 return 0;
460 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
462 struct tevent_req *req =
463 tevent_req_callback_data(subreq,
464 struct tevent_req);
465 struct rpc_tstream_trans_state *state =
466 tevent_req_data(req,
467 struct rpc_tstream_trans_state);
468 int ret;
469 int err;
471 ret = tstream_readv_pdu_queue_recv(subreq, &err);
472 TALLOC_FREE(subreq);
473 if (ret == -1) {
474 rpc_tstream_disconnect(state->transp);
475 tevent_req_nterror(req, map_nt_error_from_unix(err));
476 return;
479 tevent_req_done(req);
482 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
483 uint8_t **prdata, uint32_t *prdata_len)
485 struct rpc_tstream_trans_state *state =
486 tevent_req_data(req,
487 struct rpc_tstream_trans_state);
488 NTSTATUS status;
490 if (tevent_req_is_nterror(req, &status)) {
491 return status;
494 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
495 *prdata_len = state->rep.iov_len;
496 return NT_STATUS_OK;
500 * @brief Initialize a tstream transport facility
501 * NOTE: this function will talloc_steal, the stream and the queues.
503 * @param mem_ctx - memory context used to allocate the transport
504 * @param stream - a ready to use tstream
505 * @param presult - the transport structure
507 * @return - a NT Status error code.
509 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
510 struct tstream_context **stream,
511 struct rpc_cli_transport **presult)
513 struct rpc_cli_transport *result;
514 struct rpc_tstream_state *state;
516 result = talloc(mem_ctx, struct rpc_cli_transport);
517 if (result == NULL) {
518 return NT_STATUS_NO_MEMORY;
520 state = talloc(result, struct rpc_tstream_state);
521 if (state == NULL) {
522 TALLOC_FREE(result);
523 return NT_STATUS_NO_MEMORY;
525 result->priv = state;
527 state->read_queue = tevent_queue_create(state, "read_queue");
528 if (state->read_queue == NULL) {
529 TALLOC_FREE(result);
530 return NT_STATUS_NO_MEMORY;
532 state->write_queue = tevent_queue_create(state, "write_queue");
533 if (state->write_queue == NULL) {
534 TALLOC_FREE(result);
535 return NT_STATUS_NO_MEMORY;
538 state->stream = talloc_move(state, stream);
539 state->timeout = 10000; /* 10 seconds. */
541 if (tstream_is_smbXcli_np(state->stream)) {
542 result->trans_send = rpc_tstream_trans_send;
543 result->trans_recv = rpc_tstream_trans_recv;
544 } else {
545 result->trans_send = NULL;
546 result->trans_recv = NULL;
548 result->write_send = rpc_tstream_write_send;
549 result->write_recv = rpc_tstream_write_recv;
550 result->read_send = rpc_tstream_read_send;
551 result->read_recv = rpc_tstream_read_recv;
552 result->is_connected = rpc_tstream_is_connected;
553 result->set_timeout = rpc_tstream_set_timeout;
555 *presult = result;
556 return NT_STATUS_OK;