ctdb-protocol: Add marshalling for tdb_data with size
[Samba.git] / lib / pthreadpool / pthreadpool_tevent.c
blob253a8673518da6f7f53a430c02dcf748a7eaeaf0
1 /*
2 * Unix SMB/CIFS implementation.
3 * threadpool implementation based on pthreads
4 * Copyright (C) Volker Lendecke 2009,2011
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "pthreadpool_tevent.h"
22 #include "pthreadpool.h"
23 #include "lib/util/tevent_unix.h"
24 #include "lib/util/dlinklist.h"
26 struct pthreadpool_tevent_job_state;
28 struct pthreadpool_tevent {
29 struct pthreadpool *pool;
31 struct pthreadpool_tevent_job_state *jobs;
34 struct pthreadpool_tevent_job_state {
35 struct pthreadpool_tevent_job_state *prev, *next;
36 struct pthreadpool_tevent *pool;
37 struct tevent_context *ev;
38 struct tevent_threaded_context *tctx;
39 struct tevent_immediate *im;
40 struct tevent_req *req;
42 void (*fn)(void *private_data);
43 void *private_data;
46 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
48 static int pthreadpool_tevent_job_signal(int jobid,
49 void (*job_fn)(void *private_data),
50 void *job_private_data,
51 void *private_data);
53 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
54 struct pthreadpool_tevent **presult)
56 struct pthreadpool_tevent *pool;
57 int ret;
59 pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
60 if (pool == NULL) {
61 return ENOMEM;
64 ret = pthreadpool_init(max_threads, &pool->pool,
65 pthreadpool_tevent_job_signal, pool);
66 if (ret != 0) {
67 TALLOC_FREE(pool);
68 return ret;
71 talloc_set_destructor(pool, pthreadpool_tevent_destructor);
73 *presult = pool;
74 return 0;
77 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
79 struct pthreadpool_tevent_job_state *state, *next;
80 int ret;
82 ret = pthreadpool_destroy(pool->pool);
83 if (ret != 0) {
84 return ret;
86 pool->pool = NULL;
88 for (state = pool->jobs; state != NULL; state = next) {
89 next = state->next;
90 DLIST_REMOVE(pool->jobs, state);
91 state->pool = NULL;
94 return 0;
97 static void pthreadpool_tevent_job_fn(void *private_data);
98 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
99 struct tevent_immediate *im,
100 void *private_data);
102 static int pthreadpool_tevent_job_state_destructor(
103 struct pthreadpool_tevent_job_state *state)
105 if (state->pool == NULL) {
106 return 0;
110 * We should never be called with state->req == NULL,
111 * state->pool must be cleared before the 2nd talloc_free().
113 if (state->req == NULL) {
114 abort();
118 * We need to reparent to a long term context.
120 (void)talloc_reparent(state->req, NULL, state);
121 state->req = NULL;
122 return -1;
125 struct tevent_req *pthreadpool_tevent_job_send(
126 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
127 struct pthreadpool_tevent *pool,
128 void (*fn)(void *private_data), void *private_data)
130 struct tevent_req *req;
131 struct pthreadpool_tevent_job_state *state;
132 int ret;
134 req = tevent_req_create(mem_ctx, &state,
135 struct pthreadpool_tevent_job_state);
136 if (req == NULL) {
137 return NULL;
139 state->pool = pool;
140 state->ev = ev;
141 state->req = req;
142 state->fn = fn;
143 state->private_data = private_data;
145 state->im = tevent_create_immediate(state);
146 if (tevent_req_nomem(state->im, req)) {
147 return tevent_req_post(req, ev);
150 #ifdef HAVE_PTHREAD
151 state->tctx = tevent_threaded_context_create(state, ev);
152 if (state->tctx == NULL && errno == ENOSYS) {
154 * Samba build with pthread support but
155 * tevent without???
157 tevent_req_error(req, ENOSYS);
158 return tevent_req_post(req, ev);
160 if (tevent_req_nomem(state->tctx, req)) {
161 return tevent_req_post(req, ev);
163 #endif
165 ret = pthreadpool_add_job(pool->pool, 0,
166 pthreadpool_tevent_job_fn,
167 state);
168 if (tevent_req_error(req, ret)) {
169 return tevent_req_post(req, ev);
173 * Once the job is scheduled, we need to protect
174 * our memory.
176 talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
178 DLIST_ADD_END(pool->jobs, state);
180 return req;
183 static void pthreadpool_tevent_job_fn(void *private_data)
185 struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
186 private_data, struct pthreadpool_tevent_job_state);
187 state->fn(state->private_data);
190 static int pthreadpool_tevent_job_signal(int jobid,
191 void (*job_fn)(void *private_data),
192 void *job_private_data,
193 void *private_data)
195 struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
196 job_private_data, struct pthreadpool_tevent_job_state);
198 if (state->tctx != NULL) {
199 /* with HAVE_PTHREAD */
200 tevent_threaded_schedule_immediate(state->tctx, state->im,
201 pthreadpool_tevent_job_done,
202 state);
203 } else {
204 /* without HAVE_PTHREAD */
205 tevent_schedule_immediate(state->im, state->ev,
206 pthreadpool_tevent_job_done,
207 state);
210 return 0;
213 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
214 struct tevent_immediate *im,
215 void *private_data)
217 struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
218 private_data, struct pthreadpool_tevent_job_state);
220 if (state->pool != NULL) {
221 DLIST_REMOVE(state->pool->jobs, state);
222 state->pool = NULL;
225 TALLOC_FREE(state->tctx);
227 if (state->req == NULL) {
229 * There was a talloc_free() state->req
230 * while the job was pending,
231 * which mean we're reparented on a longterm
232 * talloc context.
234 * We just cleanup here...
236 talloc_free(state);
237 return;
240 tevent_req_done(state->req);
243 int pthreadpool_tevent_job_recv(struct tevent_req *req)
245 return tevent_req_simple_recv_unix(req);