ldb:ldb_cache.c - fix memory contexts and memory leaks
[Samba.git] / source3 / rpc_client / rpc_transport_tstream.c
blobc62540cb3f83f05ad3cf44fcc33036b351ff1b75
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/tsocket/tsocket.h"
23 #undef DBGC_CLASS
24 #define DBGC_CLASS DBGC_RPC_CLI
26 struct rpc_tstream_state {
27 struct tstream_context *stream;
28 struct tevent_queue *read_queue;
29 struct tevent_queue *write_queue;
30 int timeout;
33 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
35 TALLOC_FREE(s->stream);
38 static bool rpc_tstream_is_connected(void *priv)
40 struct rpc_tstream_state *transp =
41 talloc_get_type_abort(priv, struct rpc_tstream_state);
43 if (!transp->stream) {
44 return false;
47 return true;
50 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
52 struct rpc_tstream_state *transp =
53 talloc_get_type_abort(priv, struct rpc_tstream_state);
54 int orig_timeout;
55 bool ok;
57 ok = rpc_tstream_is_connected(transp);
58 if (!ok) {
59 return 0;
62 orig_timeout = transp->timeout;
64 transp->timeout = timeout;
66 return orig_timeout;
69 struct rpc_tstream_next_vector_state {
70 uint8_t *buf;
71 size_t len;
72 off_t ofs;
73 size_t remaining;
76 static void rpc_tstream_next_vector_init(
77 struct rpc_tstream_next_vector_state *s,
78 uint8_t *buf, size_t len)
80 ZERO_STRUCTP(s);
82 s->buf = buf;
83 s->len = MIN(len, UINT16_MAX);
86 static int rpc_tstream_next_vector(struct tstream_context *stream,
87 void *private_data,
88 TALLOC_CTX *mem_ctx,
89 struct iovec **_vector,
90 size_t *count)
92 struct rpc_tstream_next_vector_state *state =
93 (struct rpc_tstream_next_vector_state *)private_data;
94 struct iovec *vector;
95 ssize_t pending;
96 size_t wanted;
98 if (state->ofs == state->len) {
99 *_vector = NULL;
100 *count = 0;
101 return 0;
104 pending = tstream_pending_bytes(stream);
105 if (pending == -1) {
106 return -1;
109 if (pending == 0 && state->ofs != 0) {
110 /* return a short read */
111 *_vector = NULL;
112 *count = 0;
113 return 0;
116 if (pending == 0) {
117 /* we want at least one byte and recheck again */
118 wanted = 1;
119 } else {
120 size_t missing = state->len - state->ofs;
121 if (pending > missing) {
122 /* there's more available */
123 state->remaining = pending - missing;
124 wanted = missing;
125 } else {
126 /* read what we can get and recheck in the next cycle */
127 wanted = pending;
131 vector = talloc_array(mem_ctx, struct iovec, 1);
132 if (!vector) {
133 return -1;
136 vector[0].iov_base = state->buf + state->ofs;
137 vector[0].iov_len = wanted;
139 state->ofs += wanted;
141 *_vector = vector;
142 *count = 1;
143 return 0;
146 struct rpc_tstream_read_state {
147 struct rpc_tstream_state *transp;
148 struct rpc_tstream_next_vector_state next_vector;
149 ssize_t nread;
152 static void rpc_tstream_read_done(struct tevent_req *subreq);
154 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
155 struct event_context *ev,
156 uint8_t *data, size_t size,
157 void *priv)
159 struct rpc_tstream_state *transp =
160 talloc_get_type_abort(priv, struct rpc_tstream_state);
161 struct tevent_req *req, *subreq;
162 struct rpc_tstream_read_state *state;
163 struct timeval endtime;
165 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
166 if (req == NULL) {
167 return NULL;
169 if (!rpc_tstream_is_connected(transp)) {
170 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
171 return tevent_req_post(req, ev);
173 state->transp = transp;
174 rpc_tstream_next_vector_init(&state->next_vector, data, size);
176 subreq = tstream_readv_pdu_queue_send(state, ev,
177 transp->stream,
178 transp->read_queue,
179 rpc_tstream_next_vector,
180 &state->next_vector);
181 if (subreq == NULL) {
182 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
183 return tevent_req_post(req, ev);
186 endtime = timeval_current_ofs(0, transp->timeout * 1000);
187 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
188 goto fail;
191 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
192 return req;
193 fail:
194 TALLOC_FREE(req);
195 return NULL;
198 static void rpc_tstream_read_done(struct tevent_req *subreq)
200 struct tevent_req *req =
201 tevent_req_callback_data(subreq, struct tevent_req);
202 struct rpc_tstream_read_state *state =
203 tevent_req_data(req, struct rpc_tstream_read_state);
204 int err;
206 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
207 TALLOC_FREE(subreq);
208 if (state->nread < 0) {
209 rpc_tstream_disconnect(state->transp);
210 tevent_req_nterror(req, map_nt_error_from_unix(err));
211 return;
213 tevent_req_done(req);
216 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
218 struct rpc_tstream_read_state *state = tevent_req_data(
219 req, struct rpc_tstream_read_state);
220 NTSTATUS status;
222 if (tevent_req_is_nterror(req, &status)) {
223 return status;
225 *size = state->nread;
226 return NT_STATUS_OK;
229 struct rpc_tstream_write_state {
230 struct event_context *ev;
231 struct rpc_tstream_state *transp;
232 struct iovec iov;
233 ssize_t nwritten;
236 static void rpc_tstream_write_done(struct tevent_req *subreq);
238 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
239 struct event_context *ev,
240 const uint8_t *data, size_t size,
241 void *priv)
243 struct rpc_tstream_state *transp =
244 talloc_get_type_abort(priv, struct rpc_tstream_state);
245 struct tevent_req *req, *subreq;
246 struct rpc_tstream_write_state *state;
247 struct timeval endtime;
249 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
250 if (req == NULL) {
251 return NULL;
253 if (!rpc_tstream_is_connected(transp)) {
254 tevent_req_nterror(req, NT_STATUS_CONNECTION_INVALID);
255 return tevent_req_post(req, ev);
257 state->ev = ev;
258 state->transp = transp;
259 state->iov.iov_base = discard_const_p(void *, data);
260 state->iov.iov_len = size;
262 subreq = tstream_writev_queue_send(state, ev,
263 transp->stream,
264 transp->write_queue,
265 &state->iov, 1);
266 if (subreq == NULL) {
267 goto fail;
270 endtime = timeval_current_ofs(0, transp->timeout * 1000);
271 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
272 goto fail;
275 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
276 return req;
277 fail:
278 TALLOC_FREE(req);
279 return NULL;
282 static void rpc_tstream_write_done(struct tevent_req *subreq)
284 struct tevent_req *req =
285 tevent_req_callback_data(subreq, struct tevent_req);
286 struct rpc_tstream_write_state *state =
287 tevent_req_data(req, struct rpc_tstream_write_state);
288 int err;
290 state->nwritten = tstream_writev_queue_recv(subreq, &err);
291 TALLOC_FREE(subreq);
292 if (state->nwritten < 0) {
293 rpc_tstream_disconnect(state->transp);
294 tevent_req_nterror(req, map_nt_error_from_unix(err));
295 return;
297 tevent_req_done(req);
300 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
302 struct rpc_tstream_write_state *state =
303 tevent_req_data(req, struct rpc_tstream_write_state);
304 NTSTATUS status;
306 if (tevent_req_is_nterror(req, &status)) {
307 return status;
309 *sent = state->nwritten;
310 return NT_STATUS_OK;
314 * @brief Initialize a tstream transport facility
315 * NOTE: this function will talloc_steal, the stream and the queues.
317 * @param mem_ctx - memory context used to allocate the transport
318 * @param stream - a ready to use tstream
319 * @param read_queue - pre-createted tstream read queue
320 * @param write_queue - pre-createted tstream write queue
321 * @param presult - the transport structure
323 * @return - a NT Status error code.
325 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
326 struct tstream_context *stream,
327 struct tevent_queue *read_queue,
328 struct tevent_queue *write_queue,
329 struct rpc_cli_transport **presult)
331 struct rpc_cli_transport *result;
332 struct rpc_tstream_state *state;
334 result = talloc(mem_ctx, struct rpc_cli_transport);
335 if (result == NULL) {
336 return NT_STATUS_NO_MEMORY;
338 state = talloc(result, struct rpc_tstream_state);
339 if (state == NULL) {
340 TALLOC_FREE(result);
341 return NT_STATUS_NO_MEMORY;
343 result->priv = state;
345 state->stream = talloc_steal(state, stream);
346 state->read_queue = talloc_steal(state, read_queue);
347 state->write_queue = talloc_steal(state, write_queue);
348 state->timeout = 10000; /* 10 seconds. */
350 result->trans_send = NULL;
351 result->trans_recv = NULL;
352 result->write_send = rpc_tstream_write_send;
353 result->write_recv = rpc_tstream_write_recv;
354 result->read_send = rpc_tstream_read_send;
355 result->read_recv = rpc_tstream_read_recv;
356 result->is_connected = rpc_tstream_is_connected;
357 result->set_timeout = rpc_tstream_set_timeout;
359 *presult = result;
360 return NT_STATUS_OK;