tevent: add tevent_req_defer_callback()
[Samba/gebeck_regimport.git] / source3 / lib / pthreadpool / tests.c
blob667ee01784e74feb29e304ef66d90e8cbef5113d
1 #include <stdio.h>
2 #include <string.h>
3 #include <poll.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <unistd.h>
8 #include "pthreadpool.h"
10 static int test_init(void)
12 struct pthreadpool *p;
13 int ret;
15 ret = pthreadpool_init(1, &p);
16 if (ret != 0) {
17 fprintf(stderr, "pthreadpool_init failed: %s\n",
18 strerror(ret));
19 return -1;
21 ret = pthreadpool_destroy(p);
22 if (ret != 0) {
23 fprintf(stderr, "pthreadpool_init failed: %s\n",
24 strerror(ret));
25 return -1;
27 return 0;
30 static void test_sleep(void *ptr)
32 int *ptimeout = (int *)ptr;
33 int ret;
34 ret = poll(NULL, 0, *ptimeout);
35 if (ret != 0) {
36 fprintf(stderr, "poll returned %d (%s)\n",
37 ret, strerror(errno));
41 static int test_jobs(int num_threads, int num_jobs)
43 char *finished;
44 struct pthreadpool *p;
45 int timeout = 1;
46 int i, ret;
48 finished = (char *)calloc(1, num_jobs);
49 if (finished == NULL) {
50 fprintf(stderr, "calloc failed\n");
51 return -1;
54 ret = pthreadpool_init(num_threads, &p);
55 if (ret != 0) {
56 fprintf(stderr, "pthreadpool_init failed: %s\n",
57 strerror(ret));
58 return -1;
61 for (i=0; i<num_jobs; i++) {
62 ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
63 if (ret != 0) {
64 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
65 strerror(ret));
66 return -1;
70 for (i=0; i<num_jobs; i++) {
71 ret = pthreadpool_finished_job(p);
72 if ((ret < 0) || (ret >= num_jobs)) {
73 fprintf(stderr, "invalid job number %d\n", ret);
74 return -1;
76 finished[ret] += 1;
79 for (i=0; i<num_jobs; i++) {
80 if (finished[i] != 1) {
81 fprintf(stderr, "finished[%d] = %d\n",
82 i, finished[i]);
83 return -1;
87 ret = pthreadpool_destroy(p);
88 if (ret != 0) {
89 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
90 strerror(ret));
91 return -1;
94 free(finished);
95 return 0;
98 static int test_busydestroy(void)
100 struct pthreadpool *p;
101 int timeout = 50;
102 struct pollfd pfd;
103 int ret;
105 ret = pthreadpool_init(1, &p);
106 if (ret != 0) {
107 fprintf(stderr, "pthreadpool_init failed: %s\n",
108 strerror(ret));
109 return -1;
111 ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
112 if (ret != 0) {
113 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
114 strerror(ret));
115 return -1;
117 ret = pthreadpool_destroy(p);
118 if (ret != EBUSY) {
119 fprintf(stderr, "Could destroy a busy pool\n");
120 return -1;
123 pfd.fd = pthreadpool_signal_fd(p);
124 pfd.events = POLLIN|POLLERR;
126 poll(&pfd, 1, -1);
128 ret = pthreadpool_destroy(p);
129 if (ret != 0) {
130 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
131 strerror(ret));
132 return -1;
134 return 0;
137 struct threaded_state {
138 pthread_t tid;
139 struct pthreadpool *p;
140 int start_job;
141 int num_jobs;
142 int timeout;
145 static void *test_threaded_worker(void *p)
147 struct threaded_state *state = (struct threaded_state *)p;
148 int i;
150 for (i=0; i<state->num_jobs; i++) {
151 int ret = pthreadpool_add_job(state->p, state->start_job + i,
152 test_sleep, &state->timeout);
153 if (ret != 0) {
154 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
155 strerror(ret));
156 return NULL;
159 return NULL;
162 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
163 int num_jobs)
165 struct pthreadpool **pools;
166 struct threaded_state *states;
167 struct threaded_state *state;
168 struct pollfd *pfds;
169 char *finished;
170 pid_t child;
171 int i, ret, poolnum;
172 int received;
174 states = calloc(num_threads, sizeof(struct threaded_state));
175 if (states == NULL) {
176 fprintf(stderr, "calloc failed\n");
177 return -1;
180 finished = calloc(num_threads * num_jobs, 1);
181 if (finished == NULL) {
182 fprintf(stderr, "calloc failed\n");
183 return -1;
186 pools = calloc(num_pools, sizeof(struct pthreadpool *));
187 if (pools == NULL) {
188 fprintf(stderr, "calloc failed\n");
189 return -1;
192 pfds = calloc(num_pools, sizeof(struct pollfd));
193 if (pfds == NULL) {
194 fprintf(stderr, "calloc failed\n");
195 return -1;
198 for (i=0; i<num_pools; i++) {
199 ret = pthreadpool_init(poolsize, &pools[i]);
200 if (ret != 0) {
201 fprintf(stderr, "pthreadpool_init failed: %s\n",
202 strerror(ret));
203 return -1;
205 pfds[i].fd = pthreadpool_signal_fd(pools[i]);
206 pfds[i].events = POLLIN|POLLHUP;
209 poolnum = 0;
211 for (i=0; i<num_threads; i++) {
212 state = &states[i];
214 state->p = pools[poolnum];
215 poolnum = (poolnum + 1) % num_pools;
217 state->num_jobs = num_jobs;
218 state->timeout = 1;
219 state->start_job = i * num_jobs;
221 ret = pthread_create(&state->tid, NULL, test_threaded_worker,
222 state);
223 if (ret != 0) {
224 fprintf(stderr, "pthread_create failed: %s\n",
225 strerror(ret));
226 return -1;
230 if (random() % 1) {
231 poll(NULL, 0, 1);
234 child = fork();
235 if (child < 0) {
236 fprintf(stderr, "fork failed: %s\n", strerror(errno));
237 return -1;
239 if (child == 0) {
240 for (i=0; i<num_pools; i++) {
241 ret = pthreadpool_destroy(pools[i]);
242 if (ret != 0) {
243 fprintf(stderr, "pthreadpool_destroy failed: "
244 "%s\n", strerror(ret));
245 exit(1);
248 /* child */
249 exit(0);
252 for (i=0; i<num_threads; i++) {
253 ret = pthread_join(states[i].tid, NULL);
254 if (ret != 0) {
255 fprintf(stderr, "pthread_join(%d) failed: %s\n",
256 i, strerror(ret));
257 return -1;
261 received = 0;
263 while (received < num_threads*num_jobs) {
264 int j;
266 ret = poll(pfds, num_pools, 1000);
267 if (ret == -1) {
268 fprintf(stderr, "poll failed: %s\n",
269 strerror(errno));
270 return -1;
272 if (ret == 0) {
273 fprintf(stderr, "\npoll timed out\n");
274 break;
277 for (j=0; j<num_pools; j++) {
279 if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
280 continue;
283 ret = pthreadpool_finished_job(pools[j]);
284 if ((ret < 0) || (ret >= num_jobs * num_threads)) {
285 fprintf(stderr, "invalid job number %d\n",
286 ret);
287 return -1;
289 finished[ret] += 1;
290 received += 1;
294 for (i=0; i<num_threads*num_jobs; i++) {
295 if (finished[i] != 1) {
296 fprintf(stderr, "finished[%d] = %d\n",
297 i, finished[i]);
298 return -1;
302 for (i=0; i<num_pools; i++) {
303 ret = pthreadpool_destroy(pools[i]);
304 if (ret != 0) {
305 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
306 strerror(ret));
307 return -1;
311 free(pfds);
312 free(pools);
313 free(states);
314 free(finished);
316 return 0;
319 int main(void)
321 int ret;
323 ret = test_init();
324 if (ret != 0) {
325 fprintf(stderr, "test_init failed\n");
326 return 1;
329 ret = test_jobs(10, 10000);
330 if (ret != 0) {
331 fprintf(stderr, "test_jobs failed\n");
332 return 1;
335 ret = test_busydestroy();
336 if (ret != 0) {
337 fprintf(stderr, "test_busydestroy failed\n");
338 return 1;
342 * Test 10 threads adding jobs on a single pool
344 ret = test_threaded_addjob(1, 10, 5, 5000);
345 if (ret != 0) {
346 fprintf(stderr, "test_jobs failed\n");
347 return 1;
351 * Test 10 threads on 3 pools to verify our fork handling
352 * works right.
354 ret = test_threaded_addjob(3, 10, 5, 5000);
355 if (ret != 0) {
356 fprintf(stderr, "test_jobs failed\n");
357 return 1;
360 printf("success\n");
361 return 0;