4 Copyright (C) Jeremy Allison 2015
6 ** NOTE! The following LGPL license applies to the tevent
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 #include "system/filesys.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
32 #include "system/threads.h"
34 struct tevent_immediate_list
{
35 struct tevent_immediate_list
*next
, *prev
;
36 tevent_immediate_handler_t handler
;
37 struct tevent_immediate
*im
;
41 struct tevent_thread_proxy
{
42 pthread_mutex_t mutex
;
43 struct tevent_context
*dest_ev_ctx
;
46 struct tevent_fd
*pipe_read_fde
;
47 /* Pending events list. */
48 struct tevent_immediate_list
*im_list
;
49 /* Completed events list. */
50 struct tevent_immediate_list
*tofree_im_list
;
51 struct tevent_immediate
*free_im
;
54 static void free_im_list(struct tevent_immediate_list
**pp_list_head
)
56 struct tevent_immediate_list
*im_entry
= NULL
;
57 struct tevent_immediate_list
*im_next
= NULL
;
59 for (im_entry
= *pp_list_head
; im_entry
; im_entry
= im_next
) {
60 im_next
= im_entry
->next
;
61 DLIST_REMOVE(*pp_list_head
, im_entry
);
62 TALLOC_FREE(im_entry
);
66 static void free_list_handler(struct tevent_context
*ev
,
67 struct tevent_immediate
*im
,
70 struct tevent_thread_proxy
*tp
=
71 talloc_get_type_abort(private_ptr
, struct tevent_thread_proxy
);
74 ret
= pthread_mutex_lock(&tp
->mutex
);
81 free_im_list(&tp
->tofree_im_list
);
83 ret
= pthread_mutex_unlock(&tp
->mutex
);
91 static void schedule_immediate_functions(struct tevent_thread_proxy
*tp
)
93 struct tevent_immediate_list
*im_entry
= NULL
;
94 struct tevent_immediate_list
*im_next
= NULL
;
96 for (im_entry
= tp
->im_list
; im_entry
; im_entry
= im_next
) {
97 im_next
= im_entry
->next
;
98 DLIST_REMOVE(tp
->im_list
, im_entry
);
100 tevent_schedule_immediate(im_entry
->im
,
103 im_entry
->private_ptr
);
105 /* Move from pending list to free list. */
106 DLIST_ADD(tp
->tofree_im_list
, im_entry
);
108 if (tp
->tofree_im_list
!= NULL
) {
110 * Once the current immediate events
111 * are processed, we need to reschedule
112 * ourselves to free them. This works
113 * as tevent_schedule_immediate()
114 * always adds events to the *END* of
115 * the immediate events list.
117 tevent_schedule_immediate(tp
->free_im
,
124 static void pipe_read_handler(struct tevent_context
*ev
,
125 struct tevent_fd
*fde
,
129 struct tevent_thread_proxy
*tp
=
130 talloc_get_type_abort(private_ptr
, struct tevent_thread_proxy
);
134 ret
= pthread_mutex_lock(&tp
->mutex
);
142 * Clear out all data in the pipe. We
143 * don't really care if this returns -1.
147 len
= read(tp
->read_fd
, buf
, 64);
150 schedule_immediate_functions(tp
);
152 ret
= pthread_mutex_unlock(&tp
->mutex
);
160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy
*tp
)
164 ret
= pthread_mutex_lock(&tp
->mutex
);
171 TALLOC_FREE(tp
->pipe_read_fde
);
173 if (tp
->read_fd
!= -1) {
174 (void)close(tp
->read_fd
);
177 if (tp
->write_fd
!= -1) {
178 (void)close(tp
->write_fd
);
182 /* Hmmm. It's probably an error if we get here with
183 any non-NULL immediate entries.. */
185 free_im_list(&tp
->im_list
);
186 free_im_list(&tp
->tofree_im_list
);
188 TALLOC_FREE(tp
->free_im
);
190 ret
= pthread_mutex_unlock(&tp
->mutex
);
197 ret
= pthread_mutex_destroy(&tp
->mutex
);
208 * Create a struct that can be passed to other threads
209 * to allow them to signal the struct tevent_context *
213 struct tevent_thread_proxy
*tevent_thread_proxy_create(
214 struct tevent_context
*dest_ev_ctx
)
218 struct tevent_thread_proxy
*tp
;
220 if (dest_ev_ctx
->wrapper
.glue
!= NULL
) {
222 * stacking of wrappers is not supported
224 tevent_debug(dest_ev_ctx
->wrapper
.glue
->main_ev
,
226 "%s() not allowed on a wrapper context\n",
232 tp
= talloc_zero(dest_ev_ctx
, struct tevent_thread_proxy
);
237 ret
= pthread_mutex_init(&tp
->mutex
, NULL
);
242 tp
->dest_ev_ctx
= dest_ev_ctx
;
246 talloc_set_destructor(tp
, tevent_thread_proxy_destructor
);
253 tp
->read_fd
= pipefds
[0];
254 tp
->write_fd
= pipefds
[1];
256 ret
= ev_set_blocking(pipefds
[0], false);
260 ret
= ev_set_blocking(pipefds
[1], false);
264 if (!ev_set_close_on_exec(pipefds
[0])) {
267 if (!ev_set_close_on_exec(pipefds
[1])) {
271 tp
->pipe_read_fde
= tevent_add_fd(dest_ev_ctx
,
277 if (tp
->pipe_read_fde
== NULL
) {
282 * Create an immediate event to free
285 tp
->free_im
= tevent_create_immediate(tp
);
286 if (tp
->free_im
== NULL
) {
299 * This function schedules an immediate event to be called with argument
300 * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301 * wait for activation to take place, this is simply fire-and-forget.
303 * pp_im must be a pointer to an immediate event talloced on
304 * a context owned by the calling thread, or the NULL context.
305 * Ownership of *pp_im will be transfered to the tevent library.
307 * pp_private can be null, or contents of *pp_private must be
308 * talloc'ed memory on a context owned by the calling thread
309 * or the NULL context. If non-null, ownership of *pp_private will
310 * be transfered to the tevent library.
312 * If you want to return a message, have the destination use the
313 * same function call to send back to the caller.
317 void tevent_thread_proxy_schedule(struct tevent_thread_proxy
*tp
,
318 struct tevent_immediate
**pp_im
,
319 tevent_immediate_handler_t handler
,
320 void *pp_private_data
)
322 struct tevent_immediate_list
*im_entry
;
327 ret
= pthread_mutex_lock(&tp
->mutex
);
334 if (tp
->write_fd
== -1) {
335 /* In the process of being destroyed. Ignore. */
339 /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340 im_entry
= talloc_zero(NULL
, struct tevent_immediate_list
);
341 if (im_entry
== NULL
) {
345 im_entry
->handler
= handler
;
346 im_entry
->im
= talloc_move(im_entry
, pp_im
);
348 if (pp_private_data
!= NULL
) {
349 void **pptr
= (void **)pp_private_data
;
350 im_entry
->private_ptr
= talloc_move(im_entry
, pptr
);
353 DLIST_ADD(tp
->im_list
, im_entry
);
355 /* And notify the dest_ev_ctx to wake up. */
358 written
= write(tp
->write_fd
, &c
, 1);
359 } while (written
== -1 && errno
== EINTR
);
363 ret
= pthread_mutex_unlock(&tp
->mutex
);
371 struct tevent_thread_proxy
*tevent_thread_proxy_create(
372 struct tevent_context
*dest_ev_ctx
)
378 void tevent_thread_proxy_schedule(struct tevent_thread_proxy
*tp
,
379 struct tevent_immediate
**pp_im
,
380 tevent_immediate_handler_t handler
,
381 void *pp_private_data
)
387 static int tevent_threaded_context_destructor(
388 struct tevent_threaded_context
*tctx
)
390 struct tevent_context
*main_ev
= tevent_wrapper_main_ev(tctx
->event_ctx
);
393 if (main_ev
!= NULL
) {
394 DLIST_REMOVE(main_ev
->threaded_contexts
, tctx
);
398 * We have to coordinate with _tevent_threaded_schedule_immediate's
399 * unlock of the event_ctx_mutex. We're in the main thread here,
400 * and we can be scheduled before the helper thread finalizes its
401 * call _tevent_threaded_schedule_immediate. This means we would
402 * pthreadpool_destroy a locked mutex, which is illegal.
404 ret
= pthread_mutex_lock(&tctx
->event_ctx_mutex
);
409 ret
= pthread_mutex_unlock(&tctx
->event_ctx_mutex
);
414 ret
= pthread_mutex_destroy(&tctx
->event_ctx_mutex
);
422 struct tevent_threaded_context
*tevent_threaded_context_create(
423 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
426 struct tevent_context
*main_ev
= tevent_wrapper_main_ev(ev
);
427 struct tevent_threaded_context
*tctx
;
430 ret
= tevent_common_wakeup_init(main_ev
);
436 tctx
= talloc(mem_ctx
, struct tevent_threaded_context
);
440 tctx
->event_ctx
= ev
;
442 ret
= pthread_mutex_init(&tctx
->event_ctx_mutex
, NULL
);
448 DLIST_ADD(main_ev
->threaded_contexts
, tctx
);
449 talloc_set_destructor(tctx
, tevent_threaded_context_destructor
);
458 static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate
*im
)
460 if (im
->event_ctx
!= NULL
) {
466 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context
*tctx
,
467 struct tevent_immediate
*im
,
468 tevent_immediate_handler_t handler
,
470 const char *handler_name
,
471 const char *location
)
474 const char *create_location
= im
->create_location
;
475 struct tevent_context
*main_ev
= NULL
;
476 struct tevent_wrapper_glue
*glue
= NULL
;
479 ret
= pthread_mutex_lock(&tctx
->event_ctx_mutex
);
484 if (tctx
->event_ctx
== NULL
) {
486 * Our event context is already gone.
488 ret
= pthread_mutex_unlock(&tctx
->event_ctx_mutex
);
495 glue
= tctx
->event_ctx
->wrapper
.glue
;
497 if ((im
->event_ctx
!= NULL
) || (handler
== NULL
)) {
507 main_ev
= tevent_wrapper_main_ev(tctx
->event_ctx
);
509 *im
= (struct tevent_immediate
) {
510 .event_ctx
= tctx
->event_ctx
,
513 .private_data
= private_data
,
514 .handler_name
= handler_name
,
515 .create_location
= create_location
,
516 .schedule_location
= location
,
520 * Make sure the event won't be destroyed while
521 * it's part of the ev->scheduled_immediates list.
522 * _tevent_schedule_immediate() will reset the destructor
523 * in tevent_common_threaded_activate_immediate().
525 talloc_set_destructor(im
, tevent_threaded_schedule_immediate_destructor
);
527 ret
= pthread_mutex_lock(&main_ev
->scheduled_mutex
);
532 DLIST_ADD_END(main_ev
->scheduled_immediates
, im
);
533 wakeup_fd
= main_ev
->wakeup_fd
;
535 ret
= pthread_mutex_unlock(&main_ev
->scheduled_mutex
);
540 ret
= pthread_mutex_unlock(&tctx
->event_ctx_mutex
);
546 * We might want to wake up the main thread under the lock. We
547 * had a slightly similar situation in pthreadpool, changed
548 * with 1c4284c7395f23. This is not exactly the same, as the
549 * wakeup is only a last-resort thing in case the main thread
550 * is sleeping. Doing the wakeup under the lock can easily
551 * lead to a contended mutex, which is much more expensive
552 * than a noncontended one. So I'd opt for the lower footprint
553 * initially. Maybe we have to change that later.
555 tevent_common_wakeup_fd(wakeup_fd
);
558 * tevent_threaded_context_create() returned NULL with ENOSYS...
564 void tevent_common_threaded_activate_immediate(struct tevent_context
*ev
)
568 ret
= pthread_mutex_lock(&ev
->scheduled_mutex
);
573 while (ev
->scheduled_immediates
!= NULL
) {
574 struct tevent_immediate
*im
= ev
->scheduled_immediates
;
575 struct tevent_immediate copy
= *im
;
577 DLIST_REMOVE(ev
->scheduled_immediates
, im
);
579 tevent_debug(ev
, TEVENT_DEBUG_TRACE
,
580 "Schedule immediate event \"%s\": %p from thread into main\n",
581 im
->handler_name
, im
);
582 im
->handler_name
= NULL
;
583 _tevent_schedule_immediate(im
,
588 copy
.schedule_location
);
591 ret
= pthread_mutex_unlock(&ev
->scheduled_mutex
);
597 * tevent_threaded_context_create() returned NULL with ENOSYS...