ref-changes: implement "done" functionality
[girocco/readme.git] / taskd / taskd.pl
bloba31874e1ec3c4c385910ce31bfbf24c9bf271f96
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX ":sys_wait_h";
40 use File::Basename;
41 use File::Spec ();
42 use Cwd qw(realpath);
44 use lib dirname($0);
45 use Girocco::Config;
46 use Girocco::Notify;
47 use Girocco::Project;
48 use Girocco::User;
49 use Girocco::Util qw(noFatalsToBrowser get_git);
50 BEGIN {noFatalsToBrowser}
51 use Girocco::ExecUtil;
53 use constant SOCKFDENV => "GIROCCO_TASKD_SOCKET_FD";
55 # Throttle Classes Defaults
56 # Note that any same-named classes in @Girocco::Config::throttle_classes
57 # will override (completely replacing the entire hash) these ones.
58 my @throttle_defaults = (
60 name => "ref-change",
61 maxproc => 0,
62 maxjobs => 1,
63 interval => 2
66 name => "clone",
67 maxproc => 0,
68 maxjobs => 2,
69 interval => 5
72 name => "snapshot",
73 #maxproc => max(5, cpucount + maxjobs), # this is the default
74 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
75 interval => 5
79 # Options
80 my $quiet;
81 my $progress;
82 my $syslog;
83 my $stderr;
84 my $inetd;
85 my $idle_timeout;
86 my $daemon;
87 my $max_lifetime;
88 my $abbrev = 8;
89 my $showff = 1;
90 my $same_pid;
91 my $statusintv = 60;
92 my $idleintv = 3600;
93 my $maxspawn = 8;
95 $| = 1;
97 my $progname = basename($0);
98 my $children = 0;
99 my $idlestart = time;
100 my $idlestatus = 0;
102 sub cpucount {
103 use Girocco::Util "online_cpus";
104 our $online_cpus_result;
105 $online_cpus_result = online_cpus unless $online_cpus_result;
106 return $online_cpus_result;
109 sub logmsg {
110 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
111 if (tied *STDOUT) {
112 $OStream::only = 2; # STDERR only
113 print "$hdr@_\n";
114 $OStream::only = 1; # syslog only
115 print "@_\n";
116 $OStream::only = 0; # back to default
117 } else {
118 print "$hdr@_\n";
122 sub statmsg {
123 return unless $progress;
124 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
125 if (tied *STDERR) {
126 $OStream::only = 2; # STDERR only
127 print STDERR "$hdr@_\n";
128 $OStream::only = 1; # syslog only
129 print STDERR "@_\n";
130 $OStream::only = 0; # back to default
131 } else {
132 print STDERR "$hdr@_\n";
136 sub duration {
137 my $secs = shift;
138 return $secs unless defined($secs) && $secs >= 0;
139 $secs = int($secs);
140 my $ans = ($secs % 60) . 's';
141 return $ans if $secs < 60;
142 $secs = int($secs / 60);
143 $ans = ($secs % 60) . 'm' . $ans;
144 return $ans if $secs < 60;
145 $secs = int($secs / 60);
146 $ans = ($secs % 24) . 'h' . $ans;
147 return $ans if $secs < 24;
148 $secs = int($secs / 24);
149 return $secs . 'd' . $ans;
152 sub isfdopen {
153 my $fd = shift;
154 return undef unless defined($fd) && $fd >= 0;
155 my $result = POSIX::dup($fd);
156 POSIX::close($result) if defined($result);
157 defined($result);
160 sub setnoncloexec {
161 my $fd = shift;
162 fcntl($fd, F_SETFD, 0) or die "fcntl failed: $!";
165 sub setcloexec {
166 my $fd = shift;
167 fcntl($fd, F_SETFD, FD_CLOEXEC) or die "fcntl failed: $!";
170 sub setnonblock {
171 my $fd = shift;
172 my $flags = fcntl($fd, F_GETFL, 0);
173 defined($flags) or die "fcntl failed: $!";
174 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
177 sub setblock {
178 my $fd = shift;
179 my $flags = fcntl($fd, F_GETFL, 0);
180 defined($flags) or die "fcntl failed: $!";
181 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
184 package Throttle;
187 ## Throttle protocol
189 ## 1) Process needing throttle services acquire a control file descriptor
190 ## a) Either as a result of a fork + exec (the write end of a pipe)
191 ## b) Or by connecting to the taskd socket (not yet implemented)
193 ## 2) The process requesting throttle services will be referred to
194 ## as the supplicant or just "supp" for short.
196 ## 3) The supp first completes any needed setup which may include
197 ## gathering data it needs to perform the action -- if that fails
198 ## then there's no need for any throttling.
200 ## 4) The supp writes a throttle request to the control descriptor in
201 ## this format:
202 ## throttle <pid> <class>\n
203 ## for example if the supp's pid was 1234 and it was requesting throttle
204 ## control as a member of the mail class it would write this message:
205 ## throttle 1234 mail\n
206 ## Note that if the control descriptor happens to be a pipe rather than a
207 ## socket, the message should be preceded by another "\n" just be be safe.
208 ## If the control descriptor is a socket, not a pipe, the message may be
209 ## preceded by a "\n" but that's not recommended.
211 ## 5) For supplicants with a control descriptor that is a pipe
212 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
213 ## If the control descriptor is a socket (getsockname succeeds) then
214 ## protocol (5b) should be used.
216 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
217 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
218 ## to write a "keepalive\n" message to the control descriptor. If that
219 ## fails, the controller has gone away and it may make its own decision
220 ## whether or not to proceed at that point. If, on the other hand, it
221 ## receives a SIGTERM, the process limit for its class has been reached
222 ## and it should abort without performing its action. If it receives
223 ## SIGUSR1, it may proceed without writing anything more to the control
224 ## descriptor, any MAY even close the control descriptor. Finally, a
225 ## SIGUSR2 indicates rejection of the throttle request for some other reason
226 ## such as unrecognized class name or invalid pid in which case the supp may
227 ## make its own decision how to proceed.
229 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
230 ## more than 512 bytes and if a '\n' does not appear within that number of
231 ## bytes the read should be considered failed. Otherwise the read should
232 ## be retried until either a full line has been read or the socket is
233 ## closed from the other end. If the lone read is "proceed\n" then it may
234 ## proceed without reading or writing anything more to the control
235 ## descriptor, but MUST keep the control descriptor open and not call
236 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
237 ## should be retried) constitutes failure. If a full line starting with at
238 ## least one alpha character was read but it was not "proceed" then it
239 ## should abort without performing its action. For any other failure it
240 ## may make its own decision whether or not to proceed as the controller has
241 ## gone away.
243 ## 6) The supp now performs its throttled action.
245 ## 7) The supp now closes its control descriptor (if it hasn't already in the
246 ## case of (5a)) and exits -- in the case of a socket, the other end receives
247 ## notification that the socket has been closed (read EOF). In the case of
248 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
249 ## of the other end of the pipe, so it will not reaach EOF by the supp's
250 ## exit in that case).
253 # keys are class names, values are hash refs with these fields:
254 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
255 # many may be queued waiting plus how many may be
256 # concurrently active) with 0 meaning no limit.
257 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
258 # of 0 is unlimited but the number of concurrent
259 # supplicants will always be limited to no more than
260 # the 'maxproc' value (if > 0) no matter what the
261 # 'maxjobs' value is.
262 # 'total' -> integer; the total number of pids belonging to this clase that
263 # can currently be found in %pid.
264 # 'active' -> integer; the number of currently active supplicants which should
265 # be the same as (the number of elements of %pid with a
266 # matching class name) - (number of my class in @queue).
267 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
268 # or SIGUSR1 signals to members of this class.
269 # 'lastqueue' -> time; last time a supplicant was successfully queued.
270 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
271 # 'lastthrottle' => time; last time a supplicant was throttled
272 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
273 my %classes = ();
275 # keys are pid numbers, values are array refs with these elements:
276 # [0] => name of class (key to classes hash)
277 # [1] => supplicant state (0 => queued, non-zero => time it started running)
278 # [2] => descriptive text (e.g. project name)
279 my %pid = ();
281 # minimum number of seconds between any two proceed responses no matter what
282 # class. this takes priority in that it can effectively increase the
283 # class's 'interval' value by delaying proceed notifications if the minimum
284 # interval has not yet elapsed.
285 my $interval = 1;
287 # fifo of pids awaiting notification as soon as the next $interval elapses
288 # provided interval and maxjobs requirements are satisfied
289 # for the class of the pid that will next be triggered.
290 my @queue = ();
292 # time of most recent successful call to AddSupplicant
293 my $lastqueue = 0;
295 # time of most recent proceed notification
296 my $lastproceed = 0;
298 # time of most recent throttle
299 my $lastthrottle = 0;
301 # time of most recent removal
302 my $lastdied = 0;
304 # lifetime count of how many have been queued
305 my $totalqueue = 0;
307 # lifetime count of how many have been allowed to proceed
308 my $totalproceed = 0;
310 # lifetime count of how many have been throttled
311 my $totalthrottle = 0;
313 # lifetime count of how many have died
314 # It should always be true that $totalqueued - $totaldied == $curentlyactive
315 my $totaldied = 0;
317 # Returns an unordered list of currently registered class names
318 sub GetClassList {
319 return keys(%classes);
322 sub _max {
323 return $_[0] if $_[0] >= $_[1];
324 return $_[1];
327 sub _getnum {
328 my ($min, $val, $default) = @_;
329 my $ans;
330 if (defined($val) && $val =~ /^[+-]?\d+$/) {
331 $ans = 0 + $val;
332 } else {
333 $ans = &$default;
335 return _max($min, $ans);
338 # [0] => name of class to find
339 # [1] => if true, create class if it doesn't exist, if a hashref then
340 # it contains initial values for maxproc, maxjobs and interval.
341 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
342 # defaults to the max(5, number of cpu cores + maxjobs) and interval
343 # defaults to 1.
344 # Returns a hash ref with info about the class on success
345 sub GetClassInfo {
346 my ($classname, $init) = @_;
347 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
348 or return;
349 $classname = lc($classname);
350 my %info;
351 if ($classes{$classname}) {
352 %info = %{$classes{$classname}};
353 return \%info;
355 return unless $init;
356 my %newclass = ();
357 ref($init) eq 'HASH' or $init = {};
358 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
359 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
360 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
361 $newclass{'total'} = 0;
362 $newclass{'active'} = 0;
363 $newclass{'lastqueue'} = 0;
364 $newclass{'lastproceed'} = 0;
365 $newclass{'lastthrottle'} = 0;
366 $newclass{'lastdied'} = 0;
367 $classes{$classname} = \%newclass;
368 %info = %newclass;
369 return \%info;
372 # [0] => pid to look up
373 # Returns () if not found otherwise ($classname, $timestarted, $description)
374 # Where $timestarted will be 0 if it's still queued otherwise a time() value
375 sub GetPidInfo {
376 my $pid = shift;
377 return () unless exists $pid{$pid};
378 return @{$pid{$pid}};
381 # Returns array of pid numbers that are currently running sorted
382 # by time started (oldest to newest). Can return an empty array.
383 sub GetRunningPids {
384 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
385 grep({ ${$pid{$_}}[1] } keys(%pid)));
388 # Returns a hash with various about the current state
389 # 'interval' => global minimum interval between proceeds
390 # 'active' => how many pids are currently queued + how many are running
391 # 'queue' => how many pids are currently queued
392 # 'lastqueue' => time (epoch seconds) of last queue
393 # 'lastproceed' => time (epoch seconds) of last proceed
394 # 'lastthrottle' => time (epoch seconds) of last throttle
395 # 'lastdied' => time (epoch seconds) of last removal
396 # 'totalqueue' => lifetime total number of processes queued
397 # 'totalproceed' => lifetime total number of processes proceeded
398 # 'totalthrottle' => lifetime total number of processes throttled
399 # 'totaldied' => lifetime total number of removed processes
400 sub GetInfo {
401 return {
402 interval => $interval,
403 active => scalar(keys(%pid)) - scalar(@queue),
404 queue => scalar(@queue),
405 lastqueue => $lastqueue,
406 lastproceed => $lastproceed,
407 lastthrottle => $lastthrottle,
408 lastdied => $lastdied,
409 totalqueue => $totalqueue,
410 totalproceed => $totalproceed,
411 totalthrottle => $totalthrottle,
412 totaldied => $totaldied
416 # with no args get the global interval
417 # with one arg set it, returns previous value if set
418 sub Interval {
419 my $ans = $interval;
420 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
421 return $ans;
424 sub RemoveSupplicant;
426 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
427 # Returns minimum interval until next proceed is possible
428 # Returns undef if there's nothing waiting to proceed or
429 # the 'maxjobs' limits have been reached for all queued items (in which
430 # case it won't be possible to proceed until one of them exits, hence undef)
431 # This is called automatially by AddSupplicant and RemoveSupplicant
432 sub ServiceQueue {
433 RETRY:
434 return undef unless @queue; # if there's nothing queued, nothing to do
435 my $now = time;
436 my $min = _max(0, $interval - ($now - $lastproceed));
437 my $classmin = undef;
438 my $classchecked = 0;
439 my %seenclass = ();
440 my $classcount = scalar(keys(%classes));
441 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
442 my $pid = $queue[$i];
443 my $procinfo = $pid{$pid};
444 if (!$procinfo) {
445 RemoveSupplicant($pid, 1);
446 goto RETRY;
448 my $classinfo = $classes{$$procinfo[0]};
449 if (!$classinfo) {
450 RemoveSupplicant($pid, 1);
451 goto RETRY;
453 if (!$seenclass{$$procinfo[0]}) {
454 $seenclass{$$procinfo[0]} = 1;
455 ++$classchecked;
456 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
457 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
458 if (!$cmin && !$min) {
459 $now = time;
460 $$procinfo[1] = $now;
461 splice(@queue, $i, 1);
462 ++$totalproceed;
463 $lastproceed = $now;
464 $classinfo->{'lastproceed'} = $now;
465 ++$classinfo->{'active'};
466 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
467 goto RETRY;
469 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
473 return defined($classmin) ? _max($min, $classmin) : undef;
476 # $1 => pid to add (must not already be in %pids)
477 # $2 => class name (must exist)
478 # Returns -1 if no such class or pid already present or invalid
479 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
480 # Return 1 if throttled and cannot be added
481 sub AddSupplicant {
482 my ($pid, $classname, $text, $noservice) = @_;
483 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
484 $pid += 0;
485 kill(0, $pid) or return -1;
486 my $classinfo = $classes{$classname};
487 return -1 unless $classinfo;
488 return -1 if $pid{$pid};
489 $text = '' unless defined($text);
490 my $now = time;
491 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
492 ++$totalthrottle;
493 $lastthrottle = $now;
494 $classinfo->{'lastthrottle'} = $now;
495 return 1;
497 ++$totalqueue;
498 $lastqueue = $now;
499 $pid{$pid} = [$classname, 0, $text];
500 ++$classinfo->{'total'};
501 $classinfo->{'lastqueue'} = $now;
502 push(@queue, $pid);
503 ServiceQueue unless $noservice;
504 return 0;
507 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
508 # Returns 0 if removed
509 # Returns -1 if unknown pid or other error during removal
510 sub RemoveSupplicant {
511 my ($pid, $noservice) = @_;
512 return -1 unless defined($pid) && $pid =~ /^\d+$/;
513 $pid += 0;
514 my $pidinfo = $pid{$pid};
515 $pidinfo or return -1;
516 my $now = time;
517 $lastdied = $now;
518 ++$totaldied;
519 delete $pid{$pid};
520 if (!$$pidinfo[1]) {
521 for (my $i=0; $i<=$#queue; ++$i) {
522 if ($queue[$i] == $pid) {
523 splice(@queue, $i, 1);
524 --$i;
528 my $classinfo = $classes{$$pidinfo[0]};
529 ServiceQueue, return -1 unless $classinfo;
530 --$classinfo->{'active'} if $$pidinfo[1];
531 --$classinfo->{'total'};
532 $classinfo->{'lastdied'} = $now;
533 ServiceQueue unless $noservice;
534 return 0;
537 # Instance Methods
539 package main;
542 ## ---------
543 ## Functions
544 ## ---------
547 my @reapedpids = ();
548 my %signame = (
549 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
550 1 => 'SIGHUP',
551 2 => 'SIGINT',
552 3 => 'SIGQUIT',
553 6 => 'SIGABRT',
554 9 => 'SIGKILL',
555 14 => 'SIGALRM',
556 15 => 'SIGTERM',
558 sub REAPER {
559 local $!;
560 my $child;
561 my $waitedpid;
562 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
563 my $code = $? & 0xffff;
564 $idlestart = time if !--$children;
565 my $codemsg = '';
566 if (!($code & 0xff)) {
567 $codemsg = " with exit code ".($code >> 8) if $code;
568 } elsif ($code & 0x7f) {
569 my $signum = ($code & 0x7f);
570 $codemsg = " with signal ".
571 ($signame{$signum}?$signame{$signum}:$signum);
573 logmsg "reaped $waitedpid$codemsg";
574 push(@reapedpids, $waitedpid);
576 $SIG{CHLD} = \&REAPER; # loathe sysV
579 $SIG{CHLD} = \&REAPER; # Apollo 440
581 my ($piperead, $pipewrite);
582 sub spawn {
583 my $coderef = shift;
585 my $pid = fork;
586 if (not defined $pid) {
587 logmsg "cannot fork: $!";
588 return;
589 } elsif ($pid) {
590 $idlestart = time if !++$children;
591 $idlestatus = 0;
592 logmsg "begat $pid";
593 return; # I'm the parent
596 close(Server) unless fileno(Server) == 0;
597 close($piperead);
598 $SIG{'CHLD'} = sub {};
600 open STDIN, "+<&Client" or die "can't dup client to stdin";
601 close(Client);
602 exit &$coderef();
605 # returns:
606 # < 0: error
607 # = 0: proceed
608 # > 0: throttled
609 sub request_throttle {
610 use POSIX qw(sigprocmask sigsuspend SIG_SETMASK);
611 my $classname = shift;
612 my $text = shift;
614 Throttle::GetClassInfo($classname)
615 or return -1; # no such throttle class
617 my $throttled = 0;
618 my $proceed = 0;
619 my $error = 0;
620 my $controldead = 0;
621 my $setempty = POSIX::SigSet->new;
622 my $setfull = POSIX::SigSet->new;
623 $setempty->emptyset();
624 $setfull->fillset();
625 $SIG{'TERM'} = sub {$throttled = 1};
626 $SIG{'USR1'} = sub {$proceed = 1};
627 $SIG{'USR2'} = sub {$error = 1};
628 $SIG{'PIPE'} = sub {$controldead = 1};
629 $SIG{'ALRM'} = sub {};
631 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
632 print $pipewrite "\nthrottle $$ $classname $text\n";
633 my $old = POSIX::SigSet->new;
634 sigprocmask(SIG_SETMASK, $setfull, $old);
635 until ($controldead || $throttled || $proceed || $error) {
636 alarm(30);
637 sigsuspend($setempty);
638 alarm(0);
639 sigprocmask(SIG_SETMASK, $setempty, $old);
640 print $pipewrite "\nkeepalive $$\n";
641 sigprocmask(SIG_SETMASK, $setfull, $old);
643 sigprocmask(SIG_SETMASK, $setempty, $old);
644 $SIG{'TERM'} = "DEFAULT";
645 $SIG{'USR1'} = "DEFAULT";
646 $SIG{'USR2'} = "DEFAULT";
647 $SIG{'ALRM'} = "DEFAULT";
648 $SIG{'PIPE'} = "DEFAULT";
650 my $result = -1;
651 if ($throttled) {
652 $result = 1;
653 } elsif ($proceed) {
654 $result = 0;
656 return $result;
659 sub clone {
660 my ($name) = @_;
661 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
662 my $proj;
663 eval {$proj = Girocco::Project->load($name)};
664 if (!$proj && Girocco::Project::does_exist($name, 1)) {
665 # If the .clone_in_progress file exists, but the .clonelog does not
666 # and neither does the .clone_failed, be helpful and touch the
667 # .clone_failed file so that the mirror can be restarted
668 my $projdir = $Girocco::Config::reporoot."/$name.git";
669 if (-d "$projdir" && -f "$projdir/.clone_in_progress" && ! -f "$projdir/.clonelog" && ! -f "$projdir/.clone_failed") {
670 open X, '>', "$projdir/.clone_failed" and close(X);
673 $proj or die "failed to load project $name";
674 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
675 $proj->{clone_logged} and die "project $name is already being cloned";
676 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
677 statmsg "cloning $name";
678 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
679 open STDERR, ">&STDOUT";
680 open STDIN, '<', '/dev/null';
681 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
684 sub ref_indicator {
685 return ' -> ' unless $showff && defined($_[0]);
686 my ($git_dir, $old, $new) = @_;
687 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
688 # In many cases `git merge-base` is slower than this even if using the
689 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
690 my $ans = get_git("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ? '...' : '..';
691 return wantarray ? ($ans, 1) : $ans;
694 sub ref_change {
695 my ($arg) = @_;
696 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
697 $username && $name && $oldrev && $newrev && $ref or return 0;
698 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
699 $newrev ne $oldrev or return 0;
701 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
702 my $proj = Girocco::Project->load($name);
703 $proj or die "failed to load project $name";
704 my $has_notify = $proj->has_notify;
705 my $type = $has_notify ? "notify" : "change";
707 my $user;
708 if ($username && $username !~ /^%.*%$/) {
709 Girocco::User::does_exist($username, 1) or die "no such user: $username";
710 $user = Girocco::User->load($username);
711 $user or die "failed to load user $username";
712 } elsif ($username eq "%$name%") {
713 $username = "-";
716 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
717 my $ind = ref_indicator($proj->{path}, $oldrev, $newrev);
718 statmsg "ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
719 open STDIN, '<', '/dev/null';
720 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev) if $has_notify;
721 return 0;
724 sub ref_changes {
725 my ($arg) = @_;
726 my ($username, $name) = split(/\s+/, $arg);
727 $username && $name or return 0;
729 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
730 my $proj = Girocco::Project->load($name);
731 $proj or die "failed to load project $name";
732 my $has_notify = $proj->has_notify;
733 my $type = $has_notify ? "notify" : "change";
735 my $user;
736 if ($username && $username !~ /^%.*%$/) {
737 Girocco::User::does_exist($username, 1) or die "no such user: $username";
738 $user = Girocco::User->load($username);
739 $user or die "failed to load user $username";
740 } elsif ($username eq "%$name%") {
741 $username = "-";
744 my @changes = ();
745 my %oldheads = ();
746 my %deletedheads = ();
747 while (my $change = <STDIN>) {
748 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
749 $oldrev ne "done" or last;
750 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or next;
751 if ($ref =~ m{^refs/heads/.}) {
752 if ($oldrev =~ /^0{40}$/) {
753 delete $oldheads{$ref};
754 $deletedheads{$ref} = 1;
755 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
756 $oldheads{$ref} = $oldrev;
759 $newrev ne $oldrev or next;
760 push(@changes, [$oldrev, $newrev, $ref]);
762 return 0 unless @changes;
763 open STDIN, '<', '/dev/null';
764 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
765 my $statproc = sub {
766 my ($old, $new, $ref, $ran_mail_sh) = @_;
767 my ($ind, $ran_git) = ref_indicator($proj->{path}, $old, $new);
768 statmsg "ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
769 if ($ran_mail_sh) {
770 sleep 2;
771 } elsif ($ran_git) {
772 sleep 1;
775 if ($has_notify) {
776 Girocco::Notify::ref_changes($proj, $user, $statproc, \%oldheads, @changes);
777 } else {
778 &$statproc(@$_) foreach @changes;
780 return 0;
783 sub throttle {
784 my ($arg) = @_;
785 my ($pid, $classname, $text) = split(/\s+/, $arg);
786 $pid =~ /^\d+/ or return 0; # invalid pid
787 $pid += 0;
788 $pid > 0 or return 0; # invalid pid
789 kill(0, $pid) || $!{EPERM} or return 0; # no such process
790 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
791 defined($text) && $text ne '' or return 0; # no text no service
793 my $throttled = 0;
794 my $proceed = 0;
795 my $error = 0;
796 my $controldead = 0;
797 my $suppdead = 0;
798 my ($waker, $wakew);
799 pipe($waker, $wakew) or die "pipe failed: $!";
800 select((select($wakew),$|=1)[0]);
801 setnonblock($wakew);
802 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
803 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
804 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
805 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
806 select((select(STDIN),$|=1)[0]);
808 logmsg "throttle $pid $classname $text request";
809 # After writing we can expect a SIGTERM or SIGUSR1
810 print $pipewrite "\nthrottle $$ $classname $text\n";
812 # NOTE: the only way to detect the socket close is to read all the
813 # data until EOF is reached -- recv can be used to peek.
814 my $v = '';
815 vec($v, fileno(STDIN), 1) = 1;
816 vec($v, fileno($waker), 1) = 1;
817 setnonblock(\*STDIN);
818 setnonblock($waker);
819 until ($controldead || $throttled || $proceed || $error || $suppdead) {
820 my ($r, $e);
821 select($r=$v, undef, $e=$v, 30);
822 my ($bytes, $discard);
823 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
824 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
825 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
826 print $pipewrite "\nkeepalive $$\n";
828 setblock(\*STDIN);
830 if ($throttled && !$suppdead) {
831 print STDIN "throttled\n";
832 logmsg "throttle $pid $classname $text throttled";
833 } elsif ($proceed && !$suppdead) {
834 print STDIN "proceed\n";
835 logmsg "throttle $pid $classname $text proceed";
836 $SIG{'TERM'} = 'DEFAULT';
837 # Stay alive until the child dies which we detect by EOF on STDIN
838 setnonblock(\*STDIN);
839 until ($controldead || $suppdead) {
840 my ($r, $e);
841 select($r=$v, undef, $e=$v, 30);
842 my ($bytes, $discard);
843 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
844 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
845 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
846 print $pipewrite "\nkeepalive $$\n";
848 setblock(\*STDIN);
849 } else {
850 my $prefix = '';
851 $prefix = "control" if $controldead && !$suppdead;
852 logmsg "throttle $pid $classname $text ${prefix}died";
854 exit 0;
857 sub process_pipe_msg {
858 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
859 if ($act eq "throttle") {
860 $pid =~ /^\d+$/ or return 0;
861 $pid += 0;
862 $pid > 0 or return 0; # invalid pid
863 kill(0, $pid) or return 0; # invalid pid
864 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
865 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
866 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
867 # the AddSupplicant call could send SIGUSR1 before it returns
868 my $result = Throttle::AddSupplicant($pid, $cls, $text);
869 kill('USR2', $pid), return 0 if $result < 0;
870 kill('TERM', $pid), return 0 if $result > 0;
871 # $pid was added to class $cls and will receive SIGUSR1 when
872 # it's time for it to proceed
873 return 0;
874 } elsif ($act eq "keepalive") {
875 # nothing to do although we could verify pid is valid and
876 # still in %Throttle::pids and send a SIGUSR2 if not, but
877 # really keepalive should just be ignored.
878 return 0;
880 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
881 return 0;
885 ## -------
886 ## OStream
887 ## -------
890 package OStream;
892 # Set to 1 for only syslog output (if enabled by mode)
893 # Set to 2 for only stderr output (if enabled by mode)
894 our $only = 0; # This is a hack
896 use Carp 'croak';
897 use Sys::Syslog qw(:DEFAULT :macros);
899 sub writeall {
900 my ($fd, $data) = @_;
901 my $offset = 0;
902 my $remaining = length($data);
903 while ($remaining) {
904 my $bytes = POSIX::write(
905 $fd,
906 substr($data, $offset, $remaining),
907 $remaining);
908 next if !defined($bytes) && $!{EINTR};
909 croak "POSIX::write failed: $!" unless defined $bytes;
910 croak "POSIX::write wrote 0 bytes" unless $bytes;
911 $remaining -= $bytes;
912 $offset += $bytes;
916 sub dumpline {
917 use POSIX qw(STDERR_FILENO);
918 my ($self, $line) = @_;
919 $only = 0 unless defined($only);
920 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
921 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
922 return unless length($line);
923 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
926 sub TIEHANDLE {
927 my $class = shift || 'OStream';
928 my $mode = shift;
929 my $syslogname = shift;
930 my $syslogfacility = shift;
931 defined($syslogfacility) or $syslogfacility = LOG_USER;
932 my $self = {};
933 $self->{'syslog'} = $mode > 0;
934 $self->{'stderr'} = $mode <= 0 || $mode > 1;
935 $self->{'lastline'} = '';
936 if ($self->{'syslog'}) {
937 # Some Sys::Syslog have a stupid default setlogsock order
938 eval {Sys::Syslog::setlogsock("native"); 1;} or
939 eval {Sys::Syslog::setlogsock("unix");};
940 openlog($syslogname, "ndelay,pid", $syslogfacility)
941 or croak "Sys::Syslog::openlog failed: $!";
943 return bless $self, $class;
946 sub BINMODE {return 1}
947 sub FILENO {return undef}
948 sub EOF {return 0}
949 sub CLOSE {return 1}
951 sub PRINTF {
952 my $self = shift;
953 my $template = shift;
954 return $self->PRINT(sprintf $template, @_);
957 sub PRINT {
958 my $self = shift;
959 my $data = join('', $self->{'lastline'}, @_);
960 my $pos = 0;
961 while ((my $idx = index($data, "\n", $pos)) >= 0) {
962 ++$idx;
963 my $line = substr($data, $pos, $idx - $pos);
964 substr($data, $pos, $idx - $pos) = '';
965 $pos = $idx;
966 $self->dumpline($line);
968 $self->{'lastline'} = $data;
969 return 1;
972 sub DESTROY {
973 my $self = shift;
974 $self->dumpline($self->{'lastline'})
975 if length($self->{'lastline'});
976 closelog;
979 sub WRITE {
980 my $self = shift;
981 my ($scalar, $length, $offset) = @_;
982 $scalar = '' if !defined($scalar);
983 $length = length($scalar) if !defined($length);
984 croak "OStream::WRITE invalid length $length"
985 if $length < 0;
986 $offset = 0 if !defined($offset);
987 $offset += length($scalar) if $offset < 0;
988 croak "OStream::WRITE invalid write offset"
989 if $offset < 0 || $offset > $length;
990 my $max = length($scalar) - $offset;
991 $length = $max if $length > $max;
992 $self->PRINT(substr($scalar, $offset, $length));
993 return $length;
997 ## ----
998 ## main
999 ## ----
1002 package main;
1004 # returns pid of process that will schedule jobd.pl restart on success
1005 # returns 0 if fork or other system call failed with error in $!
1006 # returns undef if jobd.pl does not currently appear to be running (no lockfile)
1007 sub schedule_jobd_restart {
1008 use POSIX qw(_exit setpgid dup2 :fcntl_h);
1009 my $devnull = File::Spec->devnull;
1010 my $newpg = shift;
1011 my $jdlf = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
1012 return undef unless -f $jdlf;
1013 my $oldsigchld = $SIG{'CHLD'};
1014 defined($oldsigchld) or $oldsigchld = sub {};
1015 my ($read, $write, $read2, $write2);
1016 pipe($read, $write) or return 0;
1017 select((select($write),$|=1)[0]);
1018 if (!pipe($read2, $write2)) {
1019 local $!;
1020 close $write;
1021 close $read;
1022 return 0;
1024 select((select($write2),$|=1)[0]);
1025 $SIG{'CHLD'} = sub {};
1026 my $retries = 3;
1027 my $child;
1028 while (!defined($child) && $retries--) {
1029 $child = fork;
1030 sleep 1 unless defined($child) || !$retries;
1032 if (!defined($child)) {
1033 local $!;
1034 close $write2;
1035 close $read2;
1036 close $write;
1037 close $read;
1038 $SIG{'CHLD'} = $oldsigchld;
1039 return 0;
1041 # double fork the child
1042 if (!$child) {
1043 close $read2;
1044 my $retries2 = 3;
1045 my $child2;
1046 while (!defined($child2) && $retries2--) {
1047 $child2 = fork;
1048 sleep 1 unless defined($child2) || !$retries2;
1050 if (!defined($child2)) {
1051 my $ec = 0 + $!;
1052 $ec = 255 unless $ec;
1053 print $write2 ":$ec";
1054 close $write2;
1055 _exit 127;
1057 if ($child2) {
1058 # pass new child pid up to parent and exit
1059 print $write2 $child2;
1060 close $write2;
1061 _exit 0;
1062 } else {
1063 # this is the grandchild
1064 close $write2;
1066 } else {
1067 close $write2;
1068 my $result = <$read2>;
1069 close $read2;
1070 chomp $result if defined($result);
1071 if (!defined($result) || $result !~ /^:?\d+$/) {
1072 # something's wrong with the child -- kill it
1073 kill(9, $child) && waitpid($child, 0);
1074 my $oldsigpipe = $SIG{'PIPE'};
1075 # make sure the grandchild, if any,
1076 # doesn't run the success proc
1077 $SIG{'PIPE'} = sub {};
1078 print $write 1;
1079 close $write;
1080 close $read;
1081 $SIG{'PIPE'} = defined($oldsigpipe) ?
1082 $oldsigpipe : 'DEFAULT';
1083 $! = 255;
1084 $SIG{'CHLD'} = $oldsigchld;
1085 return 0;
1087 if ($result =~ /^:(\d+)$/) {
1088 # fork failed in child, there is no grandchild
1089 my $ec = $1;
1090 waitpid($child, 0);
1091 close $write;
1092 close $read;
1093 $! = $ec;
1094 $SIG{'CHLD'} = $oldsigchld;
1095 return 0;
1097 # reap the child and set $child to grandchild's pid
1098 waitpid($child, 0);
1099 $child = $result;
1101 if (!$child) {
1102 # grandchild that actually initiates the jobd.pl restart
1103 close $write;
1104 my $wait = 5;
1105 my $ufd = POSIX::open($devnull, O_RDWR);
1106 if (defined($ufd)) {
1107 dup2($ufd, 0) unless $ufd == 0;
1108 dup2($ufd, 1) unless $ufd == 1;
1109 dup2($ufd, 2) unless $ufd == 2;
1110 POSIX::close($ufd) unless $ufd == 0 || $ufd == 1 || $ufd == 2;
1112 chdir "/";
1113 if ($newpg) {
1114 my $makepg = sub {
1115 my $result = setpgid(0, 0);
1116 if (!defined($result)) {
1117 --$wait;
1118 sleep 1;
1120 $result;
1122 my $result = &$makepg;
1123 defined($result) or $result = &$makepg;
1124 defined($result) or $result = &$makepg;
1125 defined($result) or $result = &$makepg;
1127 sleep $wait;
1128 my $result = <$read>;
1129 close $read;
1130 chomp $result if defined($result);
1131 if (!defined($result) || $result eq 0) {
1132 open JDLF, '+<', $jdlf or _exit(1);
1133 select((select(JDLF),$|=1)[0]);
1134 print JDLF "restart\n";
1135 truncate JDLF, tell(JDLF);
1136 close JDLF;
1138 _exit(0);
1140 close $write;
1141 close $read;
1142 $SIG{'CHLD'} = $oldsigchld;
1143 return $child;
1146 sub cancel_jobd_restart {
1147 my $restarter = shift;
1148 return unless defined($restarter) && $restarter != 0;
1149 return -1 unless kill(0, $restarter);
1150 kill(9, $restarter) or die "failed to kill jobd restarter process (pid $restarter): $!\n";
1151 # we must not waitpid because $restarter was doubly forked and will
1152 # NOT send us a SIGCHLD when it terminates
1153 return $restarter;
1156 my $reexec = Girocco::ExecUtil->new;
1157 my $realpath0 = realpath($0);
1158 chdir "/";
1159 close(DATA) if fileno(DATA);
1160 my $sfac;
1161 Getopt::Long::Configure('bundling');
1162 my ($stiv, $idiv);
1163 my $parse_res = GetOptions(
1164 'help|?|h' => sub {
1165 pod2usage(-verbose => 2, -exitval => 0, -input => $realpath0)},
1166 'quiet|q' => \$quiet,
1167 'no-quiet' => sub {$quiet = 0},
1168 'progress|P' => \$progress,
1169 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
1170 'idle-timeout|t=i' => \$idle_timeout,
1171 'daemon' => sub {$daemon = 1; $syslog = 1; $quiet = 1;},
1172 'max-lifetime=i' => \$max_lifetime,
1173 'syslog|s:s' => \$sfac,
1174 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
1175 'stderr' => \$stderr,
1176 'abbrev=i' => \$abbrev,
1177 'show-fast-forward-info' => \$showff,
1178 'no-show-fast-forward-info' => sub {$showff = 0},
1179 'same-pid' => \$same_pid,
1180 'no-same-pid' => sub {$same_pid = 0},
1181 'status-interval=i' => \$stiv,
1182 'idle-status-interval=i' => \$idiv,
1183 ) || pod2usage(-exitval => 2, -input => $realpath0);
1184 $same_pid = !$daemon unless defined($same_pid);
1185 $syslog = 1 if defined($sfac);
1186 $progress = 1 unless $quiet;
1187 $abbrev = 128 unless $abbrev > 0;
1188 pod2usage(-msg => "--inetd and --daemon are incompatible") if ($inetd && $daemon);
1189 if (defined($idle_timeout)) {
1190 die "--idle-timeout must be a whole number\n" unless $idle_timeout =~ /^\d+$/;
1191 die "--idle-timeout may not be used without --inetd\n" unless $inetd;
1193 if (defined($max_lifetime)) {
1194 die "--max-lifetime must be a whole number\n" unless $max_lifetime =~ /^\d+$/;
1195 $max_lifetime += 0;
1197 defined($max_lifetime) or $max_lifetime = 604800; # 1 week
1198 if (defined($stiv)) {
1199 die "--status-interval must be a whole number\n" unless $stiv =~ /^\d+$/;
1200 $statusintv = $stiv * 60;
1202 if (defined($idiv)) {
1203 die "--idle-status-interval must be a whole number\n" unless $idiv =~ /^\d+$/;
1204 $idleintv = $idiv * 60;
1207 open STDIN, '<'.File::Spec->devnull or die "could not redirect STDIN to /dev/null\n" unless $inetd;
1208 open STDOUT, '>&STDERR' if $inetd;
1209 if ($syslog) {
1210 use Sys::Syslog qw();
1211 my $mode = 1;
1212 ++$mode if $stderr;
1213 $sfac = "user" unless defined($sfac) && $sfac ne "";
1214 my $ofac = $sfac;
1215 $sfac = uc($sfac);
1216 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1217 my $facility;
1218 my %badfac = map({("LOG_$_" => 1)}
1219 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1220 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac\n";
1221 die "invalid syslog facility: $ofac\n"
1222 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1223 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
1225 if ($quiet) {
1226 open STDOUT, '>', '/dev/null';
1227 } elsif ($inetd) {
1228 *STDOUT = *STDERR;
1231 my ($NAME, $INO);
1233 my $restart_file = $Girocco::Config::chroot.'/etc/taskd.restart';
1234 my $restart_active = 1;
1235 my $resumefd = $ENV{(SOCKFDENV)};
1236 delete $ENV{(SOCKFDENV)};
1237 if (defined($resumefd)) {{
1238 unless ($resumefd =~ /^(\d+)(?::(-?\d+))?$/) {
1239 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- bad format\n";
1240 $resumefd = undef;
1241 last;
1243 my $resumeino;
1244 ($resumefd, $resumeino) = ($1, $2);
1245 $resumefd += 0;
1246 unless (isfdopen($resumefd)) {
1247 warn "ignoring invalid ".SOCKFDENV." environment value -- fd \"$resumefd\" not open\n";
1248 $resumefd = undef;
1249 last;
1251 unless ($inetd) {
1252 unless (defined($resumeino)) {
1253 warn "ignoring invalid ".SOCKFDENV." environment value (\"$resumefd\") -- missing inode\n";
1254 POSIX::close($resumefd);
1255 $resumefd = undef;
1256 last;
1258 $resumeino += 0;
1259 my $sockloc = $Girocco::Config::chroot.'/etc/taskd.socket';
1260 my $slinode = (stat($sockloc))[1];
1261 unless (defined($slinode) && -S _) {
1262 warn "ignoring ".SOCKFDENV." environment value; socket file does not exist: $sockloc\n";
1263 POSIX::close($resumefd);
1264 $resumefd = undef;
1265 last;
1267 open Test, "<&$resumefd" or die "open: $!";
1268 my $sockname = getsockname Test;
1269 my $sockpath;
1270 $sockpath = unpack_sockaddr_un $sockname if $sockname && sockaddr_family($sockname) == AF_UNIX;
1271 close Test;
1272 if (!defined($resumeino) || !defined($sockpath) || $resumeino != $slinode || realpath($sockloc) ne realpath($sockpath)) {
1273 warn "ignoring ".SOCKFDENV." environment value; does not match socket file: $sockloc\n";
1274 POSIX::close($resumefd);
1275 $resumefd = undef;
1277 $INO = $resumeino;
1280 if ($inetd || defined($resumefd)) {
1281 my $fdopen = defined($resumefd) ? $resumefd : 0;
1282 open Server, "<&=$fdopen" or die "open: $!";
1283 setcloexec(\*Server) if $fdopen > $^F;
1284 my $sockname = getsockname Server;
1285 die "getsockname: $!" unless $sockname;
1286 die "socket already connected! must be 'wait' socket\n" if getpeername Server;
1287 die "getpeername: $!" unless $!{ENOTCONN};
1288 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
1289 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1290 my $socktype = unpack('i', $st);
1291 die "stream socket required\n" unless defined $socktype && $socktype == SOCK_STREAM;
1292 die "AF_UNIX socket required\n" unless sockaddr_family($sockname) == AF_UNIX;
1293 $NAME = unpack_sockaddr_un $sockname;
1294 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
1295 if (realpath($NAME) ne realpath($expected)) {
1296 $restart_active = 0;
1297 warn "listening on \"$NAME\" but expected \"$expected\", restart file disabled\n";
1299 my $mode = (stat($NAME))[2];
1300 die "stat: $!" unless $mode;
1301 $mode &= 07777;
1302 if (($mode & 0660) != 0660) {
1303 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1305 } else {
1306 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
1307 my $uaddr = sockaddr_un($NAME);
1309 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1310 die "already exists but not a socket: $NAME\n" if -e $NAME && ! -S _;
1311 if (-e _) {
1312 # Do not unlink another instance's active listen socket!
1313 socket(my $sfd, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1314 connect($sfd, $uaddr) || $!{EPROTOTYPE} and
1315 die "Live socket '$NAME' exists. Please make sure no other instance of taskd is running.\n";
1316 close($sfd);
1317 unlink($NAME);
1319 bind(Server, $uaddr) or die "bind failed: $!";
1320 listen(Server, SOMAXCONN) or die "listen failed: $!";
1321 chmod 0666, $NAME or die "chmod failed: $!";
1322 $INO = (stat($NAME))[1] or die "stat failed: $!";
1325 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1326 my $classname = $throttle->{"name"};
1327 $classname or next;
1328 Throttle::GetClassInfo($classname, $throttle);
1331 sub _min {
1332 return $_[0] <= $_[1] ? $_[0] : $_[1];
1335 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1336 setnonblock($piperead);
1337 select((select($pipewrite), $|=1)[0]);
1338 my $pipebuff = '';
1339 my $fdset_both = '';
1340 vec($fdset_both, fileno($piperead), 1) = 1;
1341 my $fdset_pipe = $fdset_both;
1342 vec($fdset_both, fileno(Server), 1) = 1;
1343 my $penalty = 0;
1344 my $t = time;
1345 my $penaltytime = $t;
1346 my $nextwakeup = $t + 60;
1347 my $nextstatus = undef;
1348 $nextstatus = $t + $statusintv if $statusintv;
1349 if ($restart_active) {
1350 unless (unlink($restart_file) || $!{ENOENT}) {
1351 $restart_active = 0;
1352 statmsg "restart file disabled could not unlink \"$restart_file\": $!";
1355 daemon(1, 1) or die "failed to daemonize: $!\n" if $daemon;
1356 my $starttime = time;
1357 my $endtime = $max_lifetime ? $starttime + $max_lifetime : 0;
1358 statmsg "listening on $NAME";
1359 while (1) {
1360 my ($rout, $eout, $nfound);
1361 do {
1362 my $wait;
1363 my $now = time;
1364 my $adjustpenalty = sub {
1365 if ($penaltytime < $now) {
1366 my $credit = $now - $penaltytime;
1367 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1368 $penaltytime = $now;
1371 if (defined($nextstatus) && $now >= $nextstatus) {
1372 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1373 my $statmsg = "STATUS: $children active";
1374 my @running = ();
1375 if ($children) {
1376 my @stats = ();
1377 my $cnt = 0;
1378 foreach my $cls (sort(Throttle::GetClassList())) {
1379 my $inf = Throttle::GetClassInfo($cls);
1380 if ($inf->{'total'}) {
1381 $cnt += $inf->{'total'};
1382 push(@stats, substr(lc($cls),0,1)."=".
1383 $inf->{'total'}.'/'.$inf->{'active'});
1386 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1387 $statmsg .= " (".join(" ",@stats).")" if @stats;
1388 foreach (Throttle::GetRunningPids()) {
1389 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1390 next unless $ts;
1391 push(@running, "[${cls}::$desc] ".duration($now-$ts));
1394 my $idlesecs;
1395 $statmsg .= ", idle " . duration($idlesecs)
1396 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1397 statmsg $statmsg;
1398 statmsg "STATUS: currently running: ".join(", ", @running)
1399 if @running;
1400 $idlestatus = $now if !$children;
1402 $nextstatus += $statusintv while $nextstatus <= $now;
1404 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1405 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1406 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1407 my $fdset;
1408 if ($penalty <= $maxspawn) {
1409 $fdset = $fdset_both;
1410 } else {
1411 $fdset = $fdset_pipe;
1412 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1414 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1415 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1416 my $reaped;
1417 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1418 $now = time;
1419 &$adjustpenalty; # this prevents banking credits for elapsed time
1420 if (!$children && !$nfound && $restart_active && (($endtime && $now >= $endtime) || -e $restart_file)) {
1421 statmsg "RESTART: restart requested; max lifetime ($max_lifetime) exceeded" if $endtime && $now >= $endtime;
1422 $SIG{CHLD} = sub {};
1423 my $restarter = schedule_jobd_restart($inetd);
1424 if (defined($restarter) && !$restarter) {
1425 statmsg "RESTART: restart requested; retrying failed scheduling of jobd restart: $!";
1426 sleep 2; # *cough*
1427 $restarter = schedule_jobd_restart;
1428 if (!defined($restarter)) {
1429 statmsg "RESTART: restart requested; reschedule skipped jobd no longer running";
1430 } elsif (defined($restarter) && !$restarter) {
1431 statmsg "RESTART: restart requested; retry of jobd restart scheduling failed, skipping jobd restart: $!";
1432 $restarter = undef;
1435 if ($inetd) {
1436 statmsg "RESTART: restart requested; now exiting for inetd restart";
1437 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1438 sleep 2; # *cough*
1439 exit 0;
1440 } else {
1441 statmsg "RESTART: restart requested; now restarting";
1442 statmsg "RESTART: restart requested; jobd restart scheduled in 5 seconds" if $restarter;
1443 setnoncloexec(\*Server);
1444 $reexec->setenv(SOCKFDENV, fileno(Server).":$INO");
1445 $reexec->reexec($same_pid);
1446 setcloexec(\*Server) if fileno(Server) > $^F;
1447 statmsg "RESTART: continuing after failed restart: $!";
1448 chdir "/";
1449 cancel_jobd_restart($restarter) if $restarter;
1450 statmsg "RESTART: scheduled jobd restart has been cancelled" if $restarter;
1451 $SIG{CHLD} = \&REAPER;
1454 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1455 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1456 exit 0;
1458 } while $nfound < 1;
1459 my $reout = $rout | $eout;
1460 if (vec($reout, fileno($piperead), 1)) {{
1461 my $nloff = -1;
1463 my $bytes;
1464 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1465 while (!defined($bytes) && $!{EINTR});
1466 last if !defined($bytes) && $!{EAGAIN};
1467 die "sysread failed: $!" unless defined $bytes;
1468 # since we always keep a copy of $pipewrite open EOF is fatal
1469 die "sysread returned EOF on pipe read" unless $bytes;
1470 $nloff = index($pipebuff, "\n", 0);
1471 if ($nloff < 0 && length($pipebuff) >= 512) {
1472 $pipebuff = '';
1473 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1475 redo unless $nloff >= 0;
1477 last unless $nloff >= 0;
1478 do {
1479 my $msg = substr($pipebuff, 0, $nloff);
1480 substr($pipebuff, 0, $nloff + 1) = '';
1481 $nloff = index($pipebuff, "\n", 0);
1482 process_pipe_msg($msg) if length($msg);
1483 } while $nloff >= 0;
1484 redo;
1486 next unless vec($reout, fileno(Server), 1);
1487 unless (accept(Client, Server)) {
1488 logmsg "accept failed: $!" unless $!{EINTR};
1489 next;
1491 logmsg "connection on $NAME";
1492 ++$penalty;
1493 spawn sub {
1494 my $inp = <STDIN>;
1495 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1496 chomp $inp if defined($inp);
1497 # ignore empty and "nop" connects
1498 defined($inp) && $inp ne "" && $inp ne "nop" or exit 0;
1499 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1500 defined($arg) or $arg = '';
1501 if ($cmd eq 'ref-changes') {
1502 ref_changes($arg);
1503 } elsif ($cmd eq 'clone') {
1504 clone($arg);
1505 } elsif ($cmd eq 'ref-change') {
1506 ref_change($arg);
1507 } elsif ($cmd eq 'throttle') {
1508 throttle($arg);
1509 } else {
1510 statmsg "ignoring unknown command: $cmd";
1511 exit 3;
1514 close Client;
1518 ## -------------
1519 ## Documentation
1520 ## -------------
1523 __END__
1525 =head1 NAME
1527 taskd.pl - Perform Girocco service tasks
1529 =head1 SYNOPSIS
1531 taskd.pl [options]
1533 Options:
1534 -h | --help detailed instructions
1535 -q | --quiet run quietly
1536 --no-quiet do not run quietly
1537 -P | --progress show occasional status updates
1538 -i | --inetd run as inetd unix stream wait service
1539 implies --quiet --syslog
1540 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1541 requires --inetd
1542 --daemon become a background daemon
1543 implies --quiet --syslog
1544 --max-lifetime=SECONDS how long before graceful restart
1545 default is 1 week, 0 disables
1546 -s | --syslog[=facility] send messages to syslog instead of
1547 stderr but see --stderr
1548 enabled by --inetd
1549 --no-syslog do not send message to syslog
1550 --stderr always send messages to stderr too
1551 --abbrev=n abbreviate hashes to n (default is 8)
1552 --show-fast-forward-info show fast-forward info (default is on)
1553 --no-show-fast-forward-info disable showing fast-forward info
1554 --same-pid keep same pid during graceful restart
1555 --no-same-pid do not keep same pid on graceful rstrt
1556 --status-interval=MINUTES status update interval (default 1)
1557 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1559 =head1 OPTIONS
1561 =over 8
1563 =item B<--help>
1565 Print the full description of taskd.pl's options.
1567 =item B<--quiet>
1569 Suppress non-error messages, e.g. for use when running this task as an inetd
1570 service. Enabled by default by --inetd.
1572 =item B<--no-quiet>
1574 Enable non-error messages. When running in --inetd mode these messages are
1575 sent to STDERR instead of STDOUT.
1577 =item B<--progress>
1579 Show information about the current status of the task operation occasionally.
1580 This is automatically enabled if --quiet is not given.
1582 =item B<--inetd>
1584 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1585 stream socket ready to have accept called on it. To be useful, the unix socket
1586 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1587 will be issued if the socket is not in the expected location. Socket file
1588 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1589 die. The --inetd option also enables the --quiet and --syslog options but
1590 --no-quiet and --no-syslog may be used to alter that.
1592 The correct specification for the inetd socket is a "unix" protocol "stream"
1593 socket in "wait" mode with user and group writable permissions (0660). An
1594 attempt will be made to alter the socket's file mode if needed and if that
1595 cannot be accomplished taskd.pl will die.
1597 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1598 in wait mode and will die if the passed in socket is already connected.
1600 Note that while *BSD's inetd happily supports unix sockets (and so does
1601 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1602 However, systemd does seem to.
1604 =item B<--idle-timeout=SECONDS>
1606 Only permitted when running in --inetd mode. After SECONDS of inactivity
1607 (i.e. all outstanding tasks have completed and no new requests have come in)
1608 exit normally. The default is no timeout at all (a SECONDS value of 0).
1609 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1611 =item B<--daemon>
1613 Fork and become a background daemon. Implies B<--syslog> and B<--quiet> (which
1614 can be altered by subsequent B<--no-syslog> and/or B<--no-quiet> options).
1615 Also implies B<--no-same-pid>, but since graceful restarts work by re-exec'ing
1616 taskd.pl with all of its original arguments, using B<--same-pid> won't really
1617 be effective with B<--daemon> since although it will cause the graceful restart
1618 exec to happen from the same pid, when the B<--daemon> option is subsequently
1619 processed it will end up in a new pid anyway.
1621 =item B<--max-lifetime=SECONDS>
1623 After taskd has been running for SECONDS of realtime, it will behave as though
1624 a graceful restart has been requested. A graceful restart takes place the
1625 next time taskd becomes idle (which may require up to 60 seconds to notice).
1626 If jobd is running when a graceful restart occurs, then jabd will also receive
1627 a graceful restart request at that time. The default value is 1 week (604800),
1628 set to 0 to disable.
1630 =item B<--syslog[=facility]>
1632 Normally error output is sent to STDERR. With this option it's sent to
1633 syslog instead. Note that when running in --inetd mode non-error output is
1634 also affected by this option as it's sent to STDERR in that case. If
1635 not specified, the default for facility is LOG_USER. Facility names are
1636 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1637 with the LOG_NOTICE priority.
1639 =item B<--no-syslog>
1641 Send error message output to STDERR but not syslog.
1643 =item B<--stderr>
1645 Always send error message output to STDERR. If --syslog is in effect then
1646 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1647 messages as well.
1649 =item B<--abbrev=n>
1651 Abbreviate displayed hash values to only the first n hexadecimal characters.
1652 The default is 8 characters. Set to 0 for no abbreviation at all.
1654 =item B<--show-fast-forward-info>
1656 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1657 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1658 This requires running an extra git command for each ref update that is not a
1659 creation or deletion in order to determine whether or not it's a fast forward.
1661 =item B<--no-show-fast-forward-info>
1663 Disable showing of fast-forward information for ref-change/ref-notify update
1664 messages. Instead just show a ' -> ' indicator.
1666 =item B<--same-pid>
1668 When performing a graceful restart, perform the graceful restart exec from
1669 the same pid rather than switching to a new one. This is implied when
1670 I<--daemon> is I<NOT> used.
1672 =item B<--no-same-pid>
1674 When performing a graceful restart, perform the graceful restart exec after
1675 switching to a new pid. This is implied when I<--daemon> I<IS> used.
1677 =item B<--status-interval=MINUTES>
1679 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1680 status updates are shown at each MINUTES interval. Setting the interval to 0
1681 disables them entirely even with --progress.
1683 =item B<--idle-status-interval=IDLEMINUTES>
1685 Two consecutive "idle" status updates with no intervening activity will not be
1686 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1687 Setting the interval to 0 prevents any consecutive idle updates (with no
1688 activity between them) from appearing at all.
1690 =back
1692 =head1 DESCRIPTION
1694 taskd.pl is Girocco's service request servant; it listens for service requests
1695 such as new clone requests and ref update notifications and spawns a task to
1696 perform the requested action.
1698 =cut