taskd.pl: launder validated ref-change arguments
[girocco.git] / taskd / taskd.pl
blob6592bf8a759a743d37ca2e8b28cb62ab6d84d626
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX qw(:sys_wait_h :fcntl_h);
40 use File::Basename;
41 use File::Spec ();
42 use Cwd qw(realpath);
44 use lib "__BASEDIR__";
45 use Girocco::Config;
46 use Girocco::Notify;
47 use Girocco::Project;
48 use Girocco::User;
49 use Girocco::Util qw(noFatalsToBrowser get_git human_duration);
50 BEGIN {noFatalsToBrowser}
51 use Girocco::ExecUtil;
53 use constant SOCKFDENV => "GIROCCO_TASKD_SOCKET_FD";
55 # Throttle Classes Defaults
56 # Note that any same-named classes in @Girocco::Config::throttle_classes
57 # will override (completely replacing the entire hash) these ones.
58 my @throttle_defaults = (
60 name => "ref-change",
61 maxproc => 0,
62 maxjobs => 1,
63 interval => 2
66 name => "clone",
67 maxproc => 0,
68 maxjobs => 2,
69 interval => 5
72 name => "snapshot",
73 #maxproc => max(5, cpucount + maxjobs), # this is the default
74 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
75 interval => 5
79 # Options
80 my $quiet;
81 my $progress;
82 my $syslog;
83 my $stderr;
84 my $inetd;
85 my $idle_timeout;
86 my $daemon;
87 my $max_lifetime;
88 my $abbrev = 8;
89 my $showff = 1;
90 my $same_pid;
91 my $statusintv = 60;
92 my $idleintv = 3600;
93 my $maxspawn = 8;
95 $| = 1;
97 my $progname = basename($0);
98 my $children = 0;
99 my $idlestart = time;
100 my $idlestatus = 0;
102 sub cpucount {
103 use Girocco::Util "online_cpus";
104 our $online_cpus_result;
105 $online_cpus_result = online_cpus unless $online_cpus_result;
106 return $online_cpus_result;
109 sub logmsg {
110 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
111 if (tied *STDOUT) {
112 $OStream::only = 2; # STDERR only
113 print "$hdr@_\n";
114 $OStream::only = 1; # syslog only
115 print "@_\n";
116 $OStream::only = 0; # back to default
117 } else {
118 print "$hdr@_\n";
122 sub statmsg {
123 return unless $progress;
124 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
125 if (tied *STDERR) {
126 $OStream::only = 2; # STDERR only
127 print STDERR "$hdr@_\n";
128 $OStream::only = 1; # syslog only
129 print STDERR "@_\n";
130 $OStream::only = 0; # back to default
131 } else {
132 print STDERR "$hdr@_\n";
136 sub isfdopen {
137 my $fd = shift;
138 return undef unless defined($fd) && $fd >= 0;
139 my $result = POSIX::dup($fd);
140 POSIX::close($result) if defined($result);
141 defined($result);
144 sub setnoncloexec {
145 my $fd = shift;
146 fcntl($fd, F_SETFD, 0) or die "fcntl failed: $!";
149 sub setcloexec {
150 my $fd = shift;
151 fcntl($fd, F_SETFD, FD_CLOEXEC) or die "fcntl failed: $!";
154 sub setnonblock {
155 my $fd = shift;
156 my $flags = fcntl($fd, F_GETFL, 0);
157 defined($flags) or die "fcntl failed: $!";
158 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
161 sub setblock {
162 my $fd = shift;
163 my $flags = fcntl($fd, F_GETFL, 0);
164 defined($flags) or die "fcntl failed: $!";
165 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
168 package Throttle;
171 ## Throttle protocol
173 ## 1) Process needing throttle services acquire a control file descriptor
174 ## a) Either as a result of a fork + exec (the write end of a pipe)
175 ## b) Or by connecting to the taskd socket (not yet implemented)
177 ## 2) The process requesting throttle services will be referred to
178 ## as the supplicant or just "supp" for short.
180 ## 3) The supp first completes any needed setup which may include
181 ## gathering data it needs to perform the action -- if that fails
182 ## then there's no need for any throttling.
184 ## 4) The supp writes a throttle request to the control descriptor in
185 ## this format:
186 ## throttle <pid> <class>\n
187 ## for example if the supp's pid was 1234 and it was requesting throttle
188 ## control as a member of the mail class it would write this message:
189 ## throttle 1234 mail\n
190 ## Note that if the control descriptor happens to be a pipe rather than a
191 ## socket, the message should be preceded by another "\n" just be be safe.
192 ## If the control descriptor is a socket, not a pipe, the message may be
193 ## preceded by a "\n" but that's not recommended.
195 ## 5) For supplicants with a control descriptor that is a pipe
196 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
197 ## If the control descriptor is a socket (getsockname succeeds) then
198 ## protocol (5b) should be used.
200 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
201 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
202 ## to write a "keepalive\n" message to the control descriptor. If that
203 ## fails, the controller has gone away and it may make its own decision
204 ## whether or not to proceed at that point. If, on the other hand, it
205 ## receives a SIGTERM, the process limit for its class has been reached
206 ## and it should abort without performing its action. If it receives
207 ## SIGUSR1, it may proceed without writing anything more to the control
208 ## descriptor, any MAY even close the control descriptor. Finally, a
209 ## SIGUSR2 indicates rejection of the throttle request for some other reason
210 ## such as unrecognized class name or invalid pid in which case the supp may
211 ## make its own decision how to proceed.
213 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
214 ## more than 512 bytes and if a '\n' does not appear within that number of
215 ## bytes the read should be considered failed. Otherwise the read should
216 ## be retried until either a full line has been read or the socket is
217 ## closed from the other end. If the lone read is "proceed\n" then it may
218 ## proceed without reading or writing anything more to the control
219 ## descriptor, but MUST keep the control descriptor open and not call
220 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
221 ## should be retried) constitutes failure. If a full line starting with at
222 ## least one alpha character was read but it was not "proceed" then it
223 ## should abort without performing its action. For any other failure it
224 ## may make its own decision whether or not to proceed as the controller has
225 ## gone away.
227 ## 6) The supp now performs its throttled action.
229 ## 7) The supp now closes its control descriptor (if it hasn't already in the
230 ## case of (5a)) and exits -- in the case of a socket, the other end receives
231 ## notification that the socket has been closed (read EOF). In the case of
232 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
233 ## of the other end of the pipe, so it will not reaach EOF by the supp's
234 ## exit in that case).
237 # keys are class names, values are hash refs with these fields:
238 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
239 # many may be queued waiting plus how many may be
240 # concurrently active) with 0 meaning no limit.
241 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
242 # of 0 is unlimited but the number of concurrent
243 # supplicants will always be limited to no more than
244 # the 'maxproc' value (if > 0) no matter what the
245 # 'maxjobs' value is.
246 # 'total' -> integer; the total number of pids belonging to this class that
247 # can currently be found in %pid.
248 # 'active' -> integer; the number of currently active supplicants which should
249 # be the same as (the number of elements of %pid with a
250 # matching class name) - (number of my class in @queue).
251 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
252 # or SIGUSR1 signals to members of this class.
253 # 'lastqueue' -> time; last time a supplicant was successfully queued.
254 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
255 # 'lastthrottle' => time; last time a supplicant was throttled
256 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
257 my %classes = ();
259 # keys are pid numbers, values are array refs with these elements:
260 # [0] => name of class (key to classes hash)
261 # [1] => supplicant state (0 => queued, non-zero => time it started running)
262 # [2] => descriptive text (e.g. project name)
263 my %pid = ();
265 # minimum number of seconds between any two proceed responses no matter what
266 # class. this takes priority in that it can effectively increase the
267 # class's 'interval' value by delaying proceed notifications if the minimum
268 # interval has not yet elapsed.
269 my $interval = 1;
271 # fifo of pids awaiting notification as soon as the next $interval elapses
272 # provided interval and maxjobs requirements are satisfied
273 # for the class of the pid that will next be triggered.
274 my @queue = ();
276 # time of most recent successful call to AddSupplicant
277 my $lastqueue = 0;
279 # time of most recent proceed notification
280 my $lastproceed = 0;
282 # time of most recent throttle
283 my $lastthrottle = 0;
285 # time of most recent removal
286 my $lastdied = 0;
288 # lifetime count of how many have been queued
289 my $totalqueue = 0;
291 # lifetime count of how many have been allowed to proceed
292 my $totalproceed = 0;
294 # lifetime count of how many have been throttled
295 my $totalthrottle = 0;
297 # lifetime count of how many have died
298 # It should always be true that $totalqueued - $totaldied == $curentlyactive
299 my $totaldied = 0;
301 # Returns an unordered list of currently registered class names
302 sub GetClassList {
303 return keys(%classes);
306 sub _max {
307 return $_[0] if $_[0] >= $_[1];
308 return $_[1];
311 sub _getnum {
312 my ($min, $val, $default) = @_;
313 my $ans;
314 if (defined($val) && $val =~ /^[+-]?\d+$/) {
315 $ans = 0 + $val;
316 } else {
317 $ans = &$default;
319 return _max($min, $ans);
322 # [0] => name of class to find
323 # [1] => if true, create class if it doesn't exist, if a hashref then
324 # it contains initial values for maxproc, maxjobs and interval.
325 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
326 # defaults to the max(5, number of cpu cores + maxjobs) and interval
327 # defaults to 1.
328 # Returns a hash ref with info about the class on success
329 sub GetClassInfo {
330 my ($classname, $init) = @_;
331 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
332 or return;
333 $classname = lc($classname);
334 my %info;
335 if ($classes{$classname}) {
336 %info = %{$classes{$classname}};
337 return \%info;
339 return unless $init;
340 my %newclass = ();
341 ref($init) eq 'HASH' or $init = {};
342 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
343 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
344 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
345 $newclass{'total'} = 0;
346 $newclass{'active'} = 0;
347 $newclass{'lastqueue'} = 0;
348 $newclass{'lastproceed'} = 0;
349 $newclass{'lastthrottle'} = 0;
350 $newclass{'lastdied'} = 0;
351 $classes{$classname} = \%newclass;
352 %info = %newclass;
353 return \%info;
356 # [0] => pid to look up
357 # Returns () if not found otherwise ($classname, $timestarted, $description)
358 # Where $timestarted will be 0 if it's still queued otherwise a time() value
359 sub GetPidInfo {
360 my $pid = shift;
361 return () unless exists $pid{$pid};
362 return @{$pid{$pid}};
365 # Returns array of pid numbers that are currently running sorted
366 # by time started (oldest to newest). Can return an empty array.
367 sub GetRunningPids {
368 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
369 grep({ ${$pid{$_}}[1] } keys(%pid)));
372 # Returns a hash with various about the current state
373 # 'interval' => global minimum interval between proceeds
374 # 'active' => how many pids are currently queued + how many are running
375 # 'queue' => how many pids are currently queued
376 # 'lastqueue' => time (epoch seconds) of last queue
377 # 'lastproceed' => time (epoch seconds) of last proceed
378 # 'lastthrottle' => time (epoch seconds) of last throttle
379 # 'lastdied' => time (epoch seconds) of last removal
380 # 'totalqueue' => lifetime total number of processes queued
381 # 'totalproceed' => lifetime total number of processes proceeded
382 # 'totalthrottle' => lifetime total number of processes throttled
383 # 'totaldied' => lifetime total number of removed processes
384 sub GetInfo {
385 return {
386 interval => $interval,
387 active => scalar(keys(%pid)) - scalar(@queue),
388 queue => scalar(@queue),
389 lastqueue => $lastqueue,
390 lastproceed => $lastproceed,
391 lastthrottle => $lastthrottle,
392 lastdied => $lastdied,
393 totalqueue => $totalqueue,
394 totalproceed => $totalproceed,
395 totalthrottle => $totalthrottle,
396 totaldied => $totaldied
400 # with no args get the global interval
401 # with one arg set it, returns previous value if set
402 sub Interval {
403 my $ans = $interval;
404 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
405 return $ans;
408 sub RemoveSupplicant;
410 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
411 # Returns minimum interval until next proceed is possible
412 # Returns undef if there's nothing waiting to proceed or
413 # the 'maxjobs' limits have been reached for all queued items (in which
414 # case it won't be possible to proceed until one of them exits, hence undef)
415 # This is called automatially by AddSupplicant and RemoveSupplicant
416 sub ServiceQueue {
417 RETRY:
418 return undef unless @queue; # if there's nothing queued, nothing to do
419 my $now = time;
420 my $min = _max(0, $interval - ($now - $lastproceed));
421 my $classmin = undef;
422 my $classchecked = 0;
423 my %seenclass = ();
424 my $classcount = scalar(keys(%classes));
425 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
426 my $pid = $queue[$i];
427 my $procinfo = $pid{$pid};
428 if (!$procinfo) {
429 RemoveSupplicant($pid, 1);
430 goto RETRY;
432 my $classinfo = $classes{$$procinfo[0]};
433 if (!$classinfo) {
434 RemoveSupplicant($pid, 1);
435 goto RETRY;
437 if (!$seenclass{$$procinfo[0]}) {
438 $seenclass{$$procinfo[0]} = 1;
439 ++$classchecked;
440 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
441 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
442 if (!$cmin && !$min) {
443 $now = time;
444 $$procinfo[1] = $now;
445 splice(@queue, $i, 1);
446 ++$totalproceed;
447 $lastproceed = $now;
448 $classinfo->{'lastproceed'} = $now;
449 ++$classinfo->{'active'};
450 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
451 goto RETRY;
453 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
457 return defined($classmin) ? _max($min, $classmin) : undef;
460 # $1 => pid to add (must not already be in %pids)
461 # $2 => class name (must exist)
462 # Returns -1 if no such class or pid already present or invalid
463 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
464 # Return 1 if throttled and cannot be added
465 sub AddSupplicant {
466 my ($pid, $classname, $text, $noservice) = @_;
467 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
468 $pid += 0;
469 kill(0, $pid) or return -1;
470 my $classinfo = $classes{$classname};
471 return -1 unless $classinfo;
472 return -1 if $pid{$pid};
473 $text = '' unless defined($text);
474 my $now = time;
475 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
476 ++$totalthrottle;
477 $lastthrottle = $now;
478 $classinfo->{'lastthrottle'} = $now;
479 return 1;
481 ++$totalqueue;
482 $lastqueue = $now;
483 $pid{$pid} = [$classname, 0, $text];
484 ++$classinfo->{'total'};
485 $classinfo->{'lastqueue'} = $now;
486 push(@queue, $pid);
487 ServiceQueue unless $noservice;
488 return 0;
491 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
492 # Returns 0 if removed
493 # Returns -1 if unknown pid or other error during removal
494 sub RemoveSupplicant {
495 my ($pid, $noservice) = @_;
496 return -1 unless defined($pid) && $pid =~ /^\d+$/;
497 $pid += 0;
498 my $pidinfo = $pid{$pid};
499 $pidinfo or return -1;
500 my $now = time;
501 $lastdied = $now;
502 ++$totaldied;
503 delete $pid{$pid};
504 if (!$$pidinfo[1]) {
505 for (my $i=0; $i<=$#queue; ++$i) {
506 if ($queue[$i] == $pid) {
507 splice(@queue, $i, 1);
508 --$i;
512 my $classinfo = $classes{$$pidinfo[0]};
513 ServiceQueue, return -1 unless $classinfo;
514 --$classinfo->{'active'} if $$pidinfo[1];
515 --$classinfo->{'total'};
516 $classinfo->{'lastdied'} = $now;
517 ServiceQueue unless $noservice;
518 return 0;
521 # Instance Methods
523 package main;
526 ## ---------
527 ## Functions
528 ## ---------
531 my @reapedpids = ();
532 my %signame = (
533 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
534 1 => 'SIGHUP',
535 2 => 'SIGINT',
536 3 => 'SIGQUIT',
537 6 => 'SIGABRT',
538 9 => 'SIGKILL',
539 14 => 'SIGALRM',
540 15 => 'SIGTERM',
542 sub REAPER {
543 local $!;
544 my $child;
545 my $waitedpid;
546 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
547 my $code = $? & 0xffff;
548 $idlestart = time if !--$children;
549 my $codemsg = '';
550 if (!($code & 0xff)) {
551 $codemsg = " with exit code ".($code >> 8) if $code;
552 } elsif ($code & 0x7f) {
553 my $signum = ($code & 0x7f);
554 $codemsg = " with signal ".
555 ($signame{$signum}?$signame{$signum}:$signum);
557 logmsg "reaped $waitedpid$codemsg";
558 push(@reapedpids, $waitedpid);
560 $SIG{CHLD} = \&REAPER; # loathe sysV
563 sub set_sigchld_reaper() {
564 $SIG{CHLD} = \&REAPER; # Apollo 440
567 my ($piperead, $pipewrite);
568 sub spawn {
569 my $coderef = shift;
571 my $pid = fork;
572 if (not defined $pid) {
573 logmsg "cannot fork: $!";
574 return;
575 } elsif ($pid) {
576 $idlestart = time if !++$children;
577 $idlestatus = 0;
578 logmsg "begat $pid";
579 return; # I'm the parent
582 close(Server) unless fileno(Server) == 0;
583 close($piperead);
584 $SIG{'CHLD'} = sub {};
586 open STDIN, "+<&Client" or die "can't dup client to stdin";
587 close(Client);
588 exit &$coderef();
591 # returns:
592 # < 0: error
593 # = 0: proceed
594 # > 0: throttled
595 sub request_throttle {
596 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
597 my $classname = shift;
598 my $text = shift;
600 Throttle::GetClassInfo($classname)
601 or return -1; # no such throttle class
603 my $throttled = 0;
604 my $proceed = 0;
605 my $error = 0;
606 my $controldead = 0;
607 my $setempty = POSIX::SigSet->new;
608 my $setfull = POSIX::SigSet->new;
609 $setempty->emptyset();
610 $setfull->fillset();
611 $SIG{'TERM'} = sub {$throttled = 1};
612 $SIG{'USR1'} = sub {$proceed = 1};
613 $SIG{'USR2'} = sub {$error = 1};
614 $SIG{'PIPE'} = sub {$controldead = 1};
615 $SIG{'ALRM'} = sub {};
617 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
618 print $pipewrite "\nthrottle $$ $classname $text\n";
619 my $old = POSIX::SigSet->new;
620 sigprocmask(SIG_SETMASK, $setfull, $old);
621 until ($controldead || $throttled || $proceed || $error) {
622 alarm(30);
623 sigsuspend($setempty);
624 alarm(0);
625 sigprocmask(SIG_SETMASK, $setempty, $old);
626 print $pipewrite "\nkeepalive $$\n";
627 sigprocmask(SIG_SETMASK, $setfull, $old);
629 sigprocmask(SIG_SETMASK, $setempty, $old);
630 $SIG{'TERM'} = "DEFAULT";
631 $SIG{'USR1'} = "DEFAULT";
632 $SIG{'USR2'} = "DEFAULT";
633 $SIG{'ALRM'} = "DEFAULT";
634 $SIG{'PIPE'} = "DEFAULT";
636 my $result = -1;
637 if ($throttled) {
638 $result = 1;
639 } elsif ($proceed) {
640 $result = 0;
642 return $result;
645 sub clone {
646 my ($name) = @_;
647 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
648 my $proj;
649 eval {$proj = Girocco::Project->load($name)};
650 if (!$proj && Girocco::Project::does_exist($name, 1)) {
651 # If the .clone_in_progress file exists, but the .clonelog does not
652 # and neither does the .clone_failed, be helpful and touch the
653 # .clone_failed file so that the mirror can be restarted
654 my $projdir = $Girocco::Config::reporoot."/$name.git";
655 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
656 open X, '>', "$projdir/.clone_failed" and close(X);
659 $proj or die "failed to load project $name";
660 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
661 $proj->{clone_logged} and die "project $name is already being cloned";
662 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
663 statmsg "cloning $name";
664 my $devnullfd = POSIX::open(File::Spec->devnull, O_RDWR);
665 defined($devnullfd) && $devnullfd >= 0 or die "cannot open /dev/null: $!";
666 POSIX::dup2($devnullfd, 0) or
667 die "cannot dup2 STDIN_FILENO: $!";
668 POSIX::close($devnullfd);
669 my $duperr;
670 open $duperr, '>&2' or
671 die "cannot dup STDERR_FILENO: $!";
672 my $clonelogfd = POSIX::open("$Girocco::Config::reporoot/$name.git/.clonelog", O_WRONLY|O_TRUNC|O_CREAT, 0664);
673 defined($clonelogfd) && $clonelogfd >= 0 or die "cannot open clonelog for writing: $!";
674 POSIX::dup2($clonelogfd, 1) or
675 die "cannot dup2 STDOUT_FILENO: $!";
676 POSIX::dup2($clonelogfd, 2) or
677 POSIX::dup2(fileno($duperr), 2), die "cannot dup2 STDERR_FILENO: $!";
678 POSIX::close($clonelogfd);
679 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or
680 POSIX::dup2(fileno($duperr), 2), die "exec failed: $!";
683 sub ref_indicator {
684 return ' -> ' unless $showff && defined($_[0]);
685 my ($git_dir, $old, $new) = @_;
686 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
687 # In many cases `git merge-base` is slower than this even if using the
688 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
689 my $ans = get_git("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ? '...' : '..';
690 return wantarray ? ($ans, 1) : $ans;
693 sub ref_change {
694 my ($arg) = @_;
695 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
696 $username && $name && $oldrev && $newrev && $ref or return 0;
697 $oldrev =~ /^([0-9a-f]{40,})$/ or return 0; $oldrev = $1;
698 $newrev =~ /^([0-9a-f]{40,})$/ or return 0; $newrev = $1;
699 $ref =~ m{^(refs/.+)$} or return 0; $ref = $1;
700 $newrev ne $oldrev or return 0;
701 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or return 0;
703 Girocco::Project::does_exist(\$name, 1) or die "no such project: $name";
704 my $proj = Girocco::Project->load($name);
705 $proj or die "failed to load project $name";
706 my $has_notify = $proj->has_notify;
707 my $type = $has_notify ? "notify" : "change";
709 my $user;
710 if ($username && $username !~ /^%.*%$/) {
711 Girocco::User::does_exist($username, 1) or die "no such user: $username";
712 $user = Girocco::User->load($username);
713 $user or die "failed to load user $username";
714 } elsif ($username eq "%$name%") {
715 $username = "-";
718 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
719 my $ind = ref_indicator($proj->{path}, $oldrev, $newrev);
720 statmsg "ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
721 open STDIN, '<', File::Spec->devnull;
722 Girocco::Notify::ref_changes($proj, $user, [$oldrev, $newrev, $ref]) if $has_notify;
723 return 0;
726 sub ref_changes {
727 my ($arg) = @_;
728 my ($username, $name) = split(/\s+/, $arg);
729 $username && $name or return 0;
731 Girocco::Project::does_exist(\$name, 1) or die "no such project: $name";
732 my $proj = Girocco::Project->load($name);
733 $proj or die "failed to load project $name";
734 my $has_notify = $proj->has_notify;
735 my $type = $has_notify ? "notify" : "change";
737 my $user;
738 if ($username && $username !~ /^%.*%$/) {
739 Girocco::User::does_exist($username, 1) or die "no such user: $username";
740 $user = Girocco::User->load($username);
741 $user or die "failed to load user $username";
742 } elsif ($username eq "%$name%") {
743 $username = "-";
746 my @changes = ();
747 my %oldheads = ();
748 my %deletedheads = ();
749 while (my $change = <STDIN>) {
750 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
751 $oldrev ne "done" or last;
752 $oldrev =~ /^([0-9a-f]{40,})$/ or next; $oldrev = $1;
753 $newrev =~ /^([0-9a-f]{40,})$/ or next; $newrev = $1;
754 $ref =~ m{^(refs/.+)$} or next; $ref = $1;
755 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or next;
756 if ($ref =~ m{^refs/heads/.}) {
757 if ($oldrev =~ /^0{40,}$/) {
758 delete $oldheads{$ref};
759 $deletedheads{$ref} = 1;
760 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
761 $oldheads{$ref} = $oldrev;
764 $newrev ne $oldrev or next;
765 push(@changes, [$oldrev, $newrev, $ref]);
767 return 0 unless @changes;
768 open STDIN, '<', File::Spec->devnull;
769 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
770 my $statproc = sub {
771 my ($old, $new, $ref, $ran_mail_sh) = @_;
772 my ($ind, $ran_git) = ref_indicator($proj->{path}, $old, $new);
773 statmsg "ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
774 if ($ran_mail_sh) {
775 sleep 2;
776 } elsif ($ran_git) {
777 sleep 1;
780 if ($has_notify) {
781 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
782 } else {
783 &$statproc(@$_) foreach @changes;
785 return 0;
788 sub throttle {
789 my ($arg) = @_;
790 my ($pid, $classname, $text) = split(/\s+/, $arg);
791 $pid =~ /^\d+/ or return 0; # invalid pid
792 $pid += 0;
793 $pid > 0 or return 0; # invalid pid
794 kill(0, $pid) || $!{EPERM} or return 0; # no such process
795 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
796 defined($text) && $text ne '' or return 0; # no text no service
798 my $throttled = 0;
799 my $proceed = 0;
800 my $error = 0;
801 my $controldead = 0;
802 my $suppdead = 0;
803 my ($waker, $wakew);
804 pipe($waker, $wakew) or die "pipe failed: $!";
805 select((select($wakew),$|=1)[0]);
806 setnonblock($wakew);
807 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
808 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
809 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
810 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
811 select((select(STDIN),$|=1)[0]);
813 logmsg "throttle $pid $classname $text request";
814 # After writing we can expect a SIGTERM or SIGUSR1
815 print $pipewrite "\nthrottle $$ $classname $text\n";
817 # NOTE: the only way to detect the socket close is to read all the
818 # data until EOF is reached -- recv can be used to peek.
819 my $v = '';
820 vec($v, fileno(STDIN), 1) = 1;
821 vec($v, fileno($waker), 1) = 1;
822 setnonblock(\*STDIN);
823 setnonblock($waker);
824 until ($controldead || $throttled || $proceed || $error || $suppdead) {
825 my ($r, $e);
826 select($r=$v, undef, $e=$v, 30);
827 my ($bytes, $discard);
828 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
829 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
830 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
831 print $pipewrite "\nkeepalive $$\n";
833 setblock(\*STDIN);
835 if ($throttled && !$suppdead) {
836 print STDIN "throttled\n";
837 logmsg "throttle $pid $classname $text throttled";
838 } elsif ($proceed && !$suppdead) {
839 print STDIN "proceed\n";
840 logmsg "throttle $pid $classname $text proceed";
841 $SIG{'TERM'} = 'DEFAULT';
842 # Stay alive until the child dies which we detect by EOF on STDIN
843 setnonblock(\*STDIN);
844 until ($controldead || $suppdead) {
845 my ($r, $e);
846 select($r=$v, undef, $e=$v, 30);
847 my ($bytes, $discard);
848 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
849 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
850 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
851 print $pipewrite "\nkeepalive $$\n";
853 setblock(\*STDIN);
854 } else {
855 my $prefix = '';
856 $prefix = "control" if $controldead && !$suppdead;
857 logmsg "throttle $pid $classname $text ${prefix}died";
859 exit 0;
862 sub process_pipe_msg {
863 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
864 if ($act eq "throttle") {
865 $pid =~ /^(\d+)$/ or return 0;
866 $pid = 0 + $1;
867 $pid > 0 or return 0; # invalid pid
868 kill(0, $pid) or return 0; # invalid pid
869 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
870 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
871 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
872 # the AddSupplicant call could send SIGUSR1 before it returns
873 my $result = Throttle::AddSupplicant($pid, $cls, $text);
874 kill('USR2', $pid), return 0 if $result < 0;
875 kill('TERM', $pid), return 0 if $result > 0;
876 # $pid was added to class $cls and will receive SIGUSR1 when
877 # it's time for it to proceed
878 return 0;
879 } elsif ($act eq "keepalive") {
880 # nothing to do although we could verify pid is valid and
881 # still in %Throttle::pids and send a SIGUSR2 if not, but
882 # really keepalive should just be ignored.
883 return 0;
885 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
886 return 0;
890 ## -------
891 ## OStream
892 ## -------
895 package OStream;
897 # Set to 1 for only syslog output (if enabled by mode)
898 # Set to 2 for only stderr output (if enabled by mode)
899 our $only = 0; # This is a hack
901 use Carp 'croak';
902 use Sys::Syslog qw(:DEFAULT :macros);
904 sub writeall {
905 my ($fd, $data) = @_;
906 my $offset = 0;
907 my $remaining = length($data);
908 while ($remaining) {
909 my $bytes = POSIX::write(
910 $fd,
911 substr($data, $offset, $remaining),
912 $remaining);
913 next if !defined($bytes) && $!{EINTR};
914 croak "POSIX::write failed: $!" unless defined $bytes;
915 croak "POSIX::write wrote 0 bytes" unless $bytes;
916 $remaining -= $bytes;
917 $offset += $bytes;
921 sub dumpline {
922 use POSIX qw(STDERR_FILENO);
923 my ($self, $line) = @_;
924 $only = 0 unless defined($only);
925 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
926 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
927 return unless length($line);
928 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
931 sub TIEHANDLE {
932 my $class = shift || 'OStream';
933 my $mode = shift;
934 my $syslogname = shift;
935 my $syslogfacility = shift;
936 defined($syslogfacility) or $syslogfacility = LOG_USER;
937 my $self = {};
938 $self->{'syslog'} = $mode > 0;
939 $self->{'stderr'} = $mode <= 0 || $mode > 1;
940 $self->{'lastline'} = '';
941 if ($self->{'syslog'}) {
942 # Some Sys::Syslog have a stupid default setlogsock order
943 eval {Sys::Syslog::setlogsock("native"); 1;} or
944 eval {Sys::Syslog::setlogsock("unix");};
945 openlog($syslogname, "ndelay,pid", $syslogfacility)
946 or croak "Sys::Syslog::openlog failed: $!";
948 return bless $self, $class;
951 sub BINMODE {return 1}
952 sub FILENO {return undef}
953 sub EOF {return 0}
954 sub CLOSE {return 1}
956 sub PRINTF {
957 my $self = shift;
958 my $template = shift;
959 return $self->PRINT(sprintf $template, @_);
962 sub PRINT {
963 my $self = shift;
964 my $data = join('', $self->{'lastline'}, @_);
965 my $pos = 0;
966 while ((my $idx = index($data, "\n", $pos)) >= 0) {
967 ++$idx;
968 my $line = substr($data, $pos, $idx - $pos);
969 substr($data, $pos, $idx - $pos) = '';
970 $pos = $idx;
971 $self->dumpline($line);
973 $self->{'lastline'} = $data;
974 return 1;
977 sub DESTROY {
978 my $self = shift;
979 $self->dumpline($self->{'lastline'})
980 if length($self->{'lastline'});
981 closelog;
984 sub WRITE {
985 my $self = shift;
986 my ($scalar, $length, $offset) = @_;
987 $scalar = '' if !defined($scalar);
988 $length = length($scalar) if !defined($length);
989 croak "OStream::WRITE invalid length $length"
990 if $length < 0;
991 $offset = 0 if !defined($offset);
992 $offset += length($scalar) if $offset < 0;
993 croak "OStream::WRITE invalid write offset"
994 if $offset < 0 || $offset > $length;
995 my $max = length($scalar) - $offset;
996 $length = $max if $length > $max;
997 $self->PRINT(substr($scalar, $offset, $length));
998 return $length;
1002 ## ----
1003 ## main
1004 ## ----
1007 package main;
1009 # returns pid of process that will schedule jobd.pl restart on success
1010 # returns 0 if fork or other system call failed with error in $!
1011 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
1012 sub schedule_jobd_restart {
1013 use POSIX qw(_exit setpgid dup2 :fcntl_h);
1014 my $devnull = File::Spec->devnull;
1015 my $newpg = shift;
1016 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
1017 return undef unless -f $jdlf;
1018 my $oldsigchld = $SIG{'CHLD'};
1019 defined($oldsigchld) or $oldsigchld = sub {};
1020 my ($read, $write, $read2, $write2);
1021 pipe($read, $write) or return 0;
1022 select((select($write),$|=1)[0]);
1023 if (!pipe($read2, $write2)) {
1024 local $!;
1025 close $write;
1026 close $read;
1027 return 0;
1029 select((select($write2),$|=1)[0]);
1030 $SIG{'CHLD'} = sub {};
1031 my $retries = 3;
1032 my $child;
1033 while (!defined($child) && $retries--) {
1034 $child = fork;
1035 sleep 1 unless defined($child) || !$retries;
1037 if (!defined($child)) {
1038 local $!;
1039 close $write2;
1040 close $read2;
1041 close $write;
1042 close $read;
1043 $SIG{'CHLD'} = $oldsigchld;
1044 return 0;
1046 # double fork the child
1047 if (!$child) {
1048 close $read2;
1049 my $retries2 = 3;
1050 my $child2;
1051 while (!defined($child2) && $retries2--) {
1052 $child2 = fork;
1053 sleep 1 unless defined($child2) || !$retries2;
1055 if (!defined($child2)) {
1056 my $ec = 0 + $!;
1057 $ec = 255 unless $ec;
1058 print $write2 ":$ec";
1059 close $write2;
1060 _exit 127;
1062 if ($child2) {
1063 # pass new child pid up to parent and exit
1064 print $write2 $child2;
1065 close $write2;
1066 _exit 0;
1067 } else {
1068 # this is the grandchild
1069 close $write2;
1071 } else {
1072 close $write2;
1073 my $result = <$read2>;
1074 close $read2;
1075 chomp $result if defined($result);
1076 if (!defined($result) || $result !~ /^:?\d+$/) {
1077 # something's wrong with the child -- kill it
1078 kill(9, $child) && waitpid($child, 0);
1079 my $oldsigpipe = $SIG{'PIPE'};
1080 # make sure the grandchild, if any,
1081 # doesn't run the success proc
1082 $SIG{'PIPE'} = sub {};
1083 print $write 1;
1084 close $write;
1085 close $read;
1086 $SIG{'PIPE'} = defined($oldsigpipe) ?
1087 $oldsigpipe : 'DEFAULT';
1088 $! = 255;
1089 $SIG{'CHLD'} = $oldsigchld;
1090 return 0;
1092 if ($result =~ /^:(\d+)$/) {
1093 # fork failed in child, there is no grandchild
1094 my $ec = $1;
1095 waitpid($child, 0);
1096 close $write;
1097 close $read;
1098 $! = $ec;
1099 $SIG{'CHLD'} = $oldsigchld;
1100 return 0;
1102 # reap the child and set $child to grandchild's pid
1103 waitpid($child, 0);
1104 $child = $result;
1106 if (!$child) {
1107 # grandchild that actually initiates the jobd.pl restart
1108 close $write;
1109 my $wait = 5;
1110 my $ufd = POSIX::open($devnull, O_RDWR);
1111 if (defined($ufd)) {
1112 dup2($ufd, 0) unless $ufd == 0;
1113 dup2($ufd, 1) unless $ufd == 1;
1114 dup2($ufd, 2) unless $ufd == 2;
1115 POSIX::close($ufd) unless $ufd == 0 || $ufd == 1 || $ufd == 2;
1117 chdir "/";
1118 if ($newpg) {
1119 my $makepg = sub {
1120 my $result = setpgid(0, 0);
1121 if (!defined($result)) {
1122 --$wait;
1123 sleep 1;
1125 $result;
1127 my $result = &$makepg;
1128 defined($result) or $result = &$makepg;
1129 defined($result) or $result = &$makepg;
1130 defined($result) or $result = &$makepg;
1132 sleep $wait;
1133 my $result = <$read>;
1134 close $read;
1135 chomp $result if defined($result);
1136 if (!defined($result) || $result eq 0) {
1137 open JDLF, '+<', $jdlf or _exit(1);
1138 select((select(JDLF),$|=1)[0]);
1139 print JDLF "restart\n";
1140 truncate JDLF, tell(JDLF);
1141 close JDLF;
1143 _exit(0);
1145 close $write;
1146 close $read;
1147 $SIG{'CHLD'} = $oldsigchld;
1148 return $child;
1151 sub cancel_jobd_restart {
1152 my $restarter = shift;
1153 return unless defined($restarter) && $restarter != 0;
1154 return -1 unless kill(0, $restarter);
1155 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1156 # we must not waitpid because $restarter was doubly forked and will
1157 # NOT send us a SIGCHLD when it terminates
1158 return $restarter;
1161 my $reexec = Girocco::ExecUtil->new;
1162 my $realpath0 = realpath($0);
1163 chdir "/";
1164 close(DATA) if fileno(DATA);
1165 my $sfac;
1166 Getopt::Long::Configure('bundling');
1167 my ($stiv, $idiv);
1168 my $parse_res = GetOptions(
1169 'help|?|h' => sub {
1170 pod2usage(-verbose => 2, -exitval => 0, -input => $realpath0)},
1171 'quiet|q' => \$quiet,
1172 'no-quiet' => sub {$quiet = 0},
1173 'progress|P' => \$progress,
1174 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1175 'idle-timeout|t=i' => \$idle_timeout,
1176 'daemon' => sub {$daemon = 1; $syslog = 1; $quiet = 1;},
1177 'max-lifetime=i' => \$max_lifetime,
1178 'syslog|s:s' => \$sfac,
1179 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1180 'stderr' => \$stderr,
1181 'abbrev=i' => \$abbrev,
1182 'show-fast-forward-info' => \$showff,
1183 'no-show-fast-forward-info' => sub {$showff = 0},
1184 'same-pid' => \$same_pid,
1185 'no-same-pid' => sub {$same_pid = 0},
1186 'status-interval=i' => \$stiv,
1187 'idle-status-interval=i' => \$idiv,
1188 ) || pod2usage(-exitval => 2, -input => $realpath0);
1189 $same_pid = !$daemon unless defined($same_pid);
1190 $syslog = 1 if defined($sfac);
1191 $progress = 1 unless $quiet;
1192 $abbrev = 128 unless $abbrev > 0;
1193 pod2usage(-msg => "--inetd and --daemon are incompatible") if ($inetd && $daemon);
1194 if (defined($idle_timeout)) {
1195 die "--idle-timeout must be a whole number\n" unless $idle_timeout =~ /^\d+$/;
1196 die "--idle-timeout may not be used without --inetd\n" unless $inetd;
1198 if (defined($max_lifetime)) {
1199 die "--max-lifetime must be a whole number\n" unless $max_lifetime =~ /^\d+$/;
1200 $max_lifetime += 0;
1202 defined($max_lifetime) or $max_lifetime = 604800; # 1 week
1203 if (defined($stiv)) {
1204 die "--status-interval must be a whole number\n" unless $stiv =~ /^\d+$/;
1205 $statusintv = $stiv * 60;
1207 if (defined($idiv)) {
1208 die "--idle-status-interval must be a whole number\n" unless $idiv =~ /^\d+$/;
1209 $idleintv = $idiv * 60;
1212 open STDIN, '<'.File::Spec->devnull or die "could not redirect STDIN to /dev/null\n" unless $inetd;
1213 open STDOUT, '>&STDERR' if $inetd;
1214 if ($syslog) {
1215 use Sys::Syslog qw();
1216 my $mode = 1;
1217 ++$mode if $stderr;
1218 $sfac = "user" unless defined($sfac) && $sfac ne "";
1219 my $ofac = $sfac;
1220 $sfac = uc($sfac);
1221 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1222 my $facility;
1223 my %badfac = map({("LOG_$_" => 1)}
1224 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1225 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac\n";
1226 die "invalid syslog facility: $ofac\n"
1227 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1228 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1230 if ($quiet) {
1231 open STDOUT, '>', File::Spec->devnull;
1232 } elsif ($inetd) {
1233 *STDOUT = *STDERR;
1236 my ($NAME, $INO);
1238 set_sigchld_reaper;
1239 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1240 my $restart_active = 1;
1241 my $resumefd = $ENV{(SOCKFDENV)};
1242 delete $ENV{(SOCKFDENV)};
1243 if (defined($resumefd)) {{
1244 unless ($resumefd =~ /^(\d+)(?::(-?\d+))?$/) {
1245 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- bad format\n";
1246 $resumefd = undef;
1247 last;
1249 my $resumeino;
1250 ($resumefd, $resumeino) = ($1, $2);
1251 $resumefd += 0;
1252 unless (isfdopen($resumefd)) {
1253 warn "ignoring invalid ".SOCKFDENV." environment value -- fd \"$resumefd\" not open\n";
1254 $resumefd = undef;
1255 last;
1257 unless ($inetd) {
1258 unless (defined($resumeino)) {
1259 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- missing inode\n";
1260 POSIX::close($resumefd);
1261 $resumefd = undef;
1262 last;
1264 $resumeino += 0;
1265 my $sockloc = $Girocco::Config::chroot.'/etc/taskd.socket';
1266 my $slinode = (stat($sockloc))[1];
1267 unless (defined($slinode) && -S _) {
1268 warn "ignoring ".SOCKFDENV." environment value; socket file does not exist: $sockloc\n";
1269 POSIX::close($resumefd);
1270 $resumefd = undef;
1271 last;
1273 open Test, "<&$resumefd" or die "open: $!";
1274 my $sockname = getsockname Test;
1275 my $sockpath;
1276 $sockpath = unpack_sockaddr_un $sockname if $sockname && sockaddr_family($sockname) == AF_UNIX;
1277 close Test;
1278 if (!defined($resumeino) || !defined($sockpath) || $resumeino != $slinode || realpath($sockloc) ne realpath($sockpath)) {
1279 warn "ignoring ".SOCKFDENV." environment value; does not match socket file: $sockloc\n";
1280 POSIX::close($resumefd);
1281 $resumefd = undef;
1283 $INO = $resumeino;
1286 if ($inetd || defined($resumefd)) {
1287 my $fdopen = defined($resumefd) ? $resumefd : 0;
1288 open Server, "<&=$fdopen" or die "open: $!";
1289 setcloexec(\*Server) if $fdopen > $^F;
1290 my $sockname = getsockname Server;
1291 die "getsockname: $!" unless $sockname;
1292 die "socket already connected! must be 'wait' socket\n" if getpeername Server;
1293 die "getpeername: $!" unless $!{ENOTCONN};
1294 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1295 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1296 my $socktype = unpack('i', $st);
1297 die "stream socket required\n" unless defined $socktype && $socktype == SOCK_STREAM;
1298 die "AF_UNIX socket required\n" unless sockaddr_family($sockname) == AF_UNIX;
1299 $NAME = unpack_sockaddr_un $sockname;
1300 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1301 if (realpath($NAME) ne realpath($expected)) {
1302 $restart_active = 0;
1303 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled\n";
1305 my $mode = (stat($NAME))[2];
1306 die "stat: $!" unless $mode;
1307 $mode &= 07777;
1308 if (($mode & 0660) != 0660) {
1309 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1311 } else {
1312 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1313 my $uaddr = sockaddr_un($NAME);
1315 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1316 die "already exists but not a socket: $NAME\n" if -e $NAME && ! -S _;
1317 if (-e _) {
1318 # Do not unlink another instance's active listen socket!
1319 socket(my $sfd, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1320 connect($sfd, $uaddr) || $!{EPROTOTYPE} and
1321 die "Live socket '$NAME' exists. Please make sure no other instance of taskd is running.\n";
1322 close($sfd);
1323 unlink($NAME);
1325 bind(Server, $uaddr) or die "bind failed: $!";
1326 listen(Server, SOMAXCONN) or die "listen failed: $!";
1327 chmod 0666, $NAME or die "chmod failed: $!";
1328 $INO = (stat($NAME))[1] or die "stat failed: $!";
1331 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1332 my $classname = $throttle->{"name"};
1333 $classname or next;
1334 Throttle::GetClassInfo($classname, $throttle);
1337 sub _min {
1338 return $_[0] <= $_[1] ? $_[0] : $_[1];
1341 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1342 setnonblock($piperead);
1343 select((select($pipewrite), $|=1)[0]);
1344 my $pipebuff = '';
1345 my $fdset_both = '';
1346 vec($fdset_both, fileno($piperead), 1) = 1;
1347 my $fdset_pipe = $fdset_both;
1348 vec($fdset_both, fileno(Server), 1) = 1;
1349 my $penalty = 0;
1350 my $t = time;
1351 my $penaltytime = $t;
1352 my $nextwakeup = $t + 60;
1353 my $nextstatus = undef;
1354 $nextstatus = $t + $statusintv if $statusintv;
1355 if ($restart_active) {
1356 unless (unlink($restart_file) || $!{ENOENT}) {
1357 $restart_active = 0;
1358 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1361 daemon(1, 1) or die "failed to daemonize: $!\n" if $daemon;
1362 my $starttime = time;
1363 my $endtime = $max_lifetime ? $starttime + $max_lifetime : 0;
1364 statmsg "listening on $NAME";
1365 while (1) {
1366 my ($rout, $eout, $nfound);
1367 do {
1368 my $wait;
1369 my $now = time;
1370 my $adjustpenalty = sub {
1371 if ($penaltytime < $now) {
1372 my $credit = $now - $penaltytime;
1373 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1374 $penaltytime = $now;
1377 if (defined($nextstatus) && $now >= $nextstatus) {
1378 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1379 my $statmsg = "STATUS: $children active";
1380 my @running = ();
1381 if ($children) {
1382 my @stats = ();
1383 my $cnt = 0;
1384 foreach my $cls (sort(Throttle::GetClassList())) {
1385 my $inf = Throttle::GetClassInfo($cls);
1386 if ($inf->{'total'}) {
1387 $cnt += $inf->{'total'};
1388 push(@stats, substr(lc($cls),0,1)."=".
1389 $inf->{'total'}.'/'.$inf->{'active'});
1392 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1393 $statmsg .= " (".join(" ",@stats).")" if @stats;
1394 foreach (Throttle::GetRunningPids()) {
1395 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1396 next unless $ts;
1397 push(@running, "[${cls}::$desc] ".human_duration($now-$ts));
1400 my $idlesecs;
1401 $statmsg .= ", idle " . human_duration($idlesecs)
1402 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1403 statmsg $statmsg;
1404 statmsg "STATUS: currently running: ".join(", ", @running)
1405 if @running;
1406 $idlestatus = $now if !$children;
1408 $nextstatus += $statusintv while $nextstatus <= $now;
1410 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1411 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1412 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1413 my $fdset;
1414 if ($penalty <= $maxspawn) {
1415 $fdset = $fdset_both;
1416 } else {
1417 $fdset = $fdset_pipe;
1418 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1420 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1421 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1422 my $reaped;
1423 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1424 $now = time;
1425 &$adjustpenalty; # this prevents banking credits for elapsed time
1426 if (!$children && !$nfound && $restart_active && (($endtime && $now >= $endtime) || -e $restart_file)) {
1427 statmsg "RESTART: restart requested; max lifetime ($max_lifetime) exceeded" if $endtime && $now >= $endtime;
1428 $SIG{CHLD} = sub {};
1429 my $restarter = schedule_jobd_restart($inetd);
1430 if (defined($restarter) && !$restarter) {
1431 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1432 sleep 2; # *cough*
1433 $restarter = schedule_jobd_restart;
1434 if (!defined($restarter)) {
1435 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1436 } elsif (defined($restarter) && !$restarter) {
1437 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1438 $restarter = undef;
1441 if ($inetd) {
1442 statmsg "RESTART: restart requested; now exiting for inetd restart";
1443 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1444 sleep 2; # *cough*
1445 exit 0;
1446 } else {
1447 statmsg "RESTART: restart requested; now restarting";
1448 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1449 setnoncloexec(\*Server);
1450 $reexec->setenv(SOCKFDENV, fileno(Server).":$INO");
1451 $reexec->reexec($same_pid);
1452 setcloexec(\*Server) if fileno(Server) > $^F;
1453 statmsg "RESTART: continuing after failed restart: $!";
1454 chdir "/";
1455 cancel_jobd_restart($restarter) if $restarter;
1456 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1457 set_sigchld_reaper;
1460 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1461 statmsg "idle timeout (@{[human_duration($idle_timeout)]}) exceeded now exiting";
1462 exit 0;
1464 } while $nfound < 1;
1465 my $reout = $rout | $eout;
1466 if (vec($reout, fileno($piperead), 1)) {{
1467 my $nloff = -1;
1469 my $bytes;
1470 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1471 while (!defined($bytes) && $!{EINTR});
1472 last if !defined($bytes) && $!{EAGAIN};
1473 die "sysread failed: $!" unless defined $bytes;
1474 # since we always keep a copy of $pipewrite open EOF is fatal
1475 die "sysread returned EOF on pipe read" unless $bytes;
1476 $nloff = index($pipebuff, "\n", 0);
1477 if ($nloff < 0 && length($pipebuff) >= 512) {
1478 $pipebuff = '';
1479 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1481 redo unless $nloff >= 0;
1483 last unless $nloff >= 0;
1484 do {
1485 my $msg = substr($pipebuff, 0, $nloff);
1486 substr($pipebuff, 0, $nloff + 1) = '';
1487 $nloff = index($pipebuff, "\n", 0);
1488 process_pipe_msg($msg) if length($msg);
1489 } while $nloff >= 0;
1490 redo;
1492 next unless vec($reout, fileno(Server), 1);
1493 unless (accept(Client, Server)) {
1494 logmsg "accept failed: $!" unless $!{EINTR};
1495 next;
1497 logmsg "connection on $NAME";
1498 ++$penalty;
1499 spawn sub {
1500 my $inp = <STDIN>;
1501 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1502 chomp $inp if defined($inp);
1503 # ignore empty and "nop" connects
1504 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1505 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1506 defined($arg) or $arg = '';
1507 if ($cmd eq 'ref-changes') {
1508 ref_changes($arg);
1509 } elsif ($cmd eq 'clone') {
1510 clone($arg);
1511 } elsif ($cmd eq 'ref-change') {
1512 statmsg "processing obsolete ref-change message (please switch to ref-changes)";
1513 ref_change($arg);
1514 } elsif ($cmd eq 'throttle') {
1515 throttle($arg);
1516 } else {
1517 statmsg "ignoring unknown command: $cmd";
1518 exit 3;
1521 close Client;
1525 ## -------------
1526 ## Documentation
1527 ## -------------
1530 __END__
1532 =head1 NAME
1534 taskd.pl - Perform Girocco service tasks
1536 =head1 SYNOPSIS
1538 taskd.pl [options]
1540 Options:
1541 -h | --help detailed instructions
1542 -q | --quiet run quietly
1543 --no-quiet do not run quietly
1544 -P | --progress show occasional status updates
1545 -i | --inetd run as inetd unix stream wait service
1546 implies --quiet --syslog
1547 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1548 requires --inetd
1549 --daemon become a background daemon
1550 implies --quiet --syslog
1551 --max-lifetime=SECONDS how long before graceful restart
1552 default is 1 week, 0 disables
1553 -s | --syslog[=facility] send messages to syslog instead of
1554 stderr but see --stderr
1555 enabled by --inetd
1556 --no-syslog do not send message to syslog
1557 --stderr always send messages to stderr too
1558 --abbrev=n abbreviate hashes to n (default is 8)
1559 --show-fast-forward-info show fast-forward info (default is on)
1560 --no-show-fast-forward-info disable showing fast-forward info
1561 --same-pid keep same pid during graceful restart
1562 --no-same-pid do not keep same pid on graceful rstrt
1563 --status-interval=MINUTES status update interval (default 1)
1564 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1566 =head1 DESCRIPTION
1568 taskd.pl is Girocco's service request servant; it listens for service requests
1569 such as new clone requests and ref update notifications and spawns a task to
1570 perform the requested action.
1572 =head1 OPTIONS
1574 =over 8
1576 =item B<--help>
1578 Print the full description of taskd.pl's options.
1580 =item B<--quiet>
1582 Suppress non-error messages, e.g. for use when running this task as an inetd
1583 service. Enabled by default by --inetd.
1585 =item B<--no-quiet>
1587 Enable non-error messages. When running in --inetd mode these messages are
1588 sent to STDERR instead of STDOUT.
1590 =item B<--progress>
1592 Show information about the current status of the task operation occasionally.
1593 This is automatically enabled if --quiet is not given.
1595 =item B<--inetd>
1597 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1598 stream socket ready to have accept called on it. To be useful, the unix socket
1599 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1600 will be issued if the socket is not in the expected location. Socket file
1601 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1602 die. The --inetd option also enables the --quiet and --syslog options but
1603 --no-quiet and --no-syslog may be used to alter that.
1605 The correct specification for the inetd socket is a "unix" protocol "stream"
1606 socket in "wait" mode with user and group writable permissions (0660). An
1607 attempt will be made to alter the socket's file mode if needed and if that
1608 cannot be accomplished taskd.pl will die.
1610 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1611 in wait mode and will die if the passed in socket is already connected.
1613 Note that while *BSD's inetd happily supports unix sockets (and so does
1614 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1615 However, systemd does seem to.
1617 =item B<--idle-timeout=SECONDS>
1619 Only permitted when running in --inetd mode. After SECONDS of inactivity
1620 (i.e. all outstanding tasks have completed and no new requests have come in)
1621 exit normally. The default is no timeout at all (a SECONDS value of 0).
1622 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1624 =item B<--daemon>
1626 Fork and become a background daemon. Implies B<--syslog> and B<--quiet> (which
1627 can be altered by subsequent B<--no-syslog> and/or B<--no-quiet> options).
1628 Also implies B<--no-same-pid>, but since graceful restarts work by re-exec'ing
1629 taskd.pl with all of its original arguments, using B<--same-pid> won't really
1630 be effective with B<--daemon> since although it will cause the graceful restart
1631 exec to happen from the same pid, when the B<--daemon> option is subsequently
1632 processed it will end up in a new pid anyway.
1634 =item B<--max-lifetime=SECONDS>
1636 After taskd has been running for SECONDS of realtime, it will behave as though
1637 a graceful restart has been requested. A graceful restart takes place the
1638 next time taskd becomes idle (which may require up to 60 seconds to notice).
1639 If jobd is running when a graceful restart occurs, then jobd will also receive
1640 a graceful restart request at that time. The default value is 1 week (604800),
1641 set to 0 to disable.
1643 =item B<--syslog[=facility]>
1645 Normally error output is sent to STDERR. With this option it's sent to
1646 syslog instead. Note that when running in --inetd mode non-error output is
1647 also affected by this option as it's sent to STDERR in that case. If
1648 not specified, the default for facility is LOG_USER. Facility names are
1649 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1650 with the LOG_NOTICE priority.
1652 =item B<--no-syslog>
1654 Send error message output to STDERR but not syslog.
1656 =item B<--stderr>
1658 Always send error message output to STDERR. If --syslog is in effect then
1659 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1660 messages as well.
1662 =item B<--abbrev=n>
1664 Abbreviate displayed hash values to only the first n hexadecimal characters.
1665 The default is 8 characters. Set to 0 for no abbreviation at all.
1667 =item B<--show-fast-forward-info>
1669 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1670 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1671 This requires running an extra git command for each ref update that is not a
1672 creation or deletion in order to determine whether or not it's a fast forward.
1674 =item B<--no-show-fast-forward-info>
1676 Disable showing of fast-forward information for ref-change/ref-notify update
1677 messages. Instead just show a ' -> ' indicator.
1679 =item B<--same-pid>
1681 When performing a graceful restart, perform the graceful restart exec from
1682 the same pid rather than switching to a new one. This is implied when
1683 I<--daemon> is I<NOT> used.
1685 =item B<--no-same-pid>
1687 When performing a graceful restart, perform the graceful restart exec after
1688 switching to a new pid. This is implied when I<--daemon> I<IS> used.
1690 =item B<--status-interval=MINUTES>
1692 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1693 status updates are shown at each MINUTES interval. Setting the interval to 0
1694 disables them entirely even with --progress.
1696 =item B<--idle-status-interval=IDLEMINUTES>
1698 Two consecutive "idle" status updates with no intervening activity will not be
1699 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1700 Setting the interval to 0 prevents any consecutive idle updates (with no
1701 activity between them) from appearing at all.
1703 =back
1705 =cut