10 #include "pthreadpool.h"
12 static int test_init(void)
14 struct pthreadpool
*p
;
17 ret
= pthreadpool_init(1, &p
);
19 fprintf(stderr
, "pthreadpool_init failed: %s\n",
23 ret
= pthreadpool_destroy(p
);
25 fprintf(stderr
, "pthreadpool_init failed: %s\n",
32 static void test_sleep(void *ptr
)
34 int *ptimeout
= (int *)ptr
;
36 ret
= poll(NULL
, 0, *ptimeout
);
38 fprintf(stderr
, "poll returned %d (%s)\n",
39 ret
, strerror(errno
));
43 static int test_jobs(int num_threads
, int num_jobs
)
46 struct pthreadpool
*p
;
50 finished
= (char *)calloc(1, num_jobs
);
51 if (finished
== NULL
) {
52 fprintf(stderr
, "calloc failed\n");
56 ret
= pthreadpool_init(num_threads
, &p
);
58 fprintf(stderr
, "pthreadpool_init failed: %s\n",
63 for (i
=0; i
<num_jobs
; i
++) {
64 ret
= pthreadpool_add_job(p
, i
, test_sleep
, &timeout
);
66 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
72 for (i
=0; i
<num_jobs
; i
++) {
74 ret
= pthreadpool_finished_job(p
, &jobid
);
75 if ((ret
!= 0) || (jobid
>= num_jobs
)) {
76 fprintf(stderr
, "invalid job number %d\n", jobid
);
82 for (i
=0; i
<num_jobs
; i
++) {
83 if (finished
[i
] != 1) {
84 fprintf(stderr
, "finished[%d] = %d\n",
90 ret
= pthreadpool_destroy(p
);
92 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
101 static int test_busydestroy(void)
103 struct pthreadpool
*p
;
108 ret
= pthreadpool_init(1, &p
);
110 fprintf(stderr
, "pthreadpool_init failed: %s\n",
114 ret
= pthreadpool_add_job(p
, 1, test_sleep
, &timeout
);
116 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
120 ret
= pthreadpool_destroy(p
);
122 fprintf(stderr
, "Could destroy a busy pool\n");
126 pfd
.fd
= pthreadpool_signal_fd(p
);
127 pfd
.events
= POLLIN
|POLLERR
;
131 ret
= pthreadpool_destroy(p
);
133 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
140 struct threaded_state
{
142 struct pthreadpool
*p
;
148 static void *test_threaded_worker(void *p
)
150 struct threaded_state
*state
= (struct threaded_state
*)p
;
153 for (i
=0; i
<state
->num_jobs
; i
++) {
154 int ret
= pthreadpool_add_job(state
->p
, state
->start_job
+ i
,
155 test_sleep
, &state
->timeout
);
157 fprintf(stderr
, "pthreadpool_add_job failed: %s\n",
165 static int test_threaded_addjob(int num_pools
, int num_threads
, int poolsize
,
168 struct pthreadpool
**pools
;
169 struct threaded_state
*states
;
170 struct threaded_state
*state
;
177 states
= calloc(num_threads
, sizeof(struct threaded_state
));
178 if (states
== NULL
) {
179 fprintf(stderr
, "calloc failed\n");
183 finished
= calloc(num_threads
* num_jobs
, 1);
184 if (finished
== NULL
) {
185 fprintf(stderr
, "calloc failed\n");
189 pools
= calloc(num_pools
, sizeof(struct pthreadpool
*));
191 fprintf(stderr
, "calloc failed\n");
195 pfds
= calloc(num_pools
, sizeof(struct pollfd
));
197 fprintf(stderr
, "calloc failed\n");
201 for (i
=0; i
<num_pools
; i
++) {
202 ret
= pthreadpool_init(poolsize
, &pools
[i
]);
204 fprintf(stderr
, "pthreadpool_init failed: %s\n",
208 pfds
[i
].fd
= pthreadpool_signal_fd(pools
[i
]);
209 pfds
[i
].events
= POLLIN
|POLLHUP
;
214 for (i
=0; i
<num_threads
; i
++) {
217 state
->p
= pools
[poolnum
];
218 poolnum
= (poolnum
+ 1) % num_pools
;
220 state
->num_jobs
= num_jobs
;
222 state
->start_job
= i
* num_jobs
;
224 ret
= pthread_create(&state
->tid
, NULL
, test_threaded_worker
,
227 fprintf(stderr
, "pthread_create failed: %s\n",
239 fprintf(stderr
, "fork failed: %s\n", strerror(errno
));
243 for (i
=0; i
<num_pools
; i
++) {
244 ret
= pthreadpool_destroy(pools
[i
]);
246 fprintf(stderr
, "pthreadpool_destroy failed: "
247 "%s\n", strerror(ret
));
255 for (i
=0; i
<num_threads
; i
++) {
256 ret
= pthread_join(states
[i
].tid
, NULL
);
258 fprintf(stderr
, "pthread_join(%d) failed: %s\n",
266 while (received
< num_threads
*num_jobs
) {
269 ret
= poll(pfds
, num_pools
, 1000);
271 fprintf(stderr
, "poll failed: %s\n",
276 fprintf(stderr
, "\npoll timed out\n");
280 for (j
=0; j
<num_pools
; j
++) {
283 if ((pfds
[j
].revents
& (POLLIN
|POLLHUP
)) == 0) {
287 ret
= pthreadpool_finished_job(pools
[j
], &jobid
);
288 if ((ret
!= 0) || (jobid
>= num_jobs
* num_threads
)) {
289 fprintf(stderr
, "invalid job number %d\n",
293 finished
[jobid
] += 1;
298 for (i
=0; i
<num_threads
*num_jobs
; i
++) {
299 if (finished
[i
] != 1) {
300 fprintf(stderr
, "finished[%d] = %d\n",
306 for (i
=0; i
<num_pools
; i
++) {
307 ret
= pthreadpool_destroy(pools
[i
]);
309 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
323 static int test_fork(void)
325 struct pthreadpool
*p
;
329 ret
= pthreadpool_init(1, &p
);
331 fprintf(stderr
, "pthreadpool_init failed: %s\n",
335 ret
= pthreadpool_destroy(p
);
337 fprintf(stderr
, "pthreadpool_destroy failed: %s\n",
344 perror("fork failed");
350 waited
= wait(&status
);
352 perror("wait failed");
355 if (waited
!= child
) {
356 fprintf(stderr
, "expected child %d, got %d\n",
357 (int)child
, (int)waited
);
369 fprintf(stderr
, "test_init failed\n");
375 fprintf(stderr
, "test_fork failed\n");
379 ret
= test_jobs(10, 10000);
381 fprintf(stderr
, "test_jobs failed\n");
385 ret
= test_busydestroy();
387 fprintf(stderr
, "test_busydestroy failed\n");
392 * Test 10 threads adding jobs on a single pool
394 ret
= test_threaded_addjob(1, 10, 5, 5000);
396 fprintf(stderr
, "test_jobs failed\n");
401 * Test 10 threads on 3 pools to verify our fork handling
404 ret
= test_threaded_addjob(3, 10, 5, 5000);
406 fprintf(stderr
, "test_jobs failed\n");