8 #include "pthreadpool.h"
10 static int test_init(void)
12 struct pthreadpool
*p
;
15 ret
= pthreadpool_init(1, &p
);
17 fprintf(stderr
, "pthreadpool_init failed: %s\n",
21 ret
= pthreadpool_destroy(p
);
23 fprintf(stderr
, "pthreadpool_init failed: %s\n",
30 static void test_sleep(void *ptr
)
32 int *ptimeout
= (int *)ptr
;
34 ret
= poll(NULL
, 0, *ptimeout
);
36 fprintf(stderr
, "poll returned %d (%s)\n",
37 ret
, strerror(errno
));
41 static int test_jobs(int num_threads
, int num_jobs
)
44 struct pthreadpool
*p
;
48 finished
= (char *)calloc(1, num_jobs
);
49 if (finished
== NULL
) {
50 fprintf(stderr
, "calloc failed\n");
54 ret
= pthreadpool_init(num_threads
, &p
);
56 fprintf(stderr
, "pthreadpool_init failed: %s\n",
61 for (i
=0; i
<num_jobs
; i
++) {
62 ret
= pthreadpool_add_job(p
, i
, test_sleep
, &timeout
);
64 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
70 for (i
=0; i
<num_jobs
; i
++) {
72 ret
= pthreadpool_finished_job(p
, &jobid
);
73 if ((ret
!= 0) || (jobid
>= num_jobs
)) {
74 fprintf(stderr
, "invalid job number %d\n", jobid
);
80 for (i
=0; i
<num_jobs
; i
++) {
81 if (finished
[i
] != 1) {
82 fprintf(stderr
, "finished[%d] = %d\n",
88 ret
= pthreadpool_destroy(p
);
90 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
99 static int test_busydestroy(void)
101 struct pthreadpool
*p
;
106 ret
= pthreadpool_init(1, &p
);
108 fprintf(stderr
, "pthreadpool_init failed: %s\n",
112 ret
= pthreadpool_add_job(p
, 1, test_sleep
, &timeout
);
114 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
118 ret
= pthreadpool_destroy(p
);
120 fprintf(stderr
, "Could destroy a busy pool\n");
124 pfd
.fd
= pthreadpool_signal_fd(p
);
125 pfd
.events
= POLLIN
|POLLERR
;
129 ret
= pthreadpool_destroy(p
);
131 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
138 struct threaded_state
{
140 struct pthreadpool
*p
;
146 static void *test_threaded_worker(void *p
)
148 struct threaded_state
*state
= (struct threaded_state
*)p
;
151 for (i
=0; i
<state
->num_jobs
; i
++) {
152 int ret
= pthreadpool_add_job(state
->p
, state
->start_job
+ i
,
153 test_sleep
, &state
->timeout
);
155 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
163 static int test_threaded_addjob(int num_pools
, int num_threads
, int poolsize
,
166 struct pthreadpool
**pools
;
167 struct threaded_state
*states
;
168 struct threaded_state
*state
;
175 states
= calloc(num_threads
, sizeof(struct threaded_state
));
176 if (states
== NULL
) {
177 fprintf(stderr
, "calloc failed\n");
181 finished
= calloc(num_threads
* num_jobs
, 1);
182 if (finished
== NULL
) {
183 fprintf(stderr
, "calloc failed\n");
187 pools
= calloc(num_pools
, sizeof(struct pthreadpool
*));
189 fprintf(stderr
, "calloc failed\n");
193 pfds
= calloc(num_pools
, sizeof(struct pollfd
));
195 fprintf(stderr
, "calloc failed\n");
199 for (i
=0; i
<num_pools
; i
++) {
200 ret
= pthreadpool_init(poolsize
, &pools
[i
]);
202 fprintf(stderr
, "pthreadpool_init failed: %s\n",
206 pfds
[i
].fd
= pthreadpool_signal_fd(pools
[i
]);
207 pfds
[i
].events
= POLLIN
|POLLHUP
;
212 for (i
=0; i
<num_threads
; i
++) {
215 state
->p
= pools
[poolnum
];
216 poolnum
= (poolnum
+ 1) % num_pools
;
218 state
->num_jobs
= num_jobs
;
220 state
->start_job
= i
* num_jobs
;
222 ret
= pthread_create(&state
->tid
, NULL
, test_threaded_worker
,
225 fprintf(stderr
, "pthread_create failed: %s\n",
237 fprintf(stderr
, "fork failed: %s\n", strerror(errno
));
241 for (i
=0; i
<num_pools
; i
++) {
242 ret
= pthreadpool_destroy(pools
[i
]);
244 fprintf(stderr
, "pthreadpool_destroy failed: "
245 "%s\n", strerror(ret
));
253 for (i
=0; i
<num_threads
; i
++) {
254 ret
= pthread_join(states
[i
].tid
, NULL
);
256 fprintf(stderr
, "pthread_join(%d) failed: %s\n",
264 while (received
< num_threads
*num_jobs
) {
267 ret
= poll(pfds
, num_pools
, 1000);
269 fprintf(stderr
, "poll failed: %s\n",
274 fprintf(stderr
, "\npoll timed out\n");
278 for (j
=0; j
<num_pools
; j
++) {
281 if ((pfds
[j
].revents
& (POLLIN
|POLLHUP
)) == 0) {
285 ret
= pthreadpool_finished_job(pools
[j
], &jobid
);
286 if ((ret
!= 0) || (jobid
>= num_jobs
* num_threads
)) {
287 fprintf(stderr
, "invalid job number %d\n",
291 finished
[jobid
] += 1;
296 for (i
=0; i
<num_threads
*num_jobs
; i
++) {
297 if (finished
[i
] != 1) {
298 fprintf(stderr
, "finished[%d] = %d\n",
304 for (i
=0; i
<num_pools
; i
++) {
305 ret
= pthreadpool_destroy(pools
[i
]);
307 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
327 fprintf(stderr
, "test_init failed\n");
331 ret
= test_jobs(10, 10000);
333 fprintf(stderr
, "test_jobs failed\n");
337 ret
= test_busydestroy();
339 fprintf(stderr
, "test_busydestroy failed\n");
344 * Test 10 threads adding jobs on a single pool
346 ret
= test_threaded_addjob(1, 10, 5, 5000);
348 fprintf(stderr
, "test_jobs failed\n");
353 * Test 10 threads on 3 pools to verify our fork handling
356 ret
= test_threaded_addjob(3, 10, 5, 5000);
358 fprintf(stderr
, "test_jobs failed\n");