10 #include <sys/types.h>
11 #include <sys/socket.h>
12 #include <sys/select.h>
15 #include <netinet/in.h>
16 #include <netinet/ip.h>
17 #include <netinet/tcp.h>
18 #include <arpa/inet.h>
23 static int srvpid
, size
;
25 // Global timeout set for reading response in tests; 5sec.
26 static int64 timeout
= 5000000000LL;
28 // Allocation pattern for wrapfalloc that replaces falloc in tests.
29 // Zero value at N-th element means that N-th call to the falloc
30 // should fail with ENOSPC result.
31 static byte fallocpat
[3];
39 int r
= stat(path
, &s
);
44 wrapfalloc(int fd
, int size
)
48 printf("\nwrapfalloc: fd=%d size=%d\n", fd
, size
);
49 if (c
>= sizeof(fallocpat
) || !fallocpat
[c
++]) {
52 return rawfalloc(fd
, size
);
56 muststart(char *a0
, char *a1
, char *a2
, char *a3
, char *a4
)
65 printf("%s %s %s %s %s\n", a0
, a1
, a2
, a3
, a4
);
66 printf("start server pid=%d\n", srvpid
);
67 usleep(100000); // .1s; time for the child to bind to its port
73 execlp(a0
, a0
, a1
, a2
, a3
, a4
, NULL
);
77 mustdiallocal(int port
)
79 struct sockaddr_in addr
= {
80 .sin_family
= AF_INET
,
81 .sin_port
= htons(port
),
84 int r
= inet_aton("127.0.0.1", &addr
.sin_addr
);
91 int fd
= socket(AF_INET
, SOCK_STREAM
, 0);
97 // Fix of the benchmarking issue on Linux. See issue #430.
99 if (setsockopt(fd
, IPPROTO_TCP
, TCP_NODELAY
, &flags
, sizeof(int))) {
100 twarn("setting TCP_NODELAY on fd %d", fd
);
104 r
= connect(fd
, (struct sockaddr
*)&addr
, sizeof addr
);
114 mustdialunix(char *socket_file
)
116 struct sockaddr_un addr
;
117 const size_t maxlen
= sizeof(addr
.sun_path
);
118 addr
.sun_family
= AF_UNIX
;
119 snprintf(addr
.sun_path
, maxlen
, "%s", socket_file
);
121 int fd
= socket(AF_UNIX
, SOCK_STREAM
, 0);
127 int r
= connect(fd
, (struct sockaddr
*)&addr
, sizeof addr
);
137 exit_process(int signum
)
139 UNUSED_PARAMETER(signum
);
149 int r
= sigemptyset(&sa
.sa_mask
);
151 twarn("sigemptyset()");
155 // This is required to trigger gcov on exit. See issue #443.
156 sa
.sa_handler
= exit_process
;
157 r
= sigaction(SIGTERM
, &sa
, 0);
159 twarn("sigaction(SIGTERM)");
164 // Kill the srvpid (child process) with SIGTERM to give it a chance
165 // to write gcov data to the filesystem before ct kills it with SIGKILL.
166 // Do nothing in case of srvpid==0; child was already killed.
172 kill(srvpid
, SIGTERM
);
173 waitpid(srvpid
, 0, 0);
177 #define SERVER() (progname=__func__, mustforksrv())
178 #define SERVER_UNIX() (progname=__func__, mustforksrv_unix())
180 // Forks the server storing the pid in srvpid.
181 // The parent process returns port assigned.
182 // The child process serves until the SIGTERM is received by it.
186 struct sockaddr_in addr
;
188 srv
.sock
.fd
= make_server_socket("127.0.0.1", "0");
189 if (srv
.sock
.fd
== -1) {
190 puts("mustforksrv failed");
194 socklen_t len
= sizeof(addr
);
195 int r
= getsockname(srv
.sock
.fd
, (struct sockaddr
*)&addr
, &len
);
196 if (r
== -1 || len
> sizeof(addr
)) {
197 puts("mustforksrv failed");
201 int port
= ntohs(addr
.sin_port
);
209 // On exit the parent (test) sends SIGTERM to the child.
211 printf("start server port=%d pid=%d\n", port
, srvpid
);
220 srv_acquire_wal(&srv
);
222 srvserve(&srv
); /* does not return */
223 exit(1); /* satisfy the compiler */
227 mustforksrv_unix(void)
229 static char path
[90];
231 snprintf(path
, sizeof(path
), "%s/socket", ctdir());
232 snprintf(name
, sizeof(name
), "unix:%s", path
);
233 srv
.sock
.fd
= make_server_socket(name
, NULL
);
234 if (srv
.sock
.fd
== -1) {
235 puts("mustforksrv_unix failed");
246 // On exit the parent (test) sends SIGTERM to the child.
248 printf("start server socket=%s\n", path
);
258 srv_acquire_wal(&srv
);
260 srvserve(&srv
); /* does not return */
261 exit(1); /* satisfy the compiler */
268 static char buf
[1024];
279 tv
.tv_sec
= timeout
/ 1000000000;
280 tv
.tv_usec
= (timeout
/1000) % 1000000;
281 int r
= select(fd
+1, &rfd
, NULL
, NULL
, &tv
);
286 fputs("timeout", stderr
);
292 fputs("unknown error", stderr
);
296 // TODO: try reading into a buffer to improve performance.
297 // See related issue #430.
303 if (i
>= sizeof(buf
)-1) {
304 fputs("response too big", stderr
);
310 if (p
== '\r' && c
== '\n') {
320 ckresp(int fd
, char *exp
)
322 char *line
= readline(fd
);
323 assertf(strcmp(exp
, line
) == 0, "\"%s\" != \"%s\"", exp
, line
);
327 ckrespsub(int fd
, char *sub
)
329 char *line
= readline(fd
);
330 assertf(strstr(line
, sub
), "\"%s\" not in \"%s\"", sub
, line
);
334 writefull(int fd
, char *s
, int n
)
348 mustsend(int fd
, char *s
)
350 writefull(fd
, s
, strlen(s
));
351 printf(">%d %s", fd
, s
);
360 int r
= stat(path
, &s
);
369 cttest_unknown_command()
372 int fd
= mustdiallocal(port
);
373 mustsend(fd
, "nont10knowncommand\r\n");
374 ckresp(fd
, "UNKNOWN_COMMAND\r\n");
378 cttest_too_long_commandline()
381 int fd
= mustdiallocal(port
);
383 for (i
= 0; i
< 10; i
++)
384 mustsend(fd
, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 50 bytes
385 mustsend(fd
, "\r\n");
386 ckresp(fd
, "BAD_FORMAT\r\n");
387 // Issue another command and check that reponse is not "UNKNOWN_COMMAND"
388 // as described in issue #337
389 mustsend(fd
, "put 0 0 1 1\r\n");
390 mustsend(fd
, "A\r\n");
391 ckresp(fd
, "INSERTED 1\r\n");
395 cttest_put_in_drain()
397 enter_drain_mode(SIGUSR1
);
399 int fd
= mustdiallocal(port
);
400 mustsend(fd
, "put 0 0 1 1\r\n");
401 mustsend(fd
, "x\r\n");
402 ckresp(fd
, "DRAINING\r\n");
409 int fd
= mustdiallocal(port
);
410 mustsend(fd
, "put 0 0 1 1\r\n");
411 mustsend(fd
, "a\r\n");
412 ckresp(fd
, "INSERTED 1\r\n");
414 mustsend(fd
, "peek 1\r\n");
415 ckresp(fd
, "FOUND 1 1\r\n");
420 cttest_peek_not_found()
423 int fd
= mustdiallocal(port
);
424 mustsend(fd
, "put 0 0 1 1\r\n");
425 mustsend(fd
, "a\r\n");
426 ckresp(fd
, "INSERTED 1\r\n");
428 mustsend(fd
, "peek 2\r\n");
429 ckresp(fd
, "NOT_FOUND\r\n");
430 mustsend(fd
, "peek 18446744073709551615\r\n"); // UINT64_MAX
431 ckresp(fd
, "NOT_FOUND\r\n");
435 cttest_peek_ok_unix()
437 char *name
= SERVER_UNIX();
438 int fd
= mustdialunix(name
);
439 mustsend(fd
, "put 0 0 1 1\r\n");
440 mustsend(fd
, "a\r\n");
441 ckresp(fd
, "INSERTED 1\r\n");
443 mustsend(fd
, "peek 1\r\n");
444 ckresp(fd
, "FOUND 1 1\r\n");
451 cttest_unix_auto_removal()
453 // Twice, to trigger autoremoval
460 cttest_peek_bad_format()
463 int fd
= mustdiallocal(port
);
464 mustsend(fd
, "peek 18446744073709551616\r\n"); // UINT64_MAX+1
465 ckresp(fd
, "BAD_FORMAT\r\n");
466 mustsend(fd
, "peek 184467440737095516160000000000000000000000000000\r\n");
467 ckresp(fd
, "BAD_FORMAT\r\n");
468 mustsend(fd
, "peek foo111\r\n");
469 ckresp(fd
, "BAD_FORMAT\r\n");
470 mustsend(fd
, "peek 111foo\r\n");
471 ckresp(fd
, "BAD_FORMAT\r\n");
475 cttest_peek_delayed()
478 int fd
= mustdiallocal(port
);
479 mustsend(fd
, "peek-delayed\r\n");
480 ckresp(fd
, "NOT_FOUND\r\n");
482 mustsend(fd
, "put 0 0 1 1\r\n");
483 mustsend(fd
, "A\r\n");
484 ckresp(fd
, "INSERTED 1\r\n");
485 mustsend(fd
, "put 0 99 1 1\r\n");
486 mustsend(fd
, "B\r\n");
487 ckresp(fd
, "INSERTED 2\r\n");
488 mustsend(fd
, "put 0 1 1 1\r\n");
489 mustsend(fd
, "C\r\n");
490 ckresp(fd
, "INSERTED 3\r\n");
492 mustsend(fd
, "peek-delayed\r\n");
493 ckresp(fd
, "FOUND 3 1\r\n");
496 mustsend(fd
, "delete 3\r\n");
497 ckresp(fd
, "DELETED\r\n");
499 mustsend(fd
, "peek-delayed\r\n");
500 ckresp(fd
, "FOUND 2 1\r\n");
503 mustsend(fd
, "delete 2\r\n");
504 ckresp(fd
, "DELETED\r\n");
506 mustsend(fd
, "peek-delayed\r\n");
507 ckresp(fd
, "NOT_FOUND\r\n");
511 cttest_peek_buried_kick()
514 int fd
= mustdiallocal(port
);
515 mustsend(fd
, "put 0 0 1 1\r\n");
516 mustsend(fd
, "A\r\n");
517 ckresp(fd
, "INSERTED 1\r\n");
519 // cannot bury unreserved job
520 mustsend(fd
, "bury 1 0\r\n");
521 ckresp(fd
, "NOT_FOUND\r\n");
522 mustsend(fd
, "peek-buried\r\n");
523 ckresp(fd
, "NOT_FOUND\r\n");
525 mustsend(fd
, "reserve-with-timeout 0\r\n");
526 ckresp(fd
, "RESERVED 1 1\r\n");
530 mustsend(fd
, "bury 1 0\r\n");
531 ckresp(fd
, "BURIED\r\n");
532 mustsend(fd
, "peek-buried\r\n");
533 ckresp(fd
, "FOUND 1 1\r\n");
536 // kick and verify the job is ready
537 mustsend(fd
, "kick 1\r\n");
538 ckresp(fd
, "KICKED 1\r\n");
539 mustsend(fd
, "peek-buried\r\n");
540 ckresp(fd
, "NOT_FOUND\r\n");
541 mustsend(fd
, "peek-ready\r\n");
542 ckresp(fd
, "FOUND 1 1\r\n");
545 // nothing is left to kick
546 mustsend(fd
, "kick 1\r\n");
547 ckresp(fd
, "KICKED 0\r\n");
551 cttest_touch_bad_format()
554 int fd
= mustdiallocal(port
);
555 mustsend(fd
, "touch a111\r\n");
556 ckresp(fd
, "BAD_FORMAT\r\n");
557 mustsend(fd
, "touch 111a\r\n");
558 ckresp(fd
, "BAD_FORMAT\r\n");
559 mustsend(fd
, "touch !@#!@#\r\n");
560 ckresp(fd
, "BAD_FORMAT\r\n");
564 cttest_touch_not_found()
567 int fd
= mustdiallocal(port
);
568 mustsend(fd
, "touch 1\r\n");
569 ckresp(fd
, "NOT_FOUND\r\n");
570 mustsend(fd
, "touch 100000000000000\r\n");
571 ckresp(fd
, "NOT_FOUND\r\n");
575 cttest_bury_bad_format()
578 int fd
= mustdiallocal(port
);
579 mustsend(fd
, "bury 111abc 2\r\n");
580 ckresp(fd
, "BAD_FORMAT\r\n");
581 mustsend(fd
, "bury 111\r\n");
582 ckresp(fd
, "BAD_FORMAT\r\n");
583 mustsend(fd
, "bury 111 222abc\r\n");
584 ckresp(fd
, "BAD_FORMAT\r\n");
588 cttest_kickjob_bad_format()
591 int fd
= mustdiallocal(port
);
592 mustsend(fd
, "kick-job a111\r\n");
593 ckresp(fd
, "BAD_FORMAT\r\n");
594 mustsend(fd
, "kick-job 111a\r\n");
595 ckresp(fd
, "BAD_FORMAT\r\n");
596 mustsend(fd
, "kick-job !@#!@#\r\n");
597 ckresp(fd
, "BAD_FORMAT\r\n");
601 cttest_kickjob_buried()
604 int fd
= mustdiallocal(port
);
605 mustsend(fd
, "put 0 0 1 1\r\n");
606 mustsend(fd
, "A\r\n");
607 ckresp(fd
, "INSERTED 1\r\n");
609 mustsend(fd
, "reserve\r\n");
610 ckresp(fd
, "RESERVED 1 1\r\n");
612 mustsend(fd
, "bury 1 0\r\n");
613 ckresp(fd
, "BURIED\r\n");
615 mustsend(fd
, "kick-job 100\r\n");
616 ckresp(fd
, "NOT_FOUND\r\n");
617 mustsend(fd
, "kick-job 1\r\n");
618 ckresp(fd
, "KICKED\r\n");
619 mustsend(fd
, "kick-job 1\r\n");
620 ckresp(fd
, "NOT_FOUND\r\n");
624 cttest_kickjob_delayed()
627 int fd
= mustdiallocal(port
);
628 // jid=1 - no delay, jid=2 - delay
629 mustsend(fd
, "put 0 0 1 1\r\n");
630 mustsend(fd
, "A\r\n");
631 ckresp(fd
, "INSERTED 1\r\n");
632 mustsend(fd
, "put 0 10 1 1\r\n");
633 mustsend(fd
, "B\r\n");
634 ckresp(fd
, "INSERTED 2\r\n");
636 mustsend(fd
, "kick-job 1\r\n");
637 ckresp(fd
, "NOT_FOUND\r\n");
638 mustsend(fd
, "kick-job 2\r\n");
639 ckresp(fd
, "KICKED\r\n");
640 mustsend(fd
, "kick-job 2\r\n");
641 ckresp(fd
, "NOT_FOUND\r\n");
650 int fd
= mustdiallocal(port
);
651 mustsend(fd
, "put 0 0 0 1\r\n");
652 mustsend(fd
, "x\r\n");
653 ckresp(fd
, "INSERTED 1\r\n");
655 mustsend(fd
, "pause-tube default 1\r\n");
656 ckresp(fd
, "PAUSED\r\n");
657 mustsend(fd
, "reserve\r\n");
658 ckresp(fd
, "RESERVED 1 1\r\n");
660 assert(nanoseconds() - s
>= 1000000000); // 1s
667 int fd
= mustdiallocal(port
);
668 mustsend(fd
, "use x_y\r\n");
669 ckresp(fd
, "USING x_y\r\n");
676 int fd
= mustdiallocal(port
);
677 mustsend(fd
, "use a\r\nuse b\r\n");
678 ckresp(fd
, "USING a\r\n");
679 ckresp(fd
, "USING b\r\n");
685 job_data_size_limit
= 10;
687 int fd
= mustdiallocal(port
);
688 mustsend(fd
, "put 0 0 0 11\r\n");
689 mustsend(fd
, "delete 9999\r\n");
690 mustsend(fd
, "put 0 0 0 1\r\n");
691 mustsend(fd
, "x\r\n");
692 ckresp(fd
, "JOB_TOO_BIG\r\n");
693 ckresp(fd
, "INSERTED 1\r\n");
697 cttest_job_size_invalid()
699 job_data_size_limit
= JOB_DATA_SIZE_LIMIT_MAX
;
701 int fd
= mustdiallocal(port
);
702 mustsend(fd
, "put 0 0 0 4294967296\r\n");
703 mustsend(fd
, "put 0 0 0 10b\r\n");
704 mustsend(fd
, "put 0 0 0 --!@#$%^&&**()0b\r\n");
705 mustsend(fd
, "put 0 0 0 1\r\n");
706 mustsend(fd
, "x\r\n");
707 ckresp(fd
, "BAD_FORMAT\r\n");
708 ckresp(fd
, "BAD_FORMAT\r\n");
709 ckresp(fd
, "BAD_FORMAT\r\n");
710 ckresp(fd
, "INSERTED 1\r\n");
714 cttest_job_size_max_plus_1()
716 /* verify that server reject the job larger than maximum allowed. */
717 job_data_size_limit
= JOB_DATA_SIZE_LIMIT_MAX
;
719 int fd
= mustdiallocal(port
);
720 mustsend(fd
, "put 0 0 0 1073741825\r\n");
722 const int len
= 1024*1024;
724 memset(body
, 'a', len
);
728 for (i
=0; i
<JOB_DATA_SIZE_LIMIT_MAX
; i
+=len
) {
732 mustsend(fd
, "\r\n");
733 ckresp(fd
, "JOB_TOO_BIG\r\n");
737 cttest_delete_ready()
740 int fd
= mustdiallocal(port
);
741 mustsend(fd
, "put 0 0 0 0\r\n");
742 mustsend(fd
, "\r\n");
743 ckresp(fd
, "INSERTED 1\r\n");
744 mustsend(fd
, "delete 1\r\n");
745 ckresp(fd
, "DELETED\r\n");
749 cttest_delete_reserved_by_other()
752 int fd
= mustdiallocal(port
);
753 mustsend(fd
, "put 0 0 1 1\r\n");
754 mustsend(fd
, "a\r\n");
755 ckresp(fd
, "INSERTED 1\r\n");
757 int o
= mustdiallocal(port
);
758 mustsend(o
, "reserve\r\n");
759 ckresp(o
, "RESERVED 1 1\r\n");
762 mustsend(fd
, "delete 1\r\n");
763 ckresp(fd
, "NOT_FOUND\r\n");
767 cttest_delete_bad_format()
770 int fd
= mustdiallocal(port
);
771 mustsend(fd
, "delete 18446744073709551616\r\n"); // UINT64_MAX+1
772 ckresp(fd
, "BAD_FORMAT\r\n");
773 mustsend(fd
, "delete 184467440737095516160000000000000000000000000000\r\n");
774 ckresp(fd
, "BAD_FORMAT\r\n");
775 mustsend(fd
, "delete foo111\r\n");
776 ckresp(fd
, "BAD_FORMAT\r\n");
777 mustsend(fd
, "delete 111foo\r\n");
778 ckresp(fd
, "BAD_FORMAT\r\n");
785 int fd
= mustdiallocal(port
);
786 mustsend(fd
, "use abc\r\n");
787 ckresp(fd
, "USING abc\r\n");
788 mustsend(fd
, "put 999999 0 0 0\r\n");
789 mustsend(fd
, "\r\n");
790 ckresp(fd
, "INSERTED 1\r\n");
791 mustsend(fd
, "use def\r\n");
792 ckresp(fd
, "USING def\r\n");
793 mustsend(fd
, "put 99 0 0 0\r\n");
794 mustsend(fd
, "\r\n");
795 ckresp(fd
, "INSERTED 2\r\n");
796 mustsend(fd
, "watch abc\r\n");
797 ckresp(fd
, "WATCHING 2\r\n");
798 mustsend(fd
, "watch def\r\n");
799 ckresp(fd
, "WATCHING 3\r\n");
800 mustsend(fd
, "reserve\r\n");
801 ckresp(fd
, "RESERVED 2 0\r\n");
805 cttest_negative_delay()
808 int fd
= mustdiallocal(port
);
809 mustsend(fd
, "put 512 -1 100 0\r\n");
810 ckresp(fd
, "BAD_FORMAT\r\n");
813 /* TODO: add more edge cases tests for delay and ttr */
816 cttest_garbage_priority()
819 int fd
= mustdiallocal(port
);
820 mustsend(fd
, "put -1kkdj9djjkd9 0 100 1\r\n");
821 mustsend(fd
, "a\r\n");
822 ckresp(fd
, "BAD_FORMAT\r\n");
826 cttest_negative_priority()
829 int fd
= mustdiallocal(port
);
830 mustsend(fd
, "put -1 0 100 1\r\n");
831 mustsend(fd
, "a\r\n");
832 ckresp(fd
, "BAD_FORMAT\r\n");
836 cttest_max_priority()
839 int fd
= mustdiallocal(port
);
840 mustsend(fd
, "put 4294967295 0 100 1\r\n");
841 mustsend(fd
, "a\r\n");
842 ckresp(fd
, "INSERTED 1\r\n");
846 cttest_too_big_priority()
849 int fd
= mustdiallocal(port
);
850 mustsend(fd
, "put 4294967296 0 100 1\r\n");
851 mustsend(fd
, "a\r\n");
852 ckresp(fd
, "BAD_FORMAT\r\n");
856 cttest_omit_time_left()
859 int fd
= mustdiallocal(port
);
860 mustsend(fd
, "put 0 0 5 1\r\n");
861 mustsend(fd
, "a\r\n");
862 ckresp(fd
, "INSERTED 1\r\n");
863 mustsend(fd
, "stats-job 1\r\n");
864 ckrespsub(fd
, "OK ");
865 ckrespsub(fd
, "\ntime-left: 0\n");
872 int fd
= mustdiallocal(port
);
873 mustsend(fd
, "put 0 1 1 0\r\n");
874 mustsend(fd
, "\r\n");
875 ckresp(fd
, "INSERTED 1\r\n");
879 cttest_delayed_to_ready()
882 int fd
= mustdiallocal(port
);
883 mustsend(fd
, "put 0 1 1 0\r\n");
884 mustsend(fd
, "\r\n");
885 ckresp(fd
, "INSERTED 1\r\n");
887 mustsend(fd
, "stats-tube default\r\n");
888 ckrespsub(fd
, "OK ");
889 ckrespsub(fd
, "\ncurrent-jobs-ready: 0\n");
891 mustsend(fd
, "stats-tube default\r\n");
892 ckrespsub(fd
, "OK ");
893 ckrespsub(fd
, "\ncurrent-jobs-delayed: 1\n");
895 mustsend(fd
, "stats-tube default\r\n");
896 ckrespsub(fd
, "OK ");
897 ckrespsub(fd
, "\ntotal-jobs: 1\n");
899 usleep(1010000); // 1.01 sec
901 // check that after 1 sec the delayed job is ready again
903 mustsend(fd
, "stats-tube default\r\n");
904 ckrespsub(fd
, "OK ");
905 ckrespsub(fd
, "\ncurrent-jobs-ready: 1\n");
907 mustsend(fd
, "stats-tube default\r\n");
908 ckrespsub(fd
, "OK ");
909 ckrespsub(fd
, "\ncurrent-jobs-delayed: 0\n");
911 mustsend(fd
, "stats-tube default\r\n");
912 ckrespsub(fd
, "OK ");
913 ckrespsub(fd
, "\ntotal-jobs: 1\n");
917 cttest_statsjob_ck_format()
920 int fd
= mustdiallocal(port
);
921 mustsend(fd
, "stats-job 111ABC\r\n");
922 ckresp(fd
, "BAD_FORMAT\r\n");
923 mustsend(fd
, "stats-job 111 222\r\n");
924 ckresp(fd
, "BAD_FORMAT\r\n");
925 mustsend(fd
, "stats-job 111\r\n");
926 ckresp(fd
, "NOT_FOUND\r\n");
933 int fd
= mustdiallocal(port
);
934 mustsend(fd
, "use tubea\r\n");
935 ckresp(fd
, "USING tubea\r\n");
936 mustsend(fd
, "put 0 0 0 1\r\n");
937 mustsend(fd
, "x\r\n");
938 ckresp(fd
, "INSERTED 1\r\n");
939 mustsend(fd
, "delete 1\r\n");
940 ckresp(fd
, "DELETED\r\n");
942 mustsend(fd
, "stats-tube tubea\r\n");
943 ckrespsub(fd
, "OK ");
944 ckrespsub(fd
, "\nname: \"tubea\"\n");
945 mustsend(fd
, "stats-tube tubea\r\n");
946 ckrespsub(fd
, "OK ");
947 ckrespsub(fd
, "\ncurrent-jobs-urgent: 0\n");
948 mustsend(fd
, "stats-tube tubea\r\n");
949 ckrespsub(fd
, "OK ");
950 ckrespsub(fd
, "\ncurrent-jobs-ready: 0\n");
951 mustsend(fd
, "stats-tube tubea\r\n");
952 ckrespsub(fd
, "OK ");
953 ckrespsub(fd
, "\ncurrent-jobs-reserved: 0\n");
954 mustsend(fd
, "stats-tube tubea\r\n");
955 ckrespsub(fd
, "OK ");
956 ckrespsub(fd
, "\ncurrent-jobs-delayed: 0\n");
957 mustsend(fd
, "stats-tube tubea\r\n");
958 ckrespsub(fd
, "OK ");
959 ckrespsub(fd
, "\ncurrent-jobs-buried: 0\n");
960 mustsend(fd
, "stats-tube tubea\r\n");
961 ckrespsub(fd
, "OK ");
962 ckrespsub(fd
, "\ntotal-jobs: 1\n");
963 mustsend(fd
, "stats-tube tubea\r\n");
964 ckrespsub(fd
, "OK ");
965 ckrespsub(fd
, "\ncurrent-using: 1\n");
966 mustsend(fd
, "stats-tube tubea\r\n");
967 ckrespsub(fd
, "OK ");
968 ckrespsub(fd
, "\ncurrent-watching: 0\n");
969 mustsend(fd
, "stats-tube tubea\r\n");
970 ckrespsub(fd
, "OK ");
971 ckrespsub(fd
, "\ncurrent-waiting: 0\n");
972 mustsend(fd
, "stats-tube tubea\r\n");
973 ckrespsub(fd
, "OK ");
974 ckrespsub(fd
, "\ncmd-delete: 1\n");
975 mustsend(fd
, "stats-tube tubea\r\n");
976 ckrespsub(fd
, "OK ");
977 ckrespsub(fd
, "\ncmd-pause-tube: 0\n");
978 mustsend(fd
, "stats-tube tubea\r\n");
979 ckrespsub(fd
, "OK ");
980 ckrespsub(fd
, "\npause: 0\n");
981 mustsend(fd
, "stats-tube tubea\r\n");
982 ckrespsub(fd
, "OK ");
983 ckrespsub(fd
, "\npause-time-left: 0\n");
985 mustsend(fd
, "stats-tube default\r\n");
986 ckrespsub(fd
, "OK ");
987 ckrespsub(fd
, "\nname: \"default\"\n");
988 mustsend(fd
, "stats-tube default\r\n");
989 ckrespsub(fd
, "OK ");
990 ckrespsub(fd
, "\ncurrent-jobs-urgent: 0\n");
991 mustsend(fd
, "stats-tube default\r\n");
992 ckrespsub(fd
, "OK ");
993 ckrespsub(fd
, "\ncurrent-jobs-ready: 0\n");
994 mustsend(fd
, "stats-tube default\r\n");
995 ckrespsub(fd
, "OK ");
996 ckrespsub(fd
, "\ncurrent-jobs-reserved: 0\n");
997 mustsend(fd
, "stats-tube default\r\n");
998 ckrespsub(fd
, "OK ");
999 ckrespsub(fd
, "\ncurrent-jobs-delayed: 0\n");
1000 mustsend(fd
, "stats-tube default\r\n");
1001 ckrespsub(fd
, "OK ");
1002 ckrespsub(fd
, "\ncurrent-jobs-buried: 0\n");
1003 mustsend(fd
, "stats-tube default\r\n");
1004 ckrespsub(fd
, "OK ");
1005 ckrespsub(fd
, "\ntotal-jobs: 0\n");
1006 mustsend(fd
, "stats-tube default\r\n");
1007 ckrespsub(fd
, "OK ");
1008 ckrespsub(fd
, "\ncurrent-using: 0\n");
1009 mustsend(fd
, "stats-tube default\r\n");
1010 ckrespsub(fd
, "OK ");
1011 ckrespsub(fd
, "\ncurrent-watching: 1\n");
1012 mustsend(fd
, "stats-tube default\r\n");
1013 ckrespsub(fd
, "OK ");
1014 ckrespsub(fd
, "\ncurrent-waiting: 0\n");
1015 mustsend(fd
, "stats-tube default\r\n");
1016 ckrespsub(fd
, "OK ");
1017 ckrespsub(fd
, "\ncmd-delete: 0\n");
1018 mustsend(fd
, "stats-tube default\r\n");
1019 ckrespsub(fd
, "OK ");
1020 ckrespsub(fd
, "\ncmd-pause-tube: 0\n");
1021 mustsend(fd
, "stats-tube default\r\n");
1022 ckrespsub(fd
, "OK ");
1023 ckrespsub(fd
, "\npause: 0\n");
1024 mustsend(fd
, "stats-tube default\r\n");
1025 ckrespsub(fd
, "OK ");
1026 ckrespsub(fd
, "\npause-time-left: 0\n");
1032 int port
= SERVER();
1033 int fd
= mustdiallocal(port
);
1034 mustsend(fd
, "put 0 0 120 1\r\n");
1035 mustsend(fd
, "a\r\n");
1036 ckresp(fd
, "INSERTED 1\r\n");
1037 mustsend(fd
, "put 0 0 4294 1\r\n");
1038 mustsend(fd
, "a\r\n");
1039 ckresp(fd
, "INSERTED 2\r\n");
1040 mustsend(fd
, "put 0 0 4295 1\r\n");
1041 mustsend(fd
, "a\r\n");
1042 ckresp(fd
, "INSERTED 3\r\n");
1043 mustsend(fd
, "put 0 0 4296 1\r\n");
1044 mustsend(fd
, "a\r\n");
1045 ckresp(fd
, "INSERTED 4\r\n");
1046 mustsend(fd
, "put 0 0 4297 1\r\n");
1047 mustsend(fd
, "a\r\n");
1048 ckresp(fd
, "INSERTED 5\r\n");
1049 mustsend(fd
, "put 0 0 5000 1\r\n");
1050 mustsend(fd
, "a\r\n");
1051 ckresp(fd
, "INSERTED 6\r\n");
1052 mustsend(fd
, "put 0 0 21600 1\r\n");
1053 mustsend(fd
, "a\r\n");
1054 ckresp(fd
, "INSERTED 7\r\n");
1055 mustsend(fd
, "stats-job 1\r\n");
1056 ckrespsub(fd
, "OK ");
1057 ckrespsub(fd
, "\nttr: 120\n");
1058 mustsend(fd
, "stats-job 2\r\n");
1059 ckrespsub(fd
, "OK ");
1060 ckrespsub(fd
, "\nttr: 4294\n");
1061 mustsend(fd
, "stats-job 3\r\n");
1062 ckrespsub(fd
, "OK ");
1063 ckrespsub(fd
, "\nttr: 4295\n");
1064 mustsend(fd
, "stats-job 4\r\n");
1065 ckrespsub(fd
, "OK ");
1066 ckrespsub(fd
, "\nttr: 4296\n");
1067 mustsend(fd
, "stats-job 5\r\n");
1068 ckrespsub(fd
, "OK ");
1069 ckrespsub(fd
, "\nttr: 4297\n");
1070 mustsend(fd
, "stats-job 6\r\n");
1071 ckrespsub(fd
, "OK ");
1072 ckrespsub(fd
, "\nttr: 5000\n");
1073 mustsend(fd
, "stats-job 7\r\n");
1074 ckrespsub(fd
, "OK ");
1075 ckrespsub(fd
, "\nttr: 21600\n");
1081 int port
= SERVER();
1082 int fd
= mustdiallocal(port
);
1083 mustsend(fd
, "put 0 0 0 1\r\n");
1084 mustsend(fd
, "a\r\n");
1085 ckresp(fd
, "INSERTED 1\r\n");
1086 mustsend(fd
, "stats-job 1\r\n");
1087 ckrespsub(fd
, "OK ");
1088 ckrespsub(fd
, "\nttr: 1\n");
1094 int port
= SERVER();
1095 int fd
= mustdiallocal(port
);
1096 mustsend(fd
, "put 0 0 1 0\r\n");
1097 mustsend(fd
, "\r\n");
1098 ckresp(fd
, "INSERTED 1\r\n");
1102 cttest_reserve_with_timeout_2conns()
1106 job_data_size_limit
= 10;
1108 int port
= SERVER();
1109 fd0
= mustdiallocal(port
);
1110 fd1
= mustdiallocal(port
);
1111 mustsend(fd0
, "watch foo\r\n");
1112 ckresp(fd0
, "WATCHING 2\r\n");
1113 mustsend(fd0
, "reserve-with-timeout 1\r\n");
1114 mustsend(fd1
, "watch foo\r\n");
1115 ckresp(fd1
, "WATCHING 2\r\n");
1116 timeout
= 1100000000; // 1.1s
1117 ckresp(fd0
, "TIMED_OUT\r\n");
1121 cttest_reserve_ttr_deadline_soon()
1123 int port
= SERVER();
1124 int fd
= mustdiallocal(port
);
1126 mustsend(fd
, "put 0 0 1 1\r\n");
1127 mustsend(fd
, "a\r\n");
1128 ckresp(fd
, "INSERTED 1\r\n");
1130 mustsend(fd
, "reserve-with-timeout 1\r\n");
1131 ckresp(fd
, "RESERVED 1 1\r\n");
1132 ckresp(fd
, "a\r\n");
1134 // After 0.2s the job should be still reserved.
1136 mustsend(fd
, "stats-job 1\r\n");
1137 ckrespsub(fd
, "OK ");
1138 ckrespsub(fd
, "\nstate: reserved\n");
1140 mustsend(fd
, "reserve-with-timeout 1\r\n");
1141 ckresp(fd
, "DEADLINE_SOON\r\n");
1143 // Job should be reserved; last "reserve" took less than 1s.
1144 mustsend(fd
, "stats-job 1\r\n");
1145 ckrespsub(fd
, "OK ");
1146 ckrespsub(fd
, "\nstate: reserved\n");
1148 // We don't want to process the job, so release it and check that it's ready.
1149 mustsend(fd
, "release 1 0 0\r\n");
1150 ckresp(fd
, "RELEASED\r\n");
1151 mustsend(fd
, "stats-job 1\r\n");
1152 ckrespsub(fd
, "OK ");
1153 ckrespsub(fd
, "\nstate: ready\n");
1157 cttest_reserve_job_ttr_deadline_soon()
1159 int port
= SERVER();
1160 int fd
= mustdiallocal(port
);
1162 mustsend(fd
, "put 0 5 1 1\r\n");
1163 mustsend(fd
, "a\r\n");
1164 ckresp(fd
, "INSERTED 1\r\n");
1166 mustsend(fd
, "stats-job 1\r\n");
1167 ckrespsub(fd
, "OK ");
1168 ckrespsub(fd
, "\nstate: delayed\n");
1170 mustsend(fd
, "reserve-job 1\r\n");
1171 ckresp(fd
, "RESERVED 1 1\r\n");
1172 ckresp(fd
, "a\r\n");
1174 // After 0.1s the job should be still reserved.
1176 mustsend(fd
, "stats-job 1\r\n");
1177 ckrespsub(fd
, "OK ");
1178 ckrespsub(fd
, "\nstate: reserved\n");
1180 // Reservation made with reserve-job should behave the same way as other
1181 // reserve commands, e.g. produce "deadline soon" message, and get released
1183 mustsend(fd
, "reserve-with-timeout 1\r\n");
1184 ckresp(fd
, "DEADLINE_SOON\r\n");
1186 // Job should be reserved; last "reserve" took less than 1s.
1187 mustsend(fd
, "stats-job 1\r\n");
1188 ckrespsub(fd
, "OK ");
1189 ckrespsub(fd
, "\nstate: reserved\n");
1191 // We are not able to process the job in time. Check that it gets released.
1192 // The job was in delayed state. It becomes ready when it gets auto-released.
1193 usleep(1000000); // 1.0s
1195 mustsend(fd
, "put 0 0 1 1\r\n");
1196 mustsend(fd
, "B\r\n");
1197 ckresp(fd
, "INSERTED 2\r\n");
1198 // check that ID=1 gets released
1199 mustsend(fd
, "stats-job 1\r\n");
1200 ckrespsub(fd
, "OK ");
1201 ckrespsub(fd
, "\nstate: ready\n");
1205 cttest_reserve_job_already_reserved()
1207 int port
= SERVER();
1208 int fd
= mustdiallocal(port
);
1210 mustsend(fd
, "put 0 0 1 1\r\n");
1211 mustsend(fd
, "A\r\n");
1212 ckresp(fd
, "INSERTED 1\r\n");
1214 mustsend(fd
, "reserve-job 1\r\n");
1215 ckresp(fd
, "RESERVED 1 1\r\n");
1216 ckresp(fd
, "A\r\n");
1218 // Job should not be reserved twice.
1219 mustsend(fd
, "reserve-job 1\r\n");
1220 ckresp(fd
, "NOT_FOUND\r\n");
1224 cttest_reserve_job_ready()
1226 int port
= SERVER();
1227 int fd
= mustdiallocal(port
);
1229 mustsend(fd
, "put 0 0 1 1\r\n");
1230 mustsend(fd
, "A\r\n");
1231 ckresp(fd
, "INSERTED 1\r\n");
1232 mustsend(fd
, "put 0 0 1 1\r\n");
1233 mustsend(fd
, "B\r\n");
1234 ckresp(fd
, "INSERTED 2\r\n");
1236 mustsend(fd
, "reserve-job 2\r\n");
1237 ckresp(fd
, "RESERVED 2 1\r\n");
1238 ckresp(fd
, "B\r\n");
1240 // Non-existing job.
1241 mustsend(fd
, "reserve-job 3\r\n");
1242 ckresp(fd
, "NOT_FOUND\r\n");
1244 // id=1 was not reserved.
1245 mustsend(fd
, "release 1 1 0\r\n");
1246 ckresp(fd
, "NOT_FOUND\r\n");
1248 mustsend(fd
, "release 2 1 0\r\n");
1249 ckresp(fd
, "RELEASED\r\n");
1253 cttest_reserve_job_delayed()
1255 int port
= SERVER();
1256 int fd
= mustdiallocal(port
);
1258 mustsend(fd
, "put 0 100 1 1\r\n");
1259 mustsend(fd
, "A\r\n");
1260 ckresp(fd
, "INSERTED 1\r\n");
1261 mustsend(fd
, "put 0 100 1 1\r\n");
1262 mustsend(fd
, "B\r\n");
1263 ckresp(fd
, "INSERTED 2\r\n");
1264 mustsend(fd
, "put 0 100 1 1\r\n");
1265 mustsend(fd
, "C\r\n");
1266 ckresp(fd
, "INSERTED 3\r\n");
1268 mustsend(fd
, "reserve-job 2\r\n");
1269 ckresp(fd
, "RESERVED 2 1\r\n");
1270 ckresp(fd
, "B\r\n");
1272 mustsend(fd
, "release 2 1 0\r\n");
1273 ckresp(fd
, "RELEASED\r\n");
1275 // verify that job was released in ready state.
1276 mustsend(fd
, "stats-job 2\r\n");
1277 ckrespsub(fd
, "OK ");
1278 ckrespsub(fd
, "\nstate: ready\n");
1282 cttest_reserve_job_buried()
1284 int port
= SERVER();
1285 int fd
= mustdiallocal(port
);
1287 // put, reserve and bury
1288 mustsend(fd
, "put 0 0 1 1\r\n");
1289 mustsend(fd
, "A\r\n");
1290 ckresp(fd
, "INSERTED 1\r\n");
1291 mustsend(fd
, "reserve-job 1\r\n");
1292 ckresp(fd
, "RESERVED 1 1\r\n");
1293 ckresp(fd
, "A\r\n");
1294 mustsend(fd
, "bury 1 1\r\n");
1295 ckresp(fd
, "BURIED\r\n");
1297 // put, reserve and bury
1298 mustsend(fd
, "put 0 0 1 1\r\n");
1299 mustsend(fd
, "B\r\n");
1300 ckresp(fd
, "INSERTED 2\r\n");
1301 mustsend(fd
, "reserve-job 2\r\n");
1302 ckresp(fd
, "RESERVED 2 1\r\n");
1303 ckresp(fd
, "B\r\n");
1304 mustsend(fd
, "bury 2 1\r\n");
1305 ckresp(fd
, "BURIED\r\n");
1308 mustsend(fd
, "reserve-job 2\r\n");
1309 ckresp(fd
, "RESERVED 2 1\r\n");
1310 ckresp(fd
, "B\r\n");
1311 mustsend(fd
, "reserve-job 1\r\n");
1312 ckresp(fd
, "RESERVED 1 1\r\n");
1313 ckresp(fd
, "A\r\n");
1315 // release back and check if jobs are ready.
1316 mustsend(fd
, "release 1 1 0\r\n");
1317 ckresp(fd
, "RELEASED\r\n");
1318 mustsend(fd
, "release 2 1 0\r\n");
1319 ckresp(fd
, "RELEASED\r\n");
1320 mustsend(fd
, "stats-job 1\r\n");
1321 ckrespsub(fd
, "OK ");
1322 ckrespsub(fd
, "\nstate: ready\n");
1323 mustsend(fd
, "stats-job 2\r\n");
1324 ckrespsub(fd
, "OK ");
1325 ckrespsub(fd
, "\nstate: ready\n");
1330 cttest_release_bad_format()
1332 int port
= SERVER();
1333 int fd
= mustdiallocal(port
);
1336 mustsend(fd
, "release 18446744073709551616 1 1\r\n"); // UINT64_MAX+1
1337 ckresp(fd
, "BAD_FORMAT\r\n");
1338 mustsend(fd
, "release 184467440737095516160000000000000000000000000000 1 1\r\n");
1339 ckresp(fd
, "BAD_FORMAT\r\n");
1340 mustsend(fd
, "release foo111\r\n");
1341 ckresp(fd
, "BAD_FORMAT\r\n");
1342 mustsend(fd
, "release 111foo\r\n");
1343 ckresp(fd
, "BAD_FORMAT\r\n");
1346 mustsend(fd
, "release 18446744073709551615 abc 1\r\n");
1347 ckresp(fd
, "BAD_FORMAT\r\n");
1350 mustsend(fd
, "release 18446744073709551615 1 abc\r\n");
1351 ckresp(fd
, "BAD_FORMAT\r\n");
1355 cttest_release_not_found()
1357 int port
= SERVER();
1358 int fd
= mustdiallocal(port
);
1359 mustsend(fd
, "release 1 1 1\r\n");
1360 ckresp(fd
, "NOT_FOUND\r\n");
1364 cttest_close_releases_job()
1366 int port
= SERVER();
1367 int cons
= mustdiallocal(port
);
1368 int prod
= mustdiallocal(port
);
1369 mustsend(cons
, "reserve-with-timeout 1\r\n");
1371 mustsend(prod
, "put 0 0 100 1\r\n");
1372 mustsend(prod
, "a\r\n");
1373 ckresp(prod
, "INSERTED 1\r\n");
1375 ckresp(cons
, "RESERVED 1 1\r\n");
1376 ckresp(cons
, "a\r\n");
1378 mustsend(prod
, "stats-job 1\r\n");
1379 ckrespsub(prod
, "OK ");
1380 ckrespsub(prod
, "\nstate: reserved\n");
1382 // Closed consumer connection should make the job ready sooner than ttr=100.
1385 // Job should be released in less than 1s. It is not instantly;
1386 // we do not make guarantees about how soon jobs should be released.
1387 mustsend(prod
, "reserve-with-timeout 1\r\n");
1388 ckresp(prod
, "RESERVED 1 1\r\n");
1389 ckresp(prod
, "a\r\n");
1393 cttest_quit_releases_job()
1395 // This test is similar to the close_releases_job test, except that
1396 // connection is not closed, but command quit is sent.
1397 int port
= SERVER();
1398 int cons
= mustdiallocal(port
);
1399 int prod
= mustdiallocal(port
);
1400 mustsend(cons
, "reserve-with-timeout 1\r\n");
1402 mustsend(prod
, "put 0 0 100 1\r\n");
1403 mustsend(prod
, "a\r\n");
1404 ckresp(prod
, "INSERTED 1\r\n");
1406 ckresp(cons
, "RESERVED 1 1\r\n");
1407 ckresp(cons
, "a\r\n");
1409 mustsend(prod
, "stats-job 1\r\n");
1410 ckrespsub(prod
, "OK ");
1411 ckrespsub(prod
, "\nstate: reserved\n");
1413 // Quitting consumer should make the job ready sooner than ttr=100.
1414 mustsend(cons
, "quit\r\n");
1416 // Job should be released in less than 1s. It is not instantly;
1417 // we do not make guarantees about how soon jobs should be released.
1418 mustsend(prod
, "reserve-with-timeout 1\r\n");
1419 ckresp(prod
, "RESERVED 1 1\r\n");
1420 ckresp(prod
, "a\r\n");
1424 cttest_unpause_tube()
1428 int port
= SERVER();
1429 fd0
= mustdiallocal(port
);
1430 fd1
= mustdiallocal(port
);
1432 mustsend(fd0
, "put 0 0 0 0\r\n");
1433 mustsend(fd0
, "\r\n");
1434 ckresp(fd0
, "INSERTED 1\r\n");
1436 mustsend(fd0
, "pause-tube default 86400\r\n");
1437 ckresp(fd0
, "PAUSED\r\n");
1439 mustsend(fd1
, "reserve\r\n");
1441 mustsend(fd0
, "pause-tube default 0\r\n");
1442 ckresp(fd0
, "PAUSED\r\n");
1444 // ckresp will time out if this takes too long, so the
1445 // test will not pass.
1446 ckresp(fd1
, "RESERVED 1 0\r\n");
1447 ckresp(fd1
, "\r\n");
1453 int port
= SERVER();
1454 int fd0
= mustdiallocal(port
);
1456 mustsend(fd0
, "watch w\r\n");
1457 ckresp(fd0
, "WATCHING 2\r\n");
1459 mustsend(fd0
, "use u\r\n");
1460 ckresp(fd0
, "USING u\r\n");
1462 mustsend(fd0
, "list-tubes\r\n");
1463 ckrespsub(fd0
, "OK ");
1470 mustsend(fd0
, "list-tube-used\r\n");
1471 ckresp(fd0
, "USING u\r\n");
1473 mustsend(fd0
, "list-tubes-watched\r\n");
1474 ckrespsub(fd0
, "OK ");
1480 mustsend(fd0
, "ignore default\r\n");
1481 ckresp(fd0
, "WATCHING 1\r\n");
1483 mustsend(fd0
, "list-tubes-watched\r\n");
1484 ckrespsub(fd0
, "OK ");
1489 mustsend(fd0
, "ignore w\r\n");
1490 ckresp(fd0
, "NOT_IGNORED\r\n");
1493 #define STRING_LEN_200 \
1494 "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" \
1495 "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"
1498 cttest_use_tube_long()
1500 int port
= SERVER();
1501 int fd0
= mustdiallocal(port
);
1502 // 200 chars is okay
1503 mustsend(fd0
, "use " STRING_LEN_200
"\r\n");
1504 ckresp(fd0
, "USING " STRING_LEN_200
"\r\n");
1505 // 201 chars is too much
1506 mustsend(fd0
, "use " STRING_LEN_200
"Z\r\n");
1507 ckresp(fd0
, "BAD_FORMAT\r\n");
1511 cttest_longest_command()
1513 int port
= SERVER();
1514 int fd0
= mustdiallocal(port
);
1515 mustsend(fd0
, "use " STRING_LEN_200
"\r\n");
1516 ckresp(fd0
, "USING " STRING_LEN_200
"\r\n");
1517 mustsend(fd0
, "pause-tube " STRING_LEN_200
" 4294967295\r\n");
1518 ckresp(fd0
, "PAUSED\r\n");
1522 cttest_binlog_empty_exit()
1524 srv
.wal
.dir
= ctdir();
1526 job_data_size_limit
= 10;
1528 int port
= SERVER();
1532 int fd
= mustdiallocal(port
);
1533 mustsend(fd
, "put 0 0 0 0\r\n");
1534 mustsend(fd
, "\r\n");
1535 ckresp(fd
, "INSERTED 1\r\n");
1539 cttest_binlog_bury()
1541 srv
.wal
.dir
= ctdir();
1543 job_data_size_limit
= 10;
1545 int port
= SERVER();
1546 int fd
= mustdiallocal(port
);
1547 mustsend(fd
, "put 0 0 100 0\r\n");
1548 mustsend(fd
, "\r\n");
1549 ckresp(fd
, "INSERTED 1\r\n");
1550 mustsend(fd
, "reserve\r\n");
1551 ckresp(fd
, "RESERVED 1 0\r\n");
1553 mustsend(fd
, "bury 1 0\r\n");
1554 ckresp(fd
, "BURIED\r\n");
1558 cttest_binlog_basic()
1560 srv
.wal
.dir
= ctdir();
1562 job_data_size_limit
= 10;
1564 int port
= SERVER();
1565 int fd
= mustdiallocal(port
);
1566 mustsend(fd
, "put 0 0 100 0\r\n");
1567 mustsend(fd
, "\r\n");
1568 ckresp(fd
, "INSERTED 1\r\n");
1573 fd
= mustdiallocal(port
);
1574 mustsend(fd
, "delete 1\r\n");
1575 ckresp(fd
, "DELETED\r\n");
1579 cttest_binlog_size_limit()
1585 srv
.wal
.dir
= ctdir();
1587 srv
.wal
.filesize
= size
;
1588 srv
.wal
.syncrate
= 0;
1589 srv
.wal
.wantsync
= 1;
1591 int port
= SERVER();
1592 int fd
= mustdiallocal(port
);
1593 char *b2
= fmtalloc("%s/binlog.2", ctdir());
1594 while (!exist(b2
)) {
1595 char *exp
= fmtalloc("INSERTED %d\r\n", ++i
);
1596 mustsend(fd
, "put 0 0 100 50\r\n");
1597 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1602 char *b1
= fmtalloc("%s/binlog.1", ctdir());
1603 gotsize
= filesize(b1
);
1604 assertf(gotsize
== size
, "binlog.1 %d != %d", gotsize
, size
);
1605 gotsize
= filesize(b2
);
1606 assertf(gotsize
== size
, "binlog.2 %d != %d", gotsize
, size
);
1612 cttest_binlog_allocation()
1617 srv
.wal
.dir
= ctdir();
1619 srv
.wal
.filesize
= size
;
1620 srv
.wal
.syncrate
= 0;
1621 srv
.wal
.wantsync
= 1;
1623 int port
= SERVER();
1624 int fd
= mustdiallocal(port
);
1625 for (i
= 1; i
<= 96; i
++) {
1626 char *exp
= fmtalloc("INSERTED %d\r\n", i
);
1627 mustsend(fd
, "put 0 0 120 22\r\n");
1628 mustsend(fd
, "job payload xxxxxxxxxx\r\n");
1632 for (i
= 1; i
<= 96; i
++) {
1633 char *exp
= fmtalloc("delete %d\r\n", i
);
1635 ckresp(fd
, "DELETED\r\n");
1641 cttest_binlog_read()
1643 srv
.wal
.dir
= ctdir();
1645 srv
.wal
.syncrate
= 0;
1646 srv
.wal
.wantsync
= 1;
1648 int port
= SERVER();
1649 int fd
= mustdiallocal(port
);
1650 mustsend(fd
, "use test\r\n");
1651 ckresp(fd
, "USING test\r\n");
1652 mustsend(fd
, "put 0 0 120 4\r\n");
1653 mustsend(fd
, "test\r\n");
1654 ckresp(fd
, "INSERTED 1\r\n");
1655 mustsend(fd
, "put 0 0 120 4\r\n");
1656 mustsend(fd
, "tes1\r\n");
1657 ckresp(fd
, "INSERTED 2\r\n");
1658 mustsend(fd
, "watch test\r\n");
1659 ckresp(fd
, "WATCHING 2\r\n");
1660 mustsend(fd
, "reserve\r\n");
1661 ckresp(fd
, "RESERVED 1 4\r\n");
1662 ckresp(fd
, "test\r\n");
1663 mustsend(fd
, "release 1 1 1\r\n");
1664 ckresp(fd
, "RELEASED\r\n");
1665 mustsend(fd
, "reserve\r\n");
1666 ckresp(fd
, "RESERVED 2 4\r\n");
1667 ckresp(fd
, "tes1\r\n");
1668 mustsend(fd
, "delete 2\r\n");
1669 ckresp(fd
, "DELETED\r\n");
1674 fd
= mustdiallocal(port
);
1675 mustsend(fd
, "watch test\r\n");
1676 ckresp(fd
, "WATCHING 2\r\n");
1677 mustsend(fd
, "reserve\r\n");
1678 ckresp(fd
, "RESERVED 1 4\r\n");
1679 ckresp(fd
, "test\r\n");
1680 mustsend(fd
, "delete 1\r\n");
1681 ckresp(fd
, "DELETED\r\n");
1682 mustsend(fd
, "delete 2\r\n");
1683 ckresp(fd
, "NOT_FOUND\r\n");
1687 cttest_binlog_disk_full()
1690 falloc
= &wrapfalloc
;
1694 srv
.wal
.dir
= ctdir();
1696 srv
.wal
.filesize
= size
;
1697 srv
.wal
.syncrate
= 0;
1698 srv
.wal
.wantsync
= 1;
1700 int port
= SERVER();
1701 int fd
= mustdiallocal(port
);
1702 mustsend(fd
, "put 0 0 100 50\r\n");
1703 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1704 ckresp(fd
, "INSERTED 1\r\n");
1705 mustsend(fd
, "put 0 0 100 50\r\n");
1706 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1707 ckresp(fd
, "INSERTED 2\r\n");
1708 mustsend(fd
, "put 0 0 100 50\r\n");
1709 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1710 ckresp(fd
, "INSERTED 3\r\n");
1711 mustsend(fd
, "put 0 0 100 50\r\n");
1712 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1713 ckresp(fd
, "INSERTED 4\r\n");
1715 mustsend(fd
, "put 0 0 100 50\r\n");
1716 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1717 ckresp(fd
, "OUT_OF_MEMORY\r\n");
1719 mustsend(fd
, "put 0 0 100 50\r\n");
1720 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1721 ckresp(fd
, "INSERTED 6\r\n");
1722 mustsend(fd
, "put 0 0 100 50\r\n");
1723 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1724 ckresp(fd
, "INSERTED 7\r\n");
1725 mustsend(fd
, "put 0 0 100 50\r\n");
1726 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1727 ckresp(fd
, "INSERTED 8\r\n");
1728 mustsend(fd
, "put 0 0 100 50\r\n");
1729 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1730 ckresp(fd
, "INSERTED 9\r\n");
1732 mustsend(fd
, "delete 1\r\n");
1733 ckresp(fd
, "DELETED\r\n");
1734 mustsend(fd
, "delete 2\r\n");
1735 ckresp(fd
, "DELETED\r\n");
1736 mustsend(fd
, "delete 3\r\n");
1737 ckresp(fd
, "DELETED\r\n");
1738 mustsend(fd
, "delete 4\r\n");
1739 ckresp(fd
, "DELETED\r\n");
1740 mustsend(fd
, "delete 6\r\n");
1741 ckresp(fd
, "DELETED\r\n");
1742 mustsend(fd
, "delete 7\r\n");
1743 ckresp(fd
, "DELETED\r\n");
1744 mustsend(fd
, "delete 8\r\n");
1745 ckresp(fd
, "DELETED\r\n");
1746 mustsend(fd
, "delete 9\r\n");
1747 ckresp(fd
, "DELETED\r\n");
1751 cttest_binlog_disk_full_delete()
1754 falloc
= &wrapfalloc
;
1758 srv
.wal
.dir
= ctdir();
1760 srv
.wal
.filesize
= size
;
1761 srv
.wal
.syncrate
= 0;
1762 srv
.wal
.wantsync
= 1;
1764 int port
= SERVER();
1765 int fd
= mustdiallocal(port
);
1766 mustsend(fd
, "put 0 0 100 50\r\n");
1767 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1768 ckresp(fd
, "INSERTED 1\r\n");
1769 mustsend(fd
, "put 0 0 100 50\r\n");
1770 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1771 ckresp(fd
, "INSERTED 2\r\n");
1772 mustsend(fd
, "put 0 0 100 50\r\n");
1773 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1774 ckresp(fd
, "INSERTED 3\r\n");
1775 mustsend(fd
, "put 0 0 100 50\r\n");
1776 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1777 ckresp(fd
, "INSERTED 4\r\n");
1778 mustsend(fd
, "put 0 0 100 50\r\n");
1779 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1780 ckresp(fd
, "INSERTED 5\r\n");
1782 mustsend(fd
, "put 0 0 100 50\r\n");
1783 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1784 ckresp(fd
, "INSERTED 6\r\n");
1785 mustsend(fd
, "put 0 0 100 50\r\n");
1786 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1787 ckresp(fd
, "INSERTED 7\r\n");
1788 mustsend(fd
, "put 0 0 100 50\r\n");
1789 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1790 ckresp(fd
, "INSERTED 8\r\n");
1792 mustsend(fd
, "put 0 0 100 50\r\n");
1793 mustsend(fd
, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\r\n");
1794 ckresp(fd
, "OUT_OF_MEMORY\r\n");
1796 char *b1
= fmtalloc("%s/binlog.1", ctdir());
1800 mustsend(fd
, "delete 1\r\n");
1801 ckresp(fd
, "DELETED\r\n");
1802 mustsend(fd
, "delete 2\r\n");
1803 ckresp(fd
, "DELETED\r\n");
1804 mustsend(fd
, "delete 3\r\n");
1805 ckresp(fd
, "DELETED\r\n");
1806 mustsend(fd
, "delete 4\r\n");
1807 ckresp(fd
, "DELETED\r\n");
1808 mustsend(fd
, "delete 5\r\n");
1809 ckresp(fd
, "DELETED\r\n");
1810 mustsend(fd
, "delete 6\r\n");
1811 ckresp(fd
, "DELETED\r\n");
1812 mustsend(fd
, "delete 7\r\n");
1813 ckresp(fd
, "DELETED\r\n");
1814 mustsend(fd
, "delete 8\r\n");
1815 ckresp(fd
, "DELETED\r\n");
1823 if (system("which beanstalkd-1.4.6") != 0) {
1824 puts("beanstalkd 1.4.6 not found, skipping");
1828 progname
= __func__
;
1829 int port
= (rand() & 0xfbff) + 1024;
1830 sprintf(portstr
, "%d", port
);
1831 muststart("beanstalkd-1.4.6", "-b", ctdir(), "-p", portstr
);
1832 int fd
= mustdiallocal(port
);
1833 mustsend(fd
, "use test\r\n");
1834 ckresp(fd
, "USING test\r\n");
1835 mustsend(fd
, "put 1 2 3 4\r\n");
1836 mustsend(fd
, "test\r\n");
1837 ckresp(fd
, "INSERTED 1\r\n");
1838 mustsend(fd
, "put 4 3 2 1\r\n");
1839 mustsend(fd
, "x\r\n");
1840 ckresp(fd
, "INSERTED 2\r\n");
1842 mustsend(fd
, "stats-job 1\r\n");
1843 ckrespsub(fd
, "OK ");
1844 ckrespsub(fd
, "\nid: 1\n");
1845 mustsend(fd
, "stats-job 1\r\n");
1846 ckrespsub(fd
, "OK ");
1847 ckrespsub(fd
, "\ntube: test\n");
1848 mustsend(fd
, "stats-job 1\r\n");
1849 ckrespsub(fd
, "OK ");
1850 ckrespsub(fd
, "\nstate: delayed\n");
1851 mustsend(fd
, "stats-job 1\r\n");
1852 ckrespsub(fd
, "OK ");
1853 ckrespsub(fd
, "\npri: 1\n");
1854 mustsend(fd
, "stats-job 1\r\n");
1855 ckrespsub(fd
, "OK ");
1856 ckrespsub(fd
, "\ndelay: 2\n");
1857 mustsend(fd
, "stats-job 1\r\n");
1858 ckrespsub(fd
, "OK ");
1859 ckrespsub(fd
, "\nttr: 3\n");
1860 mustsend(fd
, "stats-job 1\r\n");
1861 ckrespsub(fd
, "OK ");
1862 ckrespsub(fd
, "\nreserves: 0\n");
1863 mustsend(fd
, "stats-job 1\r\n");
1864 ckrespsub(fd
, "OK ");
1865 ckrespsub(fd
, "\ntimeouts: 0\n");
1866 mustsend(fd
, "stats-job 1\r\n");
1867 ckrespsub(fd
, "OK ");
1868 ckrespsub(fd
, "\nreleases: 0\n");
1869 mustsend(fd
, "stats-job 1\r\n");
1870 ckrespsub(fd
, "OK ");
1871 ckrespsub(fd
, "\nburies: 0\n");
1872 mustsend(fd
, "stats-job 1\r\n");
1873 ckrespsub(fd
, "OK ");
1874 ckrespsub(fd
, "\nkicks: 0\n");
1876 mustsend(fd
, "stats-job 2\r\n");
1877 ckrespsub(fd
, "OK ");
1878 ckrespsub(fd
, "\nid: 2\n");
1879 mustsend(fd
, "stats-job 2\r\n");
1880 ckrespsub(fd
, "OK ");
1881 ckrespsub(fd
, "\ntube: test\n");
1882 mustsend(fd
, "stats-job 2\r\n");
1883 ckrespsub(fd
, "OK ");
1884 ckrespsub(fd
, "\nstate: delayed\n");
1885 mustsend(fd
, "stats-job 2\r\n");
1886 ckrespsub(fd
, "OK ");
1887 ckrespsub(fd
, "\npri: 4\n");
1888 mustsend(fd
, "stats-job 2\r\n");
1889 ckrespsub(fd
, "OK ");
1890 ckrespsub(fd
, "\ndelay: 3\n");
1891 mustsend(fd
, "stats-job 2\r\n");
1892 ckrespsub(fd
, "OK ");
1893 ckrespsub(fd
, "\nttr: 2\n");
1894 mustsend(fd
, "stats-job 2\r\n");
1895 ckrespsub(fd
, "OK ");
1896 ckrespsub(fd
, "\nreserves: 0\n");
1897 mustsend(fd
, "stats-job 2\r\n");
1898 ckrespsub(fd
, "OK ");
1899 ckrespsub(fd
, "\ntimeouts: 0\n");
1900 mustsend(fd
, "stats-job 2\r\n");
1901 ckrespsub(fd
, "OK ");
1902 ckrespsub(fd
, "\nreleases: 0\n");
1903 mustsend(fd
, "stats-job 2\r\n");
1904 ckrespsub(fd
, "OK ");
1905 ckrespsub(fd
, "\nburies: 0\n");
1906 mustsend(fd
, "stats-job 2\r\n");
1907 ckrespsub(fd
, "OK ");
1908 ckrespsub(fd
, "\nkicks: 0\n");
1910 kill(srvpid
, SIGTERM
);
1911 waitpid(srvpid
, NULL
, 0);
1913 srv
.wal
.dir
= ctdir();
1915 srv
.wal
.syncrate
= 0;
1916 srv
.wal
.wantsync
= 1;
1919 fd
= mustdiallocal(port
);
1921 mustsend(fd
, "stats-job 1\r\n");
1922 ckrespsub(fd
, "OK ");
1923 ckrespsub(fd
, "\nid: 1\n");
1924 mustsend(fd
, "stats-job 1\r\n");
1925 ckrespsub(fd
, "OK ");
1926 ckrespsub(fd
, "\ntube: test\n");
1927 mustsend(fd
, "stats-job 1\r\n");
1928 ckrespsub(fd
, "OK ");
1929 ckrespsub(fd
, "\nstate: delayed\n");
1930 mustsend(fd
, "stats-job 1\r\n");
1931 ckrespsub(fd
, "OK ");
1932 ckrespsub(fd
, "\npri: 1\n");
1933 mustsend(fd
, "stats-job 1\r\n");
1934 ckrespsub(fd
, "OK ");
1935 ckrespsub(fd
, "\ndelay: 2\n");
1936 mustsend(fd
, "stats-job 1\r\n");
1937 ckrespsub(fd
, "OK ");
1938 ckrespsub(fd
, "\nttr: 3\n");
1939 mustsend(fd
, "stats-job 1\r\n");
1940 ckrespsub(fd
, "OK ");
1941 ckrespsub(fd
, "\nreserves: 0\n");
1942 mustsend(fd
, "stats-job 1\r\n");
1943 ckrespsub(fd
, "OK ");
1944 ckrespsub(fd
, "\ntimeouts: 0\n");
1945 mustsend(fd
, "stats-job 1\r\n");
1946 ckrespsub(fd
, "OK ");
1947 ckrespsub(fd
, "\nreleases: 0\n");
1948 mustsend(fd
, "stats-job 1\r\n");
1949 ckrespsub(fd
, "OK ");
1950 ckrespsub(fd
, "\nburies: 0\n");
1951 mustsend(fd
, "stats-job 1\r\n");
1952 ckrespsub(fd
, "OK ");
1953 ckrespsub(fd
, "\nkicks: 0\n");
1955 mustsend(fd
, "stats-job 2\r\n");
1956 ckrespsub(fd
, "OK ");
1957 ckrespsub(fd
, "\nid: 2\n");
1958 mustsend(fd
, "stats-job 2\r\n");
1959 ckrespsub(fd
, "OK ");
1960 ckrespsub(fd
, "\ntube: test\n");
1961 mustsend(fd
, "stats-job 2\r\n");
1962 ckrespsub(fd
, "OK ");
1963 ckrespsub(fd
, "\nstate: delayed\n");
1964 mustsend(fd
, "stats-job 2\r\n");
1965 ckrespsub(fd
, "OK ");
1966 ckrespsub(fd
, "\npri: 4\n");
1967 mustsend(fd
, "stats-job 2\r\n");
1968 ckrespsub(fd
, "OK ");
1969 ckrespsub(fd
, "\ndelay: 3\n");
1970 mustsend(fd
, "stats-job 2\r\n");
1971 ckrespsub(fd
, "OK ");
1972 ckrespsub(fd
, "\nttr: 2\n");
1973 mustsend(fd
, "stats-job 2\r\n");
1974 ckrespsub(fd
, "OK ");
1975 ckrespsub(fd
, "\nreserves: 0\n");
1976 mustsend(fd
, "stats-job 2\r\n");
1977 ckrespsub(fd
, "OK ");
1978 ckrespsub(fd
, "\ntimeouts: 0\n");
1979 mustsend(fd
, "stats-job 2\r\n");
1980 ckrespsub(fd
, "OK ");
1981 ckrespsub(fd
, "\nreleases: 0\n");
1982 mustsend(fd
, "stats-job 2\r\n");
1983 ckrespsub(fd
, "OK ");
1984 ckrespsub(fd
, "\nburies: 0\n");
1985 mustsend(fd
, "stats-job 2\r\n");
1986 ckrespsub(fd
, "OK ");
1987 ckrespsub(fd
, "\nkicks: 0\n");
1991 bench_put_delete_size(int n
, int size
, int walsize
, int sync
, int64 syncrate_ms
)
1994 srv
.wal
.dir
= ctdir();
1996 srv
.wal
.filesize
= walsize
;
1997 srv
.wal
.syncrate
= syncrate_ms
* 1000000;
1998 srv
.wal
.wantsync
= sync
;
2001 job_data_size_limit
= JOB_DATA_SIZE_LIMIT_MAX
;
2002 int port
= SERVER();
2003 int fd
= mustdiallocal(port
);
2004 char buf
[50], put
[50];
2006 memset(body
, 'a', size
);
2009 sprintf(put
, "put 0 0 0 %d\r\n", size
);
2012 for (i
= 0; i
< n
; i
++) {
2015 mustsend(fd
, "\r\n");
2016 ckrespsub(fd
, "INSERTED ");
2017 sprintf(buf
, "delete %d\r\n", i
+ 1);
2019 ckresp(fd
, "DELETED\r\n");
2025 ctbench_put_delete_0008(int n
)
2027 bench_put_delete_size(n
, 8, 0, 0, 0);
2031 ctbench_put_delete_1024(int n
)
2033 bench_put_delete_size(n
, 1024, 0, 0, 0);
2037 ctbench_put_delete_8192(int n
)
2039 bench_put_delete_size(n
, 8192, 0, 0, 0);
2043 ctbench_put_delete_81920(int n
)
2045 bench_put_delete_size(n
, 81920, 0, 0, 0);
2049 ctbench_put_delete_wal_1024_fsync_000ms(int n
)
2051 bench_put_delete_size(n
, 1024, 512000, 1, 0);
2055 ctbench_put_delete_wal_1024_fsync_050ms(int n
)
2057 bench_put_delete_size(n
, 1024, 512000, 1, 50);
2061 ctbench_put_delete_wal_1024_fsync_200ms(int n
)
2063 bench_put_delete_size(n
, 1024, 512000, 1, 200);
2067 ctbench_put_delete_wal_1024_no_fsync(int n
)
2069 bench_put_delete_size(n
, 1024, 512000, 0, 0);
2073 ctbench_put_delete_wal_8192_fsync_000ms(int n
)
2075 bench_put_delete_size(n
, 8192, 512000, 1, 0);
2079 ctbench_put_delete_wal_8192_fsync_050ms(int n
)
2081 bench_put_delete_size(n
, 8192, 512000, 1, 50);
2085 ctbench_put_delete_wal_8192_fsync_200ms(int n
)
2087 bench_put_delete_size(n
, 8192, 512000, 1, 200);
2091 ctbench_put_delete_wal_8192_no_fsync(int n
)
2093 bench_put_delete_size(n
, 8192, 512000, 0, 0);