2 * Unix SMB/CIFS implementation.
3 * cmocka tests for thread pool implementation
4 * Copyright (C) Christof Schmitt 2017
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
28 #include <sys/types.h>
29 #include <sys/socket.h>
33 #include <pthreadpool_tevent.h>
38 #ifdef HAVE_VALGRIND_HELGRIND_H
39 #include <valgrind/helgrind.h>
41 #ifndef ANNOTATE_BENIGN_RACE_SIZED
42 #define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
45 struct pthreadpool_tevent_test
{
46 struct tevent_context
*ev
;
47 struct pthreadpool_tevent
*upool
;
48 struct pthreadpool_tevent
*spool
;
49 struct pthreadpool_tevent
*opool
;
52 static int setup_pthreadpool_tevent(void **state
)
54 struct pthreadpool_tevent_test
*t
;
58 t
= talloc_zero(NULL
, struct pthreadpool_tevent_test
);
61 t
->ev
= tevent_context_init(t
);
62 assert_non_null(t
->ev
);
64 ret
= pthreadpool_tevent_init(t
->ev
, UINT_MAX
, &t
->upool
);
65 assert_int_equal(ret
, 0);
67 max_threads
= pthreadpool_tevent_max_threads(t
->upool
);
68 assert_int_equal(max_threads
, UINT_MAX
);
70 ret
= pthreadpool_tevent_init(t
->ev
, 1, &t
->opool
);
71 assert_int_equal(ret
, 0);
73 max_threads
= pthreadpool_tevent_max_threads(t
->opool
);
74 assert_int_equal(max_threads
, 1);
76 ret
= pthreadpool_tevent_init(t
->ev
, 0, &t
->spool
);
77 assert_int_equal(ret
, 0);
79 max_threads
= pthreadpool_tevent_max_threads(t
->spool
);
80 assert_int_equal(max_threads
, 0);
87 static int teardown_pthreadpool_tevent(void **state
)
89 struct pthreadpool_tevent_test
*t
= *state
;
96 int __wrap_pthread_create(pthread_t
*thread
, const pthread_attr_t
*attr
,
97 void *(*start_routine
) (void *), void *arg
);
98 int __real_pthread_create(pthread_t
*thread
, const pthread_attr_t
*attr
,
99 void *(*start_routine
) (void *), void *arg
);
101 int __wrap_pthread_create(pthread_t
*thread
, const pthread_attr_t
*attr
,
102 void *(*start_routine
) (void *), void *arg
)
106 error
= mock_type(int);
111 return __real_pthread_create(thread
, attr
, start_routine
, arg
);
114 static void test_job_threadid(void *ptr
)
116 pthread_t
*threadid
= ptr
;
118 *threadid
= pthread_self();
121 static int test_create_do(struct tevent_context
*ev
,
122 struct pthreadpool_tevent
*pool
,
124 bool *in_main_thread
)
126 struct tevent_req
*req
;
127 pthread_t zero_thread
;
128 pthread_t main_thread
;
129 pthread_t worker_thread
;
134 *in_main_thread
= false;
136 memset(&zero_thread
, 0, sizeof(zero_thread
));
137 main_thread
= pthread_self();
138 worker_thread
= zero_thread
;
140 req
= pthreadpool_tevent_job_send(
141 ev
, ev
, pool
, test_job_threadid
, &worker_thread
);
143 fprintf(stderr
, "pthreadpool_tevent_job_send failed\n");
147 ok
= tevent_req_poll(req
, ev
);
150 fprintf(stderr
, "tevent_req_poll failed: %s\n",
152 *executed
= !pthread_equal(worker_thread
, zero_thread
);
153 *in_main_thread
= pthread_equal(worker_thread
, main_thread
);
158 ret
= pthreadpool_tevent_job_recv(req
);
160 *executed
= !pthread_equal(worker_thread
, zero_thread
);
161 *in_main_thread
= pthread_equal(worker_thread
, main_thread
);
163 fprintf(stderr
, "tevent_req_recv failed: %s\n",
171 static void test_create(void **state
)
173 struct pthreadpool_tevent_test
*t
= *state
;
179 * When pthreadpool cannot create the first worker thread,
180 * this job will run in the sync fallback in the main thread.
182 will_return(__wrap_pthread_create
, EAGAIN
);
183 ret
= test_create_do(t
->ev
, t
->upool
, &executed
, &in_main_thread
);
184 assert_int_equal(ret
, EAGAIN
);
185 assert_false(executed
);
186 assert_false(in_main_thread
);
189 * The sync pool won't trigger pthread_create()
190 * It will be triggered by the one pool.
192 will_return(__wrap_pthread_create
, EAGAIN
);
194 ret
= test_create_do(t
->ev
, t
->spool
, &executed
, &in_main_thread
);
195 assert_int_equal(ret
, 0);
196 assert_true(executed
);
197 assert_true(in_main_thread
);
199 ret
= test_create_do(t
->ev
, t
->opool
, &executed
, &in_main_thread
);
200 assert_int_equal(ret
, EAGAIN
);
201 assert_false(executed
);
202 assert_false(in_main_thread
);
205 * When a thread can be created, the job will run in the worker thread.
207 will_return(__wrap_pthread_create
, 0);
208 ret
= test_create_do(t
->ev
, t
->upool
, &executed
, &in_main_thread
);
209 assert_int_equal(ret
, 0);
210 assert_true(executed
);
211 assert_false(in_main_thread
);
216 * Workerthread will still be active for a second; immediately
217 * running another job will also use the worker thread, even
218 * if a new thread cannot be created.
220 ret
= test_create_do(t
->ev
, t
->upool
, &executed
, &in_main_thread
);
221 assert_int_equal(ret
, 0);
222 assert_true(executed
);
223 assert_false(in_main_thread
);
226 * When a thread can be created, the job will run in the worker thread.
228 will_return(__wrap_pthread_create
, 0);
229 ret
= test_create_do(t
->ev
, t
->opool
, &executed
, &in_main_thread
);
230 assert_int_equal(ret
, 0);
231 assert_true(executed
);
232 assert_false(in_main_thread
);
237 * Workerthread will still be active for a second; immediately
238 * running another job will also use the worker thread, even
239 * if a new thread cannot be created.
241 ret
= test_create_do(t
->ev
, t
->opool
, &executed
, &in_main_thread
);
242 assert_int_equal(ret
, 0);
243 assert_true(executed
);
244 assert_false(in_main_thread
);
247 struct test_cancel_job
{
248 int fdm
; /* the main end of socketpair */
249 int fdj
; /* the job end of socketpair */
257 struct tevent_req
*req
;
262 static void test_cancel_job_done(struct tevent_req
*req
);
264 static int test_cancel_job_destructor(struct test_cancel_job
*job
)
266 ANNOTATE_BENIGN_RACE_SIZED(&job
->started
,
267 sizeof(job
->started
),
268 "protected by pthreadpool_tevent code");
270 ANNOTATE_BENIGN_RACE_SIZED(&job
->finished
,
271 sizeof(job
->finished
),
272 "protected by pthreadpool_tevent code");
273 assert_true(job
->finished
);
276 ANNOTATE_BENIGN_RACE_SIZED(&job
->fdj
,
278 "protected by pthreadpool_tevent code");
280 if (job
->fdm
!= -1) {
284 if (job
->fdj
!= -1) {
292 static struct test_cancel_job
*test_cancel_job_create(TALLOC_CTX
*mem_ctx
)
294 struct test_cancel_job
*job
= NULL
;
296 job
= talloc(mem_ctx
, struct test_cancel_job
);
300 *job
= (struct test_cancel_job
) {
306 talloc_set_destructor(job
, test_cancel_job_destructor
);
310 static void test_cancel_job_fn(void *ptr
)
312 struct test_cancel_job
*job
= (struct test_cancel_job
*)ptr
;
317 assert_non_null(job
); /* make sure we abort without a job pointer */
323 if (!pthreadpool_tevent_current_job_continue()) {
324 job
->canceled
= pthreadpool_tevent_current_job_canceled();
325 job
->orphaned
= pthreadpool_tevent_current_job_orphaned();
326 job
->finished
= true;
332 * Notify that we main thread
334 * write of 1 byte should always work!
336 ret
= write(fdj
, &c
, 1);
337 assert_int_equal(ret
, 1);
340 * loop until the job was tried to
341 * be canceled or becomes orphaned.
343 * If there's some activity on the fd
344 * we directly finish.
347 struct pollfd pfd
= {
354 ret
= poll(&pfd
, 1, job
->sleep_msec
);
356 job
->finished
= true;
360 assert_int_equal(ret
, 0);
364 } while (pthreadpool_tevent_current_job_continue());
366 job
->canceled
= pthreadpool_tevent_current_job_canceled();
367 job
->orphaned
= pthreadpool_tevent_current_job_orphaned();
368 job
->finished
= true;
372 static void test_cancel_job_done(struct tevent_req
*req
)
374 struct test_cancel_job
*job
=
375 tevent_req_callback_data(req
,
376 struct test_cancel_job
);
378 job
->ret
= pthreadpool_tevent_job_recv(job
->req
);
379 TALLOC_FREE(job
->req
);
380 job
->completed
= true;
383 static void test_cancel_job_wait(struct test_cancel_job
*job
,
384 struct tevent_context
*ev
)
387 * We have to keep looping until
388 * test_cancel_job_done was triggered
390 while (!job
->completed
) {
393 ret
= tevent_loop_once(ev
);
394 assert_int_equal(ret
, 0);
398 struct test_cancel_state
{
399 struct test_cancel_job
*job1
;
400 struct test_cancel_job
*job2
;
401 struct test_cancel_job
*job3
;
402 struct test_cancel_job
*job4
;
403 struct test_cancel_job
*job5
;
404 struct test_cancel_job
*job6
;
407 static void test_cancel_job(void **private_data
)
409 struct pthreadpool_tevent_test
*t
= *private_data
;
410 struct tevent_context
*ev
= t
->ev
;
411 struct pthreadpool_tevent
*pool
= t
->opool
;
412 struct test_cancel_state
*state
= NULL
;
415 int fdpair
[2] = { -1, -1 };
418 state
= talloc_zero(t
, struct test_cancel_state
);
419 assert_non_null(state
);
420 state
->job1
= test_cancel_job_create(state
);
421 assert_non_null(state
->job1
);
422 state
->job2
= test_cancel_job_create(state
);
423 assert_non_null(state
->job2
);
424 state
->job3
= test_cancel_job_create(state
);
425 assert_non_null(state
->job3
);
427 ret
= socketpair(AF_UNIX
, SOCK_STREAM
, 0, fdpair
);
428 assert_int_equal(ret
, 0);
430 state
->job1
->fdm
= fdpair
[0];
431 state
->job1
->fdj
= fdpair
[1];
433 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 0);
435 will_return(__wrap_pthread_create
, 0);
436 state
->job1
->req
= pthreadpool_tevent_job_send(
437 state
->job1
, ev
, pool
, test_cancel_job_fn
, state
->job1
);
438 assert_non_null(state
->job1
->req
);
439 tevent_req_set_callback(state
->job1
->req
,
440 test_cancel_job_done
,
443 state
->job2
->req
= pthreadpool_tevent_job_send(
444 state
->job2
, ev
, pool
, test_cancel_job_fn
, NULL
);
445 assert_non_null(state
->job2
->req
);
446 tevent_req_set_callback(state
->job2
->req
,
447 test_cancel_job_done
,
450 state
->job3
->req
= pthreadpool_tevent_job_send(
451 state
->job3
, ev
, pool
, test_cancel_job_fn
, NULL
);
452 assert_non_null(state
->job3
->req
);
453 tevent_req_set_callback(state
->job3
->req
,
454 test_cancel_job_done
,
458 * Wait for the job 1 to start.
460 ret
= read(state
->job1
->fdm
, &c
, 1);
461 assert_int_equal(ret
, 1);
464 * We cancel job 3 and destroy job2.
465 * Both should never be executed.
467 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 2);
468 TALLOC_FREE(state
->job2
->req
);
469 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 1);
470 ok
= tevent_req_cancel(state
->job3
->req
);
472 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 0);
475 * Job 3 should complete as canceled, while
476 * job 1 is still running.
478 test_cancel_job_wait(state
->job3
, ev
);
479 assert_int_equal(state
->job3
->ret
, ECANCELED
);
480 assert_null(state
->job3
->req
);
481 assert_false(state
->job3
->started
);
484 * Now job1 is canceled while it's running,
485 * this should let it stop it's loop.
487 ok
= tevent_req_cancel(state
->job1
->req
);
491 * Job 1 completes, It got at least one sleep
492 * timeout loop and has state->job1->canceled set.
494 test_cancel_job_wait(state
->job1
, ev
);
495 assert_int_equal(state
->job1
->ret
, 0);
496 assert_null(state
->job1
->req
);
497 assert_true(state
->job1
->started
);
498 assert_true(state
->job1
->finished
);
499 assert_true(state
->job1
->canceled
);
500 assert_false(state
->job1
->orphaned
);
501 assert_in_range(state
->job1
->polls
, 1, 100);
502 assert_int_equal(state
->job1
->timeouts
, state
->job1
->polls
);
505 * Now we create jobs 4 and 5
506 * Both should execute.
507 * Job 4 is orphaned while running by a TALLOC_FREE()
508 * This should stop job 4 and let job 5 start.
509 * We do a "normal" exit in job 5 by creating some activity
513 state
->job4
= test_cancel_job_create(state
);
514 assert_non_null(state
->job4
);
516 ret
= socketpair(AF_UNIX
, SOCK_STREAM
, 0, fdpair
);
517 assert_int_equal(ret
, 0);
519 state
->job4
->fdm
= fdpair
[0];
520 state
->job4
->fdj
= fdpair
[1];
522 state
->job4
->req
= pthreadpool_tevent_job_send(
523 state
->job4
, ev
, pool
, test_cancel_job_fn
, state
->job4
);
524 assert_non_null(state
->job4
->req
);
525 tevent_req_set_callback(state
->job4
->req
,
526 test_cancel_job_done
,
529 state
->job5
= test_cancel_job_create(state
);
530 assert_non_null(state
->job5
);
532 ret
= socketpair(AF_UNIX
, SOCK_STREAM
, 0, fdpair
);
533 assert_int_equal(ret
, 0);
535 state
->job5
->fdm
= fdpair
[0];
536 state
->job5
->fdj
= fdpair
[1];
538 state
->job5
->req
= pthreadpool_tevent_job_send(
539 state
->job5
, ev
, pool
, test_cancel_job_fn
, state
->job5
);
540 assert_non_null(state
->job5
->req
);
541 tevent_req_set_callback(state
->job5
->req
,
542 test_cancel_job_done
,
546 * Make sure job 5 can exit as soon as possible.
547 * It will never get a sleep/poll timeout.
549 ret
= write(state
->job5
->fdm
, &c
, 1);
550 assert_int_equal(ret
, 1);
553 * Wait for the job 4 to start
555 ret
= read(state
->job4
->fdm
, &c
, 1);
556 assert_int_equal(ret
, 1);
558 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 1);
561 * destroy the request so that it's marked
564 TALLOC_FREE(state
->job4
->req
);
567 * Job 5 completes, It got no sleep timeout loop.
569 test_cancel_job_wait(state
->job5
, ev
);
570 assert_int_equal(state
->job5
->ret
, 0);
571 assert_null(state
->job5
->req
);
572 assert_true(state
->job5
->started
);
573 assert_true(state
->job5
->finished
);
574 assert_false(state
->job5
->canceled
);
575 assert_false(state
->job5
->orphaned
);
576 assert_int_equal(state
->job5
->polls
, 1);
577 assert_int_equal(state
->job5
->timeouts
, 0);
579 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 0);
582 * Job 2 is still not executed as we did a TALLOC_FREE()
583 * before is was scheduled.
585 assert_false(state
->job2
->completed
);
586 assert_false(state
->job2
->started
);
589 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
590 * while it is was running. but it was started and has
593 assert_false(state
->job4
->completed
);
594 assert_true(state
->job4
->started
);
595 assert_true(state
->job4
->finished
);
596 assert_false(state
->job4
->canceled
);
597 assert_true(state
->job4
->orphaned
);
598 assert_in_range(state
->job4
->polls
, 1, 100);
599 assert_int_equal(state
->job4
->timeouts
, state
->job4
->polls
);
602 * Now we create jobs 6
603 * We destroy the pool while it's executing.
606 state
->job6
= test_cancel_job_create(state
);
607 assert_non_null(state
->job6
);
609 ret
= socketpair(AF_UNIX
, SOCK_STREAM
, 0, fdpair
);
610 assert_int_equal(ret
, 0);
612 state
->job6
->fdm
= fdpair
[0];
613 state
->job6
->fdj
= fdpair
[1];
615 state
->job6
->req
= pthreadpool_tevent_job_send(
616 state
->job6
, ev
, pool
, test_cancel_job_fn
, state
->job6
);
617 assert_non_null(state
->job6
->req
);
618 tevent_req_set_callback(state
->job6
->req
,
619 test_cancel_job_done
,
623 * Wait for the job 6 to start
625 ret
= read(state
->job6
->fdm
, &c
, 1);
626 assert_int_equal(ret
, 1);
628 assert_int_equal(pthreadpool_tevent_queued_jobs(pool
), 0);
631 * destroy the request so that it's marked
635 TALLOC_FREE(t
->opool
);
638 * Wait until the job finished.
640 ret
= read(state
->job6
->fdm
, &c
, 1);
641 assert_int_equal(ret
, 0);
644 * Job 6 is still dangling arround.
646 * We need to convince valgrind --tool={drd,helgrind}
647 * that the read above is good enough to be
648 * sure the job is finished and closed the other end of
651 ANNOTATE_BENIGN_RACE_SIZED(state
->job6
,
652 sizeof(*state
->job6
),
653 "protected by thread fence");
654 assert_non_null(state
->job6
->req
);
655 assert_true(tevent_req_is_in_progress(state
->job6
->req
));
656 assert_false(state
->job6
->completed
);
657 assert_true(state
->job6
->started
);
658 assert_true(state
->job6
->finished
);
659 assert_false(state
->job6
->canceled
);
660 assert_true(state
->job6
->orphaned
);
661 assert_in_range(state
->job6
->polls
, 1, 100);
662 assert_int_equal(state
->job6
->timeouts
, state
->job4
->polls
);
667 int main(int argc
, char **argv
)
669 const struct CMUnitTest tests
[] = {
670 cmocka_unit_test_setup_teardown(test_create
,
671 setup_pthreadpool_tevent
,
672 teardown_pthreadpool_tevent
),
673 cmocka_unit_test_setup_teardown(test_cancel_job
,
674 setup_pthreadpool_tevent
,
675 teardown_pthreadpool_tevent
),
678 cmocka_set_message_output(CM_OUTPUT_SUBUNIT
);
680 return cmocka_run_group_tests(tests
, NULL
, NULL
);