s3-rpc_client: move rpc_cli_transport structs and protos to rpc_transport.h
[Samba/gbeck.git] / source3 / rpc_client / rpc_transport_tstream.c
blob248b09f04912b2db942feda2130e6c9cf3fa61db
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "rpc_client/rpc_transport.h"
22 #include "lib/tsocket/tsocket.h"
23 #include "libsmb/cli_np_tstream.h"
24 #include "cli_pipe.h"
26 #undef DBGC_CLASS
27 #define DBGC_CLASS DBGC_RPC_CLI
29 struct rpc_tstream_state {
30 struct tstream_context *stream;
31 struct tevent_queue *read_queue;
32 struct tevent_queue *write_queue;
33 unsigned int timeout;
36 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
38 TALLOC_FREE(s->stream);
41 static bool rpc_tstream_is_connected(void *priv)
43 struct rpc_tstream_state *transp =
44 talloc_get_type_abort(priv, struct rpc_tstream_state);
45 ssize_t ret;
47 if (!transp->stream) {
48 return false;
51 if (!tstream_is_cli_np(transp->stream)) {
52 return true;
55 ret = tstream_pending_bytes(transp->stream);
56 if (ret == -1) {
57 return false;
60 return true;
63 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
65 struct rpc_tstream_state *transp =
66 talloc_get_type_abort(priv, struct rpc_tstream_state);
67 int orig_timeout;
68 bool ok;
70 ok = rpc_tstream_is_connected(transp);
71 if (!ok) {
72 return 0;
75 if (tstream_is_cli_np(transp->stream)) {
76 transp->timeout = timeout;
77 return tstream_cli_np_set_timeout(transp->stream, timeout);
80 orig_timeout = transp->timeout;
82 transp->timeout = timeout;
84 return orig_timeout;
87 struct rpc_tstream_next_vector_state {
88 uint8_t *buf;
89 size_t len;
90 off_t ofs;
91 size_t remaining;
94 static void rpc_tstream_next_vector_init(
95 struct rpc_tstream_next_vector_state *s,
96 uint8_t *buf, size_t len)
98 ZERO_STRUCTP(s);
100 s->buf = buf;
101 s->len = MIN(len, UINT16_MAX);
104 static int rpc_tstream_next_vector(struct tstream_context *stream,
105 void *private_data,
106 TALLOC_CTX *mem_ctx,
107 struct iovec **_vector,
108 size_t *count)
110 struct rpc_tstream_next_vector_state *state =
111 (struct rpc_tstream_next_vector_state *)private_data;
112 struct iovec *vector;
113 ssize_t pending;
114 size_t wanted;
116 if (state->ofs == state->len) {
117 *_vector = NULL;
118 *count = 0;
119 return 0;
122 pending = tstream_pending_bytes(stream);
123 if (pending == -1) {
124 return -1;
127 if (pending == 0 && state->ofs != 0) {
128 /* return a short read */
129 *_vector = NULL;
130 *count = 0;
131 return 0;
134 if (pending == 0) {
135 /* we want at least one byte and recheck again */
136 wanted = 1;
137 } else {
138 size_t missing = state->len - state->ofs;
139 if (pending > missing) {
140 /* there's more available */
141 state->remaining = pending - missing;
142 wanted = missing;
143 } else {
144 /* read what we can get and recheck in the next cycle */
145 wanted = pending;
149 vector = talloc_array(mem_ctx, struct iovec, 1);
150 if (!vector) {
151 return -1;
154 vector[0].iov_base = state->buf + state->ofs;
155 vector[0].iov_len = wanted;
157 state->ofs += wanted;
159 *_vector = vector;
160 *count = 1;
161 return 0;
164 struct rpc_tstream_read_state {
165 struct rpc_tstream_state *transp;
166 struct rpc_tstream_next_vector_state next_vector;
167 ssize_t nread;
170 static void rpc_tstream_read_done(struct tevent_req *subreq);
172 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
173 struct event_context *ev,
174 uint8_t *data, size_t size,
175 void *priv)
177 struct rpc_tstream_state *transp =
178 talloc_get_type_abort(priv, struct rpc_tstream_state);
179 struct tevent_req *req, *subreq;
180 struct rpc_tstream_read_state *state;
181 struct timeval endtime;
183 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
184 if (req == NULL) {
185 return NULL;
187 if (!rpc_tstream_is_connected(transp)) {
188 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
189 return tevent_req_post(req, ev);
191 state->transp = transp;
192 rpc_tstream_next_vector_init(&state->next_vector, data, size);
194 subreq = tstream_readv_pdu_queue_send(state, ev,
195 transp->stream,
196 transp->read_queue,
197 rpc_tstream_next_vector,
198 &state->next_vector);
199 if (subreq == NULL) {
200 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
201 return tevent_req_post(req, ev);
204 endtime = timeval_current_ofs(0, transp->timeout * 1000);
205 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
206 goto fail;
209 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
210 return req;
211 fail:
212 TALLOC_FREE(req);
213 return NULL;
216 static void rpc_tstream_read_done(struct tevent_req *subreq)
218 struct tevent_req *req =
219 tevent_req_callback_data(subreq, struct tevent_req);
220 struct rpc_tstream_read_state *state =
221 tevent_req_data(req, struct rpc_tstream_read_state);
222 int err;
224 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
225 TALLOC_FREE(subreq);
226 if (state->nread < 0) {
227 rpc_tstream_disconnect(state->transp);
228 tevent_req_nterror(req, map_nt_error_from_unix(err));
229 return;
231 tevent_req_done(req);
234 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
236 struct rpc_tstream_read_state *state = tevent_req_data(
237 req, struct rpc_tstream_read_state);
238 NTSTATUS status;
240 if (tevent_req_is_nterror(req, &status)) {
241 return status;
243 *size = state->nread;
244 return NT_STATUS_OK;
247 struct rpc_tstream_write_state {
248 struct event_context *ev;
249 struct rpc_tstream_state *transp;
250 struct iovec iov;
251 ssize_t nwritten;
254 static void rpc_tstream_write_done(struct tevent_req *subreq);
256 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
257 struct event_context *ev,
258 const uint8_t *data, size_t size,
259 void *priv)
261 struct rpc_tstream_state *transp =
262 talloc_get_type_abort(priv, struct rpc_tstream_state);
263 struct tevent_req *req, *subreq;
264 struct rpc_tstream_write_state *state;
265 struct timeval endtime;
267 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
268 if (req == NULL) {
269 return NULL;
271 if (!rpc_tstream_is_connected(transp)) {
272 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
273 return tevent_req_post(req, ev);
275 state->ev = ev;
276 state->transp = transp;
277 state->iov.iov_base = discard_const_p(void *, data);
278 state->iov.iov_len = size;
280 subreq = tstream_writev_queue_send(state, ev,
281 transp->stream,
282 transp->write_queue,
283 &state->iov, 1);
284 if (subreq == NULL) {
285 goto fail;
288 endtime = timeval_current_ofs(0, transp->timeout * 1000);
289 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
290 goto fail;
293 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
294 return req;
295 fail:
296 TALLOC_FREE(req);
297 return NULL;
300 static void rpc_tstream_write_done(struct tevent_req *subreq)
302 struct tevent_req *req =
303 tevent_req_callback_data(subreq, struct tevent_req);
304 struct rpc_tstream_write_state *state =
305 tevent_req_data(req, struct rpc_tstream_write_state);
306 int err;
308 state->nwritten = tstream_writev_queue_recv(subreq, &err);
309 TALLOC_FREE(subreq);
310 if (state->nwritten < 0) {
311 rpc_tstream_disconnect(state->transp);
312 tevent_req_nterror(req, map_nt_error_from_unix(err));
313 return;
315 tevent_req_done(req);
318 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
320 struct rpc_tstream_write_state *state =
321 tevent_req_data(req, struct rpc_tstream_write_state);
322 NTSTATUS status;
324 if (tevent_req_is_nterror(req, &status)) {
325 return status;
327 *sent = state->nwritten;
328 return NT_STATUS_OK;
331 struct rpc_tstream_trans_state {
332 struct tevent_context *ev;
333 struct rpc_tstream_state *transp;
334 struct iovec req;
335 uint32_t max_rdata_len;
336 struct iovec rep;
339 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
340 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
342 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
343 void *private_data,
344 TALLOC_CTX *mem_ctx,
345 struct iovec **_vector,
346 size_t *count);
348 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
349 struct tevent_context *ev,
350 uint8_t *data, size_t data_len,
351 uint32_t max_rdata_len,
352 void *priv)
354 struct rpc_tstream_state *transp =
355 talloc_get_type_abort(priv, struct rpc_tstream_state);
356 struct tevent_req *req, *subreq;
357 struct rpc_tstream_trans_state *state;
358 struct timeval endtime;
360 req = tevent_req_create(mem_ctx, &state,
361 struct rpc_tstream_trans_state);
362 if (req == NULL) {
363 return NULL;
366 if (!rpc_tstream_is_connected(transp)) {
367 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
368 return tevent_req_post(req, ev);
370 state->ev = ev;
371 state->transp = transp;
372 state->req.iov_len = data_len;
373 state->req.iov_base = discard_const_p(void *, data);
374 state->max_rdata_len = max_rdata_len;
376 endtime = timeval_current_ofs(0, transp->timeout * 1000);
378 subreq = tstream_writev_queue_send(state, ev,
379 transp->stream,
380 transp->write_queue,
381 &state->req, 1);
382 if (tevent_req_nomem(subreq, req)) {
383 return tevent_req_post(req, ev);
385 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
386 return tevent_req_post(req, ev);
388 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
390 if (tstream_is_cli_np(transp->stream)) {
391 tstream_cli_np_use_trans(transp->stream);
394 subreq = tstream_readv_pdu_queue_send(state, ev,
395 transp->stream,
396 transp->read_queue,
397 rpc_tstream_trans_next_vector,
398 state);
399 if (tevent_req_nomem(subreq, req)) {
400 return tevent_req_post(req, ev);
402 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
403 return tevent_req_post(req, ev);
405 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
407 return req;
410 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
412 struct tevent_req *req =
413 tevent_req_callback_data(subreq,
414 struct tevent_req);
415 struct rpc_tstream_trans_state *state =
416 tevent_req_data(req,
417 struct rpc_tstream_trans_state);
418 int ret;
419 int err;
421 ret = tstream_writev_queue_recv(subreq, &err);
422 TALLOC_FREE(subreq);
423 if (ret == -1) {
424 rpc_tstream_disconnect(state->transp);
425 tevent_req_nterror(req, map_nt_error_from_unix(err));
426 return;
430 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
431 void *private_data,
432 TALLOC_CTX *mem_ctx,
433 struct iovec **_vector,
434 size_t *count)
436 struct rpc_tstream_trans_state *state =
437 talloc_get_type_abort(private_data,
438 struct rpc_tstream_trans_state);
439 struct iovec *vector;
441 if (state->max_rdata_len == state->rep.iov_len) {
442 *_vector = NULL;
443 *count = 0;
444 return 0;
447 state->rep.iov_base = talloc_array(state, uint8_t,
448 state->max_rdata_len);
449 if (state->rep.iov_base == NULL) {
450 return -1;
452 state->rep.iov_len = state->max_rdata_len;
454 vector = talloc_array(mem_ctx, struct iovec, 1);
455 if (!vector) {
456 return -1;
459 vector[0] = state->rep;
461 *_vector = vector;
462 *count = 1;
463 return 0;
466 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
468 struct tevent_req *req =
469 tevent_req_callback_data(subreq,
470 struct tevent_req);
471 struct rpc_tstream_trans_state *state =
472 tevent_req_data(req,
473 struct rpc_tstream_trans_state);
474 int ret;
475 int err;
477 ret = tstream_readv_pdu_queue_recv(subreq, &err);
478 TALLOC_FREE(subreq);
479 if (ret == -1) {
480 rpc_tstream_disconnect(state->transp);
481 tevent_req_nterror(req, map_nt_error_from_unix(err));
482 return;
485 tevent_req_done(req);
488 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
489 uint8_t **prdata, uint32_t *prdata_len)
491 struct rpc_tstream_trans_state *state =
492 tevent_req_data(req,
493 struct rpc_tstream_trans_state);
494 NTSTATUS status;
496 if (tevent_req_is_nterror(req, &status)) {
497 return status;
500 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
501 *prdata_len = state->rep.iov_len;
502 return NT_STATUS_OK;
506 * @brief Initialize a tstream transport facility
507 * NOTE: this function will talloc_steal, the stream and the queues.
509 * @param mem_ctx - memory context used to allocate the transport
510 * @param stream - a ready to use tstream
511 * @param presult - the transport structure
513 * @return - a NT Status error code.
515 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
516 struct tstream_context **stream,
517 struct rpc_cli_transport **presult)
519 struct rpc_cli_transport *result;
520 struct rpc_tstream_state *state;
522 result = talloc(mem_ctx, struct rpc_cli_transport);
523 if (result == NULL) {
524 return NT_STATUS_NO_MEMORY;
526 state = talloc(result, struct rpc_tstream_state);
527 if (state == NULL) {
528 TALLOC_FREE(result);
529 return NT_STATUS_NO_MEMORY;
531 result->priv = state;
533 state->read_queue = tevent_queue_create(state, "read_queue");
534 if (state->read_queue == NULL) {
535 TALLOC_FREE(result);
536 return NT_STATUS_NO_MEMORY;
538 state->write_queue = tevent_queue_create(state, "write_queue");
539 if (state->write_queue == NULL) {
540 TALLOC_FREE(result);
541 return NT_STATUS_NO_MEMORY;
544 state->stream = talloc_move(state, stream);
545 state->timeout = 10000; /* 10 seconds. */
547 if (tstream_is_cli_np(state->stream)) {
548 result->trans_send = rpc_tstream_trans_send;
549 result->trans_recv = rpc_tstream_trans_recv;
550 } else {
551 result->trans_send = NULL;
552 result->trans_recv = NULL;
554 result->write_send = rpc_tstream_write_send;
555 result->write_recv = rpc_tstream_write_recv;
556 result->read_send = rpc_tstream_read_send;
557 result->read_recv = rpc_tstream_read_recv;
558 result->is_connected = rpc_tstream_is_connected;
559 result->set_timeout = rpc_tstream_set_timeout;
561 *presult = result;
562 return NT_STATUS_OK;
565 struct cli_state *rpc_pipe_np_smb_conn(struct rpc_pipe_client *p)
567 struct rpc_tstream_state *transp =
568 talloc_get_type_abort(p->transport->priv,
569 struct rpc_tstream_state);
570 bool ok;
572 ok = rpccli_is_connected(p);
573 if (!ok) {
574 return NULL;
577 if (!tstream_is_cli_np(transp->stream)) {
578 return NULL;
581 return tstream_cli_np_get_cli_state(transp->stream);