2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tsocket/tsocket.h"
24 #define DBGC_CLASS DBGC_RPC_CLI
26 struct rpc_tstream_state
{
27 struct tstream_context
*stream
;
28 struct tevent_queue
*read_queue
;
29 struct tevent_queue
*write_queue
;
33 static void rpc_tstream_disconnect(struct rpc_tstream_state
*s
)
35 TALLOC_FREE(s
->stream
);
38 static bool rpc_tstream_is_connected(void *priv
)
40 struct rpc_tstream_state
*transp
=
41 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
43 if (!transp
->stream
) {
50 static unsigned int rpc_tstream_set_timeout(void *priv
, unsigned int timeout
)
52 struct rpc_tstream_state
*transp
=
53 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
57 ok
= rpc_tstream_is_connected(transp
);
62 orig_timeout
= transp
->timeout
;
64 transp
->timeout
= timeout
;
69 struct rpc_tstream_next_vector_state
{
76 static void rpc_tstream_next_vector_init(
77 struct rpc_tstream_next_vector_state
*s
,
78 uint8_t *buf
, size_t len
)
83 s
->len
= MIN(len
, UINT16_MAX
);
86 static int rpc_tstream_next_vector(struct tstream_context
*stream
,
89 struct iovec
**_vector
,
92 struct rpc_tstream_next_vector_state
*state
=
93 (struct rpc_tstream_next_vector_state
*)private_data
;
98 if (state
->ofs
== state
->len
) {
104 pending
= tstream_pending_bytes(stream
);
109 if (pending
== 0 && state
->ofs
!= 0) {
110 /* return a short read */
117 /* we want at least one byte and recheck again */
120 size_t missing
= state
->len
- state
->ofs
;
121 if (pending
> missing
) {
122 /* there's more available */
123 state
->remaining
= pending
- missing
;
126 /* read what we can get and recheck in the next cycle */
131 vector
= talloc_array(mem_ctx
, struct iovec
, 1);
136 vector
[0].iov_base
= state
->buf
+ state
->ofs
;
137 vector
[0].iov_len
= wanted
;
139 state
->ofs
+= wanted
;
146 struct rpc_tstream_read_state
{
147 struct rpc_tstream_state
*transp
;
148 struct rpc_tstream_next_vector_state next_vector
;
152 static void rpc_tstream_read_done(struct tevent_req
*subreq
);
154 static struct tevent_req
*rpc_tstream_read_send(TALLOC_CTX
*mem_ctx
,
155 struct event_context
*ev
,
156 uint8_t *data
, size_t size
,
159 struct rpc_tstream_state
*transp
=
160 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
161 struct tevent_req
*req
, *subreq
;
162 struct rpc_tstream_read_state
*state
;
163 struct timeval endtime
;
165 req
= tevent_req_create(mem_ctx
, &state
, struct rpc_tstream_read_state
);
169 if (!rpc_tstream_is_connected(transp
)) {
170 tevent_req_nterror(req
, NT_STATUS_CONNECTION_INVALID
);
171 return tevent_req_post(req
, ev
);
173 state
->transp
= transp
;
174 rpc_tstream_next_vector_init(&state
->next_vector
, data
, size
);
176 subreq
= tstream_readv_pdu_queue_send(state
, ev
,
179 rpc_tstream_next_vector
,
180 &state
->next_vector
);
181 if (subreq
== NULL
) {
182 tevent_req_nterror(req
, NT_STATUS_NO_MEMORY
);
183 return tevent_req_post(req
, ev
);
186 endtime
= timeval_current_ofs(0, transp
->timeout
* 1000);
187 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
191 tevent_req_set_callback(subreq
, rpc_tstream_read_done
, req
);
198 static void rpc_tstream_read_done(struct tevent_req
*subreq
)
200 struct tevent_req
*req
=
201 tevent_req_callback_data(subreq
, struct tevent_req
);
202 struct rpc_tstream_read_state
*state
=
203 tevent_req_data(req
, struct rpc_tstream_read_state
);
206 state
->nread
= tstream_readv_pdu_queue_recv(subreq
, &err
);
208 if (state
->nread
< 0) {
209 rpc_tstream_disconnect(state
->transp
);
210 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
213 tevent_req_done(req
);
216 static NTSTATUS
rpc_tstream_read_recv(struct tevent_req
*req
, ssize_t
*size
)
218 struct rpc_tstream_read_state
*state
= tevent_req_data(
219 req
, struct rpc_tstream_read_state
);
222 if (tevent_req_is_nterror(req
, &status
)) {
225 *size
= state
->nread
;
229 struct rpc_tstream_write_state
{
230 struct event_context
*ev
;
231 struct rpc_tstream_state
*transp
;
236 static void rpc_tstream_write_done(struct tevent_req
*subreq
);
238 static struct tevent_req
*rpc_tstream_write_send(TALLOC_CTX
*mem_ctx
,
239 struct event_context
*ev
,
240 const uint8_t *data
, size_t size
,
243 struct rpc_tstream_state
*transp
=
244 talloc_get_type_abort(priv
, struct rpc_tstream_state
);
245 struct tevent_req
*req
, *subreq
;
246 struct rpc_tstream_write_state
*state
;
247 struct timeval endtime
;
249 req
= tevent_req_create(mem_ctx
, &state
, struct rpc_tstream_write_state
);
253 if (!rpc_tstream_is_connected(transp
)) {
254 tevent_req_nterror(req
, NT_STATUS_CONNECTION_INVALID
);
255 return tevent_req_post(req
, ev
);
258 state
->transp
= transp
;
259 state
->iov
.iov_base
= discard_const_p(void *, data
);
260 state
->iov
.iov_len
= size
;
262 subreq
= tstream_writev_queue_send(state
, ev
,
266 if (subreq
== NULL
) {
270 endtime
= timeval_current_ofs(0, transp
->timeout
* 1000);
271 if (!tevent_req_set_endtime(subreq
, ev
, endtime
)) {
275 tevent_req_set_callback(subreq
, rpc_tstream_write_done
, req
);
282 static void rpc_tstream_write_done(struct tevent_req
*subreq
)
284 struct tevent_req
*req
=
285 tevent_req_callback_data(subreq
, struct tevent_req
);
286 struct rpc_tstream_write_state
*state
=
287 tevent_req_data(req
, struct rpc_tstream_write_state
);
290 state
->nwritten
= tstream_writev_queue_recv(subreq
, &err
);
292 if (state
->nwritten
< 0) {
293 rpc_tstream_disconnect(state
->transp
);
294 tevent_req_nterror(req
, map_nt_error_from_unix(err
));
297 tevent_req_done(req
);
300 static NTSTATUS
rpc_tstream_write_recv(struct tevent_req
*req
, ssize_t
*sent
)
302 struct rpc_tstream_write_state
*state
=
303 tevent_req_data(req
, struct rpc_tstream_write_state
);
306 if (tevent_req_is_nterror(req
, &status
)) {
309 *sent
= state
->nwritten
;
314 * @brief Initialize a tstream transport facility
315 * NOTE: this function will talloc_steal, the stream and the queues.
317 * @param mem_ctx - memory context used to allocate the transport
318 * @param stream - a ready to use tstream
319 * @param read_queue - pre-createted tstream read queue
320 * @param write_queue - pre-createted tstream write queue
321 * @param presult - the transport structure
323 * @return - a NT Status error code.
325 NTSTATUS
rpc_transport_tstream_init(TALLOC_CTX
*mem_ctx
,
326 struct tstream_context
*stream
,
327 struct tevent_queue
*read_queue
,
328 struct tevent_queue
*write_queue
,
329 struct rpc_cli_transport
**presult
)
331 struct rpc_cli_transport
*result
;
332 struct rpc_tstream_state
*state
;
334 result
= talloc(mem_ctx
, struct rpc_cli_transport
);
335 if (result
== NULL
) {
336 return NT_STATUS_NO_MEMORY
;
338 state
= talloc(result
, struct rpc_tstream_state
);
341 return NT_STATUS_NO_MEMORY
;
343 result
->priv
= state
;
345 state
->stream
= talloc_steal(state
, stream
);
346 state
->read_queue
= talloc_steal(state
, read_queue
);
347 state
->write_queue
= talloc_steal(state
, write_queue
);
348 state
->timeout
= 10000; /* 10 seconds. */
350 result
->trans_send
= NULL
;
351 result
->trans_recv
= NULL
;
352 result
->write_send
= rpc_tstream_write_send
;
353 result
->write_recv
= rpc_tstream_write_recv
;
354 result
->read_send
= rpc_tstream_read_send
;
355 result
->read_recv
= rpc_tstream_read_recv
;
356 result
->is_connected
= rpc_tstream_is_connected
;
357 result
->set_timeout
= rpc_tstream_set_timeout
;