lei/store: stop shard workers + cat-file on idle
[public-inbox.git] / t / cmd_ipc.t
blobc973c6f01ff9f0bc1872f8b9cdf3ec609d5fb0c4
1 #!perl -w
2 # Copyright (C) all contributors <meta@public-inbox.org>
3 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
4 use v5.12;
5 use PublicInbox::TestCommon;
6 use autodie;
7 use Socket qw(AF_UNIX SOCK_STREAM SOCK_SEQPACKET);
8 pipe(my $r, my $w);
9 my ($send, $recv);
10 require_ok 'PublicInbox::Spawn';
11 require POSIX;
13 my $do_test = sub { SKIP: {
14         my ($type, $flag, $desc) = @_;
15         my ($s1, $s2);
16         my $src = 'some payload' x 40;
17         socketpair($s1, $s2, AF_UNIX, $type, 0);
18         my $sfds = [ fileno($r), fileno($w), fileno($s1) ];
19         $send->($s1, $sfds, $src, $flag);
20         my (@fds) = $recv->($s2, my $buf, length($src) + 1);
21         is($buf, $src, 'got buffer payload '.$desc);
22         my ($r1, $w1, $s1a);
23         my $opens = sub {
24                 ok(open($r1, '<&=', $fds[0]), 'opened received $r');
25                 ok(open($w1, '>&=', $fds[1]), 'opened received $w');
26                 ok(open($s1a, '+>&=', $fds[2]), 'opened received $s1');
27         };
28         $opens->();
29         my @exp = stat $r;
30         my @cur = stat $r1;
31         is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$r dev/ino matches');
32         @exp = stat $w;
33         @cur = stat $w1;
34         is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$w dev/ino matches');
35         @exp = stat $s1;
36         @cur = stat $s1a;
37         is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$s1 dev/ino matches');
38         if ($type == SOCK_SEQPACKET) {
39                 $r1 = $w1 = $s1a = undef;
40                 $src = (',' x 1023) . '-' .('.' x 1024);
41                 $send->($s1, $sfds, $src, $flag);
42                 (@fds) = $recv->($s2, $buf, 1024);
43                 is($buf, (',' x 1023) . '-', 'silently truncated buf');
44                 $opens->();
45                 $r1 = $w1 = $s1a = undef;
47                 $s2->blocking(0);
48                 @fds = $recv->($s2, $buf, length($src) + 1);
49                 ok($!{EAGAIN}, "EAGAIN set by ($desc)");
50                 is($buf, '', "recv buffer emptied on EAGAIN ($desc)");
51                 is_deeply(\@fds, [ undef ], "EAGAIN $desc");
52                 $s2->blocking(1);
54                 if ('test ALRM') {
55                         my $alrm = 0;
56                         local $SIG{ALRM} = sub { $alrm++ };
57                         my $tgt = $$;
58                         my $pid = fork;
59                         if ($pid == 0) {
60                                 # need to loop since Perl signals are racy
61                                 # (the interpreter doesn't self-pipe)
62                                 my $n = 3;
63                                 while (tick(0.01 * $n) && --$n) {
64                                         kill('ALRM', $tgt)
65                                 }
66                                 close $s1;
67                                 POSIX::_exit(1);
68                         }
69                         close $s1;
70                         @fds = $recv->($s2, $buf, length($src) + 1);
71                         waitpid($pid, 0);
72                         is_deeply(\@fds, [], "EINTR->EOF $desc");
73                         ok($alrm, 'SIGALRM hit');
74                 }
76                 @fds = $recv->($s2, $buf, length($src) + 1);
77                 is_deeply(\@fds, [], "no FDs on EOF $desc");
78                 is($buf, '', "buffer cleared on EOF ($desc)");
80                 socketpair($s1, $s2, AF_UNIX, $type, 0);
81                 $s1->blocking(0);
82                 my $nsent = 0;
83                 my $srclen = length($src);
84                 while (defined(my $n = $send->($s1, $sfds, $src, $flag))) {
85                         $nsent += $n;
86                         fail "sent $n bytes of $srclen" if $srclen != $n;
87                 }
88                 ok($!{EAGAIN} || $!{ETOOMANYREFS} || $!{EMSGSIZE},
89                         "hit EAGAIN || ETOOMANYREFS || EMSGSIZE on send $desc")
90                         or diag "send failed with: $! (nsent=$nsent)";
91                 ok($nsent > 0, 'sent some bytes');
93                 socketpair($s1, $s2, AF_UNIX, $type, 0);
94                 is($send->($s1, [], $src, $flag), length($src), 'sent w/o FDs');
95                 $buf = 'nope';
96                 @fds = $recv->($s2, $buf, length($src));
97                 is(scalar(@fds), 0, 'no FDs received');
98                 is($buf, $src, 'recv w/o FDs');
100                 my $nr = 2 * 1024 * 1024;
101                 while (1) {
102                         vec(my $vec = '', $nr - 1, 8) = 1;
103                         my $n = $send->($s1, [], $vec, $flag);
104                         if (defined($n)) {
105                                 $n == length($vec) or
106                                         fail "short send: $n != ".length($vec);
107                                 diag "sent $nr, retrying with more";
108                                 $nr += 2 * 1024 * 1024;
109                         } else {
110                                 ok($!{EMSGSIZE} || $!{ENOBUFS},
111                                         'got EMSGSIZE or ENOBUFS') or
112                                         diag "$nr bytes fails with: $!";
113                                 last;
114                         }
115                 }
116         }
117 } };
119 my $send_ic = PublicInbox::Spawn->can('send_cmd4');
120 my $recv_ic = PublicInbox::Spawn->can('recv_cmd4');
121 SKIP: {
122         ($send_ic && $recv_ic) or skip 'Inline::C not installed/enabled', 12;
123         $send = $send_ic;
124         $recv = $recv_ic;
125         $do_test->(SOCK_STREAM, 0, 'Inline::C stream');
126         $do_test->(SOCK_SEQPACKET, 0, 'Inline::C seqpacket');
129 SKIP: {
130         require_mods('Socket::MsgHdr', 13);
131         require_ok 'PublicInbox::CmdIPC4';
132         $send = PublicInbox::CmdIPC4->can('send_cmd4');
133         $recv = PublicInbox::CmdIPC4->can('recv_cmd4');
134         $do_test->(SOCK_STREAM, 0, 'MsgHdr stream');
135         $do_test->(SOCK_SEQPACKET, 0, 'MsgHdr seqpacket');
136         SKIP: {
137                 ($send_ic && $recv_ic) or
138                         skip 'Inline::C not installed/enabled', 12;
139                 $recv = $recv_ic;
140                 $do_test->(SOCK_STREAM, 0, 'Inline::C -> MsgHdr stream');
141                 $do_test->(SOCK_SEQPACKET, 0, 'Inline::C -> MsgHdr seqpacket');
142         }
145 SKIP: {
146         require_ok 'PublicInbox::Syscall';
147         $send = PublicInbox::Syscall->can('send_cmd4') or
148                 skip "send_cmd4 not defined for $^O arch", 1;
149         $recv = PublicInbox::Syscall->can('recv_cmd4') or
150                 skip "recv_cmd4 not defined for $^O arch", 1;
151         $do_test->(SOCK_STREAM, 0, 'pure Perl stream');
152         $do_test->(SOCK_SEQPACKET, 0, 'pure Perl seqpacket');
155 done_testing;