project-fsck-status.sh: --no-full mode can generate warnings
[girocco.git] / taskd / taskd.pl
blobd1e8bf32794c85118fbaa12b0c74fb3b437b7b99
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX qw(:sys_wait_h :fcntl_h);
40 use File::Basename;
41 use File::Spec ();
42 use Cwd qw(realpath);
44 use lib "__BASEDIR__";
45 use Girocco::Config;
46 use Girocco::Notify;
47 use Girocco::Project;
48 use Girocco::User;
49 use Girocco::Util qw(noFatalsToBrowser get_git);
50 BEGIN {noFatalsToBrowser}
51 use Girocco::ExecUtil;
53 use constant SOCKFDENV => "GIROCCO_TASKD_SOCKET_FD";
55 # Throttle Classes Defaults
56 # Note that any same-named classes in @Girocco::Config::throttle_classes
57 # will override (completely replacing the entire hash) these ones.
58 my @throttle_defaults = (
60 name => "ref-change",
61 maxproc => 0,
62 maxjobs => 1,
63 interval => 2
66 name => "clone",
67 maxproc => 0,
68 maxjobs => 2,
69 interval => 5
72 name => "snapshot",
73 #maxproc => max(5, cpucount + maxjobs), # this is the default
74 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
75 interval => 5
79 # Options
80 my $quiet;
81 my $progress;
82 my $syslog;
83 my $stderr;
84 my $inetd;
85 my $idle_timeout;
86 my $daemon;
87 my $max_lifetime;
88 my $abbrev = 8;
89 my $showff = 1;
90 my $same_pid;
91 my $statusintv = 60;
92 my $idleintv = 3600;
93 my $maxspawn = 8;
95 $| = 1;
97 my $progname = basename($0);
98 my $children = 0;
99 my $idlestart = time;
100 my $idlestatus = 0;
102 sub cpucount {
103 use Girocco::Util "online_cpus";
104 our $online_cpus_result;
105 $online_cpus_result = online_cpus unless $online_cpus_result;
106 return $online_cpus_result;
109 sub logmsg {
110 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
111 if (tied *STDOUT) {
112 $OStream::only = 2; # STDERR only
113 print "$hdr@_\n";
114 $OStream::only = 1; # syslog only
115 print "@_\n";
116 $OStream::only = 0; # back to default
117 } else {
118 print "$hdr@_\n";
122 sub statmsg {
123 return unless $progress;
124 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
125 if (tied *STDERR) {
126 $OStream::only = 2; # STDERR only
127 print STDERR "$hdr@_\n";
128 $OStream::only = 1; # syslog only
129 print STDERR "@_\n";
130 $OStream::only = 0; # back to default
131 } else {
132 print STDERR "$hdr@_\n";
136 sub duration {
137 my $secs = shift;
138 return $secs unless defined($secs) && $secs >= 0;
139 $secs = int($secs);
140 my $ans = ($secs % 60) . 's';
141 return $ans if $secs < 60;
142 $secs = int($secs / 60);
143 $ans = ($secs % 60) . 'm' . $ans;
144 return $ans if $secs < 60;
145 $secs = int($secs / 60);
146 $ans = ($secs % 24) . 'h' . $ans;
147 return $ans if $secs < 24;
148 $secs = int($secs / 24);
149 return $secs . 'd' . $ans;
152 sub isfdopen {
153 my $fd = shift;
154 return undef unless defined($fd) && $fd >= 0;
155 my $result = POSIX::dup($fd);
156 POSIX::close($result) if defined($result);
157 defined($result);
160 sub setnoncloexec {
161 my $fd = shift;
162 fcntl($fd, F_SETFD, 0) or die "fcntl failed: $!";
165 sub setcloexec {
166 my $fd = shift;
167 fcntl($fd, F_SETFD, FD_CLOEXEC) or die "fcntl failed: $!";
170 sub setnonblock {
171 my $fd = shift;
172 my $flags = fcntl($fd, F_GETFL, 0);
173 defined($flags) or die "fcntl failed: $!";
174 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
177 sub setblock {
178 my $fd = shift;
179 my $flags = fcntl($fd, F_GETFL, 0);
180 defined($flags) or die "fcntl failed: $!";
181 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
184 package Throttle;
187 ## Throttle protocol
189 ## 1) Process needing throttle services acquire a control file descriptor
190 ## a) Either as a result of a fork + exec (the write end of a pipe)
191 ## b) Or by connecting to the taskd socket (not yet implemented)
193 ## 2) The process requesting throttle services will be referred to
194 ## as the supplicant or just "supp" for short.
196 ## 3) The supp first completes any needed setup which may include
197 ## gathering data it needs to perform the action -- if that fails
198 ## then there's no need for any throttling.
200 ## 4) The supp writes a throttle request to the control descriptor in
201 ## this format:
202 ## throttle <pid> <class>\n
203 ## for example if the supp's pid was 1234 and it was requesting throttle
204 ## control as a member of the mail class it would write this message:
205 ## throttle 1234 mail\n
206 ## Note that if the control descriptor happens to be a pipe rather than a
207 ## socket, the message should be preceded by another "\n" just be be safe.
208 ## If the control descriptor is a socket, not a pipe, the message may be
209 ## preceded by a "\n" but that's not recommended.
211 ## 5) For supplicants with a control descriptor that is a pipe
212 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
213 ## If the control descriptor is a socket (getsockname succeeds) then
214 ## protocol (5b) should be used.
216 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
217 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
218 ## to write a "keepalive\n" message to the control descriptor. If that
219 ## fails, the controller has gone away and it may make its own decision
220 ## whether or not to proceed at that point. If, on the other hand, it
221 ## receives a SIGTERM, the process limit for its class has been reached
222 ## and it should abort without performing its action. If it receives
223 ## SIGUSR1, it may proceed without writing anything more to the control
224 ## descriptor, any MAY even close the control descriptor. Finally, a
225 ## SIGUSR2 indicates rejection of the throttle request for some other reason
226 ## such as unrecognized class name or invalid pid in which case the supp may
227 ## make its own decision how to proceed.
229 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
230 ## more than 512 bytes and if a '\n' does not appear within that number of
231 ## bytes the read should be considered failed. Otherwise the read should
232 ## be retried until either a full line has been read or the socket is
233 ## closed from the other end. If the lone read is "proceed\n" then it may
234 ## proceed without reading or writing anything more to the control
235 ## descriptor, but MUST keep the control descriptor open and not call
236 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
237 ## should be retried) constitutes failure. If a full line starting with at
238 ## least one alpha character was read but it was not "proceed" then it
239 ## should abort without performing its action. For any other failure it
240 ## may make its own decision whether or not to proceed as the controller has
241 ## gone away.
243 ## 6) The supp now performs its throttled action.
245 ## 7) The supp now closes its control descriptor (if it hasn't already in the
246 ## case of (5a)) and exits -- in the case of a socket, the other end receives
247 ## notification that the socket has been closed (read EOF). In the case of
248 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
249 ## of the other end of the pipe, so it will not reaach EOF by the supp's
250 ## exit in that case).
253 # keys are class names, values are hash refs with these fields:
254 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
255 # many may be queued waiting plus how many may be
256 # concurrently active) with 0 meaning no limit.
257 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
258 # of 0 is unlimited but the number of concurrent
259 # supplicants will always be limited to no more than
260 # the 'maxproc' value (if > 0) no matter what the
261 # 'maxjobs' value is.
262 # 'total' -> integer; the total number of pids belonging to this class that
263 # can currently be found in %pid.
264 # 'active' -> integer; the number of currently active supplicants which should
265 # be the same as (the number of elements of %pid with a
266 # matching class name) - (number of my class in @queue).
267 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
268 # or SIGUSR1 signals to members of this class.
269 # 'lastqueue' -> time; last time a supplicant was successfully queued.
270 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
271 # 'lastthrottle' => time; last time a supplicant was throttled
272 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
273 my %classes = ();
275 # keys are pid numbers, values are array refs with these elements:
276 # [0] => name of class (key to classes hash)
277 # [1] => supplicant state (0 => queued, non-zero => time it started running)
278 # [2] => descriptive text (e.g. project name)
279 my %pid = ();
281 # minimum number of seconds between any two proceed responses no matter what
282 # class. this takes priority in that it can effectively increase the
283 # class's 'interval' value by delaying proceed notifications if the minimum
284 # interval has not yet elapsed.
285 my $interval = 1;
287 # fifo of pids awaiting notification as soon as the next $interval elapses
288 # provided interval and maxjobs requirements are satisfied
289 # for the class of the pid that will next be triggered.
290 my @queue = ();
292 # time of most recent successful call to AddSupplicant
293 my $lastqueue = 0;
295 # time of most recent proceed notification
296 my $lastproceed = 0;
298 # time of most recent throttle
299 my $lastthrottle = 0;
301 # time of most recent removal
302 my $lastdied = 0;
304 # lifetime count of how many have been queued
305 my $totalqueue = 0;
307 # lifetime count of how many have been allowed to proceed
308 my $totalproceed = 0;
310 # lifetime count of how many have been throttled
311 my $totalthrottle = 0;
313 # lifetime count of how many have died
314 # It should always be true that $totalqueued - $totaldied == $curentlyactive
315 my $totaldied = 0;
317 # Returns an unordered list of currently registered class names
318 sub GetClassList {
319 return keys(%classes);
322 sub _max {
323 return $_[0] if $_[0] >= $_[1];
324 return $_[1];
327 sub _getnum {
328 my ($min, $val, $default) = @_;
329 my $ans;
330 if (defined($val) && $val =~ /^[+-]?\d+$/) {
331 $ans = 0 + $val;
332 } else {
333 $ans = &$default;
335 return _max($min, $ans);
338 # [0] => name of class to find
339 # [1] => if true, create class if it doesn't exist, if a hashref then
340 # it contains initial values for maxproc, maxjobs and interval.
341 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
342 # defaults to the max(5, number of cpu cores + maxjobs) and interval
343 # defaults to 1.
344 # Returns a hash ref with info about the class on success
345 sub GetClassInfo {
346 my ($classname, $init) = @_;
347 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
348 or return;
349 $classname = lc($classname);
350 my %info;
351 if ($classes{$classname}) {
352 %info = %{$classes{$classname}};
353 return \%info;
355 return unless $init;
356 my %newclass = ();
357 ref($init) eq 'HASH' or $init = {};
358 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
359 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
360 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
361 $newclass{'total'} = 0;
362 $newclass{'active'} = 0;
363 $newclass{'lastqueue'} = 0;
364 $newclass{'lastproceed'} = 0;
365 $newclass{'lastthrottle'} = 0;
366 $newclass{'lastdied'} = 0;
367 $classes{$classname} = \%newclass;
368 %info = %newclass;
369 return \%info;
372 # [0] => pid to look up
373 # Returns () if not found otherwise ($classname, $timestarted, $description)
374 # Where $timestarted will be 0 if it's still queued otherwise a time() value
375 sub GetPidInfo {
376 my $pid = shift;
377 return () unless exists $pid{$pid};
378 return @{$pid{$pid}};
381 # Returns array of pid numbers that are currently running sorted
382 # by time started (oldest to newest). Can return an empty array.
383 sub GetRunningPids {
384 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
385 grep({ ${$pid{$_}}[1] } keys(%pid)));
388 # Returns a hash with various about the current state
389 # 'interval' => global minimum interval between proceeds
390 # 'active' => how many pids are currently queued + how many are running
391 # 'queue' => how many pids are currently queued
392 # 'lastqueue' => time (epoch seconds) of last queue
393 # 'lastproceed' => time (epoch seconds) of last proceed
394 # 'lastthrottle' => time (epoch seconds) of last throttle
395 # 'lastdied' => time (epoch seconds) of last removal
396 # 'totalqueue' => lifetime total number of processes queued
397 # 'totalproceed' => lifetime total number of processes proceeded
398 # 'totalthrottle' => lifetime total number of processes throttled
399 # 'totaldied' => lifetime total number of removed processes
400 sub GetInfo {
401 return {
402 interval => $interval,
403 active => scalar(keys(%pid)) - scalar(@queue),
404 queue => scalar(@queue),
405 lastqueue => $lastqueue,
406 lastproceed => $lastproceed,
407 lastthrottle => $lastthrottle,
408 lastdied => $lastdied,
409 totalqueue => $totalqueue,
410 totalproceed => $totalproceed,
411 totalthrottle => $totalthrottle,
412 totaldied => $totaldied
416 # with no args get the global interval
417 # with one arg set it, returns previous value if set
418 sub Interval {
419 my $ans = $interval;
420 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
421 return $ans;
424 sub RemoveSupplicant;
426 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
427 # Returns minimum interval until next proceed is possible
428 # Returns undef if there's nothing waiting to proceed or
429 # the 'maxjobs' limits have been reached for all queued items (in which
430 # case it won't be possible to proceed until one of them exits, hence undef)
431 # This is called automatially by AddSupplicant and RemoveSupplicant
432 sub ServiceQueue {
433 RETRY:
434 return undef unless @queue; # if there's nothing queued, nothing to do
435 my $now = time;
436 my $min = _max(0, $interval - ($now - $lastproceed));
437 my $classmin = undef;
438 my $classchecked = 0;
439 my %seenclass = ();
440 my $classcount = scalar(keys(%classes));
441 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
442 my $pid = $queue[$i];
443 my $procinfo = $pid{$pid};
444 if (!$procinfo) {
445 RemoveSupplicant($pid, 1);
446 goto RETRY;
448 my $classinfo = $classes{$$procinfo[0]};
449 if (!$classinfo) {
450 RemoveSupplicant($pid, 1);
451 goto RETRY;
453 if (!$seenclass{$$procinfo[0]}) {
454 $seenclass{$$procinfo[0]} = 1;
455 ++$classchecked;
456 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
457 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
458 if (!$cmin && !$min) {
459 $now = time;
460 $$procinfo[1] = $now;
461 splice(@queue, $i, 1);
462 ++$totalproceed;
463 $lastproceed = $now;
464 $classinfo->{'lastproceed'} = $now;
465 ++$classinfo->{'active'};
466 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
467 goto RETRY;
469 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
473 return defined($classmin) ? _max($min, $classmin) : undef;
476 # $1 => pid to add (must not already be in %pids)
477 # $2 => class name (must exist)
478 # Returns -1 if no such class or pid already present or invalid
479 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
480 # Return 1 if throttled and cannot be added
481 sub AddSupplicant {
482 my ($pid, $classname, $text, $noservice) = @_;
483 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
484 $pid += 0;
485 kill(0, $pid) or return -1;
486 my $classinfo = $classes{$classname};
487 return -1 unless $classinfo;
488 return -1 if $pid{$pid};
489 $text = '' unless defined($text);
490 my $now = time;
491 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
492 ++$totalthrottle;
493 $lastthrottle = $now;
494 $classinfo->{'lastthrottle'} = $now;
495 return 1;
497 ++$totalqueue;
498 $lastqueue = $now;
499 $pid{$pid} = [$classname, 0, $text];
500 ++$classinfo->{'total'};
501 $classinfo->{'lastqueue'} = $now;
502 push(@queue, $pid);
503 ServiceQueue unless $noservice;
504 return 0;
507 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
508 # Returns 0 if removed
509 # Returns -1 if unknown pid or other error during removal
510 sub RemoveSupplicant {
511 my ($pid, $noservice) = @_;
512 return -1 unless defined($pid) && $pid =~ /^\d+$/;
513 $pid += 0;
514 my $pidinfo = $pid{$pid};
515 $pidinfo or return -1;
516 my $now = time;
517 $lastdied = $now;
518 ++$totaldied;
519 delete $pid{$pid};
520 if (!$$pidinfo[1]) {
521 for (my $i=0; $i<=$#queue; ++$i) {
522 if ($queue[$i] == $pid) {
523 splice(@queue, $i, 1);
524 --$i;
528 my $classinfo = $classes{$$pidinfo[0]};
529 ServiceQueue, return -1 unless $classinfo;
530 --$classinfo->{'active'} if $$pidinfo[1];
531 --$classinfo->{'total'};
532 $classinfo->{'lastdied'} = $now;
533 ServiceQueue unless $noservice;
534 return 0;
537 # Instance Methods
539 package main;
542 ## ---------
543 ## Functions
544 ## ---------
547 my @reapedpids = ();
548 my %signame = (
549 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
550 1 => 'SIGHUP',
551 2 => 'SIGINT',
552 3 => 'SIGQUIT',
553 6 => 'SIGABRT',
554 9 => 'SIGKILL',
555 14 => 'SIGALRM',
556 15 => 'SIGTERM',
558 sub REAPER {
559 local $!;
560 my $child;
561 my $waitedpid;
562 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
563 my $code = $? & 0xffff;
564 $idlestart = time if !--$children;
565 my $codemsg = '';
566 if (!($code & 0xff)) {
567 $codemsg = " with exit code ".($code >> 8) if $code;
568 } elsif ($code & 0x7f) {
569 my $signum = ($code & 0x7f);
570 $codemsg = " with signal ".
571 ($signame{$signum}?$signame{$signum}:$signum);
573 logmsg "reaped $waitedpid$codemsg";
574 push(@reapedpids, $waitedpid);
576 $SIG{CHLD} = \&REAPER; # loathe sysV
579 sub set_sigchld_reaper() {
580 $SIG{CHLD} = \&REAPER; # Apollo 440
583 my ($piperead, $pipewrite);
584 sub spawn {
585 my $coderef = shift;
587 my $pid = fork;
588 if (not defined $pid) {
589 logmsg "cannot fork: $!";
590 return;
591 } elsif ($pid) {
592 $idlestart = time if !++$children;
593 $idlestatus = 0;
594 logmsg "begat $pid";
595 return; # I'm the parent
598 close(Server) unless fileno(Server) == 0;
599 close($piperead);
600 $SIG{'CHLD'} = sub {};
602 open STDIN, "+<&Client" or die "can't dup client to stdin";
603 close(Client);
604 exit &$coderef();
607 # returns:
608 # < 0: error
609 # = 0: proceed
610 # > 0: throttled
611 sub request_throttle {
612 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
613 my $classname = shift;
614 my $text = shift;
616 Throttle::GetClassInfo($classname)
617 or return -1; # no such throttle class
619 my $throttled = 0;
620 my $proceed = 0;
621 my $error = 0;
622 my $controldead = 0;
623 my $setempty = POSIX::SigSet->new;
624 my $setfull = POSIX::SigSet->new;
625 $setempty->emptyset();
626 $setfull->fillset();
627 $SIG{'TERM'} = sub {$throttled = 1};
628 $SIG{'USR1'} = sub {$proceed = 1};
629 $SIG{'USR2'} = sub {$error = 1};
630 $SIG{'PIPE'} = sub {$controldead = 1};
631 $SIG{'ALRM'} = sub {};
633 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
634 print $pipewrite "\nthrottle $$ $classname $text\n";
635 my $old = POSIX::SigSet->new;
636 sigprocmask(SIG_SETMASK, $setfull, $old);
637 until ($controldead || $throttled || $proceed || $error) {
638 alarm(30);
639 sigsuspend($setempty);
640 alarm(0);
641 sigprocmask(SIG_SETMASK, $setempty, $old);
642 print $pipewrite "\nkeepalive $$\n";
643 sigprocmask(SIG_SETMASK, $setfull, $old);
645 sigprocmask(SIG_SETMASK, $setempty, $old);
646 $SIG{'TERM'} = "DEFAULT";
647 $SIG{'USR1'} = "DEFAULT";
648 $SIG{'USR2'} = "DEFAULT";
649 $SIG{'ALRM'} = "DEFAULT";
650 $SIG{'PIPE'} = "DEFAULT";
652 my $result = -1;
653 if ($throttled) {
654 $result = 1;
655 } elsif ($proceed) {
656 $result = 0;
658 return $result;
661 sub clone {
662 my ($name) = @_;
663 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
664 my $proj;
665 eval {$proj = Girocco::Project->load($name)};
666 if (!$proj && Girocco::Project::does_exist($name, 1)) {
667 # If the .clone_in_progress file exists, but the .clonelog does not
668 # and neither does the .clone_failed, be helpful and touch the
669 # .clone_failed file so that the mirror can be restarted
670 my $projdir = $Girocco::Config::reporoot."/$name.git";
671 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
672 open X, '>', "$projdir/.clone_failed" and close(X);
675 $proj or die "failed to load project $name";
676 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
677 $proj->{clone_logged} and die "project $name is already being cloned";
678 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
679 statmsg "cloning $name";
680 my $devnullfd = POSIX::open(File::Spec->devnull, O_RDWR);
681 defined($devnullfd) && $devnullfd >= 0 or die "cannot open /dev/null: $!";
682 POSIX::dup2($devnullfd, 0) or
683 die "cannot dup2 STDIN_FILENO: $!";
684 POSIX::close($devnullfd);
685 my $duperr;
686 open $duperr, '>&2' or
687 die "cannot dup STDERR_FILENO: $!";
688 my $clonelogfd = POSIX::open("$Girocco::Config::reporoot/$name.git/.clonelog", O_WRONLY|O_TRUNC|O_CREAT, 0664);
689 defined($clonelogfd) && $clonelogfd >= 0 or die "cannot open clonelog for writing: $!";
690 POSIX::dup2($clonelogfd, 1) or
691 die "cannot dup2 STDOUT_FILENO: $!";
692 POSIX::dup2($clonelogfd, 2) or
693 POSIX::dup2(fileno($duperr), 2), die "cannot dup2 STDERR_FILENO: $!";
694 POSIX::close($clonelogfd);
695 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or
696 POSIX::dup2(fileno($duperr), 2), die "exec failed: $!";
699 sub ref_indicator {
700 return ' -> ' unless $showff && defined($_[0]);
701 my ($git_dir, $old, $new) = @_;
702 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
703 # In many cases `git merge-base` is slower than this even if using the
704 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
705 my $ans = get_git("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ? '...' : '..';
706 return wantarray ? ($ans, 1) : $ans;
709 sub ref_change {
710 my ($arg) = @_;
711 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
712 $username && $name && $oldrev && $newrev && $ref or return 0;
713 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
714 $newrev ne $oldrev or return 0;
715 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or return 0;
717 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
718 my $proj = Girocco::Project->load($name);
719 $proj or die "failed to load project $name";
720 my $has_notify = $proj->has_notify;
721 my $type = $has_notify ? "notify" : "change";
723 my $user;
724 if ($username && $username !~ /^%.*%$/) {
725 Girocco::User::does_exist($username, 1) or die "no such user: $username";
726 $user = Girocco::User->load($username);
727 $user or die "failed to load user $username";
728 } elsif ($username eq "%$name%") {
729 $username = "-";
732 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
733 my $ind = ref_indicator($proj->{path}, $oldrev, $newrev);
734 statmsg "ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
735 open STDIN, '<', File::Spec->devnull;
736 Girocco::Notify::ref_changes($proj, $user, [$oldrev, $newrev, $ref]) if $has_notify;
737 return 0;
740 sub ref_changes {
741 my ($arg) = @_;
742 my ($username, $name) = split(/\s+/, $arg);
743 $username && $name or return 0;
745 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
746 my $proj = Girocco::Project->load($name);
747 $proj or die "failed to load project $name";
748 my $has_notify = $proj->has_notify;
749 my $type = $has_notify ? "notify" : "change";
751 my $user;
752 if ($username && $username !~ /^%.*%$/) {
753 Girocco::User::does_exist($username, 1) or die "no such user: $username";
754 $user = Girocco::User->load($username);
755 $user or die "failed to load user $username";
756 } elsif ($username eq "%$name%") {
757 $username = "-";
760 my @changes = ();
761 my %oldheads = ();
762 my %deletedheads = ();
763 while (my $change = <STDIN>) {
764 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
765 $oldrev ne "done" or last;
766 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or next;
767 $Girocco::Config::notify_single_level || $ref =~ m(^refs/[^/]+/[^/]) or next;
768 if ($ref =~ m{^refs/heads/.}) {
769 if ($oldrev =~ /^0{40}$/) {
770 delete $oldheads{$ref};
771 $deletedheads{$ref} = 1;
772 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
773 $oldheads{$ref} = $oldrev;
776 $newrev ne $oldrev or next;
777 push(@changes, [$oldrev, $newrev, $ref]);
779 return 0 unless @changes;
780 open STDIN, '<', File::Spec->devnull;
781 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
782 my $statproc = sub {
783 my ($old, $new, $ref, $ran_mail_sh) = @_;
784 my ($ind, $ran_git) = ref_indicator($proj->{path}, $old, $new);
785 statmsg "ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
786 if ($ran_mail_sh) {
787 sleep 2;
788 } elsif ($ran_git) {
789 sleep 1;
792 if ($has_notify) {
793 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
794 } else {
795 &$statproc(@$_) foreach @changes;
797 return 0;
800 sub throttle {
801 my ($arg) = @_;
802 my ($pid, $classname, $text) = split(/\s+/, $arg);
803 $pid =~ /^\d+/ or return 0; # invalid pid
804 $pid += 0;
805 $pid > 0 or return 0; # invalid pid
806 kill(0, $pid) || $!{EPERM} or return 0; # no such process
807 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
808 defined($text) && $text ne '' or return 0; # no text no service
810 my $throttled = 0;
811 my $proceed = 0;
812 my $error = 0;
813 my $controldead = 0;
814 my $suppdead = 0;
815 my ($waker, $wakew);
816 pipe($waker, $wakew) or die "pipe failed: $!";
817 select((select($wakew),$|=1)[0]);
818 setnonblock($wakew);
819 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
820 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
821 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
822 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
823 select((select(STDIN),$|=1)[0]);
825 logmsg "throttle $pid $classname $text request";
826 # After writing we can expect a SIGTERM or SIGUSR1
827 print $pipewrite "\nthrottle $$ $classname $text\n";
829 # NOTE: the only way to detect the socket close is to read all the
830 # data until EOF is reached -- recv can be used to peek.
831 my $v = '';
832 vec($v, fileno(STDIN), 1) = 1;
833 vec($v, fileno($waker), 1) = 1;
834 setnonblock(\*STDIN);
835 setnonblock($waker);
836 until ($controldead || $throttled || $proceed || $error || $suppdead) {
837 my ($r, $e);
838 select($r=$v, undef, $e=$v, 30);
839 my ($bytes, $discard);
840 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
841 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
842 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
843 print $pipewrite "\nkeepalive $$\n";
845 setblock(\*STDIN);
847 if ($throttled && !$suppdead) {
848 print STDIN "throttled\n";
849 logmsg "throttle $pid $classname $text throttled";
850 } elsif ($proceed && !$suppdead) {
851 print STDIN "proceed\n";
852 logmsg "throttle $pid $classname $text proceed";
853 $SIG{'TERM'} = 'DEFAULT';
854 # Stay alive until the child dies which we detect by EOF on STDIN
855 setnonblock(\*STDIN);
856 until ($controldead || $suppdead) {
857 my ($r, $e);
858 select($r=$v, undef, $e=$v, 30);
859 my ($bytes, $discard);
860 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
861 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
862 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
863 print $pipewrite "\nkeepalive $$\n";
865 setblock(\*STDIN);
866 } else {
867 my $prefix = '';
868 $prefix = "control" if $controldead && !$suppdead;
869 logmsg "throttle $pid $classname $text ${prefix}died";
871 exit 0;
874 sub process_pipe_msg {
875 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
876 if ($act eq "throttle") {
877 $pid =~ /^\d+$/ or return 0;
878 $pid += 0;
879 $pid > 0 or return 0; # invalid pid
880 kill(0, $pid) or return 0; # invalid pid
881 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
882 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
883 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
884 # the AddSupplicant call could send SIGUSR1 before it returns
885 my $result = Throttle::AddSupplicant($pid, $cls, $text);
886 kill('USR2', $pid), return 0 if $result < 0;
887 kill('TERM', $pid), return 0 if $result > 0;
888 # $pid was added to class $cls and will receive SIGUSR1 when
889 # it's time for it to proceed
890 return 0;
891 } elsif ($act eq "keepalive") {
892 # nothing to do although we could verify pid is valid and
893 # still in %Throttle::pids and send a SIGUSR2 if not, but
894 # really keepalive should just be ignored.
895 return 0;
897 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
898 return 0;
902 ## -------
903 ## OStream
904 ## -------
907 package OStream;
909 # Set to 1 for only syslog output (if enabled by mode)
910 # Set to 2 for only stderr output (if enabled by mode)
911 our $only = 0; # This is a hack
913 use Carp 'croak';
914 use Sys::Syslog qw(:DEFAULT :macros);
916 sub writeall {
917 my ($fd, $data) = @_;
918 my $offset = 0;
919 my $remaining = length($data);
920 while ($remaining) {
921 my $bytes = POSIX::write(
922 $fd,
923 substr($data, $offset, $remaining),
924 $remaining);
925 next if !defined($bytes) && $!{EINTR};
926 croak "POSIX::write failed: $!" unless defined $bytes;
927 croak "POSIX::write wrote 0 bytes" unless $bytes;
928 $remaining -= $bytes;
929 $offset += $bytes;
933 sub dumpline {
934 use POSIX qw(STDERR_FILENO);
935 my ($self, $line) = @_;
936 $only = 0 unless defined($only);
937 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
938 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
939 return unless length($line);
940 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
943 sub TIEHANDLE {
944 my $class = shift || 'OStream';
945 my $mode = shift;
946 my $syslogname = shift;
947 my $syslogfacility = shift;
948 defined($syslogfacility) or $syslogfacility = LOG_USER;
949 my $self = {};
950 $self->{'syslog'} = $mode > 0;
951 $self->{'stderr'} = $mode <= 0 || $mode > 1;
952 $self->{'lastline'} = '';
953 if ($self->{'syslog'}) {
954 # Some Sys::Syslog have a stupid default setlogsock order
955 eval {Sys::Syslog::setlogsock("native"); 1;} or
956 eval {Sys::Syslog::setlogsock("unix");};
957 openlog($syslogname, "ndelay,pid", $syslogfacility)
958 or croak "Sys::Syslog::openlog failed: $!";
960 return bless $self, $class;
963 sub BINMODE {return 1}
964 sub FILENO {return undef}
965 sub EOF {return 0}
966 sub CLOSE {return 1}
968 sub PRINTF {
969 my $self = shift;
970 my $template = shift;
971 return $self->PRINT(sprintf $template, @_);
974 sub PRINT {
975 my $self = shift;
976 my $data = join('', $self->{'lastline'}, @_);
977 my $pos = 0;
978 while ((my $idx = index($data, "\n", $pos)) >= 0) {
979 ++$idx;
980 my $line = substr($data, $pos, $idx - $pos);
981 substr($data, $pos, $idx - $pos) = '';
982 $pos = $idx;
983 $self->dumpline($line);
985 $self->{'lastline'} = $data;
986 return 1;
989 sub DESTROY {
990 my $self = shift;
991 $self->dumpline($self->{'lastline'})
992 if length($self->{'lastline'});
993 closelog;
996 sub WRITE {
997 my $self = shift;
998 my ($scalar, $length, $offset) = @_;
999 $scalar = '' if !defined($scalar);
1000 $length = length($scalar) if !defined($length);
1001 croak "OStream::WRITE invalid length $length"
1002 if $length < 0;
1003 $offset = 0 if !defined($offset);
1004 $offset += length($scalar) if $offset < 0;
1005 croak "OStream::WRITE invalid write offset"
1006 if $offset < 0 || $offset > $length;
1007 my $max = length($scalar) - $offset;
1008 $length = $max if $length > $max;
1009 $self->PRINT(substr($scalar, $offset, $length));
1010 return $length;
1014 ## ----
1015 ## main
1016 ## ----
1019 package main;
1021 # returns pid of process that will schedule jobd.pl restart on success
1022 # returns 0 if fork or other system call failed with error in $!
1023 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
1024 sub schedule_jobd_restart {
1025 use POSIX qw(_exit setpgid dup2 :fcntl_h);
1026 my $devnull = File::Spec->devnull;
1027 my $newpg = shift;
1028 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
1029 return undef unless -f $jdlf;
1030 my $oldsigchld = $SIG{'CHLD'};
1031 defined($oldsigchld) or $oldsigchld = sub {};
1032 my ($read, $write, $read2, $write2);
1033 pipe($read, $write) or return 0;
1034 select((select($write),$|=1)[0]);
1035 if (!pipe($read2, $write2)) {
1036 local $!;
1037 close $write;
1038 close $read;
1039 return 0;
1041 select((select($write2),$|=1)[0]);
1042 $SIG{'CHLD'} = sub {};
1043 my $retries = 3;
1044 my $child;
1045 while (!defined($child) && $retries--) {
1046 $child = fork;
1047 sleep 1 unless defined($child) || !$retries;
1049 if (!defined($child)) {
1050 local $!;
1051 close $write2;
1052 close $read2;
1053 close $write;
1054 close $read;
1055 $SIG{'CHLD'} = $oldsigchld;
1056 return 0;
1058 # double fork the child
1059 if (!$child) {
1060 close $read2;
1061 my $retries2 = 3;
1062 my $child2;
1063 while (!defined($child2) && $retries2--) {
1064 $child2 = fork;
1065 sleep 1 unless defined($child2) || !$retries2;
1067 if (!defined($child2)) {
1068 my $ec = 0 + $!;
1069 $ec = 255 unless $ec;
1070 print $write2 ":$ec";
1071 close $write2;
1072 _exit 127;
1074 if ($child2) {
1075 # pass new child pid up to parent and exit
1076 print $write2 $child2;
1077 close $write2;
1078 _exit 0;
1079 } else {
1080 # this is the grandchild
1081 close $write2;
1083 } else {
1084 close $write2;
1085 my $result = <$read2>;
1086 close $read2;
1087 chomp $result if defined($result);
1088 if (!defined($result) || $result !~ /^:?\d+$/) {
1089 # something's wrong with the child -- kill it
1090 kill(9, $child) && waitpid($child, 0);
1091 my $oldsigpipe = $SIG{'PIPE'};
1092 # make sure the grandchild, if any,
1093 # doesn't run the success proc
1094 $SIG{'PIPE'} = sub {};
1095 print $write 1;
1096 close $write;
1097 close $read;
1098 $SIG{'PIPE'} = defined($oldsigpipe) ?
1099 $oldsigpipe : 'DEFAULT';
1100 $! = 255;
1101 $SIG{'CHLD'} = $oldsigchld;
1102 return 0;
1104 if ($result =~ /^:(\d+)$/) {
1105 # fork failed in child, there is no grandchild
1106 my $ec = $1;
1107 waitpid($child, 0);
1108 close $write;
1109 close $read;
1110 $! = $ec;
1111 $SIG{'CHLD'} = $oldsigchld;
1112 return 0;
1114 # reap the child and set $child to grandchild's pid
1115 waitpid($child, 0);
1116 $child = $result;
1118 if (!$child) {
1119 # grandchild that actually initiates the jobd.pl restart
1120 close $write;
1121 my $wait = 5;
1122 my $ufd = POSIX::open($devnull, O_RDWR);
1123 if (defined($ufd)) {
1124 dup2($ufd, 0) unless $ufd == 0;
1125 dup2($ufd, 1) unless $ufd == 1;
1126 dup2($ufd, 2) unless $ufd == 2;
1127 POSIX::close($ufd) unless $ufd == 0 || $ufd == 1 || $ufd == 2;
1129 chdir "/";
1130 if ($newpg) {
1131 my $makepg = sub {
1132 my $result = setpgid(0, 0);
1133 if (!defined($result)) {
1134 --$wait;
1135 sleep 1;
1137 $result;
1139 my $result = &$makepg;
1140 defined($result) or $result = &$makepg;
1141 defined($result) or $result = &$makepg;
1142 defined($result) or $result = &$makepg;
1144 sleep $wait;
1145 my $result = <$read>;
1146 close $read;
1147 chomp $result if defined($result);
1148 if (!defined($result) || $result eq 0) {
1149 open JDLF, '+<', $jdlf or _exit(1);
1150 select((select(JDLF),$|=1)[0]);
1151 print JDLF "restart\n";
1152 truncate JDLF, tell(JDLF);
1153 close JDLF;
1155 _exit(0);
1157 close $write;
1158 close $read;
1159 $SIG{'CHLD'} = $oldsigchld;
1160 return $child;
1163 sub cancel_jobd_restart {
1164 my $restarter = shift;
1165 return unless defined($restarter) && $restarter != 0;
1166 return -1 unless kill(0, $restarter);
1167 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1168 # we must not waitpid because $restarter was doubly forked and will
1169 # NOT send us a SIGCHLD when it terminates
1170 return $restarter;
1173 my $reexec = Girocco::ExecUtil->new;
1174 my $realpath0 = realpath($0);
1175 chdir "/";
1176 close(DATA) if fileno(DATA);
1177 my $sfac;
1178 Getopt::Long::Configure('bundling');
1179 my ($stiv, $idiv);
1180 my $parse_res = GetOptions(
1181 'help|?|h' => sub {
1182 pod2usage(-verbose => 2, -exitval => 0, -input => $realpath0)},
1183 'quiet|q' => \$quiet,
1184 'no-quiet' => sub {$quiet = 0},
1185 'progress|P' => \$progress,
1186 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1187 'idle-timeout|t=i' => \$idle_timeout,
1188 'daemon' => sub {$daemon = 1; $syslog = 1; $quiet = 1;},
1189 'max-lifetime=i' => \$max_lifetime,
1190 'syslog|s:s' => \$sfac,
1191 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1192 'stderr' => \$stderr,
1193 'abbrev=i' => \$abbrev,
1194 'show-fast-forward-info' => \$showff,
1195 'no-show-fast-forward-info' => sub {$showff = 0},
1196 'same-pid' => \$same_pid,
1197 'no-same-pid' => sub {$same_pid = 0},
1198 'status-interval=i' => \$stiv,
1199 'idle-status-interval=i' => \$idiv,
1200 ) || pod2usage(-exitval => 2, -input => $realpath0);
1201 $same_pid = !$daemon unless defined($same_pid);
1202 $syslog = 1 if defined($sfac);
1203 $progress = 1 unless $quiet;
1204 $abbrev = 128 unless $abbrev > 0;
1205 pod2usage(-msg => "--inetd and --daemon are incompatible") if ($inetd && $daemon);
1206 if (defined($idle_timeout)) {
1207 die "--idle-timeout must be a whole number\n" unless $idle_timeout =~ /^\d+$/;
1208 die "--idle-timeout may not be used without --inetd\n" unless $inetd;
1210 if (defined($max_lifetime)) {
1211 die "--max-lifetime must be a whole number\n" unless $max_lifetime =~ /^\d+$/;
1212 $max_lifetime += 0;
1214 defined($max_lifetime) or $max_lifetime = 604800; # 1 week
1215 if (defined($stiv)) {
1216 die "--status-interval must be a whole number\n" unless $stiv =~ /^\d+$/;
1217 $statusintv = $stiv * 60;
1219 if (defined($idiv)) {
1220 die "--idle-status-interval must be a whole number\n" unless $idiv =~ /^\d+$/;
1221 $idleintv = $idiv * 60;
1224 open STDIN, '<'.File::Spec->devnull or die "could not redirect STDIN to /dev/null\n" unless $inetd;
1225 open STDOUT, '>&STDERR' if $inetd;
1226 if ($syslog) {
1227 use Sys::Syslog qw();
1228 my $mode = 1;
1229 ++$mode if $stderr;
1230 $sfac = "user" unless defined($sfac) && $sfac ne "";
1231 my $ofac = $sfac;
1232 $sfac = uc($sfac);
1233 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1234 my $facility;
1235 my %badfac = map({("LOG_$_" => 1)}
1236 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1237 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac\n";
1238 die "invalid syslog facility: $ofac\n"
1239 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1240 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1242 if ($quiet) {
1243 open STDOUT, '>', File::Spec->devnull;
1244 } elsif ($inetd) {
1245 *STDOUT = *STDERR;
1248 my ($NAME, $INO);
1250 set_sigchld_reaper;
1251 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1252 my $restart_active = 1;
1253 my $resumefd = $ENV{(SOCKFDENV)};
1254 delete $ENV{(SOCKFDENV)};
1255 if (defined($resumefd)) {{
1256 unless ($resumefd =~ /^(\d+)(?::(-?\d+))?$/) {
1257 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- bad format\n";
1258 $resumefd = undef;
1259 last;
1261 my $resumeino;
1262 ($resumefd, $resumeino) = ($1, $2);
1263 $resumefd += 0;
1264 unless (isfdopen($resumefd)) {
1265 warn "ignoring invalid ".SOCKFDENV." environment value -- fd \"$resumefd\" not open\n";
1266 $resumefd = undef;
1267 last;
1269 unless ($inetd) {
1270 unless (defined($resumeino)) {
1271 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- missing inode\n";
1272 POSIX::close($resumefd);
1273 $resumefd = undef;
1274 last;
1276 $resumeino += 0;
1277 my $sockloc = $Girocco::Config::chroot.'/etc/taskd.socket';
1278 my $slinode = (stat($sockloc))[1];
1279 unless (defined($slinode) && -S _) {
1280 warn "ignoring ".SOCKFDENV." environment value; socket file does not exist: $sockloc\n";
1281 POSIX::close($resumefd);
1282 $resumefd = undef;
1283 last;
1285 open Test, "<&$resumefd" or die "open: $!";
1286 my $sockname = getsockname Test;
1287 my $sockpath;
1288 $sockpath = unpack_sockaddr_un $sockname if $sockname && sockaddr_family($sockname) == AF_UNIX;
1289 close Test;
1290 if (!defined($resumeino) || !defined($sockpath) || $resumeino != $slinode || realpath($sockloc) ne realpath($sockpath)) {
1291 warn "ignoring ".SOCKFDENV." environment value; does not match socket file: $sockloc\n";
1292 POSIX::close($resumefd);
1293 $resumefd = undef;
1295 $INO = $resumeino;
1298 if ($inetd || defined($resumefd)) {
1299 my $fdopen = defined($resumefd) ? $resumefd : 0;
1300 open Server, "<&=$fdopen" or die "open: $!";
1301 setcloexec(\*Server) if $fdopen > $^F;
1302 my $sockname = getsockname Server;
1303 die "getsockname: $!" unless $sockname;
1304 die "socket already connected! must be 'wait' socket\n" if getpeername Server;
1305 die "getpeername: $!" unless $!{ENOTCONN};
1306 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1307 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1308 my $socktype = unpack('i', $st);
1309 die "stream socket required\n" unless defined $socktype && $socktype == SOCK_STREAM;
1310 die "AF_UNIX socket required\n" unless sockaddr_family($sockname) == AF_UNIX;
1311 $NAME = unpack_sockaddr_un $sockname;
1312 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1313 if (realpath($NAME) ne realpath($expected)) {
1314 $restart_active = 0;
1315 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled\n";
1317 my $mode = (stat($NAME))[2];
1318 die "stat: $!" unless $mode;
1319 $mode &= 07777;
1320 if (($mode & 0660) != 0660) {
1321 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1323 } else {
1324 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1325 my $uaddr = sockaddr_un($NAME);
1327 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1328 die "already exists but not a socket: $NAME\n" if -e $NAME && ! -S _;
1329 if (-e _) {
1330 # Do not unlink another instance's active listen socket!
1331 socket(my $sfd, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1332 connect($sfd, $uaddr) || $!{EPROTOTYPE} and
1333 die "Live socket '$NAME' exists. Please make sure no other instance of taskd is running.\n";
1334 close($sfd);
1335 unlink($NAME);
1337 bind(Server, $uaddr) or die "bind failed: $!";
1338 listen(Server, SOMAXCONN) or die "listen failed: $!";
1339 chmod 0666, $NAME or die "chmod failed: $!";
1340 $INO = (stat($NAME))[1] or die "stat failed: $!";
1343 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1344 my $classname = $throttle->{"name"};
1345 $classname or next;
1346 Throttle::GetClassInfo($classname, $throttle);
1349 sub _min {
1350 return $_[0] <= $_[1] ? $_[0] : $_[1];
1353 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1354 setnonblock($piperead);
1355 select((select($pipewrite), $|=1)[0]);
1356 my $pipebuff = '';
1357 my $fdset_both = '';
1358 vec($fdset_both, fileno($piperead), 1) = 1;
1359 my $fdset_pipe = $fdset_both;
1360 vec($fdset_both, fileno(Server), 1) = 1;
1361 my $penalty = 0;
1362 my $t = time;
1363 my $penaltytime = $t;
1364 my $nextwakeup = $t + 60;
1365 my $nextstatus = undef;
1366 $nextstatus = $t + $statusintv if $statusintv;
1367 if ($restart_active) {
1368 unless (unlink($restart_file) || $!{ENOENT}) {
1369 $restart_active = 0;
1370 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1373 daemon(1, 1) or die "failed to daemonize: $!\n" if $daemon;
1374 my $starttime = time;
1375 my $endtime = $max_lifetime ? $starttime + $max_lifetime : 0;
1376 statmsg "listening on $NAME";
1377 while (1) {
1378 my ($rout, $eout, $nfound);
1379 do {
1380 my $wait;
1381 my $now = time;
1382 my $adjustpenalty = sub {
1383 if ($penaltytime < $now) {
1384 my $credit = $now - $penaltytime;
1385 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1386 $penaltytime = $now;
1389 if (defined($nextstatus) && $now >= $nextstatus) {
1390 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1391 my $statmsg = "STATUS: $children active";
1392 my @running = ();
1393 if ($children) {
1394 my @stats = ();
1395 my $cnt = 0;
1396 foreach my $cls (sort(Throttle::GetClassList())) {
1397 my $inf = Throttle::GetClassInfo($cls);
1398 if ($inf->{'total'}) {
1399 $cnt += $inf->{'total'};
1400 push(@stats, substr(lc($cls),0,1)."=".
1401 $inf->{'total'}.'/'.$inf->{'active'});
1404 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1405 $statmsg .= " (".join(" ",@stats).")" if @stats;
1406 foreach (Throttle::GetRunningPids()) {
1407 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1408 next unless $ts;
1409 push(@running, "[${cls}::$desc] ".duration($now-$ts));
1412 my $idlesecs;
1413 $statmsg .= ", idle " . duration($idlesecs)
1414 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1415 statmsg $statmsg;
1416 statmsg "STATUS: currently running: ".join(", ", @running)
1417 if @running;
1418 $idlestatus = $now if !$children;
1420 $nextstatus += $statusintv while $nextstatus <= $now;
1422 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1423 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1424 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1425 my $fdset;
1426 if ($penalty <= $maxspawn) {
1427 $fdset = $fdset_both;
1428 } else {
1429 $fdset = $fdset_pipe;
1430 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1432 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1433 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1434 my $reaped;
1435 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1436 $now = time;
1437 &$adjustpenalty; # this prevents banking credits for elapsed time
1438 if (!$children && !$nfound && $restart_active && (($endtime && $now >= $endtime) || -e $restart_file)) {
1439 statmsg "RESTART: restart requested; max lifetime ($max_lifetime) exceeded" if $endtime && $now >= $endtime;
1440 $SIG{CHLD} = sub {};
1441 my $restarter = schedule_jobd_restart($inetd);
1442 if (defined($restarter) && !$restarter) {
1443 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1444 sleep 2; # *cough*
1445 $restarter = schedule_jobd_restart;
1446 if (!defined($restarter)) {
1447 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1448 } elsif (defined($restarter) && !$restarter) {
1449 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1450 $restarter = undef;
1453 if ($inetd) {
1454 statmsg "RESTART: restart requested; now exiting for inetd restart";
1455 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1456 sleep 2; # *cough*
1457 exit 0;
1458 } else {
1459 statmsg "RESTART: restart requested; now restarting";
1460 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1461 setnoncloexec(\*Server);
1462 $reexec->setenv(SOCKFDENV, fileno(Server).":$INO");
1463 $reexec->reexec($same_pid);
1464 setcloexec(\*Server) if fileno(Server) > $^F;
1465 statmsg "RESTART: continuing after failed restart: $!";
1466 chdir "/";
1467 cancel_jobd_restart($restarter) if $restarter;
1468 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1469 set_sigchld_reaper;
1472 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1473 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1474 exit 0;
1476 } while $nfound < 1;
1477 my $reout = $rout | $eout;
1478 if (vec($reout, fileno($piperead), 1)) {{
1479 my $nloff = -1;
1481 my $bytes;
1482 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1483 while (!defined($bytes) && $!{EINTR});
1484 last if !defined($bytes) && $!{EAGAIN};
1485 die "sysread failed: $!" unless defined $bytes;
1486 # since we always keep a copy of $pipewrite open EOF is fatal
1487 die "sysread returned EOF on pipe read" unless $bytes;
1488 $nloff = index($pipebuff, "\n", 0);
1489 if ($nloff < 0 && length($pipebuff) >= 512) {
1490 $pipebuff = '';
1491 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1493 redo unless $nloff >= 0;
1495 last unless $nloff >= 0;
1496 do {
1497 my $msg = substr($pipebuff, 0, $nloff);
1498 substr($pipebuff, 0, $nloff + 1) = '';
1499 $nloff = index($pipebuff, "\n", 0);
1500 process_pipe_msg($msg) if length($msg);
1501 } while $nloff >= 0;
1502 redo;
1504 next unless vec($reout, fileno(Server), 1);
1505 unless (accept(Client, Server)) {
1506 logmsg "accept failed: $!" unless $!{EINTR};
1507 next;
1509 logmsg "connection on $NAME";
1510 ++$penalty;
1511 spawn sub {
1512 my $inp = <STDIN>;
1513 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1514 chomp $inp if defined($inp);
1515 # ignore empty and "nop" connects
1516 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1517 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1518 defined($arg) or $arg = '';
1519 if ($cmd eq 'ref-changes') {
1520 ref_changes($arg);
1521 } elsif ($cmd eq 'clone') {
1522 clone($arg);
1523 } elsif ($cmd eq 'ref-change') {
1524 statmsg "processing obsolete ref-change message (please switch to ref-changes)";
1525 ref_change($arg);
1526 } elsif ($cmd eq 'throttle') {
1527 throttle($arg);
1528 } else {
1529 statmsg "ignoring unknown command: $cmd";
1530 exit 3;
1533 close Client;
1537 ## -------------
1538 ## Documentation
1539 ## -------------
1542 __END__
1544 =head1 NAME
1546 taskd.pl - Perform Girocco service tasks
1548 =head1 SYNOPSIS
1550 taskd.pl [options]
1552 Options:
1553 -h | --help detailed instructions
1554 -q | --quiet run quietly
1555 --no-quiet do not run quietly
1556 -P | --progress show occasional status updates
1557 -i | --inetd run as inetd unix stream wait service
1558 implies --quiet --syslog
1559 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1560 requires --inetd
1561 --daemon become a background daemon
1562 implies --quiet --syslog
1563 --max-lifetime=SECONDS how long before graceful restart
1564 default is 1 week, 0 disables
1565 -s | --syslog[=facility] send messages to syslog instead of
1566 stderr but see --stderr
1567 enabled by --inetd
1568 --no-syslog do not send message to syslog
1569 --stderr always send messages to stderr too
1570 --abbrev=n abbreviate hashes to n (default is 8)
1571 --show-fast-forward-info show fast-forward info (default is on)
1572 --no-show-fast-forward-info disable showing fast-forward info
1573 --same-pid keep same pid during graceful restart
1574 --no-same-pid do not keep same pid on graceful rstrt
1575 --status-interval=MINUTES status update interval (default 1)
1576 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1578 =head1 DESCRIPTION
1580 taskd.pl is Girocco's service request servant; it listens for service requests
1581 such as new clone requests and ref update notifications and spawns a task to
1582 perform the requested action.
1584 =head1 OPTIONS
1586 =over 8
1588 =item B<--help>
1590 Print the full description of taskd.pl's options.
1592 =item B<--quiet>
1594 Suppress non-error messages, e.g. for use when running this task as an inetd
1595 service. Enabled by default by --inetd.
1597 =item B<--no-quiet>
1599 Enable non-error messages. When running in --inetd mode these messages are
1600 sent to STDERR instead of STDOUT.
1602 =item B<--progress>
1604 Show information about the current status of the task operation occasionally.
1605 This is automatically enabled if --quiet is not given.
1607 =item B<--inetd>
1609 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1610 stream socket ready to have accept called on it. To be useful, the unix socket
1611 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1612 will be issued if the socket is not in the expected location. Socket file
1613 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1614 die. The --inetd option also enables the --quiet and --syslog options but
1615 --no-quiet and --no-syslog may be used to alter that.
1617 The correct specification for the inetd socket is a "unix" protocol "stream"
1618 socket in "wait" mode with user and group writable permissions (0660). An
1619 attempt will be made to alter the socket's file mode if needed and if that
1620 cannot be accomplished taskd.pl will die.
1622 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1623 in wait mode and will die if the passed in socket is already connected.
1625 Note that while *BSD's inetd happily supports unix sockets (and so does
1626 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1627 However, systemd does seem to.
1629 =item B<--idle-timeout=SECONDS>
1631 Only permitted when running in --inetd mode. After SECONDS of inactivity
1632 (i.e. all outstanding tasks have completed and no new requests have come in)
1633 exit normally. The default is no timeout at all (a SECONDS value of 0).
1634 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1636 =item B<--daemon>
1638 Fork and become a background daemon. Implies B<--syslog> and B<--quiet> (which
1639 can be altered by subsequent B<--no-syslog> and/or B<--no-quiet> options).
1640 Also implies B<--no-same-pid>, but since graceful restarts work by re-exec'ing
1641 taskd.pl with all of its original arguments, using B<--same-pid> won't really
1642 be effective with B<--daemon> since although it will cause the graceful restart
1643 exec to happen from the same pid, when the B<--daemon> option is subsequently
1644 processed it will end up in a new pid anyway.
1646 =item B<--max-lifetime=SECONDS>
1648 After taskd has been running for SECONDS of realtime, it will behave as though
1649 a graceful restart has been requested. A graceful restart takes place the
1650 next time taskd becomes idle (which may require up to 60 seconds to notice).
1651 If jobd is running when a graceful restart occurs, then jobd will also receive
1652 a graceful restart request at that time. The default value is 1 week (604800),
1653 set to 0 to disable.
1655 =item B<--syslog[=facility]>
1657 Normally error output is sent to STDERR. With this option it's sent to
1658 syslog instead. Note that when running in --inetd mode non-error output is
1659 also affected by this option as it's sent to STDERR in that case. If
1660 not specified, the default for facility is LOG_USER. Facility names are
1661 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1662 with the LOG_NOTICE priority.
1664 =item B<--no-syslog>
1666 Send error message output to STDERR but not syslog.
1668 =item B<--stderr>
1670 Always send error message output to STDERR. If --syslog is in effect then
1671 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1672 messages as well.
1674 =item B<--abbrev=n>
1676 Abbreviate displayed hash values to only the first n hexadecimal characters.
1677 The default is 8 characters. Set to 0 for no abbreviation at all.
1679 =item B<--show-fast-forward-info>
1681 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1682 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1683 This requires running an extra git command for each ref update that is not a
1684 creation or deletion in order to determine whether or not it's a fast forward.
1686 =item B<--no-show-fast-forward-info>
1688 Disable showing of fast-forward information for ref-change/ref-notify update
1689 messages. Instead just show a ' -> ' indicator.
1691 =item B<--same-pid>
1693 When performing a graceful restart, perform the graceful restart exec from
1694 the same pid rather than switching to a new one. This is implied when
1695 I<--daemon> is I<NOT> used.
1697 =item B<--no-same-pid>
1699 When performing a graceful restart, perform the graceful restart exec after
1700 switching to a new pid. This is implied when I<--daemon> I<IS> used.
1702 =item B<--status-interval=MINUTES>
1704 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1705 status updates are shown at each MINUTES interval. Setting the interval to 0
1706 disables them entirely even with --progress.
1708 =item B<--idle-status-interval=IDLEMINUTES>
1710 Two consecutive "idle" status updates with no intervening activity will not be
1711 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1712 Setting the interval to 0 prevents any consecutive idle updates (with no
1713 activity between them) from appearing at all.
1715 =back
1717 =cut