8 #include "pthreadpool.h"
10 static int test_init(void)
12 struct pthreadpool
*p
;
15 ret
= pthreadpool_init(1, &p
);
17 fprintf(stderr
, "pthreadpool_init failed: %s\n",
21 ret
= pthreadpool_destroy(p
);
23 fprintf(stderr
, "pthreadpool_init failed: %s\n",
30 static void test_sleep(void *ptr
)
32 int *ptimeout
= (int *)ptr
;
34 ret
= poll(NULL
, 0, *ptimeout
);
36 fprintf(stderr
, "poll returned %d (%s)\n",
37 ret
, strerror(errno
));
41 static int test_jobs(int num_threads
, int num_jobs
)
44 struct pthreadpool
*p
;
48 finished
= (char *)calloc(1, num_jobs
);
49 if (finished
== NULL
) {
50 fprintf(stderr
, "calloc failed\n");
54 ret
= pthreadpool_init(num_threads
, &p
);
56 fprintf(stderr
, "pthreadpool_init failed: %s\n",
61 for (i
=0; i
<num_jobs
; i
++) {
62 ret
= pthreadpool_add_job(p
, i
, test_sleep
, &timeout
);
64 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
70 for (i
=0; i
<num_jobs
; i
++) {
71 ret
= pthreadpool_finished_job(p
);
72 if ((ret
< 0) || (ret
>= num_jobs
)) {
73 fprintf(stderr
, "invalid job number %d\n", ret
);
79 for (i
=0; i
<num_jobs
; i
++) {
80 if (finished
[i
] != 1) {
81 fprintf(stderr
, "finished[%d] = %d\n",
87 ret
= pthreadpool_destroy(p
);
89 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
98 static int test_busydestroy(void)
100 struct pthreadpool
*p
;
105 ret
= pthreadpool_init(1, &p
);
107 fprintf(stderr
, "pthreadpool_init failed: %s\n",
111 ret
= pthreadpool_add_job(p
, 1, test_sleep
, &timeout
);
113 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
117 ret
= pthreadpool_destroy(p
);
119 fprintf(stderr
, "Could destroy a busy pool\n");
123 pfd
.fd
= pthreadpool_signal_fd(p
);
124 pfd
.events
= POLLIN
|POLLERR
;
128 ret
= pthreadpool_destroy(p
);
130 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
137 struct threaded_state
{
139 struct pthreadpool
*p
;
145 static void *test_threaded_worker(void *p
)
147 struct threaded_state
*state
= (struct threaded_state
*)p
;
150 for (i
=0; i
<state
->num_jobs
; i
++) {
151 int ret
= pthreadpool_add_job(state
->p
, state
->start_job
+ i
,
152 test_sleep
, &state
->timeout
);
154 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
162 static int test_threaded_addjob(int num_pools
, int num_threads
, int poolsize
,
165 struct pthreadpool
**pools
;
166 struct threaded_state
*states
;
167 struct threaded_state
*state
;
174 states
= calloc(num_threads
, sizeof(struct threaded_state
));
175 if (states
== NULL
) {
176 fprintf(stderr
, "calloc failed\n");
180 finished
= calloc(num_threads
* num_jobs
, 1);
181 if (finished
== NULL
) {
182 fprintf(stderr
, "calloc failed\n");
186 pools
= calloc(num_pools
, sizeof(struct pthreadpool
*));
188 fprintf(stderr
, "calloc failed\n");
192 pfds
= calloc(num_pools
, sizeof(struct pollfd
));
194 fprintf(stderr
, "calloc failed\n");
198 for (i
=0; i
<num_pools
; i
++) {
199 ret
= pthreadpool_init(poolsize
, &pools
[i
]);
201 fprintf(stderr
, "pthreadpool_init failed: %s\n",
205 pfds
[i
].fd
= pthreadpool_signal_fd(pools
[i
]);
206 pfds
[i
].events
= POLLIN
|POLLHUP
;
211 for (i
=0; i
<num_threads
; i
++) {
214 state
->p
= pools
[poolnum
];
215 poolnum
= (poolnum
+ 1) % num_pools
;
217 state
->num_jobs
= num_jobs
;
219 state
->start_job
= i
* num_jobs
;
221 ret
= pthread_create(&state
->tid
, NULL
, test_threaded_worker
,
224 fprintf(stderr
, "pthread_create failed: %s\n",
236 fprintf(stderr
, "fork failed: %s\n", strerror(errno
));
240 for (i
=0; i
<num_pools
; i
++) {
241 ret
= pthreadpool_destroy(pools
[i
]);
243 fprintf(stderr
, "pthreadpool_destroy failed: "
244 "%s\n", strerror(ret
));
252 for (i
=0; i
<num_threads
; i
++) {
253 ret
= pthread_join(states
[i
].tid
, NULL
);
255 fprintf(stderr
, "pthread_join(%d) failed: %s\n",
263 while (received
< num_threads
*num_jobs
) {
266 ret
= poll(pfds
, num_pools
, 1000);
268 fprintf(stderr
, "poll failed: %s\n",
273 fprintf(stderr
, "\npoll timed out\n");
277 for (j
=0; j
<num_pools
; j
++) {
279 if ((pfds
[j
].revents
& (POLLIN
|POLLHUP
)) == 0) {
283 ret
= pthreadpool_finished_job(pools
[j
]);
284 if ((ret
< 0) || (ret
>= num_jobs
* num_threads
)) {
285 fprintf(stderr
, "invalid job number %d\n",
294 for (i
=0; i
<num_threads
*num_jobs
; i
++) {
295 if (finished
[i
] != 1) {
296 fprintf(stderr
, "finished[%d] = %d\n",
302 for (i
=0; i
<num_pools
; i
++) {
303 ret
= pthreadpool_destroy(pools
[i
]);
305 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
325 fprintf(stderr
, "test_init failed\n");
329 ret
= test_jobs(10, 10000);
331 fprintf(stderr
, "test_jobs failed\n");
335 ret
= test_busydestroy();
337 fprintf(stderr
, "test_busydestroy failed\n");
342 * Test 10 threads adding jobs on a single pool
344 ret
= test_threaded_addjob(1, 10, 5, 5000);
346 fprintf(stderr
, "test_jobs failed\n");
351 * Test 10 threads on 3 pools to verify our fork handling
354 ret
= test_threaded_addjob(3, 10, 5, 5000);
356 fprintf(stderr
, "test_jobs failed\n");