s3:rpc_client/rpc_transport_tstream.c: add some logic to handle cli_np tstreams
[Samba.git] / source3 / rpc_client / rpc_transport_tstream.c
blob6d8200e8b4e60e10dbe9517ac1002a6d5108d852
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/tsocket/tsocket.h"
22 #include "libsmb/cli_np_tstream.h"
24 #undef DBGC_CLASS
25 #define DBGC_CLASS DBGC_RPC_CLI
27 struct rpc_tstream_state {
28 struct tstream_context *stream;
29 struct tevent_queue *read_queue;
30 struct tevent_queue *write_queue;
31 unsigned int timeout;
34 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
36 TALLOC_FREE(s->stream);
39 static bool rpc_tstream_is_connected(void *priv)
41 struct rpc_tstream_state *transp =
42 talloc_get_type_abort(priv, struct rpc_tstream_state);
43 ssize_t ret;
45 if (!transp->stream) {
46 return false;
49 if (!tstream_is_cli_np(transp->stream)) {
50 return true;
53 ret = tstream_pending_bytes(transp->stream);
54 if (ret == -1) {
55 return false;
58 return true;
61 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
63 struct rpc_tstream_state *transp =
64 talloc_get_type_abort(priv, struct rpc_tstream_state);
65 int orig_timeout;
66 bool ok;
68 ok = rpc_tstream_is_connected(transp);
69 if (!ok) {
70 return 0;
73 if (tstream_is_cli_np(transp->stream)) {
74 transp->timeout = timeout;
75 return tstream_cli_np_set_timeout(transp->stream, timeout);
78 orig_timeout = transp->timeout;
80 transp->timeout = timeout;
82 return orig_timeout;
85 struct rpc_tstream_next_vector_state {
86 uint8_t *buf;
87 size_t len;
88 off_t ofs;
89 size_t remaining;
92 static void rpc_tstream_next_vector_init(
93 struct rpc_tstream_next_vector_state *s,
94 uint8_t *buf, size_t len)
96 ZERO_STRUCTP(s);
98 s->buf = buf;
99 s->len = MIN(len, UINT16_MAX);
102 static int rpc_tstream_next_vector(struct tstream_context *stream,
103 void *private_data,
104 TALLOC_CTX *mem_ctx,
105 struct iovec **_vector,
106 size_t *count)
108 struct rpc_tstream_next_vector_state *state =
109 (struct rpc_tstream_next_vector_state *)private_data;
110 struct iovec *vector;
111 ssize_t pending;
112 size_t wanted;
114 if (state->ofs == state->len) {
115 *_vector = NULL;
116 *count = 0;
117 return 0;
120 pending = tstream_pending_bytes(stream);
121 if (pending == -1) {
122 return -1;
125 if (pending == 0 && state->ofs != 0) {
126 /* return a short read */
127 *_vector = NULL;
128 *count = 0;
129 return 0;
132 if (pending == 0) {
133 /* we want at least one byte and recheck again */
134 wanted = 1;
135 } else {
136 size_t missing = state->len - state->ofs;
137 if (pending > missing) {
138 /* there's more available */
139 state->remaining = pending - missing;
140 wanted = missing;
141 } else {
142 /* read what we can get and recheck in the next cycle */
143 wanted = pending;
147 vector = talloc_array(mem_ctx, struct iovec, 1);
148 if (!vector) {
149 return -1;
152 vector[0].iov_base = state->buf + state->ofs;
153 vector[0].iov_len = wanted;
155 state->ofs += wanted;
157 *_vector = vector;
158 *count = 1;
159 return 0;
162 struct rpc_tstream_read_state {
163 struct rpc_tstream_state *transp;
164 struct rpc_tstream_next_vector_state next_vector;
165 ssize_t nread;
168 static void rpc_tstream_read_done(struct tevent_req *subreq);
170 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
171 struct event_context *ev,
172 uint8_t *data, size_t size,
173 void *priv)
175 struct rpc_tstream_state *transp =
176 talloc_get_type_abort(priv, struct rpc_tstream_state);
177 struct tevent_req *req, *subreq;
178 struct rpc_tstream_read_state *state;
179 struct timeval endtime;
181 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
182 if (req == NULL) {
183 return NULL;
185 if (!rpc_tstream_is_connected(transp)) {
186 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
187 return tevent_req_post(req, ev);
189 state->transp = transp;
190 rpc_tstream_next_vector_init(&state->next_vector, data, size);
192 subreq = tstream_readv_pdu_queue_send(state, ev,
193 transp->stream,
194 transp->read_queue,
195 rpc_tstream_next_vector,
196 &state->next_vector);
197 if (subreq == NULL) {
198 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
199 return tevent_req_post(req, ev);
202 endtime = timeval_current_ofs(0, transp->timeout * 1000);
203 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
204 goto fail;
207 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
208 return req;
209 fail:
210 TALLOC_FREE(req);
211 return NULL;
214 static void rpc_tstream_read_done(struct tevent_req *subreq)
216 struct tevent_req *req =
217 tevent_req_callback_data(subreq, struct tevent_req);
218 struct rpc_tstream_read_state *state =
219 tevent_req_data(req, struct rpc_tstream_read_state);
220 int err;
222 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
223 TALLOC_FREE(subreq);
224 if (state->nread < 0) {
225 rpc_tstream_disconnect(state->transp);
226 tevent_req_nterror(req, map_nt_error_from_unix(err));
227 return;
229 tevent_req_done(req);
232 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
234 struct rpc_tstream_read_state *state = tevent_req_data(
235 req, struct rpc_tstream_read_state);
236 NTSTATUS status;
238 if (tevent_req_is_nterror(req, &status)) {
239 return status;
241 *size = state->nread;
242 return NT_STATUS_OK;
245 struct rpc_tstream_write_state {
246 struct event_context *ev;
247 struct rpc_tstream_state *transp;
248 struct iovec iov;
249 ssize_t nwritten;
252 static void rpc_tstream_write_done(struct tevent_req *subreq);
254 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
255 struct event_context *ev,
256 const uint8_t *data, size_t size,
257 void *priv)
259 struct rpc_tstream_state *transp =
260 talloc_get_type_abort(priv, struct rpc_tstream_state);
261 struct tevent_req *req, *subreq;
262 struct rpc_tstream_write_state *state;
263 struct timeval endtime;
265 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
266 if (req == NULL) {
267 return NULL;
269 if (!rpc_tstream_is_connected(transp)) {
270 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
271 return tevent_req_post(req, ev);
273 state->ev = ev;
274 state->transp = transp;
275 state->iov.iov_base = discard_const_p(void *, data);
276 state->iov.iov_len = size;
278 subreq = tstream_writev_queue_send(state, ev,
279 transp->stream,
280 transp->write_queue,
281 &state->iov, 1);
282 if (subreq == NULL) {
283 goto fail;
286 endtime = timeval_current_ofs(0, transp->timeout * 1000);
287 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
288 goto fail;
291 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
292 return req;
293 fail:
294 TALLOC_FREE(req);
295 return NULL;
298 static void rpc_tstream_write_done(struct tevent_req *subreq)
300 struct tevent_req *req =
301 tevent_req_callback_data(subreq, struct tevent_req);
302 struct rpc_tstream_write_state *state =
303 tevent_req_data(req, struct rpc_tstream_write_state);
304 int err;
306 state->nwritten = tstream_writev_queue_recv(subreq, &err);
307 TALLOC_FREE(subreq);
308 if (state->nwritten < 0) {
309 rpc_tstream_disconnect(state->transp);
310 tevent_req_nterror(req, map_nt_error_from_unix(err));
311 return;
313 tevent_req_done(req);
316 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
318 struct rpc_tstream_write_state *state =
319 tevent_req_data(req, struct rpc_tstream_write_state);
320 NTSTATUS status;
322 if (tevent_req_is_nterror(req, &status)) {
323 return status;
325 *sent = state->nwritten;
326 return NT_STATUS_OK;
329 struct rpc_tstream_trans_state {
330 struct tevent_context *ev;
331 struct rpc_tstream_state *transp;
332 struct iovec req;
333 uint32_t max_rdata_len;
334 struct iovec rep;
337 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
338 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
340 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
341 void *private_data,
342 TALLOC_CTX *mem_ctx,
343 struct iovec **_vector,
344 size_t *count);
346 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
347 struct tevent_context *ev,
348 uint8_t *data, size_t data_len,
349 uint32_t max_rdata_len,
350 void *priv)
352 struct rpc_tstream_state *transp =
353 talloc_get_type_abort(priv, struct rpc_tstream_state);
354 struct tevent_req *req, *subreq;
355 struct rpc_tstream_trans_state *state;
356 struct timeval endtime;
358 req = tevent_req_create(mem_ctx, &state,
359 struct rpc_tstream_trans_state);
360 if (req == NULL) {
361 return NULL;
364 if (!rpc_tstream_is_connected(transp)) {
365 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
366 return tevent_req_post(req, ev);
368 state->ev = ev;
369 state->transp = transp;
370 state->req.iov_len = data_len;
371 state->req.iov_base = discard_const_p(void *, data);
372 state->max_rdata_len = max_rdata_len;
374 endtime = timeval_current_ofs(0, transp->timeout * 1000);
376 subreq = tstream_writev_queue_send(state, ev,
377 transp->stream,
378 transp->write_queue,
379 &state->req, 1);
380 if (tevent_req_nomem(subreq, req)) {
381 return tevent_req_post(req, ev);
383 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
384 return tevent_req_post(req, ev);
386 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
388 if (tstream_is_cli_np(transp->stream)) {
389 tstream_cli_np_use_trans(transp->stream);
392 subreq = tstream_readv_pdu_queue_send(state, ev,
393 transp->stream,
394 transp->read_queue,
395 rpc_tstream_trans_next_vector,
396 state);
397 if (tevent_req_nomem(subreq, req)) {
398 return tevent_req_post(req, ev);
400 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
401 return tevent_req_post(req, ev);
403 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
405 return req;
408 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
410 struct tevent_req *req =
411 tevent_req_callback_data(subreq,
412 struct tevent_req);
413 struct rpc_tstream_trans_state *state =
414 tevent_req_data(req,
415 struct rpc_tstream_trans_state);
416 int ret;
417 int err;
419 ret = tstream_writev_queue_recv(subreq, &err);
420 TALLOC_FREE(subreq);
421 if (ret == -1) {
422 rpc_tstream_disconnect(state->transp);
423 tevent_req_nterror(req, map_nt_error_from_unix(err));
424 return;
428 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
429 void *private_data,
430 TALLOC_CTX *mem_ctx,
431 struct iovec **_vector,
432 size_t *count)
434 struct rpc_tstream_trans_state *state =
435 talloc_get_type_abort(private_data,
436 struct rpc_tstream_trans_state);
437 struct iovec *vector;
439 if (state->max_rdata_len == state->rep.iov_len) {
440 *_vector = NULL;
441 *count = 0;
442 return 0;
445 state->rep.iov_base = talloc_array(state, uint8_t,
446 state->max_rdata_len);
447 if (state->rep.iov_base == NULL) {
448 return -1;
450 state->rep.iov_len = state->max_rdata_len;
452 vector = talloc_array(mem_ctx, struct iovec, 1);
453 if (!vector) {
454 return -1;
457 vector[0] = state->rep;
459 *_vector = vector;
460 *count = 1;
461 return 0;
464 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
466 struct tevent_req *req =
467 tevent_req_callback_data(subreq,
468 struct tevent_req);
469 struct rpc_tstream_trans_state *state =
470 tevent_req_data(req,
471 struct rpc_tstream_trans_state);
472 int ret;
473 int err;
475 ret = tstream_readv_pdu_queue_recv(subreq, &err);
476 TALLOC_FREE(subreq);
477 if (ret == -1) {
478 rpc_tstream_disconnect(state->transp);
479 tevent_req_nterror(req, map_nt_error_from_unix(err));
480 return;
483 tevent_req_done(req);
486 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
487 uint8_t **prdata, uint32_t *prdata_len)
489 struct rpc_tstream_trans_state *state =
490 tevent_req_data(req,
491 struct rpc_tstream_trans_state);
492 NTSTATUS status;
494 if (tevent_req_is_nterror(req, &status)) {
495 return status;
498 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
499 *prdata_len = state->rep.iov_len;
500 return NT_STATUS_OK;
504 * @brief Initialize a tstream transport facility
505 * NOTE: this function will talloc_steal, the stream and the queues.
507 * @param mem_ctx - memory context used to allocate the transport
508 * @param stream - a ready to use tstream
509 * @param presult - the transport structure
511 * @return - a NT Status error code.
513 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
514 struct tstream_context **stream,
515 struct rpc_cli_transport **presult)
517 struct rpc_cli_transport *result;
518 struct rpc_tstream_state *state;
520 result = talloc(mem_ctx, struct rpc_cli_transport);
521 if (result == NULL) {
522 return NT_STATUS_NO_MEMORY;
524 state = talloc(result, struct rpc_tstream_state);
525 if (state == NULL) {
526 TALLOC_FREE(result);
527 return NT_STATUS_NO_MEMORY;
529 result->priv = state;
531 state->read_queue = tevent_queue_create(state, "read_queue");
532 if (state->read_queue == NULL) {
533 TALLOC_FREE(result);
534 return NT_STATUS_NO_MEMORY;
536 state->write_queue = tevent_queue_create(state, "write_queue");
537 if (state->write_queue == NULL) {
538 TALLOC_FREE(result);
539 return NT_STATUS_NO_MEMORY;
542 state->stream = talloc_move(state, stream);
543 state->timeout = 10000; /* 10 seconds. */
545 if (tstream_is_cli_np(state->stream)) {
546 result->trans_send = rpc_tstream_trans_send;
547 result->trans_recv = rpc_tstream_trans_recv;
548 } else {
549 result->trans_send = NULL;
550 result->trans_recv = NULL;
552 result->write_send = rpc_tstream_write_send;
553 result->write_recv = rpc_tstream_write_recv;
554 result->read_send = rpc_tstream_read_send;
555 result->read_recv = rpc_tstream_read_recv;
556 result->is_connected = rpc_tstream_is_connected;
557 result->set_timeout = rpc_tstream_set_timeout;
559 *presult = result;
560 return NT_STATUS_OK;