2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libsmb/cli_np_tstream.h"
28 #define DBGC_CLASS DBGC_RPC_CLI
30 struct rpc_tstream_state
{
31 struct tstream_context
*stream
;
32 struct tevent_queue
*read_queue
;
33 struct tevent_queue
*write_queue
;
37 static void rpc_tstream_disconnect(struct rpc_tstream_state
*s
)
39 TALLOC_FREE(s
->stream
);
42 static bool rpc_tstream_is_connected(void *priv
)
44 struct rpc_tstream_state
*transp
=
45 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
48 if (!transp
->stream
) {
52 if (!tstream_is_cli_np(transp
->stream
)) {
56 ret
= tstream_pending_bytes(transp
->stream
);
64 static unsigned int rpc_tstream_set_timeout(void *priv
, unsigned int timeout
)
66 struct rpc_tstream_state
*transp
=
67 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
71 ok
= rpc_tstream_is_connected(transp
);
76 if (tstream_is_cli_np(transp
->stream
)) {
77 transp
->timeout
= timeout
;
78 return tstream_cli_np_set_timeout(transp
->stream
, timeout
);
81 orig_timeout
= transp
->timeout
;
83 transp
->timeout
= timeout
;
88 struct rpc_tstream_next_vector_state
{
95 static void rpc_tstream_next_vector_init(
96 struct rpc_tstream_next_vector_state
*s
,
97 uint8_t *buf
, size_t len
)
102 s
->len
= MIN(len
, UINT16_MAX
);
105 static int rpc_tstream_next_vector(struct tstream_context
*stream
,
108 struct iovec
**_vector
,
111 struct rpc_tstream_next_vector_state
*state
=
112 (struct rpc_tstream_next_vector_state
*)private_data
;
113 struct iovec
*vector
;
117 if (state
->ofs
== state
->len
) {
123 pending
= tstream_pending_bytes(stream
);
128 if (pending
== 0 && state
->ofs
!= 0) {
129 /* return a short read */
136 /* we want at least one byte and recheck again */
139 size_t missing
= state
->len
- state
->ofs
;
140 if (pending
> missing
) {
141 /* there's more available */
142 state
->remaining
= pending
- missing
;
145 /* read what we can get and recheck in the next cycle */
150 vector
= talloc_array(mem_ctx
, struct iovec
, 1);
155 vector
[0].iov_base
= state
->buf
+ state
->ofs
;
156 vector
[0].iov_len
= wanted
;
158 state
->ofs
+= wanted
;
165 struct rpc_tstream_read_state
{
166 struct rpc_tstream_state
*transp
;
167 struct rpc_tstream_next_vector_state next_vector
;
171 static void rpc_tstream_read_done(struct tevent_req
*subreq
);
173 static struct tevent_req
*rpc_tstream_read_send(TALLOC_CTX
*mem_ctx
,
174 struct event_context
*ev
,
175 uint8_t *data
, size_t size
,
178 struct rpc_tstream_state
*transp
=
179 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
180 struct tevent_req
*req
, *subreq
;
181 struct rpc_tstream_read_state
*state
;
182 struct timeval endtime
;
184 req
= tevent_req_create(mem_ctx
, &state
, struct rpc_tstream_read_state
);
188 if (!rpc_tstream_is_connected(transp
)) {
189 tevent_req_nterror(req
, NT_STATUS_CONNECTION_INVALID
);
190 return tevent_req_post(req
, ev
);
192 state
->transp
= transp
;
193 rpc_tstream_next_vector_init(&state
->next_vector
, data
, size
);
195 subreq
= tstream_readv_pdu_queue_send(state
, ev
,
198 rpc_tstream_next_vector
,
199 &state
->next_vector
);
200 if (subreq
== NULL
) {
201 tevent_req_nterror(req
, NT_STATUS_NO_MEMORY
);
202 return tevent_req_post(req
, ev
);
205 endtime
= timeval_current_ofs_msec(transp
->timeout
);
206 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
210 tevent_req_set_callback(subreq
, rpc_tstream_read_done
, req
);
217 static void rpc_tstream_read_done(struct tevent_req
*subreq
)
219 struct tevent_req
*req
=
220 tevent_req_callback_data(subreq
, struct tevent_req
);
221 struct rpc_tstream_read_state
*state
=
222 tevent_req_data(req
, struct rpc_tstream_read_state
);
225 state
->nread
= tstream_readv_pdu_queue_recv(subreq
, &err
);
227 if (state
->nread
< 0) {
228 rpc_tstream_disconnect(state
->transp
);
229 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
232 tevent_req_done(req
);
235 static NTSTATUS
rpc_tstream_read_recv(struct tevent_req
*req
, ssize_t
*size
)
237 struct rpc_tstream_read_state
*state
= tevent_req_data(
238 req
, struct rpc_tstream_read_state
);
241 if (tevent_req_is_nterror(req
, &status
)) {
244 *size
= state
->nread
;
248 struct rpc_tstream_write_state
{
249 struct event_context
*ev
;
250 struct rpc_tstream_state
*transp
;
255 static void rpc_tstream_write_done(struct tevent_req
*subreq
);
257 static struct tevent_req
*rpc_tstream_write_send(TALLOC_CTX
*mem_ctx
,
258 struct event_context
*ev
,
259 const uint8_t *data
, size_t size
,
262 struct rpc_tstream_state
*transp
=
263 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
264 struct tevent_req
*req
, *subreq
;
265 struct rpc_tstream_write_state
*state
;
266 struct timeval endtime
;
268 req
= tevent_req_create(mem_ctx
, &state
, struct rpc_tstream_write_state
);
272 if (!rpc_tstream_is_connected(transp
)) {
273 tevent_req_nterror(req
, NT_STATUS_CONNECTION_INVALID
);
274 return tevent_req_post(req
, ev
);
277 state
->transp
= transp
;
278 state
->iov
.iov_base
= discard_const_p(void *, data
);
279 state
->iov
.iov_len
= size
;
281 subreq
= tstream_writev_queue_send(state
, ev
,
285 if (subreq
== NULL
) {
289 endtime
= timeval_current_ofs_msec(transp
->timeout
);
290 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
294 tevent_req_set_callback(subreq
, rpc_tstream_write_done
, req
);
301 static void rpc_tstream_write_done(struct tevent_req
*subreq
)
303 struct tevent_req
*req
=
304 tevent_req_callback_data(subreq
, struct tevent_req
);
305 struct rpc_tstream_write_state
*state
=
306 tevent_req_data(req
, struct rpc_tstream_write_state
);
309 state
->nwritten
= tstream_writev_queue_recv(subreq
, &err
);
311 if (state
->nwritten
< 0) {
312 rpc_tstream_disconnect(state
->transp
);
313 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
316 tevent_req_done(req
);
319 static NTSTATUS
rpc_tstream_write_recv(struct tevent_req
*req
, ssize_t
*sent
)
321 struct rpc_tstream_write_state
*state
=
322 tevent_req_data(req
, struct rpc_tstream_write_state
);
325 if (tevent_req_is_nterror(req
, &status
)) {
328 *sent
= state
->nwritten
;
332 struct rpc_tstream_trans_state
{
333 struct tevent_context
*ev
;
334 struct rpc_tstream_state
*transp
;
336 uint32_t max_rdata_len
;
340 static void rpc_tstream_trans_writev(struct tevent_req
*subreq
);
341 static void rpc_tstream_trans_readv_pdu(struct tevent_req
*subreq
);
343 static int rpc_tstream_trans_next_vector(struct tstream_context
*stream
,
346 struct iovec
**_vector
,
349 static struct tevent_req
*rpc_tstream_trans_send(TALLOC_CTX
*mem_ctx
,
350 struct tevent_context
*ev
,
351 uint8_t *data
, size_t data_len
,
352 uint32_t max_rdata_len
,
355 struct rpc_tstream_state
*transp
=
356 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
357 struct tevent_req
*req
, *subreq
;
358 struct rpc_tstream_trans_state
*state
;
359 struct timeval endtime
;
360 bool use_trans
= false;
362 req
= tevent_req_create(mem_ctx
, &state
,
363 struct rpc_tstream_trans_state
);
368 if (!rpc_tstream_is_connected(transp
)) {
369 tevent_req_nterror(req
, NT_STATUS_CONNECTION_INVALID
);
370 return tevent_req_post(req
, ev
);
373 state
->transp
= transp
;
374 state
->req
.iov_len
= data_len
;
375 state
->req
.iov_base
= discard_const_p(void *, data
);
376 state
->max_rdata_len
= max_rdata_len
;
378 endtime
= timeval_current_ofs_msec(transp
->timeout
);
380 if (tstream_is_cli_np(transp
->stream
)) {
383 if (tevent_queue_length(transp
->write_queue
) > 0) {
386 if (tevent_queue_length(transp
->read_queue
) > 0) {
391 tstream_cli_np_use_trans(transp
->stream
);
394 subreq
= tstream_writev_queue_send(state
, ev
,
398 if (tevent_req_nomem(subreq
, req
)) {
399 return tevent_req_post(req
, ev
);
401 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
402 return tevent_req_post(req
, ev
);
404 tevent_req_set_callback(subreq
, rpc_tstream_trans_writev
, req
);
406 subreq
= tstream_readv_pdu_queue_send(state
, ev
,
409 rpc_tstream_trans_next_vector
,
411 if (tevent_req_nomem(subreq
, req
)) {
412 return tevent_req_post(req
, ev
);
414 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
415 return tevent_req_post(req
, ev
);
417 tevent_req_set_callback(subreq
, rpc_tstream_trans_readv_pdu
, req
);
422 static void rpc_tstream_trans_writev(struct tevent_req
*subreq
)
424 struct tevent_req
*req
=
425 tevent_req_callback_data(subreq
,
427 struct rpc_tstream_trans_state
*state
=
429 struct rpc_tstream_trans_state
);
433 ret
= tstream_writev_queue_recv(subreq
, &err
);
436 rpc_tstream_disconnect(state
->transp
);
437 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
442 static int rpc_tstream_trans_next_vector(struct tstream_context
*stream
,
445 struct iovec
**_vector
,
448 struct rpc_tstream_trans_state
*state
=
449 talloc_get_type_abort(private_data
,
450 struct rpc_tstream_trans_state
);
451 struct iovec
*vector
;
453 if (state
->max_rdata_len
== state
->rep
.iov_len
) {
459 state
->rep
.iov_base
= talloc_array(state
, uint8_t,
460 state
->max_rdata_len
);
461 if (state
->rep
.iov_base
== NULL
) {
464 state
->rep
.iov_len
= state
->max_rdata_len
;
466 vector
= talloc_array(mem_ctx
, struct iovec
, 1);
471 vector
[0] = state
->rep
;
478 static void rpc_tstream_trans_readv_pdu(struct tevent_req
*subreq
)
480 struct tevent_req
*req
=
481 tevent_req_callback_data(subreq
,
483 struct rpc_tstream_trans_state
*state
=
485 struct rpc_tstream_trans_state
);
489 ret
= tstream_readv_pdu_queue_recv(subreq
, &err
);
492 rpc_tstream_disconnect(state
->transp
);
493 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
497 tevent_req_done(req
);
500 static NTSTATUS
rpc_tstream_trans_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
501 uint8_t **prdata
, uint32_t *prdata_len
)
503 struct rpc_tstream_trans_state
*state
=
505 struct rpc_tstream_trans_state
);
508 if (tevent_req_is_nterror(req
, &status
)) {
512 *prdata
= (uint8_t *)talloc_move(mem_ctx
, &state
->rep
.iov_base
);
513 *prdata_len
= state
->rep
.iov_len
;
518 * @brief Initialize a tstream transport facility
519 * NOTE: this function will talloc_steal, the stream and the queues.
521 * @param mem_ctx - memory context used to allocate the transport
522 * @param stream - a ready to use tstream
523 * @param presult - the transport structure
525 * @return - a NT Status error code.
527 NTSTATUS
rpc_transport_tstream_init(TALLOC_CTX
*mem_ctx
,
528 struct tstream_context
**stream
,
529 struct rpc_cli_transport
**presult
)
531 struct rpc_cli_transport
*result
;
532 struct rpc_tstream_state
*state
;
534 result
= talloc(mem_ctx
, struct rpc_cli_transport
);
535 if (result
== NULL
) {
536 return NT_STATUS_NO_MEMORY
;
538 state
= talloc(result
, struct rpc_tstream_state
);
541 return NT_STATUS_NO_MEMORY
;
543 result
->priv
= state
;
545 state
->read_queue
= tevent_queue_create(state
, "read_queue");
546 if (state
->read_queue
== NULL
) {
548 return NT_STATUS_NO_MEMORY
;
550 state
->write_queue
= tevent_queue_create(state
, "write_queue");
551 if (state
->write_queue
== NULL
) {
553 return NT_STATUS_NO_MEMORY
;
556 state
->stream
= talloc_move(state
, stream
);
557 state
->timeout
= 10000; /* 10 seconds. */
559 if (tstream_is_cli_np(state
->stream
)) {
560 result
->trans_send
= rpc_tstream_trans_send
;
561 result
->trans_recv
= rpc_tstream_trans_recv
;
563 result
->trans_send
= NULL
;
564 result
->trans_recv
= NULL
;
566 result
->write_send
= rpc_tstream_write_send
;
567 result
->write_recv
= rpc_tstream_write_recv
;
568 result
->read_send
= rpc_tstream_read_send
;
569 result
->read_recv
= rpc_tstream_read_recv
;
570 result
->is_connected
= rpc_tstream_is_connected
;
571 result
->set_timeout
= rpc_tstream_set_timeout
;
577 struct cli_state
*rpc_pipe_np_smb_conn(struct rpc_pipe_client
*p
)
579 struct rpc_tstream_state
*transp
=
580 talloc_get_type_abort(p
->transport
->priv
,
581 struct rpc_tstream_state
);
584 ok
= rpccli_is_connected(p
);
589 if (!tstream_is_cli_np(transp
->stream
)) {
593 return tstream_cli_np_get_cli_state(transp
->stream
);