s3: Fix Coverity ID 242726 Uninitialized scalar variable
[Samba/gebeck_regimport.git] / source3 / rpc_client / rpc_transport_tstream.c
blob01e7a5edc87a16947ecba1c9ac440787f03c5534
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libsmb/cli_np_tstream.h"
25 #include "cli_pipe.h"
27 #undef DBGC_CLASS
28 #define DBGC_CLASS DBGC_RPC_CLI
30 struct rpc_tstream_state {
31 struct tstream_context *stream;
32 struct tevent_queue *read_queue;
33 struct tevent_queue *write_queue;
34 unsigned int timeout;
37 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
39 TALLOC_FREE(s->stream);
42 static bool rpc_tstream_is_connected(void *priv)
44 struct rpc_tstream_state *transp =
45 talloc_get_type_abort(priv, struct rpc_tstream_state);
46 ssize_t ret;
48 if (!transp->stream) {
49 return false;
52 if (!tstream_is_cli_np(transp->stream)) {
53 return true;
56 ret = tstream_pending_bytes(transp->stream);
57 if (ret == -1) {
58 return false;
61 return true;
64 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
66 struct rpc_tstream_state *transp =
67 talloc_get_type_abort(priv, struct rpc_tstream_state);
68 int orig_timeout;
69 bool ok;
71 ok = rpc_tstream_is_connected(transp);
72 if (!ok) {
73 return 0;
76 if (tstream_is_cli_np(transp->stream)) {
77 transp->timeout = timeout;
78 return tstream_cli_np_set_timeout(transp->stream, timeout);
81 orig_timeout = transp->timeout;
83 transp->timeout = timeout;
85 return orig_timeout;
88 struct rpc_tstream_next_vector_state {
89 uint8_t *buf;
90 size_t len;
91 off_t ofs;
92 size_t remaining;
95 static void rpc_tstream_next_vector_init(
96 struct rpc_tstream_next_vector_state *s,
97 uint8_t *buf, size_t len)
99 ZERO_STRUCTP(s);
101 s->buf = buf;
102 s->len = MIN(len, UINT16_MAX);
105 static int rpc_tstream_next_vector(struct tstream_context *stream,
106 void *private_data,
107 TALLOC_CTX *mem_ctx,
108 struct iovec **_vector,
109 size_t *count)
111 struct rpc_tstream_next_vector_state *state =
112 (struct rpc_tstream_next_vector_state *)private_data;
113 struct iovec *vector;
114 ssize_t pending;
115 size_t wanted;
117 if (state->ofs == state->len) {
118 *_vector = NULL;
119 *count = 0;
120 return 0;
123 pending = tstream_pending_bytes(stream);
124 if (pending == -1) {
125 return -1;
128 if (pending == 0 && state->ofs != 0) {
129 /* return a short read */
130 *_vector = NULL;
131 *count = 0;
132 return 0;
135 if (pending == 0) {
136 /* we want at least one byte and recheck again */
137 wanted = 1;
138 } else {
139 size_t missing = state->len - state->ofs;
140 if (pending > missing) {
141 /* there's more available */
142 state->remaining = pending - missing;
143 wanted = missing;
144 } else {
145 /* read what we can get and recheck in the next cycle */
146 wanted = pending;
150 vector = talloc_array(mem_ctx, struct iovec, 1);
151 if (!vector) {
152 return -1;
155 vector[0].iov_base = state->buf + state->ofs;
156 vector[0].iov_len = wanted;
158 state->ofs += wanted;
160 *_vector = vector;
161 *count = 1;
162 return 0;
165 struct rpc_tstream_read_state {
166 struct rpc_tstream_state *transp;
167 struct rpc_tstream_next_vector_state next_vector;
168 ssize_t nread;
171 static void rpc_tstream_read_done(struct tevent_req *subreq);
173 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
174 struct event_context *ev,
175 uint8_t *data, size_t size,
176 void *priv)
178 struct rpc_tstream_state *transp =
179 talloc_get_type_abort(priv, struct rpc_tstream_state);
180 struct tevent_req *req, *subreq;
181 struct rpc_tstream_read_state *state;
182 struct timeval endtime;
184 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
185 if (req == NULL) {
186 return NULL;
188 if (!rpc_tstream_is_connected(transp)) {
189 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
190 return tevent_req_post(req, ev);
192 state->transp = transp;
193 rpc_tstream_next_vector_init(&state->next_vector, data, size);
195 subreq = tstream_readv_pdu_queue_send(state, ev,
196 transp->stream,
197 transp->read_queue,
198 rpc_tstream_next_vector,
199 &state->next_vector);
200 if (subreq == NULL) {
201 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
202 return tevent_req_post(req, ev);
205 endtime = timeval_current_ofs_msec(transp->timeout);
206 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
207 goto fail;
210 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
211 return req;
212 fail:
213 TALLOC_FREE(req);
214 return NULL;
217 static void rpc_tstream_read_done(struct tevent_req *subreq)
219 struct tevent_req *req =
220 tevent_req_callback_data(subreq, struct tevent_req);
221 struct rpc_tstream_read_state *state =
222 tevent_req_data(req, struct rpc_tstream_read_state);
223 int err;
225 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
226 TALLOC_FREE(subreq);
227 if (state->nread < 0) {
228 rpc_tstream_disconnect(state->transp);
229 tevent_req_nterror(req, map_nt_error_from_unix(err));
230 return;
232 tevent_req_done(req);
235 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
237 struct rpc_tstream_read_state *state = tevent_req_data(
238 req, struct rpc_tstream_read_state);
239 NTSTATUS status;
241 if (tevent_req_is_nterror(req, &status)) {
242 return status;
244 *size = state->nread;
245 return NT_STATUS_OK;
248 struct rpc_tstream_write_state {
249 struct event_context *ev;
250 struct rpc_tstream_state *transp;
251 struct iovec iov;
252 ssize_t nwritten;
255 static void rpc_tstream_write_done(struct tevent_req *subreq);
257 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
258 struct event_context *ev,
259 const uint8_t *data, size_t size,
260 void *priv)
262 struct rpc_tstream_state *transp =
263 talloc_get_type_abort(priv, struct rpc_tstream_state);
264 struct tevent_req *req, *subreq;
265 struct rpc_tstream_write_state *state;
266 struct timeval endtime;
268 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
269 if (req == NULL) {
270 return NULL;
272 if (!rpc_tstream_is_connected(transp)) {
273 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
274 return tevent_req_post(req, ev);
276 state->ev = ev;
277 state->transp = transp;
278 state->iov.iov_base = discard_const_p(void *, data);
279 state->iov.iov_len = size;
281 subreq = tstream_writev_queue_send(state, ev,
282 transp->stream,
283 transp->write_queue,
284 &state->iov, 1);
285 if (subreq == NULL) {
286 goto fail;
289 endtime = timeval_current_ofs_msec(transp->timeout);
290 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
291 goto fail;
294 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
295 return req;
296 fail:
297 TALLOC_FREE(req);
298 return NULL;
301 static void rpc_tstream_write_done(struct tevent_req *subreq)
303 struct tevent_req *req =
304 tevent_req_callback_data(subreq, struct tevent_req);
305 struct rpc_tstream_write_state *state =
306 tevent_req_data(req, struct rpc_tstream_write_state);
307 int err;
309 state->nwritten = tstream_writev_queue_recv(subreq, &err);
310 TALLOC_FREE(subreq);
311 if (state->nwritten < 0) {
312 rpc_tstream_disconnect(state->transp);
313 tevent_req_nterror(req, map_nt_error_from_unix(err));
314 return;
316 tevent_req_done(req);
319 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
321 struct rpc_tstream_write_state *state =
322 tevent_req_data(req, struct rpc_tstream_write_state);
323 NTSTATUS status;
325 if (tevent_req_is_nterror(req, &status)) {
326 return status;
328 *sent = state->nwritten;
329 return NT_STATUS_OK;
332 struct rpc_tstream_trans_state {
333 struct tevent_context *ev;
334 struct rpc_tstream_state *transp;
335 struct iovec req;
336 uint32_t max_rdata_len;
337 struct iovec rep;
340 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
341 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
343 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
344 void *private_data,
345 TALLOC_CTX *mem_ctx,
346 struct iovec **_vector,
347 size_t *count);
349 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
350 struct tevent_context *ev,
351 uint8_t *data, size_t data_len,
352 uint32_t max_rdata_len,
353 void *priv)
355 struct rpc_tstream_state *transp =
356 talloc_get_type_abort(priv, struct rpc_tstream_state);
357 struct tevent_req *req, *subreq;
358 struct rpc_tstream_trans_state *state;
359 struct timeval endtime;
360 bool use_trans = false;
362 req = tevent_req_create(mem_ctx, &state,
363 struct rpc_tstream_trans_state);
364 if (req == NULL) {
365 return NULL;
368 if (!rpc_tstream_is_connected(transp)) {
369 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
370 return tevent_req_post(req, ev);
372 state->ev = ev;
373 state->transp = transp;
374 state->req.iov_len = data_len;
375 state->req.iov_base = discard_const_p(void *, data);
376 state->max_rdata_len = max_rdata_len;
378 endtime = timeval_current_ofs_msec(transp->timeout);
380 if (tstream_is_cli_np(transp->stream)) {
381 use_trans = true;
383 if (tevent_queue_length(transp->write_queue) > 0) {
384 use_trans = false;
386 if (tevent_queue_length(transp->read_queue) > 0) {
387 use_trans = false;
390 if (use_trans) {
391 tstream_cli_np_use_trans(transp->stream);
394 subreq = tstream_writev_queue_send(state, ev,
395 transp->stream,
396 transp->write_queue,
397 &state->req, 1);
398 if (tevent_req_nomem(subreq, req)) {
399 return tevent_req_post(req, ev);
401 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
402 return tevent_req_post(req, ev);
404 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
406 subreq = tstream_readv_pdu_queue_send(state, ev,
407 transp->stream,
408 transp->read_queue,
409 rpc_tstream_trans_next_vector,
410 state);
411 if (tevent_req_nomem(subreq, req)) {
412 return tevent_req_post(req, ev);
414 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
415 return tevent_req_post(req, ev);
417 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
419 return req;
422 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
424 struct tevent_req *req =
425 tevent_req_callback_data(subreq,
426 struct tevent_req);
427 struct rpc_tstream_trans_state *state =
428 tevent_req_data(req,
429 struct rpc_tstream_trans_state);
430 int ret;
431 int err;
433 ret = tstream_writev_queue_recv(subreq, &err);
434 TALLOC_FREE(subreq);
435 if (ret == -1) {
436 rpc_tstream_disconnect(state->transp);
437 tevent_req_nterror(req, map_nt_error_from_unix(err));
438 return;
442 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
443 void *private_data,
444 TALLOC_CTX *mem_ctx,
445 struct iovec **_vector,
446 size_t *count)
448 struct rpc_tstream_trans_state *state =
449 talloc_get_type_abort(private_data,
450 struct rpc_tstream_trans_state);
451 struct iovec *vector;
453 if (state->max_rdata_len == state->rep.iov_len) {
454 *_vector = NULL;
455 *count = 0;
456 return 0;
459 state->rep.iov_base = talloc_array(state, uint8_t,
460 state->max_rdata_len);
461 if (state->rep.iov_base == NULL) {
462 return -1;
464 state->rep.iov_len = state->max_rdata_len;
466 vector = talloc_array(mem_ctx, struct iovec, 1);
467 if (!vector) {
468 return -1;
471 vector[0] = state->rep;
473 *_vector = vector;
474 *count = 1;
475 return 0;
478 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
480 struct tevent_req *req =
481 tevent_req_callback_data(subreq,
482 struct tevent_req);
483 struct rpc_tstream_trans_state *state =
484 tevent_req_data(req,
485 struct rpc_tstream_trans_state);
486 int ret;
487 int err;
489 ret = tstream_readv_pdu_queue_recv(subreq, &err);
490 TALLOC_FREE(subreq);
491 if (ret == -1) {
492 rpc_tstream_disconnect(state->transp);
493 tevent_req_nterror(req, map_nt_error_from_unix(err));
494 return;
497 tevent_req_done(req);
500 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
501 uint8_t **prdata, uint32_t *prdata_len)
503 struct rpc_tstream_trans_state *state =
504 tevent_req_data(req,
505 struct rpc_tstream_trans_state);
506 NTSTATUS status;
508 if (tevent_req_is_nterror(req, &status)) {
509 return status;
512 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
513 *prdata_len = state->rep.iov_len;
514 return NT_STATUS_OK;
518 * @brief Initialize a tstream transport facility
519 * NOTE: this function will talloc_steal, the stream and the queues.
521 * @param mem_ctx - memory context used to allocate the transport
522 * @param stream - a ready to use tstream
523 * @param presult - the transport structure
525 * @return - a NT Status error code.
527 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
528 struct tstream_context **stream,
529 struct rpc_cli_transport **presult)
531 struct rpc_cli_transport *result;
532 struct rpc_tstream_state *state;
534 result = talloc(mem_ctx, struct rpc_cli_transport);
535 if (result == NULL) {
536 return NT_STATUS_NO_MEMORY;
538 state = talloc(result, struct rpc_tstream_state);
539 if (state == NULL) {
540 TALLOC_FREE(result);
541 return NT_STATUS_NO_MEMORY;
543 result->priv = state;
545 state->read_queue = tevent_queue_create(state, "read_queue");
546 if (state->read_queue == NULL) {
547 TALLOC_FREE(result);
548 return NT_STATUS_NO_MEMORY;
550 state->write_queue = tevent_queue_create(state, "write_queue");
551 if (state->write_queue == NULL) {
552 TALLOC_FREE(result);
553 return NT_STATUS_NO_MEMORY;
556 state->stream = talloc_move(state, stream);
557 state->timeout = 10000; /* 10 seconds. */
559 if (tstream_is_cli_np(state->stream)) {
560 result->trans_send = rpc_tstream_trans_send;
561 result->trans_recv = rpc_tstream_trans_recv;
562 } else {
563 result->trans_send = NULL;
564 result->trans_recv = NULL;
566 result->write_send = rpc_tstream_write_send;
567 result->write_recv = rpc_tstream_write_recv;
568 result->read_send = rpc_tstream_read_send;
569 result->read_recv = rpc_tstream_read_recv;
570 result->is_connected = rpc_tstream_is_connected;
571 result->set_timeout = rpc_tstream_set_timeout;
573 *presult = result;
574 return NT_STATUS_OK;
577 struct cli_state *rpc_pipe_np_smb_conn(struct rpc_pipe_client *p)
579 struct rpc_tstream_state *transp =
580 talloc_get_type_abort(p->transport->priv,
581 struct rpc_tstream_state);
582 bool ok;
584 ok = rpccli_is_connected(p);
585 if (!ok) {
586 return NULL;
589 if (!tstream_is_cli_np(transp->stream)) {
590 return NULL;
593 return tstream_cli_np_get_cli_state(transp->stream);