pthreadpool: test cancelling and freeing pending pthreadpool_tevent jobs/pools
[Samba.git] / lib / pthreadpool / tests_cmocka.c
blobdc7b1150b5c0d90f052f6af63d4c4244695ffc41
1 /*
2 * Unix SMB/CIFS implementation.
3 * cmocka tests for thread pool implementation
4 * Copyright (C) Christof Schmitt 2017
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "config.h"
21 #include <errno.h>
22 #include <pthread.h>
23 #include <setjmp.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
31 #include <talloc.h>
32 #include <tevent.h>
33 #include <pthreadpool_tevent.h>
35 #include <cmocka.h>
36 #include <poll.h>
38 #ifdef HAVE_VALGRIND_HELGRIND_H
39 #include <valgrind/helgrind.h>
40 #endif
41 #ifndef ANNOTATE_BENIGN_RACE_SIZED
42 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
43 #endif
45 struct pthreadpool_tevent_test {
46 struct tevent_context *ev;
47 struct pthreadpool_tevent *upool;
48 struct pthreadpool_tevent *spool;
49 struct pthreadpool_tevent *opool;
52 static int setup_pthreadpool_tevent(void **state)
54 struct pthreadpool_tevent_test *t;
55 int ret;
56 size_t max_threads;
58 t = talloc_zero(NULL, struct pthreadpool_tevent_test);
59 assert_non_null(t);
61 t->ev = tevent_context_init(t);
62 assert_non_null(t->ev);
64 ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
65 assert_int_equal(ret, 0);
67 max_threads = pthreadpool_tevent_max_threads(t->upool);
68 assert_int_equal(max_threads, UINT_MAX);
70 ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
71 assert_int_equal(ret, 0);
73 max_threads = pthreadpool_tevent_max_threads(t->opool);
74 assert_int_equal(max_threads, 1);
76 ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
77 assert_int_equal(ret, 0);
79 max_threads = pthreadpool_tevent_max_threads(t->spool);
80 assert_int_equal(max_threads, 0);
82 *state = t;
84 return 0;
87 static int teardown_pthreadpool_tevent(void **state)
89 struct pthreadpool_tevent_test *t = *state;
91 TALLOC_FREE(t);
93 return 0;
96 int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
97 void *(*start_routine) (void *), void *arg);
98 int __real_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
99 void *(*start_routine) (void *), void *arg);
101 int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
102 void *(*start_routine) (void *), void *arg)
104 int error;
106 error = mock_type(int);
107 if (error != 0) {
108 return error;
111 return __real_pthread_create(thread, attr, start_routine, arg);
114 static void test_job_threadid(void *ptr)
116 pthread_t *threadid = ptr;
118 *threadid = pthread_self();
121 static int test_create_do(struct tevent_context *ev,
122 struct pthreadpool_tevent *pool,
123 bool *executed,
124 bool *in_main_thread)
126 struct tevent_req *req;
127 pthread_t zero_thread;
128 pthread_t main_thread;
129 pthread_t worker_thread;
130 bool ok;
131 int ret;
133 *executed = false;
134 *in_main_thread = false;
136 memset(&zero_thread, 0, sizeof(zero_thread));
137 main_thread = pthread_self();
138 worker_thread = zero_thread;
140 req = pthreadpool_tevent_job_send(
141 ev, ev, pool, test_job_threadid, &worker_thread);
142 if (req == NULL) {
143 fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
144 return ENOMEM;
147 ok = tevent_req_poll(req, ev);
148 if (!ok) {
149 ret = errno;
150 fprintf(stderr, "tevent_req_poll failed: %s\n",
151 strerror(ret));
152 *executed = !pthread_equal(worker_thread, zero_thread);
153 *in_main_thread = pthread_equal(worker_thread, main_thread);
154 return ret;
158 ret = pthreadpool_tevent_job_recv(req);
159 TALLOC_FREE(req);
160 *executed = !pthread_equal(worker_thread, zero_thread);
161 *in_main_thread = pthread_equal(worker_thread, main_thread);
162 if (ret != 0) {
163 fprintf(stderr, "tevent_req_recv failed: %s\n",
164 strerror(ret));
165 return ret;
168 return 0;
171 static void test_create(void **state)
173 struct pthreadpool_tevent_test *t = *state;
174 bool executed;
175 bool in_main_thread;
176 int ret;
179 * When pthreadpool cannot create the first worker thread,
180 * this job will run in the sync fallback in the main thread.
182 will_return(__wrap_pthread_create, EAGAIN);
183 ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
184 assert_int_equal(ret, EAGAIN);
185 assert_false(executed);
186 assert_false(in_main_thread);
189 * The sync pool won't trigger pthread_create()
190 * It will be triggered by the one pool.
192 will_return(__wrap_pthread_create, EAGAIN);
194 ret = test_create_do(t->ev, t->spool, &executed, &in_main_thread);
195 assert_int_equal(ret, 0);
196 assert_true(executed);
197 assert_true(in_main_thread);
199 ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
200 assert_int_equal(ret, EAGAIN);
201 assert_false(executed);
202 assert_false(in_main_thread);
205 * When a thread can be created, the job will run in the worker thread.
207 will_return(__wrap_pthread_create, 0);
208 ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
209 assert_int_equal(ret, 0);
210 assert_true(executed);
211 assert_false(in_main_thread);
213 poll(NULL, 0, 10);
216 * Workerthread will still be active for a second; immediately
217 * running another job will also use the worker thread, even
218 * if a new thread cannot be created.
220 ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
221 assert_int_equal(ret, 0);
222 assert_true(executed);
223 assert_false(in_main_thread);
226 * When a thread can be created, the job will run in the worker thread.
228 will_return(__wrap_pthread_create, 0);
229 ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
230 assert_int_equal(ret, 0);
231 assert_true(executed);
232 assert_false(in_main_thread);
234 poll(NULL, 0, 10);
237 * Workerthread will still be active for a second; immediately
238 * running another job will also use the worker thread, even
239 * if a new thread cannot be created.
241 ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
242 assert_int_equal(ret, 0);
243 assert_true(executed);
244 assert_false(in_main_thread);
247 struct test_cancel_job {
248 int fdm; /* the main end of socketpair */
249 int fdj; /* the job end of socketpair */
250 bool started;
251 bool canceled;
252 bool orphaned;
253 bool finished;
254 size_t polls;
255 size_t timeouts;
256 int sleep_msec;
257 struct tevent_req *req;
258 bool completed;
259 int ret;
262 static void test_cancel_job_done(struct tevent_req *req);
264 static int test_cancel_job_destructor(struct test_cancel_job *job)
266 ANNOTATE_BENIGN_RACE_SIZED(&job->started,
267 sizeof(job->started),
268 "protected by pthreadpool_tevent code");
269 if (job->started) {
270 ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
271 sizeof(job->finished),
272 "protected by pthreadpool_tevent code");
273 assert_true(job->finished);
276 ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
277 sizeof(job->fdj),
278 "protected by pthreadpool_tevent code");
280 if (job->fdm != -1) {
281 close(job->fdm);
282 job->fdm = -1;
284 if (job->fdj != -1) {
285 close(job->fdj);
286 job->fdj = -1;
289 return 0;
292 static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
294 struct test_cancel_job *job = NULL;
296 job = talloc(mem_ctx, struct test_cancel_job);
297 if (job == NULL) {
298 return NULL;
300 *job = (struct test_cancel_job) {
301 .fdm = -1,
302 .fdj = -1,
303 .sleep_msec = 50,
306 talloc_set_destructor(job, test_cancel_job_destructor);
307 return job;
310 static void test_cancel_job_fn(void *ptr)
312 struct test_cancel_job *job = (struct test_cancel_job *)ptr;
313 int fdj = -1;
314 char c = 0;
315 int ret;
317 assert_non_null(job); /* make sure we abort without a job pointer */
319 job->started = true;
320 fdj = job->fdj;
321 job->fdj = -1;
323 if (!pthreadpool_tevent_current_job_continue()) {
324 job->canceled = pthreadpool_tevent_current_job_canceled();
325 job->orphaned = pthreadpool_tevent_current_job_orphaned();
326 job->finished = true;
327 close(fdj);
328 return;
332 * Notify that we main thread
334 * write of 1 byte should always work!
336 ret = write(fdj, &c, 1);
337 assert_int_equal(ret, 1);
340 * loop until the job was tried to
341 * be canceled or becomes orphaned.
343 * If there's some activity on the fd
344 * we directly finish.
346 do {
347 struct pollfd pfd = {
348 .fd = fdj,
349 .events = POLLIN,
352 job->polls += 1;
354 ret = poll(&pfd, 1, job->sleep_msec);
355 if (ret == 1) {
356 job->finished = true;
357 close(fdj);
358 return;
360 assert_int_equal(ret, 0);
362 job->timeouts += 1;
364 } while (pthreadpool_tevent_current_job_continue());
366 job->canceled = pthreadpool_tevent_current_job_canceled();
367 job->orphaned = pthreadpool_tevent_current_job_orphaned();
368 job->finished = true;
369 close(fdj);
372 static void test_cancel_job_done(struct tevent_req *req)
374 struct test_cancel_job *job =
375 tevent_req_callback_data(req,
376 struct test_cancel_job);
378 job->ret = pthreadpool_tevent_job_recv(job->req);
379 TALLOC_FREE(job->req);
380 job->completed = true;
383 static void test_cancel_job_wait(struct test_cancel_job *job,
384 struct tevent_context *ev)
387 * We have to keep looping until
388 * test_cancel_job_done was triggered
390 while (!job->completed) {
391 int ret;
393 ret = tevent_loop_once(ev);
394 assert_int_equal(ret, 0);
398 struct test_cancel_state {
399 struct test_cancel_job *job1;
400 struct test_cancel_job *job2;
401 struct test_cancel_job *job3;
402 struct test_cancel_job *job4;
403 struct test_cancel_job *job5;
404 struct test_cancel_job *job6;
407 static void test_cancel_job(void **private_data)
409 struct pthreadpool_tevent_test *t = *private_data;
410 struct tevent_context *ev = t->ev;
411 struct pthreadpool_tevent *pool = t->opool;
412 struct test_cancel_state *state = NULL;
413 int ret;
414 bool ok;
415 int fdpair[2] = { -1, -1 };
416 char c = 0;
418 state = talloc_zero(t, struct test_cancel_state);
419 assert_non_null(state);
420 state->job1 = test_cancel_job_create(state);
421 assert_non_null(state->job1);
422 state->job2 = test_cancel_job_create(state);
423 assert_non_null(state->job2);
424 state->job3 = test_cancel_job_create(state);
425 assert_non_null(state->job3);
427 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
428 assert_int_equal(ret, 0);
430 state->job1->fdm = fdpair[0];
431 state->job1->fdj = fdpair[1];
433 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
435 will_return(__wrap_pthread_create, 0);
436 state->job1->req = pthreadpool_tevent_job_send(
437 state->job1, ev, pool, test_cancel_job_fn, state->job1);
438 assert_non_null(state->job1->req);
439 tevent_req_set_callback(state->job1->req,
440 test_cancel_job_done,
441 state->job1);
443 state->job2->req = pthreadpool_tevent_job_send(
444 state->job2, ev, pool, test_cancel_job_fn, NULL);
445 assert_non_null(state->job2->req);
446 tevent_req_set_callback(state->job2->req,
447 test_cancel_job_done,
448 state->job2);
450 state->job3->req = pthreadpool_tevent_job_send(
451 state->job3, ev, pool, test_cancel_job_fn, NULL);
452 assert_non_null(state->job3->req);
453 tevent_req_set_callback(state->job3->req,
454 test_cancel_job_done,
455 state->job3);
458 * Wait for the job 1 to start.
460 ret = read(state->job1->fdm, &c, 1);
461 assert_int_equal(ret, 1);
464 * We cancel job 3 and destroy job2.
465 * Both should never be executed.
467 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
468 TALLOC_FREE(state->job2->req);
469 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
470 ok = tevent_req_cancel(state->job3->req);
471 assert_true(ok);
472 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
475 * Job 3 should complete as canceled, while
476 * job 1 is still running.
478 test_cancel_job_wait(state->job3, ev);
479 assert_int_equal(state->job3->ret, ECANCELED);
480 assert_null(state->job3->req);
481 assert_false(state->job3->started);
484 * Now job1 is canceled while it's running,
485 * this should let it stop it's loop.
487 ok = tevent_req_cancel(state->job1->req);
488 assert_false(ok);
491 * Job 1 completes, It got at least one sleep
492 * timeout loop and has state->job1->canceled set.
494 test_cancel_job_wait(state->job1, ev);
495 assert_int_equal(state->job1->ret, 0);
496 assert_null(state->job1->req);
497 assert_true(state->job1->started);
498 assert_true(state->job1->finished);
499 assert_true(state->job1->canceled);
500 assert_false(state->job1->orphaned);
501 assert_in_range(state->job1->polls, 1, 100);
502 assert_int_equal(state->job1->timeouts, state->job1->polls);
505 * Now we create jobs 4 and 5
506 * Both should execute.
507 * Job 4 is orphaned while running by a TALLOC_FREE()
508 * This should stop job 4 and let job 5 start.
509 * We do a "normal" exit in job 5 by creating some activity
510 * on the socketpair.
513 state->job4 = test_cancel_job_create(state);
514 assert_non_null(state->job4);
516 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
517 assert_int_equal(ret, 0);
519 state->job4->fdm = fdpair[0];
520 state->job4->fdj = fdpair[1];
522 state->job4->req = pthreadpool_tevent_job_send(
523 state->job4, ev, pool, test_cancel_job_fn, state->job4);
524 assert_non_null(state->job4->req);
525 tevent_req_set_callback(state->job4->req,
526 test_cancel_job_done,
527 state->job4);
529 state->job5 = test_cancel_job_create(state);
530 assert_non_null(state->job5);
532 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
533 assert_int_equal(ret, 0);
535 state->job5->fdm = fdpair[0];
536 state->job5->fdj = fdpair[1];
538 state->job5->req = pthreadpool_tevent_job_send(
539 state->job5, ev, pool, test_cancel_job_fn, state->job5);
540 assert_non_null(state->job5->req);
541 tevent_req_set_callback(state->job5->req,
542 test_cancel_job_done,
543 state->job5);
546 * Make sure job 5 can exit as soon as possible.
547 * It will never get a sleep/poll timeout.
549 ret = write(state->job5->fdm, &c, 1);
550 assert_int_equal(ret, 1);
553 * Wait for the job 4 to start
555 ret = read(state->job4->fdm, &c, 1);
556 assert_int_equal(ret, 1);
558 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
561 * destroy the request so that it's marked
562 * as orphaned.
564 TALLOC_FREE(state->job4->req);
567 * Job 5 completes, It got no sleep timeout loop.
569 test_cancel_job_wait(state->job5, ev);
570 assert_int_equal(state->job5->ret, 0);
571 assert_null(state->job5->req);
572 assert_true(state->job5->started);
573 assert_true(state->job5->finished);
574 assert_false(state->job5->canceled);
575 assert_false(state->job5->orphaned);
576 assert_int_equal(state->job5->polls, 1);
577 assert_int_equal(state->job5->timeouts, 0);
579 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
582 * Job 2 is still not executed as we did a TALLOC_FREE()
583 * before is was scheduled.
585 assert_false(state->job2->completed);
586 assert_false(state->job2->started);
589 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
590 * while it is was running. but it was started and has
591 * orphaned set
593 assert_false(state->job4->completed);
594 assert_true(state->job4->started);
595 assert_true(state->job4->finished);
596 assert_false(state->job4->canceled);
597 assert_true(state->job4->orphaned);
598 assert_in_range(state->job4->polls, 1, 100);
599 assert_int_equal(state->job4->timeouts, state->job4->polls);
602 * Now we create jobs 6
603 * We destroy the pool while it's executing.
606 state->job6 = test_cancel_job_create(state);
607 assert_non_null(state->job6);
609 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
610 assert_int_equal(ret, 0);
612 state->job6->fdm = fdpair[0];
613 state->job6->fdj = fdpair[1];
615 state->job6->req = pthreadpool_tevent_job_send(
616 state->job6, ev, pool, test_cancel_job_fn, state->job6);
617 assert_non_null(state->job6->req);
618 tevent_req_set_callback(state->job6->req,
619 test_cancel_job_done,
620 state->job6);
623 * Wait for the job 6 to start
625 ret = read(state->job6->fdm, &c, 1);
626 assert_int_equal(ret, 1);
628 assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
631 * destroy the request so that it's marked
632 * as orphaned.
634 pool = NULL;
635 TALLOC_FREE(t->opool);
638 * Wait until the job finished.
640 ret = read(state->job6->fdm, &c, 1);
641 assert_int_equal(ret, 0);
644 * Job 6 is still dangling arround.
646 * We need to convince valgrind --tool={drd,helgrind}
647 * that the read above is good enough to be
648 * sure the job is finished and closed the other end of
649 * the socketpair.
651 ANNOTATE_BENIGN_RACE_SIZED(state->job6,
652 sizeof(*state->job6),
653 "protected by thread fence");
654 assert_non_null(state->job6->req);
655 assert_true(tevent_req_is_in_progress(state->job6->req));
656 assert_false(state->job6->completed);
657 assert_true(state->job6->started);
658 assert_true(state->job6->finished);
659 assert_false(state->job6->canceled);
660 assert_true(state->job6->orphaned);
661 assert_in_range(state->job6->polls, 1, 100);
662 assert_int_equal(state->job6->timeouts, state->job4->polls);
664 TALLOC_FREE(state);
667 int main(int argc, char **argv)
669 const struct CMUnitTest tests[] = {
670 cmocka_unit_test_setup_teardown(test_create,
671 setup_pthreadpool_tevent,
672 teardown_pthreadpool_tevent),
673 cmocka_unit_test_setup_teardown(test_cancel_job,
674 setup_pthreadpool_tevent,
675 teardown_pthreadpool_tevent),
678 cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
680 return cmocka_run_group_tests(tests, NULL, NULL);