taskd.pl: make sure no status messages are skipped
[girocco.git] / taskd / taskd.pl
blob460329738269560d845c3516e0e845330dae07f5
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Based on perlipc example.
30 use strict;
31 use warnings;
33 use Getopt::Long;
34 use Pod::Usage;
35 use Socket;
36 use Errno;
37 use POSIX ":sys_wait_h";
38 use File::Basename;
40 use lib dirname($0);
41 use Girocco::Config;
42 use Girocco::Notify;
43 use Girocco::Project;
44 use Girocco::User;
45 use Girocco::Util qw(noFatalsToBrowser);
46 BEGIN {noFatalsToBrowser}
48 # Options
49 my $quiet;
50 my $progress;
51 my $syslog;
52 my $stderr;
53 my $inetd;
54 my $idle_timeout;
55 my $abbrev = 8;
57 $| = 1;
59 my $progname = basename($0);
60 my $children = 0;
61 my $idlestart = time;
63 sub logmsg {
64 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
65 if (tied *STDOUT) {
66 $OStream::only = 2; # STDERR only
67 print "$hdr@_\n";
68 $OStream::only = 1; # syslog only
69 print "@_\n";
70 $OStream::only = 0; # back to default
71 } else {
72 print "$hdr@_\n";
76 sub statmsg {
77 return unless $progress;
78 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
79 if (tied *STDERR) {
80 $OStream::only = 2; # STDERR only
81 print STDERR "$hdr@_\n";
82 $OStream::only = 1; # syslog only
83 print STDERR "@_\n";
84 $OStream::only = 0; # back to default
85 } else {
86 print STDERR "$hdr@_\n";
90 sub duration {
91 my $secs = shift;
92 return $secs unless defined($secs) && $secs >= 0;
93 $secs = int($secs);
94 my $ans = ($secs % 60) . 's';
95 return $ans if $secs < 60;
96 $secs = int($secs / 60);
97 $ans = ($secs % 60) . 'm' . $ans;
98 return $ans if $secs < 60;
99 $secs = int($secs / 60);
100 $ans = ($secs % 24) . 'h' . $ans;
101 return $ans if $secs < 24;
102 $secs = int($secs / 24);
103 return $secs . 'd' . $ans;
106 sub REAPER {
107 local $!;
108 my $child;
109 my $waitedpid;
110 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
111 $idlestart = time if !--$children;
112 logmsg "reaped $waitedpid" . ($? ? " with exit $?" : '');
114 $SIG{CHLD} = \&REAPER; # loathe sysV
117 $SIG{CHLD} = \&REAPER; # Apollo 440
119 sub spawn {
120 my $coderef = shift;
122 my $pid = fork;
123 if (not defined $pid) {
124 logmsg "cannot fork: $!";
125 return;
126 } elsif ($pid) {
127 $idlestart = time if !++$children;
128 logmsg "begat $pid";
129 return; # I'm the parent
132 $SIG{CHLD} = 'DEFAULT';
134 local *STDIN;
135 open STDIN, "+<&Client" or die "can't dup client to stdin";
136 exit &$coderef();
139 sub clone {
140 my ($name) = @_;
141 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
142 my $proj = Girocco::Project->load($name);
143 $proj or die "failed to load project $name";
144 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
145 $proj->{clone_logged} and die "project $name is already being cloned";
146 statmsg "cloning $name";
147 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
148 open STDERR, ">&STDOUT";
149 open STDIN, '<', '/dev/null';
150 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
153 sub ref_change {
154 my ($arg) = @_;
155 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
156 $username && $name && $oldrev && $newrev && $ref or return 0;
157 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
158 $newrev ne $oldrev or return 0;
160 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
161 my $proj = Girocco::Project->load($name);
162 $proj or die "failed to load project $name";
164 my $user;
165 if ($username && $username !~ /^%.*%$/) {
166 Girocco::User::does_exist($username, 1) or die "no such user: $username";
167 $user = Girocco::User->load($username);
168 $user or die "failed to load user $username";
171 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
172 open STDIN, '<', '/dev/null';
173 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
174 return 0;
177 sub ref_changes {
178 my ($arg) = @_;
179 my ($username, $name) = split(/\s+/, $arg);
180 $username && $name or return 0;
182 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
183 my $proj = Girocco::Project->load($name);
184 $proj or die "failed to load project $name";
186 my $user;
187 if ($username && $username !~ /^%.*%$/) {
188 Girocco::User::does_exist($username, 1) or die "no such user: $username";
189 $user = Girocco::User->load($username);
190 $user or die "failed to load user $username";
193 my @changes = ();
194 while (my $change = <STDIN>) {
195 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
196 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
197 $newrev ne $oldrev or return 0;
198 push(@changes, [$oldrev, $newrev, $ref]);
200 return 0 unless @changes;
201 open STDIN, '<', '/dev/null';
202 foreach my $change (@changes) {
203 my ($oldrev, $newrev, $ref) = @$change;
204 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
205 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
206 sleep 1;
208 return 0;
212 ## -------
213 ## OStream
214 ## -------
217 package OStream;
219 # Set to 1 for only syslog output (if enabled by mode)
220 # Set to 2 for only stderr output (if enabled by mode)
221 our $only = 0; # This is a hack
223 use Carp 'croak';
224 use Sys::Syslog qw(:DEFAULT :macros);
226 sub writeall {
227 use POSIX qw();
228 use Errno;
229 my ($fd, $data) = @_;
230 my $offset = 0;
231 my $remaining = length($data);
232 while ($remaining) {
233 my $bytes = POSIX::write(
234 $fd,
235 substr($data, $offset, $remaining),
236 $remaining);
237 next if !defined($bytes) && $!{EINTR};
238 croak "POSIX::write failed: $!" unless defined $bytes;
239 croak "POSIX::write wrote 0 bytes" unless $bytes;
240 $remaining -= $bytes;
241 $offset += $bytes;
245 sub dumpline {
246 use POSIX qw(STDERR_FILENO);
247 my ($self, $line) = @_;
248 $only = 0 unless defined($only);
249 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
250 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
251 return unless length($line);
252 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
255 sub TIEHANDLE {
256 my $class = shift || 'OStream';
257 my $mode = shift;
258 my $syslogname = shift;
259 my $syslogfacility = shift;
260 defined($syslogfacility) or $syslogfacility = LOG_USER;
261 my $self = {};
262 $self->{'syslog'} = $mode > 0;
263 $self->{'stderr'} = $mode <= 0 || $mode > 1;
264 $self->{'lastline'} = '';
265 if ($self->{'syslog'}) {
266 # Some Sys::Syslog have a stupid default setlogsock order
267 eval {Sys::Syslog::setlogsock("native"); 1;} or
268 eval {Sys::Syslog::setlogsock("unix");};
269 openlog($syslogname, "ndelay,pid", $syslogfacility)
270 or croak "Sys::Syslog::openlog failed: $!";
272 return bless $self, $class;
275 sub BINMODE {return 1}
276 sub FILENO {return undef}
277 sub EOF {return 0}
278 sub CLOSE {return 1}
280 sub PRINTF {
281 my $self = shift;
282 my $template = shift;
283 return $self->PRINT(sprintf $template, @_);
286 sub PRINT {
287 my $self = shift;
288 my $data = join('', $self->{'lastline'}, @_);
289 my $pos = 0;
290 while ((my $idx = index($data, "\n", $pos)) >= 0) {
291 ++$idx;
292 my $line = substr($data, $pos, $idx - $pos);
293 substr($data, $pos, $idx - $pos) = '';
294 $pos = $idx;
295 $self->dumpline($line);
297 $self->{'lastline'} = $data;
298 return 1;
301 sub DESTROY {
302 my $self = shift;
303 $self->dumpline($self->{'lastline'})
304 if length($self->{'lastline'});
305 closelog;
308 sub WRITE {
309 my $self = shift;
310 my ($scalar, $length, $offset) = @_;
311 $scalar = '' if !defined($scalar);
312 $length = length($scalar) if !defined($length);
313 croak "OStream::WRITE invalid length $length"
314 if $length < 0;
315 $offset = 0 if !defined($offset);
316 $offset += length($scalar) if $offset < 0;
317 croak "OStream::WRITE invalid write offset"
318 if $offset < 0 || $offset > $length;
319 my $max = length($scalar) - $offset;
320 $length = $max if $length > $max;
321 $self->PRINT(substr($scalar, $offset, $length));
322 return $length;
326 ## ----
327 ## main
328 ## ----
331 package main;
333 my $sfac;
334 Getopt::Long::Configure('bundling');
335 my $parse_res = GetOptions(
336 'help|?|h' => sub {pod2usage(-verbose => 2, -exitval => 0)},
337 'quiet|q' => \$quiet,
338 'no-quiet' => sub {$quiet = 0},
339 'progress|P' => \$progress,
340 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
341 'idle-timeout|t=i' => \$idle_timeout,
342 'syslog|s:s' => \$sfac,
343 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
344 'stderr' => \$stderr,
345 'abbrev=i' => \$abbrev,
346 ) || pod2usage(2);
347 $syslog = 1 if defined($sfac);
348 $progress = 1 unless $quiet;
349 $abbrev = 128 unless $abbrev > 0;
350 if (defined($idle_timeout)) {
351 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
352 die "--idle-timeout may not be used without --inetd" unless $inetd;
355 if ($syslog) {
356 use Sys::Syslog qw();
357 my $mode = 1;
358 ++$mode if $stderr;
359 $sfac = "user" unless defined($sfac) && $sfac ne "";
360 my $ofac = $sfac;
361 $sfac = uc($sfac);
362 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
363 my $facility;
364 my %badfac = map({("LOG_$_" => 1)}
365 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
366 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
367 die "invalid syslog facility: $ofac"
368 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
369 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
371 if ($quiet) {
372 open STDOUT, '>', '/dev/null';
373 } elsif ($inetd) {
374 *STDOUT = *STDERR;
377 my $NAME;
379 if ($inetd) {
380 open Server, '<&=0' or die "open: $!";
381 my $sockname = getsockname Server;
382 die "getsockname: $!" unless $sockname;
383 die "socket already connected! must be 'wait' socket" if getpeername Server;
384 die "getpeername: $!" unless $!{ENOTCONN};
385 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
386 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
387 my $socktype = unpack('i', $st);
388 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM;
389 die "AF_UNIX socket required" unless sockaddr_family($sockname) == AF_UNIX;
390 $NAME = unpack_sockaddr_un $sockname;
391 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
392 warn "listening on \"$NAME\" but expected \"$expected\"" unless $NAME eq $expected;
393 my $mode = (stat($NAME))[2];
394 die "stat: $!" unless $mode;
395 $mode &= 07777;
396 if (($mode & 0660) != 0660) {
397 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\": $!";
399 } else {
400 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
401 my $uaddr = sockaddr_un($NAME);
403 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
404 unlink($NAME);
405 bind(Server, $uaddr) or die "bind: $!";
406 listen(Server, SOMAXCONN) or die "listen: $!";
407 chmod 0666, $NAME or die "chmod: $!";
410 my $fdset = '';
411 vec($fdset, fileno(Server), 1) = 1;
412 my $nextstatus = time + 60;
413 statmsg "listening on $NAME";
414 while (1) {
415 my ($rout, $eout, $nfound);
416 do {
417 my $wait;
418 my $now = time;
419 if ($now >= $nextstatus) {
420 my $statmsg = "STATUS: $children active";
421 my $idlesecs;
422 $statmsg .= ", idle " . duration($idlesecs)
423 if !$children && ($idlesecs = $now - $idlestart) >= 2;
424 statmsg $statmsg;
426 $nextstatus += 60, $now = time while ($wait = $nextstatus - $now) <= 0;
427 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
428 logmsg "select failed: $!" unless $nfound >= 0 || $!{EINTR};
429 $now = time;
430 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
431 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
432 exit 0;
434 } while $nfound < 1;
435 unless (accept(Client, Server)) {
436 logmsg "accept failed: $!" unless $!{EINTR};
437 next;
439 logmsg "connection on $NAME";
440 spawn sub {
441 my $inp = <STDIN>;
442 chomp $inp if defined($inp);
443 $inp or exit 0; # ignore empty connects
444 my ($cmd, $arg) = $inp =~ /^([a-zA-Z0-9-]+)\s+(.*)$/;
445 if ($cmd eq 'ref-changes') {
446 ref_changes($arg);
447 } elsif ($cmd eq 'clone') {
448 clone($arg);
449 } elsif ($cmd eq 'ref-change') {
450 ref_change($arg);
451 } else {
452 die "unknown command: $cmd";
455 close Client;
456 sleep 1;
460 ## -------------
461 ## Documentation
462 ## -------------
465 __END__
467 =head1 NAME
469 taskd.pl - Perform Girocco service tasks
471 =head1 SYNOPSIS
473 taskd.pl [options]
475 Options:
476 -h | --help detailed instructions
477 -q | --quiet run quietly
478 --no-quiet do not run quietly
479 -P | --progress show occasional status updates
480 -i | --inetd run as inetd unix stream wait service
481 implies --quiet --syslog
482 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
483 requires --inetd
484 -s | --syslog[=facility] send messages to syslog instead of
485 stderr but see --stderr
486 enabled by --inetd
487 --no-syslog do not send message to syslog
488 --stderr always send messages to stderr too
489 --abbrev=n abbreviate hashes to n (default is 8)
491 =head1 OPTIONS
493 =over 8
495 =item B<--help>
497 Print the full description of taskd.pl's options.
499 =item B<--quiet>
501 Suppress non-error messages, e.g. for use when running this task as an inetd
502 service. Enabled by default by --inetd.
504 =item B<--no-quiet>
506 Enable non-error messages. When running in --inetd mode these messages are
507 sent to STDERR instead of STDOUT.
509 =item B<--progress>
511 Show information about the current status of the task operation occasionally.
512 This is automatically enabled if --quiet is not given.
514 =item B<--inetd>
516 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
517 stream socket ready to have accept called on it. To be useful, the unix socket
518 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
519 will be issued if the socket is not in the expected location. Socket file
520 permissions will be adjusted if necessary and if they cannot be taskd.pl will
521 die. The --inetd option also enables the --quiet and --syslog options but
522 --no-quiet and --no-syslog may be used to alter that.
524 The correct specification for the inetd socket is a "unix" protocol "stream"
525 socket in "wait" mode with user and group writable permissions (0660). An
526 attempt will be made to alter the socket's file mode if needed and if that
527 cannot be accomplished taskd.pl will die.
529 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
530 in wait mode and will die if the passed in socket is already connected.
532 Note that while *BSD's inetd happily supports unix sockets (and so does
533 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
534 However, systemd does seem to.
536 =item B<--idle-timeout=SECONDS>
538 Only permitted when running in --inetd mode. After SECONDS of inactivity
539 (i.e. all outstanding tasks have completed and no new requests have come in)
540 exit normally. The default is no timeout at all (a SECONDS value of 0).
541 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
543 =item B<--syslog[=facility]>
545 Normally error output is sent to STDERR. With this option it's sent to
546 syslog instead. Note that when running in --inetd mode non-error output is
547 also affected by this option as it's sent to STDERR in that case. If
548 not specified, the default for facility is LOG_USER. Facility names are
549 case-insensitive and the leading 'LOG_' is optional. Messages are logged
550 with the LOG_NOTICE priority.
552 =item B<--no-syslog>
554 Send error message output to STDERR but not syslog.
556 =item B<--stderr>
558 Always send error message output to STDERR. If --syslog is in effect then
559 a copy will also be sent to syslog. In --inetd mode this applies to non-error
560 messages as well.
562 =item B<--abbrev=n>
564 Abbreviate displayed hash values to only the first n hexadecimal characters.
565 The default is 8 characters. Set to 0 for no abbreviation at all.
567 =back
569 =head1 DESCRIPTION
571 taskd.pl is Girocco's service request servant; it listens for service requests
572 such as new clone requests and ref update notifications and spawns a task to
573 perform the requested action.
575 =cut