lei/store: stop shard workers + cat-file on idle
[public-inbox.git] / t / xap_helper.t
blob0f474608a3e83ccf841d7b8e22c1505ccfc97396
1 #!perl -w
2 # Copyright (C) all contributors <meta@public-inbox.org>
3 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
4 use v5.12;
5 use PublicInbox::TestCommon;
6 require_mods(qw(DBD::SQLite Xapian +SCM_RIGHTS)); # TODO: FIFO support?
7 use PublicInbox::Spawn qw(spawn);
8 use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM);
9 require PublicInbox::AutoReap;
10 use PublicInbox::IPC;
11 require PublicInbox::XapClient;
12 use autodie;
13 my ($tmp, $for_destroy) = tmpdir();
15 my $fi_data = './t/git.fast-import-data';
16 open my $fi_fh, '<', $fi_data;
17 open my $dh, '<', '.';
18 my $crepo = create_coderepo 'for-cindex', sub {
19         my ($d) = @_;
20         xsys_e([qw(git init -q --bare)]);
21         xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
22         chdir($dh);
23         run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), '-g', $d])
24                 or xbail '-cindex internal';
25         run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
26                 "$d/cidx-ext", '-g', $d]) or xbail '-cindex "external"';
28 $dh = $fi_fh = undef;
30 my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
31                         tmpdir => "$tmp/v2", sub {
32         my ($im) = @_;
33         for my $f (qw(t/data/0001.patch t/data/binary.patch
34                         t/data/message_embed.eml
35                         t/solve/0001-simple-mod.patch
36                         t/solve/0002-rename-with-modifications.patch
37                         t/solve/bare.patch)) {
38                 $im->add(eml_load($f)) or BAIL_OUT;
39         }
42 my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
43 my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
44 my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
45 my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
46 is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
47 is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
49 my $doreq = sub {
50         my ($s, @arg) = @_;
51         my $err = ref($arg[-1]) ? pop(@arg) : \*STDERR;
52         pipe(my $x, my $y);
53         my $buf = join("\0", @arg, '');
54         my @fds = (fileno($y), fileno($err));
55         my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
56                 xbail "send: $!";
57         my $exp = length($buf);
58         $exp == $n or xbail "req @arg sent short ($n != $exp)";
59         $x;
62 local $SIG{PIPE} = 'IGNORE';
63 my $env = { PERL5LIB => join(':', @INC) };
64 my $test = sub {
65         my (@cmd) = @_;
66         socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
67         my $pid = spawn(\@cmd, $env, { 0 => $y });
68         my $ar = PublicInbox::AutoReap->new($pid);
69         diag "$cmd[-1] running pid=$pid";
70         close $y;
71         my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
72         my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
73         is($info{has_threadid}, '1', 'has_threadid true for inbox');
74         like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
76         $r = $doreq->($s, qw(test_inspect -d), $int[0]);
77         my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
78         is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
79         is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
81         my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 rt:0..));
82         $r = $doreq->($s, @dump);
83         my @res;
84         while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
85         is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
87         pipe(my $err_rd, my $err_wr);
88         $r = $doreq->($s, @dump, $err_wr);
89         close $err_wr;
90         my $res = do { local $/; <$r> };
91         is(join('', @res), $res, 'got identical response w/ error pipe');
92         my $stats = do { local $/; <$err_rd> };
93         is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
94                 diag "res=$res";
96         return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
98         # test worker management:
99         kill('TERM', $cinfo{pid});
100         my $tries = 0;
101         do {
102                 $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
103                 %info = map { split(/=/, $_, 2) }
104                         split(/ /, do { local $/; <$r> });
105         } while ($info{pid} == $cinfo{pid} && ++$tries < 10);
106         isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
108         my %pids;
109         $tries = 0;
110         my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]);
111         kill('TTIN', $pid);
112         until (scalar(keys %pids) >= 2 || ++$tries > 100) {
113                 tick;
114                 my @r = map { $doreq->(@ins) } (0..100);
115                 for my $fh (@r) {
116                         my $buf = do { local $/; <$fh> } // die "read: $!";
117                         $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
118                 }
119         }
120         is(scalar keys %pids, 2, 'have two pids') or
121                 diag 'pids='.explain(\%pids);
123         kill('TTOU', $pid);
124         %pids = ();
125         my $delay = $tries * 0.11 * ($ENV{VALGRIND} ? 10 : 1);
126         $tries = 0;
127         diag 'waiting '.$delay.'s for SIGTTOU';
128         tick($delay);
129         until (scalar(keys %pids) == 1 || ++$tries > 100) {
130                 %pids = ();
131                 my @r = map { $doreq->(@ins) } (0..100);
132                 for my $fh (@r) {
133                         my $buf = do { local $/; <$fh> } // die "read: $!";
134                         $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
135                 }
136         }
137         is(scalar keys %pids, 1, 'have one pid') or diag explain(\%pids);
138         is($info{pid}, (keys %pids)[0], 'kept oldest PID after TTOU');
140         wantarray ? ($ar, $s) : $ar;
143 my @NO_CXX = (1);
144 unless ($ENV{TEST_XH_CXX_ONLY}) {
145         my $ar = $test->($^X, qw[-w -MPublicInbox::XapHelper -e
146                         PublicInbox::XapHelper::start('-j0')]);
147         ($ar, my $s) = $test->($^X, qw[-w -MPublicInbox::XapHelper -e
148                         PublicInbox::XapHelper::start('-j1')]);
149         no_pollerfd($ar->{pid});
151 SKIP: {
152         my $cmd = eval {
153                 require PublicInbox::XapHelperCxx;
154                 PublicInbox::XapHelperCxx::cmd();
155         };
156         skip "XapHelperCxx build: $@", 1 if $@;
158         @NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
159         my $ar = $test->(@$cmd, '-j0');
160         $ar = $test->(@$cmd, '-j1');
163 require PublicInbox::CodeSearch;
164 my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
165 my $root2id_file = "$tmp/root2id";
166 my @id2root;
168         open my $fh, '>', $root2id_file;
169         my $i = -1;
170         for ($cs_int->all_terms('G')) {
171                 print $fh $_, "\0", ++$i, "\0";
172                 $id2root[$i] = $_;
173         }
174         close $fh;
177 my $ar;
178 for my $n (@NO_CXX) {
179         local $ENV{PI_NO_CXX} = $n;
180         my $xhc = PublicInbox::XapClient::start_helper('-j0');
181         pipe(my $err_r, my $err_w);
183         # git patch-id --stable <t/data/0001.patch | awk '{print $1}'
184         my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
185         my $mid = '20180720072141.GA15957@example';
186         my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
187                                 (map { ('-d', $_) } @ibx_idx),
188                                 9, "mid:$mid");
189         close $err_w;
190         my $res = do { local $/; <$r> };
191         is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
192         my $err = do { local $/; <$err_r> };
193         is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
195         pipe($err_r, $err_w);
196         $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
197                         (map { ('-d', $_) } @int),
198                         $root2id_file, 'dt:19700101'.'000000..');
199         close $err_w;
200         my @res = <$r>;
201         is(scalar(@res), 5, 'got expected rows');
202         is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
203                 'entries match format');
204         $err = do { local $/; <$err_r> };
205         is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
207         $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
208                                 'dfn:lib/PublicInbox/Search.pm');
209         chomp((my $hdr, @res) = readline($r));
210         is $hdr, 'mset.size=1', "got expected header via mset ($xhc->{impl}";
211         is scalar(@res), 1, 'got one result';
212         @res = split /\0/, $res[0];
213         {
214                 my $doc = $v2->search->xdb->get_document($res[0]);
215                 my @q = PublicInbox::Search::xap_terms('Q', $doc);
216                 is_deeply \@q, [ $mid ], 'docid usable';
217         }
218         ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
219         is $res[2], 'XDFID'.$dfid, 'XDFID result matches';
220         is $res[3], 'Q'.$mid, 'Q (msgid) mset result matches';
221         is scalar(@res), 4, 'only 4 columns in result';
223         $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
224                                 'dt:19700101'.'000000..');
225         chomp(($hdr, @res) = readline($r));
226         is $hdr, 'mset.size=6',
227                 "got expected header via multi-result mset ($xhc->{impl}";
228         is(scalar(@res), 6, 'got 6 rows');
229         for my $r (@res) {
230                 my ($docid, $pct, @rest) = split /\0/, $r;
231                 my $doc = $v2->search->xdb->get_document($docid);
232                 ok $pct > 0 && $pct <= 100,
233                         "pct > 0 && <= 100 #$docid ($xhc->{impl})";
234                 my %terms;
235                 for (@rest) {
236                         s/\A([A-Z]+)// or xbail 'no prefix=', \@rest;
237                         push @{$terms{$1}}, $_;
238                 }
239                 while (my ($pfx, $vals) = each %terms) {
240                         @$vals = sort @$vals;
241                         my @q = PublicInbox::Search::xap_terms($pfx, $doc);
242                         is_deeply $vals, \@q,
243                                 "#$docid $pfx as expected ($xhc->{impl})";
244                 }
245         }
246         my $nr;
247         for my $i (7, 8, 39, 40) {
248                 pipe($err_r, $err_w);
249                 $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A),
250                                 "XDFPOST$i", (map { ('-d', $_) } @int),
251                                 $root2id_file, 'dt:19700101'.'000000..');
252                 close $err_w;
253                 @res = <$r>;
254                 my @err = <$err_r>;
255                 if (defined $nr) {
256                         is scalar(@res), $nr,
257                                 "got expected results ($xhc->{impl})";
258                 } else {
259                         $nr //= scalar @res;
260                         ok $nr, "got initial results ($xhc->{impl})";
261                 }
262                 my @oids = (join('', @res) =~ /^([a-f0-9]+) /gms);
263                 is_deeply [grep { length == $i } @oids], \@oids,
264                         "all OIDs match expected length ($xhc->{impl})";
265                 my ($nr_out) = ("@err" =~ /nr_out=(\d+)/);
266                 is $nr_out, scalar(@oids), "output count matches $xhc->{impl}"
267                         or diag explain(\@res, \@err);
268         }
269         pipe($err_r, $err_w);
270         $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFPOST7),
271                         @ibx_shard_args, qw(13 rt:0..));
272         close $err_w;
273         @res = <$r>;
274         my @err = <$err_r>;
275         my ($nr_out) = ("@err" =~ /nr_out=(\d+)/);
276         my @oids = (join('', @res) =~ /^([a-f0-9]{7}) /gms);
277         is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or
278                 diag explain(\@res, \@err);
281 done_testing;