taskd.pl: avoid missed connects during graceful restart
[girocco.git] / taskd / taskd.pl
blob271d70a367bae6787ab267d6c7682b3c86ead352
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX ":sys_wait_h";
40 use File::Basename;
42 use lib dirname($0);
43 use Girocco::Config;
44 use Girocco::Notify;
45 use Girocco::Project;
46 use Girocco::User;
47 use Girocco::Util qw(noFatalsToBrowser get_git);
48 BEGIN {noFatalsToBrowser}
49 use Girocco::ExecUtil;
51 use constant SOCKFDENV => "GIROCCO_TASKD_SOCKET_FD";
53 # Throttle Classes Defaults
54 # Note that any same-named classes in @Girocco::Config::throttle_classes
55 # will override (completely replacing the entire hash) these ones.
56 my @throttle_defaults = (
58 name => "ref-change",
59 maxproc => 0,
60 maxjobs => 1,
61 interval => 1
64 name => "clone",
65 maxproc => 0,
66 maxjobs => 2,
67 interval => 5
70 name => "snapshot",
71 #maxproc => max(5, cpucount + maxjobs), # this is the default
72 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
73 interval => 5
77 # Options
78 my $quiet;
79 my $progress;
80 my $syslog;
81 my $stderr;
82 my $inetd;
83 my $idle_timeout;
84 my $abbrev = 8;
85 my $showff = 1;
86 my $same_pid;
87 my $statusintv = 60;
88 my $idleintv = 3600;
89 my $maxspawn = 8;
91 $| = 1;
93 my $progname = basename($0);
94 my $children = 0;
95 my $idlestart = time;
96 my $idlestatus = 0;
98 sub cpucount {
99 use Girocco::Util "online_cpus";
100 our $online_cpus_result;
101 $online_cpus_result = online_cpus unless $online_cpus_result;
102 return $online_cpus_result;
105 sub logmsg {
106 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
107 if (tied *STDOUT) {
108 $OStream::only = 2; # STDERR only
109 print "$hdr@_\n";
110 $OStream::only = 1; # syslog only
111 print "@_\n";
112 $OStream::only = 0; # back to default
113 } else {
114 print "$hdr@_\n";
118 sub statmsg {
119 return unless $progress;
120 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
121 if (tied *STDERR) {
122 $OStream::only = 2; # STDERR only
123 print STDERR "$hdr@_\n";
124 $OStream::only = 1; # syslog only
125 print STDERR "@_\n";
126 $OStream::only = 0; # back to default
127 } else {
128 print STDERR "$hdr@_\n";
132 sub duration {
133 my $secs = shift;
134 return $secs unless defined($secs) && $secs >= 0;
135 $secs = int($secs);
136 my $ans = ($secs % 60) . 's';
137 return $ans if $secs < 60;
138 $secs = int($secs / 60);
139 $ans = ($secs % 60) . 'm' . $ans;
140 return $ans if $secs < 60;
141 $secs = int($secs / 60);
142 $ans = ($secs % 24) . 'h' . $ans;
143 return $ans if $secs < 24;
144 $secs = int($secs / 24);
145 return $secs . 'd' . $ans;
148 sub isfdopen {
149 my $fd = shift;
150 return undef unless defined($fd) && $fd >= 0;
151 my $result = POSIX::dup($fd);
152 POSIX::close($result) if defined($result);
153 defined($result);
156 sub setnoncloexec {
157 my $fd = shift;
158 fcntl($fd, F_SETFD, 0) or die "fcntl failed: $!";
161 sub setcloexec {
162 my $fd = shift;
163 fcntl($fd, F_SETFD, FD_CLOEXEC) or die "fcntl failed: $!";
166 sub setnonblock {
167 my $fd = shift;
168 my $flags = fcntl($fd, F_GETFL, 0);
169 defined($flags) or die "fcntl failed: $!";
170 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
173 sub setblock {
174 my $fd = shift;
175 my $flags = fcntl($fd, F_GETFL, 0);
176 defined($flags) or die "fcntl failed: $!";
177 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
180 package Throttle;
183 ## Throttle protocol
185 ## 1) Process needing throttle services acquire a control file descriptor
186 ## a) Either as a result of a fork + exec (the write end of a pipe)
187 ## b) Or by connecting to the taskd socket (not yet implemented)
189 ## 2) The process requesting throttle services will be referred to
190 ## as the supplicant or just "supp" for short.
192 ## 3) The supp first completes any needed setup which may include
193 ## gathering data it needs to perform the action -- if that fails
194 ## then there's no need for any throttling.
196 ## 4) The supp writes a throttle request to the control descriptor in
197 ## this format:
198 ## throttle <pid> <class>\n
199 ## for example if the supp's pid was 1234 and it was requesting throttle
200 ## control as a member of the mail class it would write this message:
201 ## throttle 1234 mail\n
202 ## Note that if the control descriptor happens to be a pipe rather than a
203 ## socket, the message should be preceded by another "\n" just be be safe.
204 ## If the control descriptor is a socket, not a pipe, the message may be
205 ## preceded by a "\n" but that's not recommended.
207 ## 5) For supplicants with a control descriptor that is a pipe
208 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
209 ## If the control descriptor is a socket (getsockname succeeds) then
210 ## protocol (5b) should be used.
212 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
213 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
214 ## to write a "keepalive\n" message to the control descriptor. If that
215 ## fails, the controller has gone away and it may make its own decision
216 ## whether or not to proceed at that point. If, on the other hand, it
217 ## receives a SIGTERM, the process limit for its class has been reached
218 ## and it should abort without performing its action. If it receives
219 ## SIGUSR1, it may proceed without writing anything more to the control
220 ## descriptor, any MAY even close the control descriptor. Finally, a
221 ## SIGUSR2 indicates rejection of the throttle request for some other reason
222 ## such as unrecognized class name or invalid pid in which case the supp may
223 ## make its own decision how to proceed.
225 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
226 ## more than 512 bytes and if a '\n' does not appear within that number of
227 ## bytes the read should be considered failed. Otherwise the read should
228 ## be retried until either a full line has been read or the socket is
229 ## closed from the other end. If the lone read is "proceed\n" then it may
230 ## proceed without reading or writing anything more to the control
231 ## descriptor, but MUST keep the control descriptor open and not call
232 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
233 ## should be retried) constitutes failure. If a full line starting with at
234 ## least one alpha character was read but it was not "proceed" then it
235 ## should abort without performing its action. For any other failure it
236 ## may make its own decision whether or not to proceed as the controller has
237 ## gone away.
239 ## 6) The supp now performs its throttled action.
241 ## 7) The supp now closes its control descriptor (if it hasn't already in the
242 ## case of (5a)) and exits -- in the case of a socket, the other end receives
243 ## notification that the socket has been closed (read EOF). In the case of
244 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
245 ## of the other end of the pipe, so it will not reaach EOF by the supp's
246 ## exit in that case).
249 # keys are class names, values are hash refs with these fields:
250 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
251 # many may be queued waiting plus how many may be
252 # concurrently active) with 0 meaning no limit.
253 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
254 # of 0 is unlimited but the number of concurrent
255 # supplicants will always be limited to no more than
256 # the 'maxproc' value (if > 0) no matter what the
257 # 'maxjobs' value is.
258 # 'total' -> integer; the total number of pids belonging to this clase that
259 # can currently be found in %pid.
260 # 'active' -> integer; the number of currently active supplicants which should
261 # be the same as (the number of elements of %pid with a
262 # matching class name) - (number of my class in @queue).
263 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
264 # or SIGUSR1 signals to members of this class.
265 # 'lastqueue' -> time; last time a supplicant was successfully queued.
266 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
267 # 'lastthrottle' => time; last time a supplicant was throttled
268 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
269 my %classes = ();
271 # keys are pid numbers, values are array refs with these elements:
272 # [0] => name of class (key to classes hash)
273 # [1] => supplicant state (0 => queued, non-zero => time it started running)
274 # [2] => descriptive text (e.g. project name)
275 my %pid = ();
277 # minimum number of seconds between any two proceed responses no matter what
278 # class. this takes priority in that it can effectively increase the
279 # class's 'interval' value by delaying proceed notifications if the minimum
280 # interval has not yet elapsed.
281 my $interval = 1;
283 # fifo of pids awaiting notification as soon as the next $interval elapses
284 # provided interval and maxjobs requirements are satisfied
285 # for the class of the pid that will next be triggered.
286 my @queue = ();
288 # time of most recent successful call to AddSupplicant
289 my $lastqueue = 0;
291 # time of most recent proceed notification
292 my $lastproceed = 0;
294 # time of most recent throttle
295 my $lastthrottle = 0;
297 # time of most recent removal
298 my $lastdied = 0;
300 # lifetime count of how many have been queued
301 my $totalqueue = 0;
303 # lifetime count of how many have been allowed to proceed
304 my $totalproceed = 0;
306 # lifetime count of how many have been throttled
307 my $totalthrottle = 0;
309 # lifetime count of how many have died
310 # It should always be true that $totalqueued - $totaldied == $curentlyactive
311 my $totaldied = 0;
313 # Returns an unordered list of currently registered class names
314 sub GetClassList {
315 return keys(%classes);
318 sub _max {
319 return $_[0] if $_[0] >= $_[1];
320 return $_[1];
323 sub _getnum {
324 my ($min, $val, $default) = @_;
325 my $ans;
326 if (defined($val) && $val =~ /^[+-]?\d+$/) {
327 $ans = 0 + $val;
328 } else {
329 $ans = &$default;
331 return _max($min, $ans);
334 # [0] => name of class to find
335 # [1] => if true, create class if it doesn't exist, if a hashref then
336 # it contains initial values for maxproc, maxjobs and interval.
337 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
338 # defaults to the max(5, number of cpu cores + maxjobs) and interval
339 # defaults to 1.
340 # Returns a hash ref with info about the class on success
341 sub GetClassInfo {
342 my ($classname, $init) = @_;
343 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
344 or return;
345 $classname = lc($classname);
346 my %info;
347 if ($classes{$classname}) {
348 %info = %{$classes{$classname}};
349 return \%info;
351 return unless $init;
352 my %newclass = ();
353 ref($init) eq 'HASH' or $init = {};
354 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
355 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
356 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
357 $newclass{'total'} = 0;
358 $newclass{'active'} = 0;
359 $newclass{'lastqueue'} = 0;
360 $newclass{'lastproceed'} = 0;
361 $newclass{'lastthrottle'} = 0;
362 $newclass{'lastdied'} = 0;
363 $classes{$classname} = \%newclass;
364 %info = %newclass;
365 return \%info;
368 # [0] => pid to look up
369 # Returns () if not found otherwise ($classname, $timestarted, $description)
370 # Where $timestarted will be 0 if it's still queued otherwise a time() value
371 sub GetPidInfo {
372 my $pid = shift;
373 return () unless exists $pid{$pid};
374 return @{$pid{$pid}};
377 # Returns array of pid numbers that are currently running sorted
378 # by time started (oldest to newest). Can return an empty array.
379 sub GetRunningPids {
380 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
381 grep({ ${$pid{$_}}[1] } keys(%pid)));
384 # Returns a hash with various about the current state
385 # 'interval' => global minimum interval between proceeds
386 # 'active' => how many pids are currently queued + how many are running
387 # 'queue' => how many pids are currently queued
388 # 'lastqueue' => time (epoch seconds) of last queue
389 # 'lastproceed' => time (epoch seconds) of last proceed
390 # 'lastthrottle' => time (epoch seconds) of last throttle
391 # 'lastdied' => time (epoch seconds) of last removal
392 # 'totalqueue' => lifetime total number of processes queued
393 # 'totalproceed' => lifetime total number of processes proceeded
394 # 'totalthrottle' => lifetime total number of processes throttled
395 # 'totaldied' => lifetime total number of removed processes
396 sub GetInfo {
397 return {
398 interval => $interval,
399 active => scalar(keys(%pid)) - scalar(@queue),
400 queue => scalar(@queue),
401 lastqueue => $lastqueue,
402 lastproceed => $lastproceed,
403 lastthrottle => $lastthrottle,
404 lastdied => $lastdied,
405 totalqueue => $totalqueue,
406 totalproceed => $totalproceed,
407 totalthrottle => $totalthrottle,
408 totaldied => $totaldied
412 # with no args get the global interval
413 # with one arg set it, returns previous value if set
414 sub Interval {
415 my $ans = $interval;
416 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
417 return $ans;
420 sub RemoveSupplicant;
422 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
423 # Returns minimum interval until next proceed is possible
424 # Returns undef if there's nothing waiting to proceed or
425 # the 'maxjobs' limits have been reached for all queued items (in which
426 # case it won't be possible to proceed until one of them exits, hence undef)
427 # This is called automatially by AddSupplicant and RemoveSupplicant
428 sub ServiceQueue {
429 RETRY:
430 return undef unless @queue; # if there's nothing queued, nothing to do
431 my $now = time;
432 my $min = _max(0, $interval - ($now - $lastproceed));
433 my $classmin = undef;
434 my $classchecked = 0;
435 my %seenclass = ();
436 my $classcount = scalar(keys(%classes));
437 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
438 my $pid = $queue[$i];
439 my $procinfo = $pid{$pid};
440 if (!$procinfo) {
441 RemoveSupplicant($pid, 1);
442 goto RETRY;
444 my $classinfo = $classes{$$procinfo[0]};
445 if (!$classinfo) {
446 RemoveSupplicant($pid, 1);
447 goto RETRY;
449 if (!$seenclass{$$procinfo[0]}) {
450 $seenclass{$$procinfo[0]} = 1;
451 ++$classchecked;
452 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
453 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
454 if (!$cmin && !$min) {
455 $now = time;
456 $$procinfo[1] = $now;
457 splice(@queue, $i, 1);
458 ++$totalproceed;
459 $lastproceed = $now;
460 $classinfo->{'lastproceed'} = $now;
461 ++$classinfo->{'active'};
462 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
463 goto RETRY;
465 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
469 return defined($classmin) ? _max($min, $classmin) : undef;
472 # $1 => pid to add (must not already be in %pids)
473 # $2 => class name (must exist)
474 # Returns -1 if no such class or pid already present or invalid
475 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
476 # Return 1 if throttled and cannot be added
477 sub AddSupplicant {
478 my ($pid, $classname, $text, $noservice) = @_;
479 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
480 $pid += 0;
481 kill(0, $pid) or return -1;
482 my $classinfo = $classes{$classname};
483 return -1 unless $classinfo;
484 return -1 if $pid{$pid};
485 $text = '' unless defined($text);
486 my $now = time;
487 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
488 ++$totalthrottle;
489 $lastthrottle = $now;
490 $classinfo->{'lastthrottle'} = $now;
491 return 1;
493 ++$totalqueue;
494 $lastqueue = $now;
495 $pid{$pid} = [$classname, 0, $text];
496 ++$classinfo->{'total'};
497 $classinfo->{'lastqueue'} = $now;
498 push(@queue, $pid);
499 ServiceQueue unless $noservice;
500 return 0;
503 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
504 # Returns 0 if removed
505 # Returns -1 if unknown pid or other error during removal
506 sub RemoveSupplicant {
507 my ($pid, $noservice) = @_;
508 return -1 unless defined($pid) && $pid =~ /^\d+$/;
509 $pid += 0;
510 my $pidinfo = $pid{$pid};
511 $pidinfo or return -1;
512 my $now = time;
513 $lastdied = $now;
514 ++$totaldied;
515 delete $pid{$pid};
516 if (!$$pidinfo[1]) {
517 for (my $i=0; $i<=$#queue; ++$i) {
518 if ($queue[$i] == $pid) {
519 splice(@queue, $i, 1);
520 --$i;
524 my $classinfo = $classes{$$pidinfo[0]};
525 ServiceQueue, return -1 unless $classinfo;
526 --$classinfo->{'active'} if $$pidinfo[1];
527 --$classinfo->{'total'};
528 $classinfo->{'lastdied'} = $now;
529 ServiceQueue unless $noservice;
530 return 0;
533 # Instance Methods
535 package main;
538 ## ---------
539 ## Functions
540 ## ---------
543 my @reapedpids = ();
544 my %signame = (
545 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
546 1 => 'SIGHUP',
547 2 => 'SIGINT',
548 3 => 'SIGQUIT',
549 6 => 'SIGABRT',
550 9 => 'SIGKILL',
551 14 => 'SIGALRM',
552 15 => 'SIGTERM',
554 sub REAPER {
555 local $!;
556 my $child;
557 my $waitedpid;
558 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
559 my $code = $? & 0xffff;
560 $idlestart = time if !--$children;
561 my $codemsg = '';
562 if (!($code & 0xff)) {
563 $codemsg = " with exit code ".($code >> 8) if $code;
564 } elsif ($code & 0x7f) {
565 my $signum = ($code & 0x7f);
566 $codemsg = " with signal ".
567 ($signame{$signum}?$signame{$signum}:$signum);
569 logmsg "reaped $waitedpid$codemsg";
570 push(@reapedpids, $waitedpid);
572 $SIG{CHLD} = \&REAPER; # loathe sysV
575 $SIG{CHLD} = \&REAPER; # Apollo 440
577 my ($piperead, $pipewrite);
578 sub spawn {
579 my $coderef = shift;
581 my $pid = fork;
582 if (not defined $pid) {
583 logmsg "cannot fork: $!";
584 return;
585 } elsif ($pid) {
586 $idlestart = time if !++$children;
587 $idlestatus = 0;
588 logmsg "begat $pid";
589 return; # I'm the parent
592 close(Server) unless fileno(Server) == 0;
593 close($piperead);
594 $SIG{'CHLD'} = sub {};
596 open STDIN, "+<&Client" or die "can't dup client to stdin";
597 close(Client);
598 exit &$coderef();
601 # returns:
602 # < 0: error
603 # = 0: proceed
604 # > 0: throttled
605 sub request_throttle {
606 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
607 my $classname = shift;
608 my $text = shift;
610 Throttle::GetClassInfo($classname)
611 or return -1; # no such throttle class
613 my $throttled = 0;
614 my $proceed = 0;
615 my $error = 0;
616 my $controldead = 0;
617 my $setempty = POSIX::SigSet->new;
618 my $setfull = POSIX::SigSet->new;
619 $setempty->emptyset();
620 $setfull->fillset();
621 $SIG{'TERM'} = sub {$throttled = 1};
622 $SIG{'USR1'} = sub {$proceed = 1};
623 $SIG{'USR2'} = sub {$error = 1};
624 $SIG{'PIPE'} = sub {$controldead = 1};
625 $SIG{'ALRM'} = sub {};
627 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
628 print $pipewrite "\nthrottle $$ $classname $text\n";
629 my $old = POSIX::SigSet->new;
630 sigprocmask(SIG_SETMASK, $setfull, $old);
631 until ($controldead || $throttled || $proceed || $error) {
632 alarm(30);
633 sigsuspend($setempty);
634 alarm(0);
635 sigprocmask(SIG_SETMASK, $setempty, $old);
636 print $pipewrite "\nkeepalive $$\n";
637 sigprocmask(SIG_SETMASK, $setfull, $old);
639 sigprocmask(SIG_SETMASK, $setempty, $old);
640 $SIG{'TERM'} = "DEFAULT";
641 $SIG{'USR1'} = "DEFAULT";
642 $SIG{'USR2'} = "DEFAULT";
643 $SIG{'ALRM'} = "DEFAULT";
644 $SIG{'PIPE'} = "DEFAULT";
646 my $result = -1;
647 if ($throttled) {
648 $result = 1;
649 } elsif ($proceed) {
650 $result = 0;
652 return $result;
655 sub clone {
656 my ($name) = @_;
657 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
658 my $proj;
659 eval {$proj = Girocco::Project->load($name)};
660 if (!$proj && Girocco::Project::does_exist($name, 1)) {
661 # If the .clone_in_progress file exists, but the .clonelog does not
662 # and neither does the .clone_failed, be helpful and touch the
663 # .clone_failed file so that the mirror can be restarted
664 my $projdir = $Girocco::Config::reporoot."/$name.git";
665 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
666 open X, '>', "$projdir/.clone_failed" and close(X);
669 $proj or die "failed to load project $name";
670 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
671 $proj->{clone_logged} and die "project $name is already being cloned";
672 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
673 statmsg "cloning $name";
674 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
675 open STDERR, ">&STDOUT";
676 open STDIN, '<', '/dev/null';
677 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
680 sub ref_indicator {
681 return ' -> ' unless $showff && defined($_[0]);
682 my ($git_dir, $old, $new) = @_;
683 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
684 # In many cases `git merge-base` is slower than this even if using the
685 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
686 my $ans = get_git("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ? '...' : '..';
687 return wantarray ? ($ans, 1) : $ans;
690 sub ref_change {
691 my ($arg) = @_;
692 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
693 $username && $name && $oldrev && $newrev && $ref or return 0;
694 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
695 $newrev ne $oldrev or return 0;
697 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
698 my $proj = Girocco::Project->load($name);
699 $proj or die "failed to load project $name";
700 my $has_notify = $proj->has_notify;
701 my $type = $has_notify ? "notify" : "change";
703 my $user;
704 if ($username && $username !~ /^%.*%$/) {
705 Girocco::User::does_exist($username, 1) or die "no such user: $username";
706 $user = Girocco::User->load($username);
707 $user or die "failed to load user $username";
708 } elsif ($username eq "%$name%") {
709 $username = "-";
712 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
713 my $ind = ref_indicator($proj->{path}, $oldrev, $newrev);
714 statmsg "ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
715 open STDIN, '<', '/dev/null';
716 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev) if $has_notify;
717 return 0;
720 sub ref_changes {
721 my ($arg) = @_;
722 my ($username, $name) = split(/\s+/, $arg);
723 $username && $name or return 0;
725 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
726 my $proj = Girocco::Project->load($name);
727 $proj or die "failed to load project $name";
728 my $has_notify = $proj->has_notify;
729 my $type = $has_notify ? "notify" : "change";
731 my $user;
732 if ($username && $username !~ /^%.*%$/) {
733 Girocco::User::does_exist($username, 1) or die "no such user: $username";
734 $user = Girocco::User->load($username);
735 $user or die "failed to load user $username";
736 } elsif ($username eq "%$name%") {
737 $username = "-";
740 my @changes = ();
741 my %oldheads = ();
742 my %deletedheads = ();
743 while (my $change = <STDIN>) {
744 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
745 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or next;
746 if ($ref =~ m{^refs/heads/.}) {
747 if ($oldrev =~ /^0{40}$/) {
748 delete $oldheads{$ref};
749 $deletedheads{$ref} = 1;
750 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
751 $oldheads{$ref} = $oldrev;
754 $newrev ne $oldrev or next;
755 push(@changes, [$oldrev, $newrev, $ref]);
757 return 0 unless @changes;
758 open STDIN, '<', '/dev/null';
759 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
760 my $statproc = sub {
761 my ($old, $new, $ref, $ran_mail_sh) = @_;
762 my ($ind, $ran_git) = ref_indicator($proj->{path}, $old, $new);
763 statmsg "ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
764 sleep 1 if $ran_mail_sh || $ran_git;
766 if ($has_notify) {
767 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
768 } else {
769 &$statproc(@$_) foreach @changes;
771 return 0;
774 sub throttle {
775 my ($arg) = @_;
776 my ($pid, $classname, $text) = split(/\s+/, $arg);
777 $pid =~ /^\d+/ or return 0; # invalid pid
778 $pid += 0;
779 $pid > 0 or return 0; # invalid pid
780 kill(0, $pid) || $!{EPERM} or return 0; # no such process
781 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
782 defined($text) && $text ne '' or return 0; # no text no service
784 my $throttled = 0;
785 my $proceed = 0;
786 my $error = 0;
787 my $controldead = 0;
788 my $suppdead = 0;
789 my ($waker, $wakew);
790 pipe($waker, $wakew) or die "pipe failed: $!";
791 select((select($wakew),$|=1)[0]);
792 setnonblock($wakew);
793 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
794 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
795 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
796 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
797 select((select(STDIN),$|=1)[0]);
799 logmsg "throttle $pid $classname $text request";
800 # After writing we can expect a SIGTERM or SIGUSR1
801 print $pipewrite "\nthrottle $$ $classname $text\n";
803 # NOTE: the only way to detect the socket close is to read all the
804 # data until EOF is reached -- recv can be used to peek.
805 my $v = '';
806 vec($v, fileno(STDIN), 1) = 1;
807 vec($v, fileno($waker), 1) = 1;
808 setnonblock(\*STDIN);
809 setnonblock($waker);
810 until ($controldead || $throttled || $proceed || $error || $suppdead) {
811 my ($r, $e);
812 select($r=$v, undef, $e=$v, 30);
813 my ($bytes, $discard);
814 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
815 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
816 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
817 print $pipewrite "\nkeepalive $$\n";
819 setblock(\*STDIN);
821 if ($throttled && !$suppdead) {
822 print STDIN "throttled\n";
823 logmsg "throttle $pid $classname $text throttled";
824 } elsif ($proceed && !$suppdead) {
825 print STDIN "proceed\n";
826 logmsg "throttle $pid $classname $text proceed";
827 $SIG{'TERM'} = 'DEFAULT';
828 # Stay alive until the child dies which we detect by EOF on STDIN
829 setnonblock(\*STDIN);
830 until ($controldead || $suppdead) {
831 my ($r, $e);
832 select($r=$v, undef, $e=$v, 30);
833 my ($bytes, $discard);
834 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
835 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
836 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
837 print $pipewrite "\nkeepalive $$\n";
839 setblock(\*STDIN);
840 } else {
841 my $prefix = '';
842 $prefix = "control" if $controldead && !$suppdead;
843 logmsg "throttle $pid $classname $text ${prefix}died";
845 exit 0;
848 sub process_pipe_msg {
849 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
850 if ($act eq "throttle") {
851 $pid =~ /^\d+$/ or return 0;
852 $pid += 0;
853 $pid > 0 or return 0; # invalid pid
854 kill(0, $pid) or return 0; # invalid pid
855 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
856 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
857 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
858 # the AddSupplicant call could send SIGUSR1 before it returns
859 my $result = Throttle::AddSupplicant($pid, $cls, $text);
860 kill('USR2', $pid), return 0 if $result < 0;
861 kill('TERM', $pid), return 0 if $result > 0;
862 # $pid was added to class $cls and will receive SIGUSR1 when
863 # it's time for it to proceed
864 return 0;
865 } elsif ($act eq "keepalive") {
866 # nothing to do although we could verify pid is valid and
867 # still in %Throttle::pids and send a SIGUSR2 if not, but
868 # really keepalive should just be ignored.
869 return 0;
871 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
872 return 0;
876 ## -------
877 ## OStream
878 ## -------
881 package OStream;
883 # Set to 1 for only syslog output (if enabled by mode)
884 # Set to 2 for only stderr output (if enabled by mode)
885 our $only = 0; # This is a hack
887 use Carp 'croak';
888 use Sys::Syslog qw(:DEFAULT :macros);
890 sub writeall {
891 use POSIX qw();
892 use Errno;
893 my ($fd, $data) = @_;
894 my $offset = 0;
895 my $remaining = length($data);
896 while ($remaining) {
897 my $bytes = POSIX::write(
898 $fd,
899 substr($data, $offset, $remaining),
900 $remaining);
901 next if !defined($bytes) && $!{EINTR};
902 croak "POSIX::write failed: $!" unless defined $bytes;
903 croak "POSIX::write wrote 0 bytes" unless $bytes;
904 $remaining -= $bytes;
905 $offset += $bytes;
909 sub dumpline {
910 use POSIX qw(STDERR_FILENO);
911 my ($self, $line) = @_;
912 $only = 0 unless defined($only);
913 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
914 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
915 return unless length($line);
916 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
919 sub TIEHANDLE {
920 my $class = shift || 'OStream';
921 my $mode = shift;
922 my $syslogname = shift;
923 my $syslogfacility = shift;
924 defined($syslogfacility) or $syslogfacility = LOG_USER;
925 my $self = {};
926 $self->{'syslog'} = $mode > 0;
927 $self->{'stderr'} = $mode <= 0 || $mode > 1;
928 $self->{'lastline'} = '';
929 if ($self->{'syslog'}) {
930 # Some Sys::Syslog have a stupid default setlogsock order
931 eval {Sys::Syslog::setlogsock("native"); 1;} or
932 eval {Sys::Syslog::setlogsock("unix");};
933 openlog($syslogname, "ndelay,pid", $syslogfacility)
934 or croak "Sys::Syslog::openlog failed: $!";
936 return bless $self, $class;
939 sub BINMODE {return 1}
940 sub FILENO {return undef}
941 sub EOF {return 0}
942 sub CLOSE {return 1}
944 sub PRINTF {
945 my $self = shift;
946 my $template = shift;
947 return $self->PRINT(sprintf $template, @_);
950 sub PRINT {
951 my $self = shift;
952 my $data = join('', $self->{'lastline'}, @_);
953 my $pos = 0;
954 while ((my $idx = index($data, "\n", $pos)) >= 0) {
955 ++$idx;
956 my $line = substr($data, $pos, $idx - $pos);
957 substr($data, $pos, $idx - $pos) = '';
958 $pos = $idx;
959 $self->dumpline($line);
961 $self->{'lastline'} = $data;
962 return 1;
965 sub DESTROY {
966 my $self = shift;
967 $self->dumpline($self->{'lastline'})
968 if length($self->{'lastline'});
969 closelog;
972 sub WRITE {
973 my $self = shift;
974 my ($scalar, $length, $offset) = @_;
975 $scalar = '' if !defined($scalar);
976 $length = length($scalar) if !defined($length);
977 croak "OStream::WRITE invalid length $length"
978 if $length < 0;
979 $offset = 0 if !defined($offset);
980 $offset += length($scalar) if $offset < 0;
981 croak "OStream::WRITE invalid write offset"
982 if $offset < 0 || $offset > $length;
983 my $max = length($scalar) - $offset;
984 $length = $max if $length > $max;
985 $self->PRINT(substr($scalar, $offset, $length));
986 return $length;
990 ## ----
991 ## main
992 ## ----
995 package main;
997 # returns pid of process that will schedule jobd.pl restart on success
998 # returns 0 if fork or other system call failed with error in $!
999 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
1000 sub schedule_jobd_restart {
1001 use POSIX qw(_exit setpgid);
1002 my $newpg = shift;
1003 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
1004 return undef unless -f $jdlf;
1005 my $oldsigchld = $SIG{'CHLD'};
1006 defined($oldsigchld) or $oldsigchld = sub {};
1007 my ($read, $write, $read2, $write2);
1008 pipe($read, $write) or return 0;
1009 select((select($write),$|=1)[0]);
1010 if (!pipe($read2, $write2)) {
1011 local $!;
1012 close $write;
1013 close $read;
1014 return 0;
1016 select((select($write2),$|=1)[0]);
1017 $SIG{'CHLD'} = sub {};
1018 my $retries = 3;
1019 my $child;
1020 while (!defined($child) && $retries--) {
1021 $child = fork;
1022 sleep 1 unless defined($child) || !$retries;
1024 if (!defined($child)) {
1025 local $!;
1026 close $write2;
1027 close $read2;
1028 close $write;
1029 close $read;
1030 $SIG{'CHLD'} = $oldsigchld;
1031 return 0;
1033 # double fork the child
1034 if (!$child) {
1035 close $read2;
1036 my $retries2 = 3;
1037 my $child2;
1038 while (!defined($child2) && $retries2--) {
1039 $child2 = fork;
1040 sleep 1 unless defined($child2) || !$retries2;
1042 if (!defined($child2)) {
1043 my $ec = 0 + $!;
1044 $ec = 255 unless $ec;
1045 print $write2 ":$ec";
1046 close $write2;
1047 _exit 127;
1049 if ($child2) {
1050 # pass new child pid up to parent and exit
1051 print $write2 $child2;
1052 close $write2;
1053 _exit 0;
1054 } else {
1055 # this is the grandchild
1056 close $write2;
1058 } else {
1059 close $write2;
1060 my $result = <$read2>;
1061 close $read2;
1062 chomp $result if defined($result);
1063 if (!defined($result) || $result !~ /^:?\d+$/) {
1064 # something's wrong with the child -- kill it
1065 kill(9, $child) && waitpid($child, 0);
1066 my $oldsigpipe = $SIG{'PIPE'};
1067 # make sure the grandchild, if any,
1068 # doesn't run the success proc
1069 $SIG{'PIPE'} = sub {};
1070 print $write 1;
1071 close $write;
1072 close $read;
1073 $SIG{'PIPE'} = defined($oldsigpipe) ?
1074 $oldsigpipe : 'DEFAULT';
1075 $! = 255;
1076 $SIG{'CHLD'} = $oldsigchld;
1077 return 0;
1079 if ($result =~ /^:(\d+)$/) {
1080 # fork failed in child, there is no grandchild
1081 my $ec = $1;
1082 waitpid($child, 0);
1083 close $write;
1084 close $read;
1085 $! = $ec;
1086 $SIG{'CHLD'} = $oldsigchld;
1087 return 0;
1089 # reap the child and set $child to grandchild's pid
1090 waitpid($child, 0);
1091 $child = $result;
1093 if (!$child) {
1094 # grandchild that actually initiates the jobd.pl restart
1095 close $write;
1096 my $wait = 5;
1097 open STDIN, '<', '/dev/null';
1098 open STDOUT, '>', '/dev/null';
1099 open STDERR, '>', '/dev/null';
1100 chdir "/";
1101 if ($newpg) {
1102 my $makepg = sub {
1103 my $result = setpgid(0, 0);
1104 if (!defined($result)) {
1105 --$wait;
1106 sleep 1;
1108 $result;
1110 my $result = &$makepg;
1111 defined($result) or $result = &$makepg;
1112 defined($result) or $result = &$makepg;
1113 defined($result) or $result = &$makepg;
1115 sleep $wait;
1116 my $result = <$read>;
1117 close $read;
1118 chomp $result if defined($result);
1119 if (!defined($result) || $result eq 0) {
1120 open JDLF, '+<', $jdlf or _exit(1);
1121 select((select(JDLF),$|=1)[0]);
1122 print JDLF "restart\n";
1123 truncate JDLF, tell(JDLF);
1124 close JDLF;
1126 _exit(0);
1128 close $write;
1129 close $read;
1130 $SIG{'CHLD'} = $oldsigchld;
1131 return $child;
1134 sub cancel_jobd_restart {
1135 my $restarter = shift;
1136 return unless defined($restarter) && $restarter != 0;
1137 return -1 unless kill(0, $restarter);
1138 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1139 # we must not waitpid because $restarter was doubly forked and will
1140 # NOT send us a SIGCHLD when it terminates
1141 return $restarter;
1144 my $reexec = Girocco::ExecUtil->new;
1145 chdir "/";
1146 close(DATA) if fileno(DATA);
1147 my $sfac;
1148 Getopt::Long::Configure('bundling');
1149 my ($stiv, $idiv);
1150 my $parse_res = GetOptions(
1151 'help|?|h' => sub {pod2usage(-verbose => 2, -exitval => 0)},
1152 'quiet|q' => \$quiet,
1153 'no-quiet' => sub {$quiet = 0},
1154 'progress|P' => \$progress,
1155 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1156 'idle-timeout|t=i' => \$idle_timeout,
1157 'syslog|s:s' => \$sfac,
1158 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1159 'stderr' => \$stderr,
1160 'abbrev=i' => \$abbrev,
1161 'show-fast-forward-info' => \$showff,
1162 'no-show-fast-forward-info' => sub {$showff = 0},
1163 'same-pid' => \$same_pid,
1164 'status-interval=i' => \$stiv,
1165 'idle-status-interval=i' => \$idiv,
1166 ) || pod2usage(2);
1167 $syslog = 1 if defined($sfac);
1168 $progress = 1 unless $quiet;
1169 $abbrev = 128 unless $abbrev > 0;
1170 if (defined($idle_timeout)) {
1171 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
1172 die "--idle-timeout may not be used without --inetd" unless $inetd;
1174 if (defined($stiv)) {
1175 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
1176 $statusintv = $stiv * 60;
1178 if (defined($idiv)) {
1179 die "--idle-status-interval must be a whole number" unless $idiv =~ /^\d+$/;
1180 $idleintv = $idiv * 60;
1183 open STDOUT, '>&STDERR' if $inetd;
1184 if ($syslog) {
1185 use Sys::Syslog qw();
1186 my $mode = 1;
1187 ++$mode if $stderr;
1188 $sfac = "user" unless defined($sfac) && $sfac ne "";
1189 my $ofac = $sfac;
1190 $sfac = uc($sfac);
1191 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1192 my $facility;
1193 my %badfac = map({("LOG_$_" => 1)}
1194 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1195 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
1196 die "invalid syslog facility: $ofac"
1197 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1198 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1200 if ($quiet) {
1201 open STDOUT, '>', '/dev/null';
1202 } elsif ($inetd) {
1203 *STDOUT = *STDERR;
1206 my $NAME;
1208 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1209 my $restart_active = 1;
1210 my $resumefd = $ENV{(SOCKFDENV)};
1211 delete $ENV{(SOCKFDENV)};
1212 if (defined($resumefd) && !isfdopen($resumefd)) {
1213 warn "ignoring invalid ".SOCKFDENV." environment value\n";
1214 $resumefd = undef;
1216 if ($inetd || defined($resumefd)) {
1217 my $fdopen = defined($resumefd) ? $resumefd : 0;
1218 open Server, "<&=$fdopen" or die "open: $!";
1219 setcloexec(\*Server) if $fdopen > $^F;
1220 my $sockname = getsockname Server;
1221 die "getsockname: $!" unless $sockname;
1222 die "socket already connected! must be 'wait' socket" if getpeername Server;
1223 die "getpeername: $!" unless $!{ENOTCONN};
1224 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1225 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1226 my $socktype = unpack('i', $st);
1227 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM;
1228 die "AF_UNIX socket required" unless sockaddr_family($sockname) == AF_UNIX;
1229 $NAME = unpack_sockaddr_un $sockname;
1230 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1231 if ($NAME ne $expected) {
1232 $restart_active = 0;
1233 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled";
1235 my $mode = (stat($NAME))[2];
1236 die "stat: $!" unless $mode;
1237 $mode &= 07777;
1238 if (($mode & 0660) != 0660) {
1239 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1241 } else {
1242 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1243 my $uaddr = sockaddr_un($NAME);
1245 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1246 unlink($NAME);
1247 bind(Server, $uaddr) or die "bind failed: $!";
1248 listen(Server, SOMAXCONN) or die "listen failed: $!";
1249 chmod 0666, $NAME or die "chmod failed: $!";
1252 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1253 my $classname = $throttle->{"name"};
1254 $classname or next;
1255 Throttle::GetClassInfo($classname, $throttle);
1258 sub _min {
1259 return $_[0] <= $_[1] ? $_[0] : $_[1];
1262 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1263 setnonblock($piperead);
1264 select((select($pipewrite), $|=1)[0]);
1265 my $pipebuff = '';
1266 my $fdset_both = '';
1267 vec($fdset_both, fileno($piperead), 1) = 1;
1268 my $fdset_pipe = $fdset_both;
1269 vec($fdset_both, fileno(Server), 1) = 1;
1270 my $penalty = 0;
1271 my $t = time;
1272 my $penaltytime = $t;
1273 my $nextwakeup = $t + 60;
1274 my $nextstatus = undef;
1275 $nextstatus = $t + $statusintv if $statusintv;
1276 if ($restart_active) {
1277 unless (unlink($restart_file) || $!{ENOENT}) {
1278 $restart_active = 0;
1279 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1282 statmsg "listening on $NAME";
1283 while (1) {
1284 my ($rout, $eout, $nfound);
1285 do {
1286 my $wait;
1287 my $now = time;
1288 my $adjustpenalty = sub {
1289 if ($penaltytime < $now) {
1290 my $credit = $now - $penaltytime;
1291 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1292 $penaltytime = $now;
1295 if (defined($nextstatus) && $now >= $nextstatus) {
1296 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1297 my $statmsg = "STATUS: $children active";
1298 my @running = ();
1299 if ($children) {
1300 my @stats = ();
1301 my $cnt = 0;
1302 foreach my $cls (sort(Throttle::GetClassList())) {
1303 my $inf = Throttle::GetClassInfo($cls);
1304 if ($inf->{'total'}) {
1305 $cnt += $inf->{'total'};
1306 push(@stats, substr(lc($cls),0,1)."=".
1307 $inf->{'total'}.'/'.$inf->{'active'});
1310 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1311 $statmsg .= " (".join(" ",@stats).")" if @stats;
1312 foreach (Throttle::GetRunningPids()) {
1313 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1314 next unless $ts;
1315 push(@running, "[${cls}::$desc] ".duration($now-$ts));
1318 my $idlesecs;
1319 $statmsg .= ", idle " . duration($idlesecs)
1320 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1321 statmsg $statmsg;
1322 statmsg "STATUS: currently running: ".join(", ", @running)
1323 if @running;
1324 $idlestatus = $now if !$children;
1326 $nextstatus += $statusintv while $nextstatus <= $now;
1328 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1329 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1330 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1331 my $fdset;
1332 if ($penalty <= $maxspawn) {
1333 $fdset = $fdset_both;
1334 } else {
1335 $fdset = $fdset_pipe;
1336 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1338 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1339 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1340 my $reaped;
1341 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1342 $now = time;
1343 &$adjustpenalty; # this prevents banking credits for elapsed time
1344 if (!$children && !$nfound && $restart_active && -e $restart_file) {
1345 $SIG{CHLD} = sub {};
1346 my $restarter = schedule_jobd_restart($inetd);
1347 if (defined($restarter) && !$restarter) {
1348 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1349 sleep 2; # *cough*
1350 $restarter = schedule_jobd_restart;
1351 if (!defined($restarter)) {
1352 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1353 } elsif (defined($restarter) && !$restarter) {
1354 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1355 $restarter = undef;
1358 if ($inetd) {
1359 statmsg "RESTART: restart requested; now exiting for inetd restart";
1360 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1361 sleep 2; # *cough*
1362 exit 0;
1363 } else {
1364 statmsg "RESTART: restart requested; now restarting";
1365 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1366 setnoncloexec(\*Server);
1367 $reexec->setenv(SOCKFDENV, fileno(Server));
1368 $reexec->reexec($same_pid);
1369 setcloexec(\*Server) if fileno(Server) > $^F;
1370 statmsg "RESTART: continuing after failed restart: $!";
1371 chdir "/";
1372 cancel_jobd_restart($restarter) if $restarter;
1373 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1374 $SIG{CHLD} = \&REAPER;
1377 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1378 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1379 exit 0;
1381 } while $nfound < 1;
1382 my $reout = $rout | $eout;
1383 if (vec($reout, fileno($piperead), 1)) {{
1384 my $nloff = -1;
1386 my $bytes;
1387 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1388 while (!defined($bytes) && $!{EINTR});
1389 last if !defined($bytes) && $!{EAGAIN};
1390 die "sysread failed: $!" unless defined $bytes;
1391 # since we always keep a copy of $pipewrite open EOF is fatal
1392 die "sysread returned EOF on pipe read" unless $bytes;
1393 $nloff = index($pipebuff, "\n", 0);
1394 if ($nloff < 0 && length($pipebuff) >= 512) {
1395 $pipebuff = '';
1396 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1398 redo unless $nloff >= 0;
1400 last unless $nloff >= 0;
1401 do {
1402 my $msg = substr($pipebuff, 0, $nloff);
1403 substr($pipebuff, 0, $nloff + 1) = '';
1404 $nloff = index($pipebuff, "\n", 0);
1405 process_pipe_msg($msg) if length($msg);
1406 } while $nloff >= 0;
1407 redo;
1409 next unless vec($reout, fileno(Server), 1);
1410 unless (accept(Client, Server)) {
1411 logmsg "accept failed: $!" unless $!{EINTR};
1412 next;
1414 logmsg "connection on $NAME";
1415 ++$penalty;
1416 spawn sub {
1417 my $inp = <STDIN>;
1418 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1419 chomp $inp if defined($inp);
1420 # ignore empty and "nop" connects
1421 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1422 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1423 defined($arg) or $arg = '';
1424 if ($cmd eq 'ref-changes') {
1425 ref_changes($arg);
1426 } elsif ($cmd eq 'clone') {
1427 clone($arg);
1428 } elsif ($cmd eq 'ref-change') {
1429 ref_change($arg);
1430 } elsif ($cmd eq 'throttle') {
1431 throttle($arg);
1432 } else {
1433 statmsg "ignoring unknown command: $cmd";
1434 exit 3;
1437 close Client;
1441 ## -------------
1442 ## Documentation
1443 ## -------------
1446 __END__
1448 =head1 NAME
1450 taskd.pl - Perform Girocco service tasks
1452 =head1 SYNOPSIS
1454 taskd.pl [options]
1456 Options:
1457 -h | --help detailed instructions
1458 -q | --quiet run quietly
1459 --no-quiet do not run quietly
1460 -P | --progress show occasional status updates
1461 -i | --inetd run as inetd unix stream wait service
1462 implies --quiet --syslog
1463 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1464 requires --inetd
1465 -s | --syslog[=facility] send messages to syslog instead of
1466 stderr but see --stderr
1467 enabled by --inetd
1468 --no-syslog do not send message to syslog
1469 --stderr always send messages to stderr too
1470 --abbrev=n abbreviate hashes to n (default is 8)
1471 --show-fast-forward-info show fast-forward info (default is on)
1472 --no-show-fast-forward-info disable showing fast-forward info
1473 --same-pid keep same pid during graceful restart
1474 --status-interval=MINUTES status update interval (default 1)
1475 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1477 =head1 OPTIONS
1479 =over 8
1481 =item B<--help>
1483 Print the full description of taskd.pl's options.
1485 =item B<--quiet>
1487 Suppress non-error messages, e.g. for use when running this task as an inetd
1488 service. Enabled by default by --inetd.
1490 =item B<--no-quiet>
1492 Enable non-error messages. When running in --inetd mode these messages are
1493 sent to STDERR instead of STDOUT.
1495 =item B<--progress>
1497 Show information about the current status of the task operation occasionally.
1498 This is automatically enabled if --quiet is not given.
1500 =item B<--inetd>
1502 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1503 stream socket ready to have accept called on it. To be useful, the unix socket
1504 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1505 will be issued if the socket is not in the expected location. Socket file
1506 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1507 die. The --inetd option also enables the --quiet and --syslog options but
1508 --no-quiet and --no-syslog may be used to alter that.
1510 The correct specification for the inetd socket is a "unix" protocol "stream"
1511 socket in "wait" mode with user and group writable permissions (0660). An
1512 attempt will be made to alter the socket's file mode if needed and if that
1513 cannot be accomplished taskd.pl will die.
1515 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1516 in wait mode and will die if the passed in socket is already connected.
1518 Note that while *BSD's inetd happily supports unix sockets (and so does
1519 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1520 However, systemd does seem to.
1522 =item B<--idle-timeout=SECONDS>
1524 Only permitted when running in --inetd mode. After SECONDS of inactivity
1525 (i.e. all outstanding tasks have completed and no new requests have come in)
1526 exit normally. The default is no timeout at all (a SECONDS value of 0).
1527 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1529 =item B<--syslog[=facility]>
1531 Normally error output is sent to STDERR. With this option it's sent to
1532 syslog instead. Note that when running in --inetd mode non-error output is
1533 also affected by this option as it's sent to STDERR in that case. If
1534 not specified, the default for facility is LOG_USER. Facility names are
1535 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1536 with the LOG_NOTICE priority.
1538 =item B<--no-syslog>
1540 Send error message output to STDERR but not syslog.
1542 =item B<--stderr>
1544 Always send error message output to STDERR. If --syslog is in effect then
1545 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1546 messages as well.
1548 =item B<--abbrev=n>
1550 Abbreviate displayed hash values to only the first n hexadecimal characters.
1551 The default is 8 characters. Set to 0 for no abbreviation at all.
1553 =item B<--show-fast-forward-info>
1555 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1556 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1557 This requires running an extra git command for each ref update that is not a
1558 creation or deletion in order to determine whether or not it's a fast forward.
1560 =item B<--no-show-fast-forward-info>
1562 Disable showing of fast-forward information for ref-change/ref-notify update
1563 messages. Instead just show a ' -> ' indicator.
1565 =item B<--same-pid>
1567 When performing a graceful restart, keep the same pid rather than switching to
1568 a new one.
1570 =item B<--status-interval=MINUTES>
1572 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1573 status updates are shown at each MINUTES interval. Setting the interval to 0
1574 disables them entirely even with --progress.
1576 =item B<--idle-status-interval=IDLEMINUTES>
1578 Two consecutive "idle" status updates with no intervening activity will not be
1579 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1580 Setting the interval to 0 prevents any consecutive idle updates (with no
1581 activity between them) from appearing at all.
1583 =back
1585 =head1 DESCRIPTION
1587 taskd.pl is Girocco's service request servant; it listens for service requests
1588 such as new clone requests and ref update notifications and spawns a task to
1589 perform the requested action.
1591 =cut