jobd/taskd: support --same-pid graceful restart option
[girocco.git] / taskd / taskd.pl
blob406c5cf51107a6c72f9e170407d2d0f0d88d7c88
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX ":sys_wait_h";
40 use File::Basename;
42 use lib dirname($0);
43 use Girocco::Config;
44 use Girocco::Notify;
45 use Girocco::Project;
46 use Girocco::User;
47 use Girocco::Util qw(noFatalsToBrowser get_git);
48 BEGIN {noFatalsToBrowser}
49 use Girocco::ExecUtil;
51 # Throttle Classes Defaults
52 # Note that any same-named classes in @Girocco::Config::throttle_classes
53 # will override (completely replacing the entire hash) these ones.
54 my @throttle_defaults = (
56 name => "ref-change",
57 maxproc => 0,
58 maxjobs => 1,
59 interval => 1
62 name => "clone",
63 maxproc => 0,
64 maxjobs => 2,
65 interval => 5
68 name => "snapshot",
69 #maxproc => max(5, cpucount + maxjobs), # this is the default
70 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
71 interval => 5
75 # Options
76 my $quiet;
77 my $progress;
78 my $syslog;
79 my $stderr;
80 my $inetd;
81 my $idle_timeout;
82 my $abbrev = 8;
83 my $showff = 1;
84 my $same_pid;
85 my $statusintv = 60;
86 my $idleintv = 3600;
87 my $maxspawn = 8;
89 $| = 1;
91 my $progname = basename($0);
92 my $children = 0;
93 my $idlestart = time;
94 my $idlestatus = 0;
96 sub cpucount {
97 use Girocco::Util "online_cpus";
98 our $online_cpus_result;
99 $online_cpus_result = online_cpus unless $online_cpus_result;
100 return $online_cpus_result;
103 sub logmsg {
104 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
105 if (tied *STDOUT) {
106 $OStream::only = 2; # STDERR only
107 print "$hdr@_\n";
108 $OStream::only = 1; # syslog only
109 print "@_\n";
110 $OStream::only = 0; # back to default
111 } else {
112 print "$hdr@_\n";
116 sub statmsg {
117 return unless $progress;
118 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
119 if (tied *STDERR) {
120 $OStream::only = 2; # STDERR only
121 print STDERR "$hdr@_\n";
122 $OStream::only = 1; # syslog only
123 print STDERR "@_\n";
124 $OStream::only = 0; # back to default
125 } else {
126 print STDERR "$hdr@_\n";
130 sub duration {
131 my $secs = shift;
132 return $secs unless defined($secs) && $secs >= 0;
133 $secs = int($secs);
134 my $ans = ($secs % 60) . 's';
135 return $ans if $secs < 60;
136 $secs = int($secs / 60);
137 $ans = ($secs % 60) . 'm' . $ans;
138 return $ans if $secs < 60;
139 $secs = int($secs / 60);
140 $ans = ($secs % 24) . 'h' . $ans;
141 return $ans if $secs < 24;
142 $secs = int($secs / 24);
143 return $secs . 'd' . $ans;
146 sub setnonblock {
147 my $fd = shift;
148 my $flags = fcntl($fd, F_GETFL, 0);
149 defined($flags) or die "fcntl failed: $!";
150 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
153 sub setblock {
154 my $fd = shift;
155 my $flags = fcntl($fd, F_GETFL, 0);
156 defined($flags) or die "fcntl failed: $!";
157 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
160 package Throttle;
163 ## Throttle protocol
165 ## 1) Process needing throttle services acquire a control file descriptor
166 ## a) Either as a result of a fork + exec (the write end of a pipe)
167 ## b) Or by connecting to the taskd socket (not yet implemented)
169 ## 2) The process requesting throttle services will be referred to
170 ## as the supplicant or just "supp" for short.
172 ## 3) The supp first completes any needed setup which may include
173 ## gathering data it needs to perform the action -- if that fails
174 ## then there's no need for any throttling.
176 ## 4) The supp writes a throttle request to the control descriptor in
177 ## this format:
178 ## throttle <pid> <class>\n
179 ## for example if the supp's pid was 1234 and it was requesting throttle
180 ## control as a member of the mail class it would write this message:
181 ## throttle 1234 mail\n
182 ## Note that if the control descriptor happens to be a pipe rather than a
183 ## socket, the message should be preceded by another "\n" just be be safe.
184 ## If the control descriptor is a socket, not a pipe, the message may be
185 ## preceded by a "\n" but that's not recommended.
187 ## 5) For supplicants with a control descriptor that is a pipe
188 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
189 ## If the control descriptor is a socket (getsockname succeeds) then
190 ## protocol (5b) should be used.
192 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
193 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
194 ## to write a "keepalive\n" message to the control descriptor. If that
195 ## fails, the controller has gone away and it may make its own decision
196 ## whether or not to proceed at that point. If, on the other hand, it
197 ## receives a SIGTERM, the process limit for its class has been reached
198 ## and it should abort without performing its action. If it receives
199 ## SIGUSR1, it may proceed without writing anything more to the control
200 ## descriptor, any MAY even close the control descriptor. Finally, a
201 ## SIGUSR2 indicates rejection of the throttle request for some other reason
202 ## such as unrecognized class name or invalid pid in which case the supp may
203 ## make its own decision how to proceed.
205 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
206 ## more than 512 bytes and if a '\n' does not appear within that number of
207 ## bytes the read should be considered failed. Otherwise the read should
208 ## be retried until either a full line has been read or the socket is
209 ## closed from the other end. If the lone read is "proceed\n" then it may
210 ## proceed without reading or writing anything more to the control
211 ## descriptor, but MUST keep the control descriptor open and not call
212 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
213 ## should be retried) constitutes failure. If a full line starting with at
214 ## least one alpha character was read but it was not "proceed" then it
215 ## should abort without performing its action. For any other failure it
216 ## may make its own decision whether or not to proceed as the controller has
217 ## gone away.
219 ## 6) The supp now performs its throttled action.
221 ## 7) The supp now closes its control descriptor (if it hasn't already in the
222 ## case of (5a)) and exits -- in the case of a socket, the other end receives
223 ## notification that the socket has been closed (read EOF). In the case of
224 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
225 ## of the other end of the pipe, so it will not reaach EOF by the supp's
226 ## exit in that case).
229 # keys are class names, values are hash refs with these fields:
230 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
231 # many may be queued waiting plus how many may be
232 # concurrently active) with 0 meaning no limit.
233 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
234 # of 0 is unlimited but the number of concurrent
235 # supplicants will always be limited to no more than
236 # the 'maxproc' value (if > 0) no matter what the
237 # 'maxjobs' value is.
238 # 'total' -> integer; the total number of pids belonging to this clase that
239 # can currently be found in %pid.
240 # 'active' -> integer; the number of currently active supplicants which should
241 # be the same as (the number of elements of %pid with a
242 # matching class name) - (number of my class in @queue).
243 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
244 # or SIGUSR1 signals to members of this class.
245 # 'lastqueue' -> time; last time a supplicant was successfully queued.
246 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
247 # 'lastthrottle' => time; last time a supplicant was throttled
248 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
249 my %classes = ();
251 # keys are pid numbers, values are array refs with these elements:
252 # [0] => name of class (key to classes hash)
253 # [1] => supplicant state (0 => queued, non-zero => time it started running)
254 # [2] => descriptive text (e.g. project name)
255 my %pid = ();
257 # minimum number of seconds between any two proceed responses no matter what
258 # class. this takes priority in that it can effectively increase the
259 # class's 'interval' value by delaying proceed notifications if the minimum
260 # interval has not yet elapsed.
261 my $interval = 1;
263 # fifo of pids awaiting notification as soon as the next $interval elapses
264 # provided interval and maxjobs requirements are satisfied
265 # for the class of the pid that will next be triggered.
266 my @queue = ();
268 # time of most recent successful call to AddSupplicant
269 my $lastqueue = 0;
271 # time of most recent proceed notification
272 my $lastproceed = 0;
274 # time of most recent throttle
275 my $lastthrottle = 0;
277 # time of most recent removal
278 my $lastdied = 0;
280 # lifetime count of how many have been queued
281 my $totalqueue = 0;
283 # lifetime count of how many have been allowed to proceed
284 my $totalproceed = 0;
286 # lifetime count of how many have been throttled
287 my $totalthrottle = 0;
289 # lifetime count of how many have died
290 # It should always be true that $totalqueued - $totaldied == $curentlyactive
291 my $totaldied = 0;
293 # Returns an unordered list of currently registered class names
294 sub GetClassList {
295 return keys(%classes);
298 sub _max {
299 return $_[0] if $_[0] >= $_[1];
300 return $_[1];
303 sub _getnum {
304 my ($min, $val, $default) = @_;
305 my $ans;
306 if (defined($val) && $val =~ /^[+-]?\d+$/) {
307 $ans = 0 + $val;
308 } else {
309 $ans = &$default;
311 return _max($min, $ans);
314 # [0] => name of class to find
315 # [1] => if true, create class if it doesn't exist, if a hashref then
316 # it contains initial values for maxproc, maxjobs and interval.
317 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
318 # defaults to the max(5, number of cpu cores + maxjobs) and interval
319 # defaults to 1.
320 # Returns a hash ref with info about the class on success
321 sub GetClassInfo {
322 my ($classname, $init) = @_;
323 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
324 or return;
325 $classname = lc($classname);
326 my %info;
327 if ($classes{$classname}) {
328 %info = %{$classes{$classname}};
329 return \%info;
331 return unless $init;
332 my %newclass = ();
333 ref($init) eq 'HASH' or $init = {};
334 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
335 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
336 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
337 $newclass{'total'} = 0;
338 $newclass{'active'} = 0;
339 $newclass{'lastqueue'} = 0;
340 $newclass{'lastproceed'} = 0;
341 $newclass{'lastthrottle'} = 0;
342 $newclass{'lastdied'} = 0;
343 $classes{$classname} = \%newclass;
344 %info = %newclass;
345 return \%info;
348 # [0] => pid to look up
349 # Returns () if not found otherwise ($classname, $timestarted, $description)
350 # Where $timestarted will be 0 if it's still queued otherwise a time() value
351 sub GetPidInfo {
352 my $pid = shift;
353 return () unless exists $pid{$pid};
354 return @{$pid{$pid}};
357 # Returns array of pid numbers that are currently running sorted
358 # by time started (oldest to newest). Can return an empty array.
359 sub GetRunningPids {
360 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
361 grep({ ${$pid{$_}}[1] } keys(%pid)));
364 # Returns a hash with various about the current state
365 # 'interval' => global minimum interval between proceeds
366 # 'active' => how many pids are currently queued + how many are running
367 # 'queue' => how many pids are currently queued
368 # 'lastqueue' => time (epoch seconds) of last queue
369 # 'lastproceed' => time (epoch seconds) of last proceed
370 # 'lastthrottle' => time (epoch seconds) of last throttle
371 # 'lastdied' => time (epoch seconds) of last removal
372 # 'totalqueue' => lifetime total number of processes queued
373 # 'totalproceed' => lifetime total number of processes proceeded
374 # 'totalthrottle' => lifetime total number of processes throttled
375 # 'totaldied' => lifetime total number of removed processes
376 sub GetInfo {
377 return {
378 interval => $interval,
379 active => scalar(keys(%pid)) - scalar(@queue),
380 queue => scalar(@queue),
381 lastqueue => $lastqueue,
382 lastproceed => $lastproceed,
383 lastthrottle => $lastthrottle,
384 lastdied => $lastdied,
385 totalqueue => $totalqueue,
386 totalproceed => $totalproceed,
387 totalthrottle => $totalthrottle,
388 totaldied => $totaldied
392 # with no args get the global interval
393 # with one arg set it, returns previous value if set
394 sub Interval {
395 my $ans = $interval;
396 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
397 return $ans;
400 sub RemoveSupplicant;
402 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
403 # Returns minimum interval until next proceed is possible
404 # Returns undef if there's nothing waiting to proceed or
405 # the 'maxjobs' limits have been reached for all queued items (in which
406 # case it won't be possible to proceed until one of them exits, hence undef)
407 # This is called automatially by AddSupplicant and RemoveSupplicant
408 sub ServiceQueue {
409 RETRY:
410 return undef unless @queue; # if there's nothing queued, nothing to do
411 my $now = time;
412 my $min = _max(0, $interval - ($now - $lastproceed));
413 my $classmin = undef;
414 my $classchecked = 0;
415 my %seenclass = ();
416 my $classcount = scalar(keys(%classes));
417 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
418 my $pid = $queue[$i];
419 my $procinfo = $pid{$pid};
420 if (!$procinfo) {
421 RemoveSupplicant($pid, 1);
422 goto RETRY;
424 my $classinfo = $classes{$$procinfo[0]};
425 if (!$classinfo) {
426 RemoveSupplicant($pid, 1);
427 goto RETRY;
429 if (!$seenclass{$$procinfo[0]}) {
430 $seenclass{$$procinfo[0]} = 1;
431 ++$classchecked;
432 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
433 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
434 if (!$cmin && !$min) {
435 $now = time;
436 $$procinfo[1] = $now;
437 splice(@queue, $i, 1);
438 ++$totalproceed;
439 $lastproceed = $now;
440 $classinfo->{'lastproceed'} = $now;
441 ++$classinfo->{'active'};
442 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
443 goto RETRY;
445 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
449 return defined($classmin) ? _max($min, $classmin) : undef;
452 # $1 => pid to add (must not already be in %pids)
453 # $2 => class name (must exist)
454 # Returns -1 if no such class or pid already present or invalid
455 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
456 # Return 1 if throttled and cannot be added
457 sub AddSupplicant {
458 my ($pid, $classname, $text, $noservice) = @_;
459 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
460 $pid += 0;
461 kill(0, $pid) or return -1;
462 my $classinfo = $classes{$classname};
463 return -1 unless $classinfo;
464 return -1 if $pid{$pid};
465 $text = '' unless defined($text);
466 my $now = time;
467 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
468 ++$totalthrottle;
469 $lastthrottle = $now;
470 $classinfo->{'lastthrottle'} = $now;
471 return 1;
473 ++$totalqueue;
474 $lastqueue = $now;
475 $pid{$pid} = [$classname, 0, $text];
476 ++$classinfo->{'total'};
477 $classinfo->{'lastqueue'} = $now;
478 push(@queue, $pid);
479 ServiceQueue unless $noservice;
480 return 0;
483 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
484 # Returns 0 if removed
485 # Returns -1 if unknown pid or other error during removal
486 sub RemoveSupplicant {
487 my ($pid, $noservice) = @_;
488 return -1 unless defined($pid) && $pid =~ /^\d+$/;
489 $pid += 0;
490 my $pidinfo = $pid{$pid};
491 $pidinfo or return -1;
492 my $now = time;
493 $lastdied = $now;
494 ++$totaldied;
495 delete $pid{$pid};
496 if (!$$pidinfo[1]) {
497 for (my $i=0; $i<=$#queue; ++$i) {
498 if ($queue[$i] == $pid) {
499 splice(@queue, $i, 1);
500 --$i;
504 my $classinfo = $classes{$$pidinfo[0]};
505 ServiceQueue, return -1 unless $classinfo;
506 --$classinfo->{'active'} if $$pidinfo[1];
507 --$classinfo->{'total'};
508 $classinfo->{'lastdied'} = $now;
509 ServiceQueue unless $noservice;
510 return 0;
513 # Instance Methods
515 package main;
518 ## ---------
519 ## Functions
520 ## ---------
523 my @reapedpids = ();
524 my %signame = (
525 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
526 1 => 'SIGHUP',
527 2 => 'SIGINT',
528 3 => 'SIGQUIT',
529 6 => 'SIGABRT',
530 9 => 'SIGKILL',
531 14 => 'SIGALRM',
532 15 => 'SIGTERM',
534 sub REAPER {
535 local $!;
536 my $child;
537 my $waitedpid;
538 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
539 my $code = $? & 0xffff;
540 $idlestart = time if !--$children;
541 my $codemsg = '';
542 if (!($code & 0xff)) {
543 $codemsg = " with exit code ".($code >> 8) if $code;
544 } elsif ($code & 0x7f) {
545 my $signum = ($code & 0x7f);
546 $codemsg = " with signal ".
547 ($signame{$signum}?$signame{$signum}:$signum);
549 logmsg "reaped $waitedpid$codemsg";
550 push(@reapedpids, $waitedpid);
552 $SIG{CHLD} = \&REAPER; # loathe sysV
555 $SIG{CHLD} = \&REAPER; # Apollo 440
557 my ($piperead, $pipewrite);
558 sub spawn {
559 my $coderef = shift;
561 my $pid = fork;
562 if (not defined $pid) {
563 logmsg "cannot fork: $!";
564 return;
565 } elsif ($pid) {
566 $idlestart = time if !++$children;
567 $idlestatus = 0;
568 logmsg "begat $pid";
569 return; # I'm the parent
572 close(Server) unless fileno(Server) == 0;
573 close($piperead);
574 $SIG{'CHLD'} = sub {};
576 open STDIN, "+<&Client" or die "can't dup client to stdin";
577 close(Client);
578 exit &$coderef();
581 # returns:
582 # < 0: error
583 # = 0: proceed
584 # > 0: throttled
585 sub request_throttle {
586 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
587 my $classname = shift;
588 my $text = shift;
590 Throttle::GetClassInfo($classname)
591 or return -1; # no such throttle class
593 my $throttled = 0;
594 my $proceed = 0;
595 my $error = 0;
596 my $controldead = 0;
597 my $setempty = POSIX::SigSet->new;
598 my $setfull = POSIX::SigSet->new;
599 $setempty->emptyset();
600 $setfull->fillset();
601 $SIG{'TERM'} = sub {$throttled = 1};
602 $SIG{'USR1'} = sub {$proceed = 1};
603 $SIG{'USR2'} = sub {$error = 1};
604 $SIG{'PIPE'} = sub {$controldead = 1};
605 $SIG{'ALRM'} = sub {};
607 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
608 print $pipewrite "\nthrottle $$ $classname $text\n";
609 my $old = POSIX::SigSet->new;
610 sigprocmask(SIG_SETMASK, $setfull, $old);
611 until ($controldead || $throttled || $proceed || $error) {
612 alarm(30);
613 sigsuspend($setempty);
614 alarm(0);
615 sigprocmask(SIG_SETMASK, $setempty, $old);
616 print $pipewrite "\nkeepalive $$\n";
617 sigprocmask(SIG_SETMASK, $setfull, $old);
619 sigprocmask(SIG_SETMASK, $setempty, $old);
620 $SIG{'TERM'} = "DEFAULT";
621 $SIG{'USR1'} = "DEFAULT";
622 $SIG{'USR2'} = "DEFAULT";
623 $SIG{'ALRM'} = "DEFAULT";
624 $SIG{'PIPE'} = "DEFAULT";
626 my $result = -1;
627 if ($throttled) {
628 $result = 1;
629 } elsif ($proceed) {
630 $result = 0;
632 return $result;
635 sub clone {
636 my ($name) = @_;
637 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
638 my $proj;
639 eval {$proj = Girocco::Project->load($name)};
640 if (!$proj && Girocco::Project::does_exist($name, 1)) {
641 # If the .clone_in_progress file exists, but the .clonelog does not
642 # and neither does the .clone_failed, be helpful and touch the
643 # .clone_failed file so that the mirror can be restarted
644 my $projdir = $Girocco::Config::reporoot."/$name.git";
645 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
646 open X, '>', "$projdir/.clone_failed" and close(X);
649 $proj or die "failed to load project $name";
650 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
651 $proj->{clone_logged} and die "project $name is already being cloned";
652 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
653 statmsg "cloning $name";
654 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
655 open STDERR, ">&STDOUT";
656 open STDIN, '<', '/dev/null';
657 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
660 sub ref_indicator {
661 return ' -> ' unless $showff && defined($_[0]);
662 my ($git_dir, $old, $new) = @_;
663 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
664 # In many cases `git merge-base` is slower than this even if using the
665 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
666 my $ans = get_git("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ? '...' : '..';
667 return wantarray ? ($ans, 1) : $ans;
670 sub ref_change {
671 my ($arg) = @_;
672 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
673 $username && $name && $oldrev && $newrev && $ref or return 0;
674 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
675 $newrev ne $oldrev or return 0;
677 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
678 my $proj = Girocco::Project->load($name);
679 $proj or die "failed to load project $name";
680 my $has_notify = $proj->has_notify;
681 my $type = $has_notify ? "notify" : "change";
683 my $user;
684 if ($username && $username !~ /^%.*%$/) {
685 Girocco::User::does_exist($username, 1) or die "no such user: $username";
686 $user = Girocco::User->load($username);
687 $user or die "failed to load user $username";
688 } elsif ($username eq "%$name%") {
689 $username = "-";
692 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
693 my $ind = ref_indicator($proj->{path}, $oldrev, $newrev);
694 statmsg "ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
695 open STDIN, '<', '/dev/null';
696 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev) if $has_notify;
697 return 0;
700 sub ref_changes {
701 my ($arg) = @_;
702 my ($username, $name) = split(/\s+/, $arg);
703 $username && $name or return 0;
705 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
706 my $proj = Girocco::Project->load($name);
707 $proj or die "failed to load project $name";
708 my $has_notify = $proj->has_notify;
709 my $type = $has_notify ? "notify" : "change";
711 my $user;
712 if ($username && $username !~ /^%.*%$/) {
713 Girocco::User::does_exist($username, 1) or die "no such user: $username";
714 $user = Girocco::User->load($username);
715 $user or die "failed to load user $username";
716 } elsif ($username eq "%$name%") {
717 $username = "-";
720 my @changes = ();
721 my %oldheads = ();
722 my %deletedheads = ();
723 while (my $change = <STDIN>) {
724 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
725 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or next;
726 if ($ref =~ m{^refs/heads/.}) {
727 if ($oldrev =~ /^0{40}$/) {
728 delete $oldheads{$ref};
729 $deletedheads{$ref} = 1;
730 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
731 $oldheads{$ref} = $oldrev;
734 $newrev ne $oldrev or next;
735 push(@changes, [$oldrev, $newrev, $ref]);
737 return 0 unless @changes;
738 open STDIN, '<', '/dev/null';
739 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
740 my $statproc = sub {
741 my ($old, $new, $ref, $ran_mail_sh) = @_;
742 my ($ind, $ran_git) = ref_indicator($proj->{path}, $old, $new);
743 statmsg "ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
744 sleep 1 if $ran_mail_sh || $ran_git;
746 if ($has_notify) {
747 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
748 } else {
749 &$statproc(@$_) foreach @changes;
751 return 0;
754 sub throttle {
755 my ($arg) = @_;
756 my ($pid, $classname, $text) = split(/\s+/, $arg);
757 $pid =~ /^\d+/ or return 0; # invalid pid
758 $pid += 0;
759 $pid > 0 or return 0; # invalid pid
760 kill(0, $pid) || $!{EPERM} or return 0; # no such process
761 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
762 defined($text) && $text ne '' or return 0; # no text no service
764 my $throttled = 0;
765 my $proceed = 0;
766 my $error = 0;
767 my $controldead = 0;
768 my $suppdead = 0;
769 my ($waker, $wakew);
770 pipe($waker, $wakew) or die "pipe failed: $!";
771 select((select($wakew),$|=1)[0]);
772 setnonblock($wakew);
773 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
774 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
775 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
776 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
777 select((select(STDIN),$|=1)[0]);
779 logmsg "throttle $pid $classname $text request";
780 # After writing we can expect a SIGTERM or SIGUSR1
781 print $pipewrite "\nthrottle $$ $classname $text\n";
783 # NOTE: the only way to detect the socket close is to read all the
784 # data until EOF is reached -- recv can be used to peek.
785 my $v = '';
786 vec($v, fileno(STDIN), 1) = 1;
787 vec($v, fileno($waker), 1) = 1;
788 setnonblock(\*STDIN);
789 setnonblock($waker);
790 until ($controldead || $throttled || $proceed || $error || $suppdead) {
791 my ($r, $e);
792 select($r=$v, undef, $e=$v, 30);
793 my ($bytes, $discard);
794 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
795 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
796 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
797 print $pipewrite "\nkeepalive $$\n";
799 setblock(\*STDIN);
801 if ($throttled && !$suppdead) {
802 print STDIN "throttled\n";
803 logmsg "throttle $pid $classname $text throttled";
804 } elsif ($proceed && !$suppdead) {
805 print STDIN "proceed\n";
806 logmsg "throttle $pid $classname $text proceed";
807 $SIG{'TERM'} = 'DEFAULT';
808 # Stay alive until the child dies which we detect by EOF on STDIN
809 setnonblock(\*STDIN);
810 until ($controldead || $suppdead) {
811 my ($r, $e);
812 select($r=$v, undef, $e=$v, 30);
813 my ($bytes, $discard);
814 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
815 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
816 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
817 print $pipewrite "\nkeepalive $$\n";
819 setblock(\*STDIN);
820 } else {
821 my $prefix = '';
822 $prefix = "control" if $controldead && !$suppdead;
823 logmsg "throttle $pid $classname $text ${prefix}died";
825 exit 0;
828 sub process_pipe_msg {
829 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
830 if ($act eq "throttle") {
831 $pid =~ /^\d+$/ or return 0;
832 $pid += 0;
833 $pid > 0 or return 0; # invalid pid
834 kill(0, $pid) or return 0; # invalid pid
835 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
836 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
837 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
838 # the AddSupplicant call could send SIGUSR1 before it returns
839 my $result = Throttle::AddSupplicant($pid, $cls, $text);
840 kill('USR2', $pid), return 0 if $result < 0;
841 kill('TERM', $pid), return 0 if $result > 0;
842 # $pid was added to class $cls and will receive SIGUSR1 when
843 # it's time for it to proceed
844 return 0;
845 } elsif ($act eq "keepalive") {
846 # nothing to do although we could verify pid is valid and
847 # still in %Throttle::pids and send a SIGUSR2 if not, but
848 # really keepalive should just be ignored.
849 return 0;
851 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
852 return 0;
856 ## -------
857 ## OStream
858 ## -------
861 package OStream;
863 # Set to 1 for only syslog output (if enabled by mode)
864 # Set to 2 for only stderr output (if enabled by mode)
865 our $only = 0; # This is a hack
867 use Carp 'croak';
868 use Sys::Syslog qw(:DEFAULT :macros);
870 sub writeall {
871 use POSIX qw();
872 use Errno;
873 my ($fd, $data) = @_;
874 my $offset = 0;
875 my $remaining = length($data);
876 while ($remaining) {
877 my $bytes = POSIX::write(
878 $fd,
879 substr($data, $offset, $remaining),
880 $remaining);
881 next if !defined($bytes) && $!{EINTR};
882 croak "POSIX::write failed: $!" unless defined $bytes;
883 croak "POSIX::write wrote 0 bytes" unless $bytes;
884 $remaining -= $bytes;
885 $offset += $bytes;
889 sub dumpline {
890 use POSIX qw(STDERR_FILENO);
891 my ($self, $line) = @_;
892 $only = 0 unless defined($only);
893 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
894 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
895 return unless length($line);
896 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
899 sub TIEHANDLE {
900 my $class = shift || 'OStream';
901 my $mode = shift;
902 my $syslogname = shift;
903 my $syslogfacility = shift;
904 defined($syslogfacility) or $syslogfacility = LOG_USER;
905 my $self = {};
906 $self->{'syslog'} = $mode > 0;
907 $self->{'stderr'} = $mode <= 0 || $mode > 1;
908 $self->{'lastline'} = '';
909 if ($self->{'syslog'}) {
910 # Some Sys::Syslog have a stupid default setlogsock order
911 eval {Sys::Syslog::setlogsock("native"); 1;} or
912 eval {Sys::Syslog::setlogsock("unix");};
913 openlog($syslogname, "ndelay,pid", $syslogfacility)
914 or croak "Sys::Syslog::openlog failed: $!";
916 return bless $self, $class;
919 sub BINMODE {return 1}
920 sub FILENO {return undef}
921 sub EOF {return 0}
922 sub CLOSE {return 1}
924 sub PRINTF {
925 my $self = shift;
926 my $template = shift;
927 return $self->PRINT(sprintf $template, @_);
930 sub PRINT {
931 my $self = shift;
932 my $data = join('', $self->{'lastline'}, @_);
933 my $pos = 0;
934 while ((my $idx = index($data, "\n", $pos)) >= 0) {
935 ++$idx;
936 my $line = substr($data, $pos, $idx - $pos);
937 substr($data, $pos, $idx - $pos) = '';
938 $pos = $idx;
939 $self->dumpline($line);
941 $self->{'lastline'} = $data;
942 return 1;
945 sub DESTROY {
946 my $self = shift;
947 $self->dumpline($self->{'lastline'})
948 if length($self->{'lastline'});
949 closelog;
952 sub WRITE {
953 my $self = shift;
954 my ($scalar, $length, $offset) = @_;
955 $scalar = '' if !defined($scalar);
956 $length = length($scalar) if !defined($length);
957 croak "OStream::WRITE invalid length $length"
958 if $length < 0;
959 $offset = 0 if !defined($offset);
960 $offset += length($scalar) if $offset < 0;
961 croak "OStream::WRITE invalid write offset"
962 if $offset < 0 || $offset > $length;
963 my $max = length($scalar) - $offset;
964 $length = $max if $length > $max;
965 $self->PRINT(substr($scalar, $offset, $length));
966 return $length;
970 ## ----
971 ## main
972 ## ----
975 package main;
977 # returns pid of process that will schedule jobd.pl restart on success
978 # returns 0 if fork or other system call failed with error in $!
979 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
980 sub schedule_jobd_restart {
981 use POSIX qw(_exit setpgid);
982 my $newpg = shift;
983 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
984 return undef unless -f $jdlf;
985 my $oldsigchld = $SIG{'CHLD'};
986 defined($oldsigchld) or $oldsigchld = sub {};
987 my ($read, $write, $read2, $write2);
988 pipe($read, $write) or return 0;
989 select((select($write),$|=1)[0]);
990 if (!pipe($read2, $write2)) {
991 local $!;
992 close $write;
993 close $read;
994 return 0;
996 select((select($write2),$|=1)[0]);
997 $SIG{'CHLD'} = sub {};
998 my $retries = 3;
999 my $child;
1000 while (!defined($child) && $retries--) {
1001 $child = fork;
1002 sleep 1 unless defined($child) || !$retries;
1004 if (!defined($child)) {
1005 local $!;
1006 close $write2;
1007 close $read2;
1008 close $write;
1009 close $read;
1010 $SIG{'CHLD'} = $oldsigchld;
1011 return 0;
1013 # double fork the child
1014 if (!$child) {
1015 close $read2;
1016 my $retries2 = 3;
1017 my $child2;
1018 while (!defined($child2) && $retries2--) {
1019 $child2 = fork;
1020 sleep 1 unless defined($child2) || !$retries2;
1022 if (!defined($child2)) {
1023 my $ec = 0 + $!;
1024 $ec = 255 unless $ec;
1025 print $write2 ":$ec";
1026 close $write2;
1027 _exit 127;
1029 if ($child2) {
1030 # pass new child pid up to parent and exit
1031 print $write2 $child2;
1032 close $write2;
1033 _exit 0;
1034 } else {
1035 # this is the grandchild
1036 close $write2;
1038 } else {
1039 close $write2;
1040 my $result = <$read2>;
1041 close $read2;
1042 chomp $result if defined($result);
1043 if (!defined($result) || $result !~ /^:?\d+$/) {
1044 # something's wrong with the child -- kill it
1045 kill(9, $child) && waitpid($child, 0);
1046 my $oldsigpipe = $SIG{'PIPE'};
1047 # make sure the grandchild, if any,
1048 # doesn't run the success proc
1049 $SIG{'PIPE'} = sub {};
1050 print $write 1;
1051 close $write;
1052 close $read;
1053 $SIG{'PIPE'} = defined($oldsigpipe) ?
1054 $oldsigpipe : 'DEFAULT';
1055 $! = 255;
1056 $SIG{'CHLD'} = $oldsigchld;
1057 return 0;
1059 if ($result =~ /^:(\d+)$/) {
1060 # fork failed in child, there is no grandchild
1061 my $ec = $1;
1062 waitpid($child, 0);
1063 close $write;
1064 close $read;
1065 $! = $ec;
1066 $SIG{'CHLD'} = $oldsigchld;
1067 return 0;
1069 # reap the child and set $child to grandchild's pid
1070 waitpid($child, 0);
1071 $child = $result;
1073 if (!$child) {
1074 # grandchild that actually initiates the jobd.pl restart
1075 close $write;
1076 my $wait = 5;
1077 open STDIN, '<', '/dev/null';
1078 open STDOUT, '>', '/dev/null';
1079 open STDERR, '>', '/dev/null';
1080 chdir "/";
1081 if ($newpg) {
1082 my $makepg = sub {
1083 my $result = setpgid(0, 0);
1084 if (!defined($result)) {
1085 --$wait;
1086 sleep 1;
1088 $result;
1090 my $result = &$makepg;
1091 defined($result) or $result = &$makepg;
1092 defined($result) or $result = &$makepg;
1093 defined($result) or $result = &$makepg;
1095 sleep $wait;
1096 my $result = <$read>;
1097 close $read;
1098 chomp $result if defined($result);
1099 if (!defined($result) || $result eq 0) {
1100 open JDLF, '+<', $jdlf or _exit(1);
1101 select((select(JDLF),$|=1)[0]);
1102 print JDLF "restart\n";
1103 truncate JDLF, tell(JDLF);
1104 close JDLF;
1106 _exit(0);
1108 close $write;
1109 close $read;
1110 $SIG{'CHLD'} = $oldsigchld;
1111 return $child;
1114 sub cancel_jobd_restart {
1115 my $restarter = shift;
1116 return unless defined($restarter) && $restarter != 0;
1117 return -1 unless kill(0, $restarter);
1118 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1119 # we must not waitpid because $restarter was doubly forked and will
1120 # NOT send us a SIGCHLD when it terminates
1121 return $restarter;
1124 my $reexec = Girocco::ExecUtil->new;
1125 chdir "/";
1126 close(DATA) if fileno(DATA);
1127 my $sfac;
1128 Getopt::Long::Configure('bundling');
1129 my ($stiv, $idiv);
1130 my $parse_res = GetOptions(
1131 'help|?|h' => sub {pod2usage(-verbose => 2, -exitval => 0)},
1132 'quiet|q' => \$quiet,
1133 'no-quiet' => sub {$quiet = 0},
1134 'progress|P' => \$progress,
1135 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1136 'idle-timeout|t=i' => \$idle_timeout,
1137 'syslog|s:s' => \$sfac,
1138 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1139 'stderr' => \$stderr,
1140 'abbrev=i' => \$abbrev,
1141 'show-fast-forward-info' => \$showff,
1142 'no-show-fast-forward-info' => sub {$showff = 0},
1143 'same-pid' => \$same_pid,
1144 'status-interval=i' => \$stiv,
1145 'idle-status-interval=i' => \$idiv,
1146 ) || pod2usage(2);
1147 $syslog = 1 if defined($sfac);
1148 $progress = 1 unless $quiet;
1149 $abbrev = 128 unless $abbrev > 0;
1150 if (defined($idle_timeout)) {
1151 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
1152 die "--idle-timeout may not be used without --inetd" unless $inetd;
1154 if (defined($stiv)) {
1155 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
1156 $statusintv = $stiv * 60;
1158 if (defined($idiv)) {
1159 die "--idle-status-interval must be a whole number" unless $idiv =~ /^\d+$/;
1160 $idleintv = $idiv * 60;
1163 open STDOUT, '>&STDERR' if $inetd;
1164 if ($syslog) {
1165 use Sys::Syslog qw();
1166 my $mode = 1;
1167 ++$mode if $stderr;
1168 $sfac = "user" unless defined($sfac) && $sfac ne "";
1169 my $ofac = $sfac;
1170 $sfac = uc($sfac);
1171 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1172 my $facility;
1173 my %badfac = map({("LOG_$_" => 1)}
1174 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1175 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
1176 die "invalid syslog facility: $ofac"
1177 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1178 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1180 if ($quiet) {
1181 open STDOUT, '>', '/dev/null';
1182 } elsif ($inetd) {
1183 *STDOUT = *STDERR;
1186 my $NAME;
1188 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1189 my $restart_active = 1;
1190 if ($inetd) {
1191 open Server, '<&=0' or die "open: $!";
1192 my $sockname = getsockname Server;
1193 die "getsockname: $!" unless $sockname;
1194 die "socket already connected! must be 'wait' socket" if getpeername Server;
1195 die "getpeername: $!" unless $!{ENOTCONN};
1196 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1197 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1198 my $socktype = unpack('i', $st);
1199 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM;
1200 die "AF_UNIX socket required" unless sockaddr_family($sockname) == AF_UNIX;
1201 $NAME = unpack_sockaddr_un $sockname;
1202 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1203 if ($NAME ne $expected) {
1204 $restart_active = 0;
1205 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled";
1207 my $mode = (stat($NAME))[2];
1208 die "stat: $!" unless $mode;
1209 $mode &= 07777;
1210 if (($mode & 0660) != 0660) {
1211 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1213 } else {
1214 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1215 my $uaddr = sockaddr_un($NAME);
1217 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1218 unlink($NAME);
1219 bind(Server, $uaddr) or die "bind failed: $!";
1220 listen(Server, SOMAXCONN) or die "listen failed: $!";
1221 chmod 0666, $NAME or die "chmod failed: $!";
1224 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1225 my $classname = $throttle->{"name"};
1226 $classname or next;
1227 Throttle::GetClassInfo($classname, $throttle);
1230 sub _min {
1231 return $_[0] <= $_[1] ? $_[0] : $_[1];
1234 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1235 setnonblock($piperead);
1236 select((select($pipewrite), $|=1)[0]);
1237 my $pipebuff = '';
1238 my $fdset_both = '';
1239 vec($fdset_both, fileno($piperead), 1) = 1;
1240 my $fdset_pipe = $fdset_both;
1241 vec($fdset_both, fileno(Server), 1) = 1;
1242 my $penalty = 0;
1243 my $t = time;
1244 my $penaltytime = $t;
1245 my $nextwakeup = $t + 60;
1246 my $nextstatus = undef;
1247 $nextstatus = $t + $statusintv if $statusintv;
1248 if ($restart_active) {
1249 unless (unlink($restart_file) || $!{ENOENT}) {
1250 $restart_active = 0;
1251 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1254 statmsg "listening on $NAME";
1255 while (1) {
1256 my ($rout, $eout, $nfound);
1257 do {
1258 my $wait;
1259 my $now = time;
1260 my $adjustpenalty = sub {
1261 if ($penaltytime < $now) {
1262 my $credit = $now - $penaltytime;
1263 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1264 $penaltytime = $now;
1267 if (defined($nextstatus) && $now >= $nextstatus) {
1268 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1269 my $statmsg = "STATUS: $children active";
1270 my @running = ();
1271 if ($children) {
1272 my @stats = ();
1273 my $cnt = 0;
1274 foreach my $cls (sort(Throttle::GetClassList())) {
1275 my $inf = Throttle::GetClassInfo($cls);
1276 if ($inf->{'total'}) {
1277 $cnt += $inf->{'total'};
1278 push(@stats, substr(lc($cls),0,1)."=".
1279 $inf->{'total'}.'/'.$inf->{'active'});
1282 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1283 $statmsg .= " (".join(" ",@stats).")" if @stats;
1284 foreach (Throttle::GetRunningPids()) {
1285 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1286 next unless $ts;
1287 push(@running, "[${cls}::$desc] ".duration($now-$ts));
1290 my $idlesecs;
1291 $statmsg .= ", idle " . duration($idlesecs)
1292 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1293 statmsg $statmsg;
1294 statmsg "STATUS: currently running: ".join(", ", @running)
1295 if @running;
1296 $idlestatus = $now if !$children;
1298 $nextstatus += $statusintv while $nextstatus <= $now;
1300 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1301 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1302 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1303 my $fdset;
1304 if ($penalty <= $maxspawn) {
1305 $fdset = $fdset_both;
1306 } else {
1307 $fdset = $fdset_pipe;
1308 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1310 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1311 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1312 my $reaped;
1313 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1314 $now = time;
1315 &$adjustpenalty; # this prevents banking credits for elapsed time
1316 if (!$children && !$nfound && $restart_active && -e $restart_file) {
1317 $SIG{CHLD} = sub {};
1318 my $restarter = schedule_jobd_restart($inetd);
1319 if (defined($restarter) && !$restarter) {
1320 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1321 sleep 2; # *cough*
1322 $restarter = schedule_jobd_restart;
1323 if (!defined($restarter)) {
1324 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1325 } elsif (defined($restarter) && !$restarter) {
1326 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1327 $restarter = undef;
1330 if ($inetd) {
1331 statmsg "RESTART: restart requested; now exiting for inetd restart";
1332 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1333 sleep 2; # *cough*
1334 exit 0;
1335 } else {
1336 statmsg "RESTART: restart requested; now restarting";
1337 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1338 $reexec->reexec($same_pid);
1339 statmsg "RESTART: continuing after failed restart: $!";
1340 chdir "/";
1341 cancel_jobd_restart($restarter) if $restarter;
1342 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1343 $SIG{CHLD} = \&REAPER;
1346 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1347 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1348 exit 0;
1350 } while $nfound < 1;
1351 my $reout = $rout | $eout;
1352 if (vec($reout, fileno($piperead), 1)) {{
1353 my $nloff = -1;
1355 my $bytes;
1356 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1357 while (!defined($bytes) && $!{EINTR});
1358 last if !defined($bytes) && $!{EAGAIN};
1359 die "sysread failed: $!" unless defined $bytes;
1360 # since we always keep a copy of $pipewrite open EOF is fatal
1361 die "sysread returned EOF on pipe read" unless $bytes;
1362 $nloff = index($pipebuff, "\n", 0);
1363 if ($nloff < 0 && length($pipebuff) >= 512) {
1364 $pipebuff = '';
1365 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1367 redo unless $nloff >= 0;
1369 last unless $nloff >= 0;
1370 do {
1371 my $msg = substr($pipebuff, 0, $nloff);
1372 substr($pipebuff, 0, $nloff + 1) = '';
1373 $nloff = index($pipebuff, "\n", 0);
1374 process_pipe_msg($msg) if length($msg);
1375 } while $nloff >= 0;
1376 redo;
1378 next unless vec($reout, fileno(Server), 1);
1379 unless (accept(Client, Server)) {
1380 logmsg "accept failed: $!" unless $!{EINTR};
1381 next;
1383 logmsg "connection on $NAME";
1384 ++$penalty;
1385 spawn sub {
1386 my $inp = <STDIN>;
1387 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1388 chomp $inp if defined($inp);
1389 # ignore empty and "nop" connects
1390 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1391 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1392 defined($arg) or $arg = '';
1393 if ($cmd eq 'ref-changes') {
1394 ref_changes($arg);
1395 } elsif ($cmd eq 'clone') {
1396 clone($arg);
1397 } elsif ($cmd eq 'ref-change') {
1398 ref_change($arg);
1399 } elsif ($cmd eq 'throttle') {
1400 throttle($arg);
1401 } else {
1402 statmsg "ignoring unknown command: $cmd";
1403 exit 3;
1406 close Client;
1410 ## -------------
1411 ## Documentation
1412 ## -------------
1415 __END__
1417 =head1 NAME
1419 taskd.pl - Perform Girocco service tasks
1421 =head1 SYNOPSIS
1423 taskd.pl [options]
1425 Options:
1426 -h | --help detailed instructions
1427 -q | --quiet run quietly
1428 --no-quiet do not run quietly
1429 -P | --progress show occasional status updates
1430 -i | --inetd run as inetd unix stream wait service
1431 implies --quiet --syslog
1432 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1433 requires --inetd
1434 -s | --syslog[=facility] send messages to syslog instead of
1435 stderr but see --stderr
1436 enabled by --inetd
1437 --no-syslog do not send message to syslog
1438 --stderr always send messages to stderr too
1439 --abbrev=n abbreviate hashes to n (default is 8)
1440 --show-fast-forward-info show fast-forward info (default is on)
1441 --no-show-fast-forward-info disable showing fast-forward info
1442 --same-pid keep same pid during graceful restart
1443 --status-interval=MINUTES status update interval (default 1)
1444 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1446 =head1 OPTIONS
1448 =over 8
1450 =item B<--help>
1452 Print the full description of taskd.pl's options.
1454 =item B<--quiet>
1456 Suppress non-error messages, e.g. for use when running this task as an inetd
1457 service. Enabled by default by --inetd.
1459 =item B<--no-quiet>
1461 Enable non-error messages. When running in --inetd mode these messages are
1462 sent to STDERR instead of STDOUT.
1464 =item B<--progress>
1466 Show information about the current status of the task operation occasionally.
1467 This is automatically enabled if --quiet is not given.
1469 =item B<--inetd>
1471 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1472 stream socket ready to have accept called on it. To be useful, the unix socket
1473 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1474 will be issued if the socket is not in the expected location. Socket file
1475 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1476 die. The --inetd option also enables the --quiet and --syslog options but
1477 --no-quiet and --no-syslog may be used to alter that.
1479 The correct specification for the inetd socket is a "unix" protocol "stream"
1480 socket in "wait" mode with user and group writable permissions (0660). An
1481 attempt will be made to alter the socket's file mode if needed and if that
1482 cannot be accomplished taskd.pl will die.
1484 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1485 in wait mode and will die if the passed in socket is already connected.
1487 Note that while *BSD's inetd happily supports unix sockets (and so does
1488 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1489 However, systemd does seem to.
1491 =item B<--idle-timeout=SECONDS>
1493 Only permitted when running in --inetd mode. After SECONDS of inactivity
1494 (i.e. all outstanding tasks have completed and no new requests have come in)
1495 exit normally. The default is no timeout at all (a SECONDS value of 0).
1496 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1498 =item B<--syslog[=facility]>
1500 Normally error output is sent to STDERR. With this option it's sent to
1501 syslog instead. Note that when running in --inetd mode non-error output is
1502 also affected by this option as it's sent to STDERR in that case. If
1503 not specified, the default for facility is LOG_USER. Facility names are
1504 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1505 with the LOG_NOTICE priority.
1507 =item B<--no-syslog>
1509 Send error message output to STDERR but not syslog.
1511 =item B<--stderr>
1513 Always send error message output to STDERR. If --syslog is in effect then
1514 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1515 messages as well.
1517 =item B<--abbrev=n>
1519 Abbreviate displayed hash values to only the first n hexadecimal characters.
1520 The default is 8 characters. Set to 0 for no abbreviation at all.
1522 =item B<--show-fast-forward-info>
1524 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1525 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1526 This requires running an extra git command for each ref update that is not a
1527 creation or deletion in order to determine whether or not it's a fast forward.
1529 =item B<--no-show-fast-forward-info>
1531 Disable showing of fast-forward information for ref-change/ref-notify update
1532 messages. Instead just show a ' -> ' indicator.
1534 =item B<--same-pid>
1536 When performing a graceful restart, keep the same pid rather than switching to
1537 a new one.
1539 =item B<--status-interval=MINUTES>
1541 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1542 status updates are shown at each MINUTES interval. Setting the interval to 0
1543 disables them entirely even with --progress.
1545 =item B<--idle-status-interval=IDLEMINUTES>
1547 Two consecutive "idle" status updates with no intervening activity will not be
1548 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1549 Setting the interval to 0 prevents any consecutive idle updates (with no
1550 activity between them) from appearing at all.
1552 =back
1554 =head1 DESCRIPTION
1556 taskd.pl is Girocco's service request servant; it listens for service requests
1557 such as new clone requests and ref update notifications and spawns a task to
1558 perform the requested action.
1560 =cut