s3: Avoid a thundering herd in g_lock_unlock
[Samba.git] / lib / tsocket / tsocket_helpers.c
blobd8db8640580b963463e8cdcd1c5e21f0131cf842
1 /*
2 Unix SMB/CIFS implementation.
4 Copyright (C) Stefan Metzmacher 2009
6 ** NOTE! The following LGPL license applies to the tsocket
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "tsocket.h"
27 #include "tsocket_internal.h"
29 struct tdgram_sendto_queue_state {
30 /* this structs are owned by the caller */
31 struct {
32 struct tevent_context *ev;
33 struct tdgram_context *dgram;
34 const uint8_t *buf;
35 size_t len;
36 const struct tsocket_address *dst;
37 } caller;
38 ssize_t ret;
41 static void tdgram_sendto_queue_trigger(struct tevent_req *req,
42 void *private_data);
43 static void tdgram_sendto_queue_done(struct tevent_req *subreq);
45 /**
46 * @brief Queue a dgram blob for sending through the socket
47 * @param[in] mem_ctx The memory context for the result
48 * @param[in] ev The event context the operation should work on
49 * @param[in] dgram The tdgram_context to send the message buffer
50 * @param[in] queue The existing dgram queue
51 * @param[in] buf The message buffer
52 * @param[in] len The message length
53 * @param[in] dst The destination socket address
54 * @retval The async request handle
56 * This function queues a blob for sending to destination through an existing
57 * dgram socket. The async callback is triggered when the whole blob is
58 * delivered to the underlying system socket.
60 * The caller needs to make sure that all non-scalar input parameters hang
61 * arround for the whole lifetime of the request.
63 struct tevent_req *tdgram_sendto_queue_send(TALLOC_CTX *mem_ctx,
64 struct tevent_context *ev,
65 struct tdgram_context *dgram,
66 struct tevent_queue *queue,
67 const uint8_t *buf,
68 size_t len,
69 struct tsocket_address *dst)
71 struct tevent_req *req;
72 struct tdgram_sendto_queue_state *state;
73 bool ok;
75 req = tevent_req_create(mem_ctx, &state,
76 struct tdgram_sendto_queue_state);
77 if (!req) {
78 return NULL;
81 state->caller.ev = ev;
82 state->caller.dgram = dgram;
83 state->caller.buf = buf;
84 state->caller.len = len;
85 state->caller.dst = dst;
86 state->ret = -1;
88 ok = tevent_queue_add(queue,
89 ev,
90 req,
91 tdgram_sendto_queue_trigger,
92 NULL);
93 if (!ok) {
94 tevent_req_nomem(NULL, req);
95 goto post;
98 return req;
100 post:
101 tevent_req_post(req, ev);
102 return req;
105 static void tdgram_sendto_queue_trigger(struct tevent_req *req,
106 void *private_data)
108 struct tdgram_sendto_queue_state *state = tevent_req_data(req,
109 struct tdgram_sendto_queue_state);
110 struct tevent_req *subreq;
112 subreq = tdgram_sendto_send(state,
113 state->caller.ev,
114 state->caller.dgram,
115 state->caller.buf,
116 state->caller.len,
117 state->caller.dst);
118 if (tevent_req_nomem(subreq, req)) {
119 return;
121 tevent_req_set_callback(subreq, tdgram_sendto_queue_done, req);
124 static void tdgram_sendto_queue_done(struct tevent_req *subreq)
126 struct tevent_req *req = tevent_req_callback_data(subreq,
127 struct tevent_req);
128 struct tdgram_sendto_queue_state *state = tevent_req_data(req,
129 struct tdgram_sendto_queue_state);
130 ssize_t ret;
131 int sys_errno;
133 ret = tdgram_sendto_recv(subreq, &sys_errno);
134 talloc_free(subreq);
135 if (ret == -1) {
136 tevent_req_error(req, sys_errno);
137 return;
139 state->ret = ret;
141 tevent_req_done(req);
144 ssize_t tdgram_sendto_queue_recv(struct tevent_req *req, int *perrno)
146 struct tdgram_sendto_queue_state *state = tevent_req_data(req,
147 struct tdgram_sendto_queue_state);
148 ssize_t ret;
150 ret = tsocket_simple_int_recv(req, perrno);
151 if (ret == 0) {
152 ret = state->ret;
155 tevent_req_received(req);
156 return ret;
159 struct tstream_readv_pdu_state {
160 /* this structs are owned by the caller */
161 struct {
162 struct tevent_context *ev;
163 struct tstream_context *stream;
164 tstream_readv_pdu_next_vector_t next_vector_fn;
165 void *next_vector_private;
166 } caller;
169 * Each call to the callback resets iov and count
170 * the callback allocated the iov as child of our state,
171 * that means we are allowed to modify and free it.
173 * we should call the callback every time we filled the given
174 * vector and ask for a new vector. We return if the callback
175 * ask for 0 bytes.
177 struct iovec *vector;
178 size_t count;
181 * the total number of bytes we read,
182 * the return value of the _recv function
184 int total_read;
187 static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req);
188 static void tstream_readv_pdu_readv_done(struct tevent_req *subreq);
190 struct tevent_req *tstream_readv_pdu_send(TALLOC_CTX *mem_ctx,
191 struct tevent_context *ev,
192 struct tstream_context *stream,
193 tstream_readv_pdu_next_vector_t next_vector_fn,
194 void *next_vector_private)
196 struct tevent_req *req;
197 struct tstream_readv_pdu_state *state;
199 req = tevent_req_create(mem_ctx, &state,
200 struct tstream_readv_pdu_state);
201 if (!req) {
202 return NULL;
205 state->caller.ev = ev;
206 state->caller.stream = stream;
207 state->caller.next_vector_fn = next_vector_fn;
208 state->caller.next_vector_private = next_vector_private;
210 state->vector = NULL;
211 state->count = 0;
212 state->total_read = 0;
214 tstream_readv_pdu_ask_for_next_vector(req);
215 if (!tevent_req_is_in_progress(req)) {
216 goto post;
219 return req;
221 post:
222 return tevent_req_post(req, ev);
225 static void tstream_readv_pdu_ask_for_next_vector(struct tevent_req *req)
227 struct tstream_readv_pdu_state *state = tevent_req_data(req,
228 struct tstream_readv_pdu_state);
229 int ret;
230 size_t to_read = 0;
231 size_t i;
232 struct tevent_req *subreq;
234 TALLOC_FREE(state->vector);
235 state->count = 0;
237 ret = state->caller.next_vector_fn(state->caller.stream,
238 state->caller.next_vector_private,
239 state, &state->vector, &state->count);
240 if (ret == -1) {
241 tevent_req_error(req, errno);
242 return;
245 if (state->count == 0) {
246 tevent_req_done(req);
247 return;
250 for (i=0; i < state->count; i++) {
251 size_t tmp = to_read;
252 tmp += state->vector[i].iov_len;
254 if (tmp < to_read) {
255 tevent_req_error(req, EMSGSIZE);
256 return;
259 to_read = tmp;
263 * this is invalid the next vector function should have
264 * reported count == 0.
266 if (to_read == 0) {
267 tevent_req_error(req, EINVAL);
268 return;
271 if (state->total_read + to_read < state->total_read) {
272 tevent_req_error(req, EMSGSIZE);
273 return;
276 subreq = tstream_readv_send(state,
277 state->caller.ev,
278 state->caller.stream,
279 state->vector,
280 state->count);
281 if (tevent_req_nomem(subreq, req)) {
282 return;
284 tevent_req_set_callback(subreq, tstream_readv_pdu_readv_done, req);
287 static void tstream_readv_pdu_readv_done(struct tevent_req *subreq)
289 struct tevent_req *req = tevent_req_callback_data(subreq,
290 struct tevent_req);
291 struct tstream_readv_pdu_state *state = tevent_req_data(req,
292 struct tstream_readv_pdu_state);
293 int ret;
294 int sys_errno;
296 ret = tstream_readv_recv(subreq, &sys_errno);
297 if (ret == -1) {
298 tevent_req_error(req, sys_errno);
299 return;
302 state->total_read += ret;
304 /* ask the callback for a new vector we should fill */
305 tstream_readv_pdu_ask_for_next_vector(req);
308 int tstream_readv_pdu_recv(struct tevent_req *req, int *perrno)
310 struct tstream_readv_pdu_state *state = tevent_req_data(req,
311 struct tstream_readv_pdu_state);
312 int ret;
314 ret = tsocket_simple_int_recv(req, perrno);
315 if (ret == 0) {
316 ret = state->total_read;
319 tevent_req_received(req);
320 return ret;
323 struct tstream_readv_pdu_queue_state {
324 /* this structs are owned by the caller */
325 struct {
326 struct tevent_context *ev;
327 struct tstream_context *stream;
328 tstream_readv_pdu_next_vector_t next_vector_fn;
329 void *next_vector_private;
330 } caller;
331 int ret;
334 static void tstream_readv_pdu_queue_trigger(struct tevent_req *req,
335 void *private_data);
336 static void tstream_readv_pdu_queue_done(struct tevent_req *subreq);
339 * @brief Queue a dgram blob for sending through the socket
340 * @param[in] mem_ctx The memory context for the result
341 * @param[in] ev The tevent_context to run on
342 * @param[in] stream The stream to send data through
343 * @param[in] queue The existing send queue
344 * @param[in] next_vector_fn The next vector function
345 * @param[in] next_vector_private The private_data of the next vector function
346 * @retval The async request handle
348 * This function queues a blob for sending to destination through an existing
349 * dgram socket. The async callback is triggered when the whole blob is
350 * delivered to the underlying system socket.
352 * The caller needs to make sure that all non-scalar input parameters hang
353 * arround for the whole lifetime of the request.
355 struct tevent_req *tstream_readv_pdu_queue_send(TALLOC_CTX *mem_ctx,
356 struct tevent_context *ev,
357 struct tstream_context *stream,
358 struct tevent_queue *queue,
359 tstream_readv_pdu_next_vector_t next_vector_fn,
360 void *next_vector_private)
362 struct tevent_req *req;
363 struct tstream_readv_pdu_queue_state *state;
364 bool ok;
366 req = tevent_req_create(mem_ctx, &state,
367 struct tstream_readv_pdu_queue_state);
368 if (!req) {
369 return NULL;
372 state->caller.ev = ev;
373 state->caller.stream = stream;
374 state->caller.next_vector_fn = next_vector_fn;
375 state->caller.next_vector_private = next_vector_private;
376 state->ret = -1;
378 ok = tevent_queue_add(queue,
380 req,
381 tstream_readv_pdu_queue_trigger,
382 NULL);
383 if (!ok) {
384 tevent_req_nomem(NULL, req);
385 goto post;
388 return req;
390 post:
391 return tevent_req_post(req, ev);
394 static void tstream_readv_pdu_queue_trigger(struct tevent_req *req,
395 void *private_data)
397 struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
398 struct tstream_readv_pdu_queue_state);
399 struct tevent_req *subreq;
401 subreq = tstream_readv_pdu_send(state,
402 state->caller.ev,
403 state->caller.stream,
404 state->caller.next_vector_fn,
405 state->caller.next_vector_private);
406 if (tevent_req_nomem(subreq, req)) {
407 return;
409 tevent_req_set_callback(subreq, tstream_readv_pdu_queue_done ,req);
412 static void tstream_readv_pdu_queue_done(struct tevent_req *subreq)
414 struct tevent_req *req = tevent_req_callback_data(subreq,
415 struct tevent_req);
416 struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
417 struct tstream_readv_pdu_queue_state);
418 int ret;
419 int sys_errno;
421 ret = tstream_readv_pdu_recv(subreq, &sys_errno);
422 talloc_free(subreq);
423 if (ret == -1) {
424 tevent_req_error(req, sys_errno);
425 return;
427 state->ret = ret;
429 tevent_req_done(req);
432 int tstream_readv_pdu_queue_recv(struct tevent_req *req, int *perrno)
434 struct tstream_readv_pdu_queue_state *state = tevent_req_data(req,
435 struct tstream_readv_pdu_queue_state);
436 int ret;
438 ret = tsocket_simple_int_recv(req, perrno);
439 if (ret == 0) {
440 ret = state->ret;
443 tevent_req_received(req);
444 return ret;
447 struct tstream_writev_queue_state {
448 /* this structs are owned by the caller */
449 struct {
450 struct tevent_context *ev;
451 struct tstream_context *stream;
452 const struct iovec *vector;
453 size_t count;
454 } caller;
455 int ret;
458 static void tstream_writev_queue_trigger(struct tevent_req *req,
459 void *private_data);
460 static void tstream_writev_queue_done(struct tevent_req *subreq);
463 * @brief Queue a dgram blob for sending through the socket
464 * @param[in] mem_ctx The memory context for the result
465 * @param[in] ev The tevent_context to run on
466 * @param[in] stream The stream to send data through
467 * @param[in] queue The existing send queue
468 * @param[in] vector The iovec vector so write
469 * @param[in] count The size of the vector
470 * @retval The async request handle
472 * This function queues a blob for sending to destination through an existing
473 * dgram socket. The async callback is triggered when the whole blob is
474 * delivered to the underlying system socket.
476 * The caller needs to make sure that all non-scalar input parameters hang
477 * arround for the whole lifetime of the request.
479 struct tevent_req *tstream_writev_queue_send(TALLOC_CTX *mem_ctx,
480 struct tevent_context *ev,
481 struct tstream_context *stream,
482 struct tevent_queue *queue,
483 const struct iovec *vector,
484 size_t count)
486 struct tevent_req *req;
487 struct tstream_writev_queue_state *state;
488 bool ok;
490 req = tevent_req_create(mem_ctx, &state,
491 struct tstream_writev_queue_state);
492 if (!req) {
493 return NULL;
496 state->caller.ev = ev;
497 state->caller.stream = stream;
498 state->caller.vector = vector;
499 state->caller.count = count;
500 state->ret = -1;
502 ok = tevent_queue_add(queue,
504 req,
505 tstream_writev_queue_trigger,
506 NULL);
507 if (!ok) {
508 tevent_req_nomem(NULL, req);
509 goto post;
512 return req;
514 post:
515 return tevent_req_post(req, ev);
518 static void tstream_writev_queue_trigger(struct tevent_req *req,
519 void *private_data)
521 struct tstream_writev_queue_state *state = tevent_req_data(req,
522 struct tstream_writev_queue_state);
523 struct tevent_req *subreq;
525 subreq = tstream_writev_send(state,
526 state->caller.ev,
527 state->caller.stream,
528 state->caller.vector,
529 state->caller.count);
530 if (tevent_req_nomem(subreq, req)) {
531 return;
533 tevent_req_set_callback(subreq, tstream_writev_queue_done ,req);
536 static void tstream_writev_queue_done(struct tevent_req *subreq)
538 struct tevent_req *req = tevent_req_callback_data(subreq,
539 struct tevent_req);
540 struct tstream_writev_queue_state *state = tevent_req_data(req,
541 struct tstream_writev_queue_state);
542 int ret;
543 int sys_errno;
545 ret = tstream_writev_recv(subreq, &sys_errno);
546 talloc_free(subreq);
547 if (ret == -1) {
548 tevent_req_error(req, sys_errno);
549 return;
551 state->ret = ret;
553 tevent_req_done(req);
556 int tstream_writev_queue_recv(struct tevent_req *req, int *perrno)
558 struct tstream_writev_queue_state *state = tevent_req_data(req,
559 struct tstream_writev_queue_state);
560 int ret;
562 ret = tsocket_simple_int_recv(req, perrno);
563 if (ret == 0) {
564 ret = state->ret;
567 tevent_req_received(req);
568 return ret;