2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "../lib/util/tevent_unix.h"
25 #include "lib/pthreadpool/pthreadpool.h"
28 struct fncall_context
*ctx
;
36 struct fncall_context
{
37 struct pthreadpool
*pool
;
40 struct tevent_req
**pending
;
42 struct fncall_state
**orphaned
;
48 static void fncall_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
49 uint16_t flags
, void *private_data
);
51 static int fncall_context_destructor(struct fncall_context
*ctx
)
53 while (talloc_array_length(ctx
->pending
) != 0) {
54 /* No TALLOC_FREE here */
55 talloc_free(ctx
->pending
[0]);
58 while (ctx
->num_orphaned
!= 0) {
60 * We've got jobs in the queue for which the tevent_req has
61 * been finished already. Wait for all of them to finish.
63 fncall_handler(NULL
, NULL
, TEVENT_FD_READ
, ctx
);
66 pthreadpool_destroy(ctx
->pool
);
72 struct fncall_context
*fncall_context_init(TALLOC_CTX
*mem_ctx
,
75 struct fncall_context
*ctx
;
78 ctx
= talloc_zero(mem_ctx
, struct fncall_context
);
83 ret
= pthreadpool_init(max_threads
, &ctx
->pool
);
88 talloc_set_destructor(ctx
, fncall_context_destructor
);
90 ctx
->sig_fd
= pthreadpool_signal_fd(ctx
->pool
);
91 if (ctx
->sig_fd
== -1) {
99 static int fncall_next_job_id(struct fncall_context
*ctx
)
101 int num_pending
= talloc_array_length(ctx
->pending
);
107 result
= ctx
->next_job_id
++;
112 for (i
=0; i
<num_pending
; i
++) {
113 struct fncall_state
*state
= tevent_req_data(
114 ctx
->pending
[i
], struct fncall_state
);
116 if (result
== state
->job_id
) {
120 if (i
== num_pending
) {
126 static void fncall_unset_pending(struct tevent_req
*req
);
127 static int fncall_destructor(struct tevent_req
*req
);
129 static bool fncall_set_pending(struct tevent_req
*req
,
130 struct fncall_context
*ctx
,
131 struct tevent_context
*ev
)
133 struct tevent_req
**pending
;
134 int num_pending
, orphaned_array_length
;
136 num_pending
= talloc_array_length(ctx
->pending
);
138 pending
= talloc_realloc(ctx
, ctx
->pending
, struct tevent_req
*,
140 if (pending
== NULL
) {
143 pending
[num_pending
] = req
;
145 ctx
->pending
= pending
;
146 talloc_set_destructor(req
, fncall_destructor
);
149 * Make sure that the orphaned array of fncall_state structs has
150 * enough space. A job can change from pending to orphaned in
151 * fncall_destructor, and to fail in a talloc destructor should be
152 * avoided if possible.
155 orphaned_array_length
= talloc_array_length(ctx
->orphaned
);
156 if (num_pending
> orphaned_array_length
) {
157 struct fncall_state
**orphaned
;
159 orphaned
= talloc_realloc(ctx
, ctx
->orphaned
,
160 struct fncall_state
*,
161 orphaned_array_length
+ 1);
162 if (orphaned
== NULL
) {
163 fncall_unset_pending(req
);
166 ctx
->orphaned
= orphaned
;
169 if (ctx
->fde
!= NULL
) {
173 ctx
->fde
= tevent_add_fd(ev
, ctx
->pending
, ctx
->sig_fd
, TEVENT_FD_READ
,
174 fncall_handler
, ctx
);
175 if (ctx
->fde
== NULL
) {
176 fncall_unset_pending(req
);
182 static void fncall_unset_pending(struct tevent_req
*req
)
184 struct fncall_state
*state
= tevent_req_data(req
, struct fncall_state
);
185 struct fncall_context
*ctx
= state
->ctx
;
186 int num_pending
= talloc_array_length(ctx
->pending
);
189 if (num_pending
== 1) {
190 TALLOC_FREE(ctx
->fde
);
191 TALLOC_FREE(ctx
->pending
);
195 for (i
=0; i
<num_pending
; i
++) {
196 if (req
== ctx
->pending
[i
]) {
200 if (i
== num_pending
) {
203 if (num_pending
> 1) {
204 ctx
->pending
[i
] = ctx
->pending
[num_pending
-1];
206 ctx
->pending
= talloc_realloc(NULL
, ctx
->pending
, struct tevent_req
*,
210 static int fncall_destructor(struct tevent_req
*req
)
212 struct fncall_state
*state
= tevent_req_data(
213 req
, struct fncall_state
);
214 struct fncall_context
*ctx
= state
->ctx
;
216 fncall_unset_pending(req
);
223 * Keep around the state of the deleted request until the request has
224 * finished in the helper thread. fncall_handler will destroy it.
226 ctx
->orphaned
[ctx
->num_orphaned
] = talloc_move(ctx
->orphaned
, &state
);
227 ctx
->num_orphaned
+= 1;
232 struct tevent_req
*fncall_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
233 struct fncall_context
*ctx
,
234 void (*fn
)(void *private_data
),
237 struct tevent_req
*req
;
238 struct fncall_state
*state
;
241 req
= tevent_req_create(mem_ctx
, &state
, struct fncall_state
);
246 state
->job_id
= fncall_next_job_id(state
->ctx
);
250 * We need to keep the private data we handed out to the thread around
251 * as long as the job is not finished. This is a bit of an abstraction
252 * violation, because the "req->state1->subreq->state2" (we're
253 * "subreq", "req" is the request our caller creates) is broken to
254 * "ctx->state2->state1", but we are right now in the destructor for
255 * "subreq2", so what can we do. We need to keep state1 around,
256 * otherwise the helper thread will have no place to put its results.
259 state
->private_parent
= talloc_parent(private_data
);
260 state
->job_private
= talloc_move(state
, &private_data
);
262 ret
= pthreadpool_add_job(state
->ctx
->pool
, state
->job_id
, fn
,
265 tevent_req_error(req
, errno
);
266 return tevent_req_post(req
, ev
);
268 if (!fncall_set_pending(req
, state
->ctx
, ev
)) {
269 tevent_req_nomem(NULL
, req
);
270 return tevent_req_post(req
, ev
);
275 static void fncall_handler(struct tevent_context
*ev
, struct tevent_fd
*fde
,
276 uint16_t flags
, void *private_data
)
278 struct fncall_context
*ctx
= talloc_get_type_abort(
279 private_data
, struct fncall_context
);
283 job_id
= pthreadpool_finished_job(ctx
->pool
);
288 num_pending
= talloc_array_length(ctx
->pending
);
290 for (i
=0; i
<num_pending
; i
++) {
291 struct fncall_state
*state
= tevent_req_data(
292 ctx
->pending
[i
], struct fncall_state
);
294 if (job_id
== state
->job_id
) {
296 talloc_move(state
->private_parent
,
297 &state
->job_private
);
298 tevent_req_done(ctx
->pending
[i
]);
303 for (i
=0; i
<ctx
->num_orphaned
; i
++) {
304 if (job_id
== ctx
->orphaned
[i
]->job_id
) {
308 if (i
== ctx
->num_orphaned
) {
312 TALLOC_FREE(ctx
->orphaned
[i
]);
314 if (i
< ctx
->num_orphaned
-1) {
315 ctx
->orphaned
[i
] = ctx
->orphaned
[ctx
->num_orphaned
-1];
317 ctx
->num_orphaned
-= 1;
320 int fncall_recv(struct tevent_req
*req
, int *perr
)
322 if (tevent_req_is_unix_error(req
, perr
)) {
328 #else /* WITH_PTHREADPOOL */
330 struct fncall_context
{
334 struct fncall_context
*fncall_context_init(TALLOC_CTX
*mem_ctx
,
337 return talloc(mem_ctx
, struct fncall_context
);
340 struct fncall_state
{
344 struct tevent_req
*fncall_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
345 struct fncall_context
*ctx
,
346 void (*fn
)(void *private_data
),
349 struct tevent_req
*req
;
350 struct fncall_state
*state
;
352 req
= tevent_req_create(mem_ctx
, &state
, struct fncall_state
);
357 tevent_req_post(req
, ev
);
361 int fncall_recv(struct tevent_req
*req
, int *perr
)