2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "pthreadpool.h"
27 struct fncall_context
*ctx
;
35 struct fncall_context
{
36 struct pthreadpool
*pool
;
39 struct tevent_req
**pending
;
41 struct fncall_state
**orphaned
;
47 static void fncall_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
48 uint16_t flags
, void *private_data
);
50 static int fncall_context_destructor(struct fncall_context
*ctx
)
52 while (talloc_array_length(ctx
->pending
) != 0) {
53 /* No TALLOC_FREE here */
54 talloc_free(ctx
->pending
[0]);
57 while (ctx
->num_orphaned
!= 0) {
59 * We've got jobs in the queue for which the tevent_req has
60 * been finished already. Wait for all of them to finish.
62 fncall_handler(NULL
, NULL
, TEVENT_FD_READ
, ctx
);
65 pthreadpool_destroy(ctx
->pool
);
71 struct fncall_context
*fncall_context_init(TALLOC_CTX
*mem_ctx
,
74 struct fncall_context
*ctx
;
77 ctx
= talloc_zero(mem_ctx
, struct fncall_context
);
82 ret
= pthreadpool_init(max_threads
, &ctx
->pool
);
87 talloc_set_destructor(ctx
, fncall_context_destructor
);
89 ctx
->sig_fd
= pthreadpool_sig_fd(ctx
->pool
);
90 if (ctx
->sig_fd
== -1) {
98 static int fncall_next_job_id(struct fncall_context
*ctx
)
100 int num_pending
= talloc_array_length(ctx
->pending
);
106 result
= ctx
->next_job_id
++;
111 for (i
=0; i
<num_pending
; i
++) {
112 struct fncall_state
*state
= tevent_req_data(
113 ctx
->pending
[i
], struct fncall_state
);
115 if (result
== state
->job_id
) {
119 if (i
== num_pending
) {
125 static void fncall_unset_pending(struct tevent_req
*req
);
126 static int fncall_destructor(struct tevent_req
*req
);
128 static bool fncall_set_pending(struct tevent_req
*req
,
129 struct fncall_context
*ctx
,
130 struct tevent_context
*ev
)
132 struct tevent_req
**pending
;
133 int num_pending
, orphaned_array_length
;
135 num_pending
= talloc_array_length(ctx
->pending
);
137 pending
= talloc_realloc(ctx
, ctx
->pending
, struct tevent_req
*,
139 if (pending
== NULL
) {
142 pending
[num_pending
] = req
;
144 ctx
->pending
= pending
;
145 talloc_set_destructor(req
, fncall_destructor
);
148 * Make sure that the orphaned array of fncall_state structs has
149 * enough space. A job can change from pending to orphaned in
150 * fncall_destructor, and to fail in a talloc destructor should be
151 * avoided if possible.
154 orphaned_array_length
= talloc_array_length(ctx
->orphaned
);
155 if (num_pending
> orphaned_array_length
) {
156 struct fncall_state
**orphaned
;
158 orphaned
= talloc_realloc(ctx
, ctx
->orphaned
,
159 struct fncall_state
*,
160 orphaned_array_length
+ 1);
161 if (orphaned
== NULL
) {
162 fncall_unset_pending(req
);
165 ctx
->orphaned
= orphaned
;
168 if (ctx
->fde
!= NULL
) {
172 ctx
->fde
= tevent_add_fd(ev
, ctx
->pending
, ctx
->sig_fd
, TEVENT_FD_READ
,
173 fncall_handler
, ctx
);
174 if (ctx
->fde
== NULL
) {
175 fncall_unset_pending(req
);
181 static void fncall_unset_pending(struct tevent_req
*req
)
183 struct fncall_state
*state
= tevent_req_data(req
, struct fncall_state
);
184 struct fncall_context
*ctx
= state
->ctx
;
185 int num_pending
= talloc_array_length(ctx
->pending
);
188 if (num_pending
== 1) {
189 TALLOC_FREE(ctx
->fde
);
190 TALLOC_FREE(ctx
->pending
);
194 for (i
=0; i
<num_pending
; i
++) {
195 if (req
== ctx
->pending
[i
]) {
199 if (i
== num_pending
) {
202 if (num_pending
> 1) {
203 ctx
->pending
[i
] = ctx
->pending
[num_pending
-1];
205 ctx
->pending
= talloc_realloc(NULL
, ctx
->pending
, struct tevent_req
*,
209 static int fncall_destructor(struct tevent_req
*req
)
211 struct fncall_state
*state
= tevent_req_data(
212 req
, struct fncall_state
);
213 struct fncall_context
*ctx
= state
->ctx
;
215 fncall_unset_pending(req
);
222 * Keep around the state of the deleted request until the request has
223 * finished in the helper thread. fncall_handler will destroy it.
225 ctx
->orphaned
[ctx
->num_orphaned
] = talloc_move(ctx
->orphaned
, &state
);
226 ctx
->num_orphaned
+= 1;
231 struct tevent_req
*fncall_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
232 struct fncall_context
*ctx
,
233 void (*fn
)(void *private_data
),
236 struct tevent_req
*req
;
237 struct fncall_state
*state
;
240 req
= tevent_req_create(mem_ctx
, &state
, struct fncall_state
);
245 state
->job_id
= fncall_next_job_id(state
->ctx
);
249 * We need to keep the private data we handed out to the thread around
250 * as long as the job is not finished. This is a bit of an abstraction
251 * violation, because the "req->state1->subreq->state2" (we're
252 * "subreq", "req" is the request our caller creates) is broken to
253 * "ctx->state2->state1", but we are right now in the destructor for
254 * "subreq2", so what can we do. We need to keep state1 around,
255 * otherwise the helper thread will have no place to put its results.
258 state
->private_parent
= talloc_parent(private_data
);
259 state
->job_private
= talloc_move(state
, &private_data
);
261 ret
= pthreadpool_add_job(state
->ctx
->pool
, state
->job_id
, fn
,
264 tevent_req_error(req
, errno
);
265 return tevent_req_post(req
, ev
);
267 if (!fncall_set_pending(req
, state
->ctx
, ev
)) {
268 tevent_req_nomem(NULL
, req
);
269 return tevent_req_post(req
, ev
);
274 static void fncall_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
275 uint16_t flags
, void *private_data
)
277 struct fncall_context
*ctx
= talloc_get_type_abort(
278 private_data
, struct fncall_context
);
282 job_id
= pthreadpool_finished_job(ctx
->pool
);
287 num_pending
= talloc_array_length(ctx
->pending
);
289 for (i
=0; i
<num_pending
; i
++) {
290 struct fncall_state
*state
= tevent_req_data(
291 ctx
->pending
[i
], struct fncall_state
);
293 if (job_id
== state
->job_id
) {
295 talloc_move(state
->private_parent
,
296 &state
->job_private
);
297 tevent_req_done(ctx
->pending
[i
]);
302 for (i
=0; i
<ctx
->num_orphaned
; i
++) {
303 if (job_id
== ctx
->orphaned
[i
]->job_id
) {
307 if (i
== ctx
->num_orphaned
) {
311 TALLOC_FREE(ctx
->orphaned
[i
]);
313 if (i
< ctx
->num_orphaned
-1) {
314 ctx
->orphaned
[i
] = ctx
->orphaned
[ctx
->num_orphaned
-1];
316 ctx
->num_orphaned
-= 1;
319 int fncall_recv(struct tevent_req
*req
, int *perr
)
321 if (tevent_req_is_unix_error(req
, perr
)) {
327 #else /* WITH_PTHREADPOOL */
329 struct fncall_context
{
333 struct fncall_context
*fncall_context_init(TALLOC_CTX
*mem_ctx
,
336 return talloc(mem_ctx
, struct fncall_context
);
339 struct fncall_state
{
343 struct tevent_req
*fncall_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
344 struct fncall_context
*ctx
,
345 void (*fn
)(void *private_data
),
348 struct tevent_req
*req
;
349 struct fncall_state
*state
;
351 req
= tevent_req_create(mem_ctx
, &state
, struct fncall_state
);
356 tevent_req_post(req
, ev
);
360 int fncall_recv(struct tevent_req
*req
, int *perr
)