2 * Unix SMB/CIFS implementation.
3 * threadpool implementation based on pthreads
4 * Copyright (C) Volker Lendecke 2009,2011
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "pthreadpool_pipe.h"
23 #include "pthreadpool.h"
25 struct pthreadpool_pipe
{
26 struct pthreadpool
*pool
;
32 static int pthreadpool_pipe_signal(int jobid
,
33 void (*job_fn
)(void *private_data
),
34 void *job_private_data
,
37 int pthreadpool_pipe_init(unsigned max_threads
,
38 struct pthreadpool_pipe
**presult
)
40 struct pthreadpool_pipe
*pool
;
43 pool
= calloc(1, sizeof(struct pthreadpool_pipe
));
49 ret
= pipe(pool
->pipe_fds
);
56 ret
= pthreadpool_init(max_threads
, &pool
->pool
,
57 pthreadpool_pipe_signal
, pool
);
59 close(pool
->pipe_fds
[0]);
60 close(pool
->pipe_fds
[1]);
69 static int pthreadpool_pipe_signal(int jobid
,
70 void (*job_fn
)(void *private_data
),
71 void *job_private_data
,
74 struct pthreadpool_pipe
*pool
= private_data
;
78 written
= write(pool
->pipe_fds
[1], &jobid
, sizeof(jobid
));
79 } while ((written
== -1) && (errno
== EINTR
));
81 if (written
!= sizeof(jobid
)) {
88 int pthreadpool_pipe_destroy(struct pthreadpool_pipe
*pool
)
92 if (pool
->num_jobs
!= 0) {
96 ret
= pthreadpool_destroy(pool
->pool
);
101 close(pool
->pipe_fds
[0]);
102 pool
->pipe_fds
[0] = -1;
104 close(pool
->pipe_fds
[1]);
105 pool
->pipe_fds
[1] = -1;
111 static int pthreadpool_pipe_reinit(struct pthreadpool_pipe
*pool
)
113 pid_t pid
= getpid();
117 if (pid
== pool
->pid
) {
121 signal_fd
= pool
->pipe_fds
[0];
123 close(pool
->pipe_fds
[0]);
124 pool
->pipe_fds
[0] = -1;
126 close(pool
->pipe_fds
[1]);
127 pool
->pipe_fds
[1] = -1;
129 ret
= pipe(pool
->pipe_fds
);
134 ret
= dup2(pool
->pipe_fds
[0], signal_fd
);
139 pool
->pipe_fds
[0] = signal_fd
;
145 int pthreadpool_pipe_add_job(struct pthreadpool_pipe
*pool
, int job_id
,
146 void (*fn
)(void *private_data
),
151 ret
= pthreadpool_pipe_reinit(pool
);
156 ret
= pthreadpool_add_job(pool
->pool
, job_id
, fn
, private_data
);
166 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe
*pool
)
168 return pool
->pipe_fds
[0];
171 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe
*pool
, int *jobids
,
174 ssize_t to_read
, nread
, num_jobs
;
175 pid_t pid
= getpid();
177 if (pool
->pid
!= pid
) {
181 to_read
= sizeof(int) * num_jobids
;
184 nread
= read(pool
->pipe_fds
[0], jobids
, to_read
);
185 } while ((nread
== -1) && (errno
== EINTR
));
190 if ((nread
% sizeof(int)) != 0) {
194 num_jobs
= nread
/ sizeof(int);
196 if (num_jobs
> pool
->num_jobs
) {
199 pool
->num_jobs
-= num_jobs
;