3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
39 use POSIX
":sys_wait_h";
47 use Girocco
::Util
qw(noFatalsToBrowser get_git);
48 BEGIN {noFatalsToBrowser
}
50 # Throttle Classes Defaults
51 # Note that any same-named classes in @Girocco::Config::throttle_classes
52 # will override (completely replacing the entire hash) these ones.
53 my @throttle_defaults = (
68 #maxproc => max(5, cpucount + maxjobs), # this is the default
69 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
89 my $progname = basename
($0);
95 use Girocco
::Util
"online_cpus";
96 our $online_cpus_result;
97 $online_cpus_result = online_cpus
unless $online_cpus_result;
98 return $online_cpus_result;
102 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
104 $OStream::only
= 2; # STDERR only
106 $OStream::only
= 1; # syslog only
108 $OStream::only
= 0; # back to default
115 return unless $progress;
116 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
118 $OStream::only
= 2; # STDERR only
119 print STDERR
"$hdr@_\n";
120 $OStream::only
= 1; # syslog only
122 $OStream::only
= 0; # back to default
124 print STDERR
"$hdr@_\n";
130 return $secs unless defined($secs) && $secs >= 0;
132 my $ans = ($secs % 60) . 's';
133 return $ans if $secs < 60;
134 $secs = int($secs / 60);
135 $ans = ($secs % 60) . 'm' . $ans;
136 return $ans if $secs < 60;
137 $secs = int($secs / 60);
138 $ans = ($secs % 24) . 'h' . $ans;
139 return $ans if $secs < 24;
140 $secs = int($secs / 24);
141 return $secs . 'd' . $ans;
146 my $flags = fcntl($fd, F_GETFL
, 0);
147 defined($flags) or die "fcntl failed: $!";
148 fcntl($fd, F_SETFL
, $flags | O_NONBLOCK
) or die "fcntl failed: $!";
153 my $flags = fcntl($fd, F_GETFL
, 0);
154 defined($flags) or die "fcntl failed: $!";
155 fcntl($fd, F_SETFL
, $flags & ~O_NONBLOCK
) or die "fcntl failed: $!";
163 ## 1) Process needing throttle services acquire a control file descriptor
164 ## a) Either as a result of a fork + exec (the write end of a pipe)
165 ## b) Or by connecting to the taskd socket (not yet implemented)
167 ## 2) The process requesting throttle services will be referred to
168 ## as the supplicant or just "supp" for short.
170 ## 3) The supp first completes any needed setup which may include
171 ## gathering data it needs to perform the action -- if that fails
172 ## then there's no need for any throttling.
174 ## 4) The supp writes a throttle request to the control descriptor in
176 ## throttle <pid> <class>\n
177 ## for example if the supp's pid was 1234 and it was requesting throttle
178 ## control as a member of the mail class it would write this message:
179 ## throttle 1234 mail\n
180 ## Note that if the control descriptor happens to be a pipe rather than a
181 ## socket, the message should be preceded by another "\n" just be be safe.
182 ## If the control descriptor is a socket, not a pipe, the message may be
183 ## preceded by a "\n" but that's not recommended.
185 ## 5) For supplicants with a control descriptor that is a pipe
186 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
187 ## If the control descriptor is a socket (getsockname succeeds) then
188 ## protocol (5b) should be used.
190 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
191 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
192 ## to write a "keepalive\n" message to the control descriptor. If that
193 ## fails, the controller has gone away and it may make its own decision
194 ## whether or not to proceed at that point. If, on the other hand, it
195 ## receives a SIGTERM, the process limit for its class has been reached
196 ## and it should abort without performing its action. If it receives
197 ## SIGUSR1, it may proceed without writing anything more to the control
198 ## descriptor, any MAY even close the control descriptor. Finally, a
199 ## SIGUSR2 indicates rejection of the throttle request for some other reason
200 ## such as unrecognized class name or invalid pid in which case the supp may
201 ## make its own decision how to proceed.
203 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
204 ## more than 512 bytes and if a '\n' does not appear within that number of
205 ## bytes the read should be considered failed. Otherwise the read should
206 ## be retried until either a full line has been read or the socket is
207 ## closed from the other end. If the lone read is "proceed\n" then it may
208 ## proceed without reading or writing anything more to the control
209 ## descriptor, but MUST keep the control descriptor open and not call
210 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
211 ## should be retried) constitutes failure. If a full line starting with at
212 ## least one alpha character was read but it was not "proceed" then it
213 ## should abort without performing its action. For any other failure it
214 ## may make its own decision whether or not to proceed as the controller has
217 ## 6) The supp now performs its throttled action.
219 ## 7) The supp now closes its control descriptor (if it hasn't already in the
220 ## case of (5a)) and exits -- in the case of a socket, the other end receives
221 ## notification that the socket has been closed (read EOF). In the case of
222 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
223 ## of the other end of the pipe, so it will not reaach EOF by the supp's
224 ## exit in that case).
227 # keys are class names, values are hash refs with these fields:
228 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
229 # many may be queued waiting plus how many may be
230 # concurrently active) with 0 meaning no limit.
231 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
232 # of 0 is unlimited but the number of concurrent
233 # supplicants will always be limited to no more than
234 # the 'maxproc' value (if > 0) no matter what the
235 # 'maxjobs' value is.
236 # 'total' -> integer; the total number of pids belonging to this clase that
237 # can currently be found in %pid.
238 # 'active' -> integer; the number of currently active supplicants which should
239 # be the same as (the number of elements of %pid with a
240 # matching class name) - (number of my class in @queue).
241 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
242 # or SIGUSR1 signals to members of this class.
243 # 'lastqueue' -> time; last time a supplicant was successfully queued.
244 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
245 # 'lastthrottle' => time; last time a supplicant was throttled
246 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
249 # keys are pid numbers, values are array refs with these elements:
250 # [0] => name of class (key to classes hash)
251 # [1] => supplicant state (0 => queued, non-zero => time it started running)
252 # [2] => descriptive text (e.g. project name)
255 # minimum number of seconds between any two proceed responses no matter what
256 # class. this takes priority in that it can effectively increase the
257 # class's 'interval' value by delaying proceed notifications if the minimum
258 # interval has not yet elapsed.
261 # fifo of pids awaiting notification as soon as the next $interval elapses
262 # provided interval and maxjobs requirements are satisfied
263 # for the class of the pid that will next be triggered.
266 # time of most recent successful call to AddSupplicant
269 # time of most recent proceed notification
272 # time of most recent throttle
273 my $lastthrottle = 0;
275 # time of most recent removal
278 # lifetime count of how many have been queued
281 # lifetime count of how many have been allowed to proceed
282 my $totalproceed = 0;
284 # lifetime count of how many have been throttled
285 my $totalthrottle = 0;
287 # lifetime count of how many have died
288 # It should always be true that $totalqueued - $totaldied == $curentlyactive
291 # Returns an unordered list of currently registered class names
293 return keys(%classes);
297 return $_[0] if $_[0] >= $_[1];
302 my ($min, $val, $default) = @_;
304 if (defined($val) && $val =~ /^[+-]?\d+$/) {
309 return _max
($min, $ans);
312 # [0] => name of class to find
313 # [1] => if true, create class if it doesn't exist, if a hashref then
314 # it contains initial values for maxproc, maxjobs and interval.
315 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
316 # defaults to the max(5, number of cpu cores + maxjobs) and interval
318 # Returns a hash ref with info about the class on success
320 my ($classname, $init) = @_;
321 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
323 $classname = lc($classname);
325 if ($classes{$classname}) {
326 %info = %{$classes{$classname}};
331 ref($init) eq 'HASH' or $init = {};
332 $newclass{'maxjobs'} = _getnum
(0, $init->{'maxjobs'}, sub{_max
(1, int(::cpucount
() / 4))});
333 $newclass{'maxproc'} = _getnum
(0, $init->{'maxproc'}, sub{_max
(5, ::cpucount
() + $newclass{'maxjobs'})});
334 $newclass{'interval'} = _getnum
(0, $init->{'interval'}, sub{1});
335 $newclass{'total'} = 0;
336 $newclass{'active'} = 0;
337 $newclass{'lastqueue'} = 0;
338 $newclass{'lastproceed'} = 0;
339 $newclass{'lastthrottle'} = 0;
340 $newclass{'lastdied'} = 0;
341 $classes{$classname} = \
%newclass;
346 # [0] => pid to look up
347 # Returns () if not found otherwise ($classname, $timestarted, $description)
348 # Where $timestarted will be 0 if it's still queued otherwise a time() value
351 return () unless exists $pid{$pid};
352 return @
{$pid{$pid}};
355 # Returns array of pid numbers that are currently running sorted
356 # by time started (oldest to newest). Can return an empty array.
358 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
359 grep({ ${$pid{$_}}[1] } keys(%pid)));
362 # Returns a hash with various about the current state
363 # 'interval' => global minimum interval between proceeds
364 # 'active' => how many pids are currently queued + how many are running
365 # 'queue' => how many pids are currently queued
366 # 'lastqueue' => time (epoch seconds) of last queue
367 # 'lastproceed' => time (epoch seconds) of last proceed
368 # 'lastthrottle' => time (epoch seconds) of last throttle
369 # 'lastdied' => time (epoch seconds) of last removal
370 # 'totalqueue' => lifetime total number of processes queued
371 # 'totalproceed' => lifetime total number of processes proceeded
372 # 'totalthrottle' => lifetime total number of processes throttled
373 # 'totaldied' => lifetime total number of removed processes
376 interval
=> $interval,
377 active
=> scalar(keys(%pid)) - scalar(@queue),
378 queue
=> scalar(@queue),
379 lastqueue
=> $lastqueue,
380 lastproceed
=> $lastproceed,
381 lastthrottle
=> $lastthrottle,
382 lastdied
=> $lastdied,
383 totalqueue
=> $totalqueue,
384 totalproceed
=> $totalproceed,
385 totalthrottle
=> $totalthrottle,
386 totaldied
=> $totaldied
390 # with no args get the global interval
391 # with one arg set it, returns previous value if set
394 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
398 sub RemoveSupplicant
;
400 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
401 # Returns minimum interval until next proceed is possible
402 # Returns undef if there's nothing waiting to proceed or
403 # the 'maxjobs' limits have been reached for all queued items (in which
404 # case it won't be possible to proceed until one of them exits, hence undef)
405 # This is called automatially by AddSupplicant and RemoveSupplicant
408 return undef unless @queue; # if there's nothing queued, nothing to do
410 my $min = _max
(0, $interval - ($now - $lastproceed));
411 my $classmin = undef;
412 my $classchecked = 0;
414 my $classcount = scalar(keys(%classes));
415 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
416 my $pid = $queue[$i];
417 my $procinfo = $pid{$pid};
419 RemoveSupplicant
($pid, 1);
422 my $classinfo = $classes{$$procinfo[0]};
424 RemoveSupplicant
($pid, 1);
427 if (!$seenclass{$$procinfo[0]}) {
428 $seenclass{$$procinfo[0]} = 1;
430 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
431 my $cmin = _max
(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
432 if (!$cmin && !$min) {
434 $$procinfo[1] = $now;
435 splice(@queue, $i, 1);
438 $classinfo->{'lastproceed'} = $now;
439 ++$classinfo->{'active'};
440 kill("USR1", $pid) or RemoveSupplicant
($pid, 1);
443 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
447 return defined($classmin) ? _max
($min, $classmin) : undef;
450 # $1 => pid to add (must not already be in %pids)
451 # $2 => class name (must exist)
452 # Returns -1 if no such class or pid already present or invalid
453 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
454 # Return 1 if throttled and cannot be added
456 my ($pid, $classname, $text, $noservice) = @_;
457 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
459 kill(0, $pid) or return -1;
460 my $classinfo = $classes{$classname};
461 return -1 unless $classinfo;
462 return -1 if $pid{$pid};
463 $text = '' unless defined($text);
465 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
467 $lastthrottle = $now;
468 $classinfo->{'lastthrottle'} = $now;
473 $pid{$pid} = [$classname, 0, $text];
474 ++$classinfo->{'total'};
475 $classinfo->{'lastqueue'} = $now;
477 ServiceQueue
unless $noservice;
481 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
482 # Returns 0 if removed
483 # Returns -1 if unknown pid or other error during removal
484 sub RemoveSupplicant
{
485 my ($pid, $noservice) = @_;
486 return -1 unless defined($pid) && $pid =~ /^\d+$/;
488 my $pidinfo = $pid{$pid};
489 $pidinfo or return -1;
495 for (my $i=0; $i<=$#queue; ++$i) {
496 if ($queue[$i] == $pid) {
497 splice(@queue, $i, 1);
502 my $classinfo = $classes{$$pidinfo[0]};
503 ServiceQueue
, return -1 unless $classinfo;
504 --$classinfo->{'active'} if $$pidinfo[1];
505 --$classinfo->{'total'};
506 $classinfo->{'lastdied'} = $now;
507 ServiceQueue
unless $noservice;
523 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
536 while (($waitedpid = waitpid(-1, WNOHANG
)) > 0) {
537 my $code = $?
& 0xffff;
538 $idlestart = time if !--$children;
540 if (!($code & 0xff)) {
541 $codemsg = " with exit code ".($code >> 8) if $code;
542 } elsif ($code & 0x7f) {
543 my $signum = ($code & 0x7f);
544 $codemsg = " with signal ".
545 ($signame{$signum}?
$signame{$signum}:$signum);
547 logmsg
"reaped $waitedpid$codemsg";
548 push(@reapedpids, $waitedpid);
550 $SIG{CHLD
} = \
&REAPER
; # loathe sysV
553 $SIG{CHLD
} = \
&REAPER
; # Apollo 440
555 my ($piperead, $pipewrite);
560 if (not defined $pid) {
561 logmsg
"cannot fork: $!";
564 $idlestart = time if !++$children;
567 return; # I'm the parent
570 close(Server
) unless fileno(Server
) == 0;
572 $SIG{'CHLD'} = sub {};
574 open STDIN
, "+<&Client" or die "can't dup client to stdin";
583 sub request_throttle
{
584 use POSIX
qw(sigprocmask sigsuspend SIG_SETMASK);
585 my $classname = shift;
588 Throttle
::GetClassInfo
($classname)
589 or return -1; # no such throttle class
595 my $setempty = POSIX
::SigSet
->new;
596 my $setfull = POSIX
::SigSet
->new;
597 $setempty->emptyset();
599 $SIG{'TERM'} = sub {$throttled = 1};
600 $SIG{'USR1'} = sub {$proceed = 1};
601 $SIG{'USR2'} = sub {$error = 1};
602 $SIG{'PIPE'} = sub {$controldead = 1};
603 $SIG{'ALRM'} = sub {};
605 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
606 print $pipewrite "\nthrottle $$ $classname $text\n";
607 my $old = POSIX
::SigSet
->new;
608 sigprocmask
(SIG_SETMASK
, $setfull, $old);
609 until ($controldead || $throttled || $proceed || $error) {
611 sigsuspend
($setempty);
613 sigprocmask
(SIG_SETMASK
, $setempty, $old);
614 print $pipewrite "\nkeepalive $$\n";
615 sigprocmask
(SIG_SETMASK
, $setfull, $old);
617 sigprocmask
(SIG_SETMASK
, $setempty, $old);
618 $SIG{'TERM'} = "DEFAULT";
619 $SIG{'USR1'} = "DEFAULT";
620 $SIG{'USR2'} = "DEFAULT";
621 $SIG{'ALRM'} = "DEFAULT";
622 $SIG{'PIPE'} = "DEFAULT";
635 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
636 my $proj = Girocco
::Project
->load($name);
637 $proj or die "failed to load project $name";
638 $proj->{clone_in_progress
} or die "project $name is not marked for cloning";
639 $proj->{clone_logged
} and die "project $name is already being cloned";
640 request_throttle
("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
641 statmsg
"cloning $name";
642 open STDOUT
, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
643 open STDERR
, ">&STDOUT";
644 open STDIN
, '<', '/dev/null';
645 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
649 return ' -> ' unless $showff && defined($_[0]);
650 my ($git_dir, $old, $new) = @_;
651 return '..' unless defined($old) && defined($new) && $old !~ /^0+$/ && $new !~ /^0+$/ && $old ne $new;
652 # In many cases `git merge-base` is slower than this even if using the
653 # `--is-ancestor` option available since Git 1.8.0, but it's never faster
654 my $ans = get_git
("--git-dir=$git_dir", "rev-list", "-n", "1", "^$new^0", "$old^0", "--") ?
'...' : '..';
655 return wantarray ?
($ans, 1) : $ans;
660 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
661 $username && $name && $oldrev && $newrev && $ref or return 0;
662 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
663 $newrev ne $oldrev or return 0;
665 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
666 my $proj = Girocco
::Project
->load($name);
667 $proj or die "failed to load project $name";
668 my $has_notify = $proj->has_notify;
669 my $type = $has_notify ?
"notify" : "change";
672 if ($username && $username !~ /^%.*%$/) {
673 Girocco
::User
::does_exist
($username, 1) or die "no such user: $username";
674 $user = Girocco
::User
->load($username);
675 $user or die "failed to load user $username";
676 } elsif ($username eq "%$name%") {
680 request_throttle
("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
681 my $ind = ref_indicator
($proj->{path
}, $oldrev, $newrev);
682 statmsg
"ref-$type $username $name ($ref: @{[substr($oldrev,0,$abbrev)]}$ind@{[substr($newrev,0,$abbrev)]})";
683 open STDIN
, '<', '/dev/null';
684 Girocco
::Notify
::ref_change
($proj, $user, $ref, $oldrev, $newrev) if $has_notify;
690 my ($username, $name) = split(/\s+/, $arg);
691 $username && $name or return 0;
693 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
694 my $proj = Girocco
::Project
->load($name);
695 $proj or die "failed to load project $name";
696 my $has_notify = $proj->has_notify;
697 my $type = $has_notify ?
"notify" : "change";
700 if ($username && $username !~ /^%.*%$/) {
701 Girocco
::User
::does_exist
($username, 1) or die "no such user: $username";
702 $user = Girocco
::User
->load($username);
703 $user or die "failed to load user $username";
704 } elsif ($username eq "%$name%") {
710 my %deletedheads = ();
711 while (my $change = <STDIN
>) {
712 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
713 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or next;
714 if ($ref =~ m{^refs/heads/.}) {
715 if ($oldrev =~ /^0{40}$/) {
716 delete $oldheads{$ref};
717 $deletedheads{$ref} = 1;
718 } elsif ($newrev ne $oldrev || (!exists($oldheads{$ref}) && !$deletedheads{$ref})) {
719 $oldheads{$ref} = $oldrev;
722 $newrev ne $oldrev or next;
723 push(@changes, [$oldrev, $newrev, $ref]);
725 return 0 unless @changes;
726 open STDIN
, '<', '/dev/null';
727 request_throttle
("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
729 my ($old, $new, $ref, $ran_mail_sh) = @_;
730 my ($ind, $ran_git) = ref_indicator
($proj->{path
}, $old, $new);
731 statmsg
"ref-$type $username $name ($ref: @{[substr($old,0,$abbrev)]}$ind@{[substr($new,0,$abbrev)]})";
732 sleep 1 if $ran_mail_sh || $ran_git;
735 Girocco
::Notify
::ref_changes
($proj, $user, $statproc, \
%oldheads, @changes);
737 &$statproc(@
$_) foreach @changes;
744 my ($pid, $classname, $text) = split(/\s+/, $arg);
745 $pid =~ /^\d+/ or return 0; # invalid pid
747 $pid > 0 or return 0; # invalid pid
748 kill(0, $pid) || $!{EPERM
} or return 0; # no such process
749 Throttle
::GetClassInfo
($classname) or return 0; # no such throttle class
750 defined($text) && $text ne '' or return 0; # no text no service
758 pipe($waker, $wakew) or die "pipe failed: $!";
759 select((select($wakew),$|=1)[0]);
761 $SIG{'TERM'} = sub {$throttled = 1; syswrite($wakew, '!')};
762 $SIG{'USR1'} = sub {$proceed = 1; syswrite($wakew, '!')};
763 $SIG{'USR2'} = sub {$error = 1; syswrite($wakew, '!')};
764 $SIG{'PIPE'} = sub {$controldead = 1; syswrite($wakew, '!')};
765 select((select(STDIN
),$|=1)[0]);
767 logmsg
"throttle $pid $classname $text request";
768 # After writing we can expect a SIGTERM or SIGUSR1
769 print $pipewrite "\nthrottle $$ $classname $text\n";
771 # NOTE: the only way to detect the socket close is to read all the
772 # data until EOF is reached -- recv can be used to peek.
774 vec($v, fileno(STDIN
), 1) = 1;
775 vec($v, fileno($waker), 1) = 1;
776 setnonblock
(\
*STDIN
);
778 until ($controldead || $throttled || $proceed || $error || $suppdead) {
780 select($r=$v, undef, $e=$v, 30);
781 my ($bytes, $discard);
782 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
783 do {$bytes = sysread(STDIN
, $discard, 4096)} while (defined($bytes) && $bytes > 0);
784 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN
};
785 print $pipewrite "\nkeepalive $$\n";
789 if ($throttled && !$suppdead) {
790 print STDIN
"throttled\n";
791 logmsg
"throttle $pid $classname $text throttled";
792 } elsif ($proceed && !$suppdead) {
793 print STDIN
"proceed\n";
794 logmsg
"throttle $pid $classname $text proceed";
795 $SIG{'TERM'} = 'DEFAULT';
796 # Stay alive until the child dies which we detect by EOF on STDIN
797 setnonblock
(\
*STDIN
);
798 until ($controldead || $suppdead) {
800 select($r=$v, undef, $e=$v, 30);
801 my ($bytes, $discard);
802 do {$bytes = sysread($waker, $discard, 512)} while (defined($bytes) && $bytes > 0);
803 do {$bytes = sysread(STDIN
, $discard, 512)} while (defined($bytes) && $bytes > 0);
804 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN
};
805 print $pipewrite "\nkeepalive $$\n";
810 $prefix = "control" if $controldead && !$suppdead;
811 logmsg
"throttle $pid $classname $text ${prefix}died";
816 sub process_pipe_msg
{
817 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
818 if ($act eq "throttle") {
819 $pid =~ /^\d+$/ or return 0;
821 $pid > 0 or return 0; # invalid pid
822 kill(0, $pid) or return 0; # invalid pid
823 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
824 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
825 Throttle
::GetClassInfo
($cls) or kill('USR2', $pid), return 0;
826 # the AddSupplicant call could send SIGUSR1 before it returns
827 my $result = Throttle
::AddSupplicant
($pid, $cls, $text);
828 kill('USR2', $pid), return 0 if $result < 0;
829 kill('TERM', $pid), return 0 if $result > 0;
830 # $pid was added to class $cls and will receive SIGUSR1 when
831 # it's time for it to proceed
833 } elsif ($act eq "keepalive") {
834 # nothing to do although we could verify pid is valid and
835 # still in %Throttle::pids and send a SIGUSR2 if not, but
836 # really keepalive should just be ignored.
839 print STDERR
"discarding unknown pipe message \"$_[0]\"\n";
851 # Set to 1 for only syslog output (if enabled by mode)
852 # Set to 2 for only stderr output (if enabled by mode)
853 our $only = 0; # This is a hack
856 use Sys
::Syslog
qw(:DEFAULT :macros);
861 my ($fd, $data) = @_;
863 my $remaining = length($data);
865 my $bytes = POSIX
::write(
867 substr($data, $offset, $remaining),
869 next if !defined($bytes) && $!{EINTR
};
870 croak
"POSIX::write failed: $!" unless defined $bytes;
871 croak
"POSIX::write wrote 0 bytes" unless $bytes;
872 $remaining -= $bytes;
878 use POSIX
qw(STDERR_FILENO);
879 my ($self, $line) = @_;
880 $only = 0 unless defined($only);
881 writeall
(STDERR_FILENO
, $line) if $self->{'stderr'} && $only != 1;
882 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
883 return unless length($line);
884 syslog
(LOG_NOTICE
, "%s", $line) if $self->{'syslog'} && $only != 2;
888 my $class = shift || 'OStream';
890 my $syslogname = shift;
891 my $syslogfacility = shift;
892 defined($syslogfacility) or $syslogfacility = LOG_USER
;
894 $self->{'syslog'} = $mode > 0;
895 $self->{'stderr'} = $mode <= 0 || $mode > 1;
896 $self->{'lastline'} = '';
897 if ($self->{'syslog'}) {
898 # Some Sys::Syslog have a stupid default setlogsock order
899 eval {Sys
::Syslog
::setlogsock
("native"); 1;} or
900 eval {Sys
::Syslog
::setlogsock
("unix");};
901 openlog
($syslogname, "ndelay,pid", $syslogfacility)
902 or croak
"Sys::Syslog::openlog failed: $!";
904 return bless $self, $class;
907 sub BINMODE
{return 1}
908 sub FILENO
{return undef}
914 my $template = shift;
915 return $self->PRINT(sprintf $template, @_);
920 my $data = join('', $self->{'lastline'}, @_);
922 while ((my $idx = index($data, "\n", $pos)) >= 0) {
924 my $line = substr($data, $pos, $idx - $pos);
925 substr($data, $pos, $idx - $pos) = '';
927 $self->dumpline($line);
929 $self->{'lastline'} = $data;
935 $self->dumpline($self->{'lastline'})
936 if length($self->{'lastline'});
942 my ($scalar, $length, $offset) = @_;
943 $scalar = '' if !defined($scalar);
944 $length = length($scalar) if !defined($length);
945 croak
"OStream::WRITE invalid length $length"
947 $offset = 0 if !defined($offset);
948 $offset += length($scalar) if $offset < 0;
949 croak
"OStream::WRITE invalid write offset"
950 if $offset < 0 || $offset > $length;
951 my $max = length($scalar) - $offset;
952 $length = $max if $length > $max;
953 $self->PRINT(substr($scalar, $offset, $length));
965 close(DATA
) if fileno(DATA
);
967 Getopt
::Long
::Configure
('bundling');
969 my $parse_res = GetOptions
(
970 'help|?|h' => sub {pod2usage
(-verbose
=> 2, -exitval
=> 0)},
971 'quiet|q' => \
$quiet,
972 'no-quiet' => sub {$quiet = 0},
973 'progress|P' => \
$progress,
974 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
975 'idle-timeout|t=i' => \
$idle_timeout,
976 'syslog|s:s' => \
$sfac,
977 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
978 'stderr' => \
$stderr,
979 'abbrev=i' => \
$abbrev,
980 'show-fast-forward-info' => \
$showff,
981 'no-show-fast-forward-info' => sub {$showff = 0},
982 'status-interval=i' => \
$stiv,
983 'idle-status-interval=i' => \
$idiv,
985 $syslog = 1 if defined($sfac);
986 $progress = 1 unless $quiet;
987 $abbrev = 128 unless $abbrev > 0;
988 if (defined($idle_timeout)) {
989 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
990 die "--idle-timeout may not be used without --inetd" unless $inetd;
992 if (defined($stiv)) {
993 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
994 $statusintv = $stiv * 60;
996 if (defined($idiv)) {
997 die "--idle-status-interval must be a whole number" unless $idiv =~ /^\d+$/;
998 $idleintv = $idiv * 60;
1001 open STDOUT
, '>&STDERR' if $inetd;
1003 use Sys
::Syslog
qw();
1006 $sfac = "user" unless defined($sfac) && $sfac ne "";
1009 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
1011 my %badfac = map({("LOG_$_" => 1)}
1012 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
1013 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
1014 die "invalid syslog facility: $ofac"
1015 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
1016 tie
*STDERR
, 'OStream', $mode, $progname, $facility or die "tie failed";
1019 open STDOUT
, '>', '/dev/null';
1027 open Server
, '<&=0' or die "open: $!";
1028 my $sockname = getsockname Server
;
1029 die "getsockname: $!" unless $sockname;
1030 die "socket already connected! must be 'wait' socket" if getpeername Server
;
1031 die "getpeername: $!" unless $!{ENOTCONN
};
1032 my $st = getsockopt Server
, SOL_SOCKET
, SO_TYPE
;
1033 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
1034 my $socktype = unpack('i', $st);
1035 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM
;
1036 die "AF_UNIX socket required" unless sockaddr_family
($sockname) == AF_UNIX
;
1037 $NAME = unpack_sockaddr_un
$sockname;
1038 my $expected = $Girocco::Config
::chroot.'/etc/taskd.socket';
1039 warn "listening on \"$NAME\" but expected \"$expected\"" unless $NAME eq $expected;
1040 my $mode = (stat($NAME))[2];
1041 die "stat: $!" unless $mode;
1043 if (($mode & 0660) != 0660) {
1044 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
1047 $NAME = $Girocco::Config
::chroot.'/etc/taskd.socket';
1048 my $uaddr = sockaddr_un
($NAME);
1050 socket(Server
, PF_UNIX
, SOCK_STREAM
, 0) or die "socket failed: $!";
1052 bind(Server
, $uaddr) or die "bind failed: $!";
1053 listen(Server
, SOMAXCONN
) or die "listen failed: $!";
1054 chmod 0666, $NAME or die "chmod failed: $!";
1057 foreach my $throttle (@Girocco::Config
::throttle_classes
, @throttle_defaults) {
1058 my $classname = $throttle->{"name"};
1060 Throttle
::GetClassInfo
($classname, $throttle);
1064 return $_[0] <= $_[1] ?
$_[0] : $_[1];
1067 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1068 setnonblock
($piperead);
1069 select((select($pipewrite), $|=1)[0]);
1071 my $fdset_both = '';
1072 vec($fdset_both, fileno($piperead), 1) = 1;
1073 my $fdset_pipe = $fdset_both;
1074 vec($fdset_both, fileno(Server
), 1) = 1;
1077 my $penaltytime = $t;
1078 my $nextwakeup = $t + 60;
1079 my $nextstatus = undef;
1080 $nextstatus = $t + $statusintv if $statusintv;
1081 statmsg
"listening on $NAME";
1083 my ($rout, $eout, $nfound);
1087 my $adjustpenalty = sub {
1088 if ($penaltytime < $now) {
1089 my $credit = $now - $penaltytime;
1090 $penalty = $penalty > $credit ?
$penalty - $credit : 0;
1091 $penaltytime = $now;
1094 if (defined($nextstatus) && $now >= $nextstatus) {
1095 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1096 my $statmsg = "STATUS: $children active";
1101 foreach my $cls (sort(Throttle
::GetClassList
())) {
1102 my $inf = Throttle
::GetClassInfo
($cls);
1103 if ($inf->{'total'}) {
1104 $cnt += $inf->{'total'};
1105 push(@stats, substr(lc($cls),0,1)."=".
1106 $inf->{'total'}.'/'.$inf->{'active'});
1109 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1110 $statmsg .= " (".join(" ",@stats).")" if @stats;
1111 foreach (Throttle
::GetRunningPids
()) {
1112 my ($cls, $ts, $desc) = Throttle
::GetPidInfo
($_);
1114 push(@running, "[${cls}::$desc] ".duration
($now-$ts));
1118 $statmsg .= ", idle " . duration
($idlesecs)
1119 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1121 statmsg
"STATUS: currently running: ".join(", ", @running)
1123 $idlestatus = $now if !$children;
1125 $nextstatus += $statusintv while $nextstatus <= $now;
1127 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1128 $wait = _min
($wait, (Throttle
::ServiceQueue
()||60));
1129 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1131 if ($penalty <= $maxspawn) {
1132 $fdset = $fdset_both;
1134 $fdset = $fdset_pipe;
1135 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1137 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1138 logmsg
("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR
} || $!{EAGAIN
};
1140 Throttle
::RemoveSupplicant
($reaped) while ($reaped = shift(@reapedpids));
1142 &$adjustpenalty; # this prevents banking credits for elapsed time
1143 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1144 statmsg
"idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1147 } while $nfound < 1;
1148 my $reout = $rout | $eout;
1149 if (vec($reout, fileno($piperead), 1)) {{
1153 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1154 while (!defined($bytes) && $!{EINTR
});
1155 last if !defined($bytes) && $!{EAGAIN
};
1156 die "sysread failed: $!" unless defined $bytes;
1157 # since we always keep a copy of $pipewrite open EOF is fatal
1158 die "sysread returned EOF on pipe read" unless $bytes;
1159 $nloff = index($pipebuff, "\n", 0);
1160 if ($nloff < 0 && length($pipebuff) >= 512) {
1162 print STDERR
"discarding 512 bytes of control pipe data with no \\n found\n";
1164 redo unless $nloff >= 0;
1166 last unless $nloff >= 0;
1168 my $msg = substr($pipebuff, 0, $nloff);
1169 substr($pipebuff, 0, $nloff + 1) = '';
1170 $nloff = index($pipebuff, "\n", 0);
1171 process_pipe_msg
($msg) if length($msg);
1172 } while $nloff >= 0;
1175 next unless vec($reout, fileno(Server
), 1);
1176 unless (accept(Client
, Server
)) {
1177 logmsg
"accept failed: $!" unless $!{EINTR
};
1180 logmsg
"connection on $NAME";
1184 $inp = <STDIN
> if defined($inp) && $inp eq "\n";
1185 chomp $inp if defined($inp);
1186 $inp or exit 0; # ignore empty connects
1187 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1188 defined($arg) or $arg = '';
1189 if ($cmd eq 'ref-changes') {
1191 } elsif ($cmd eq 'clone') {
1193 } elsif ($cmd eq 'ref-change') {
1195 } elsif ($cmd eq 'throttle') {
1198 die "ignoring unknown command: $cmd\n";
1214 taskd.pl - Perform Girocco service tasks
1221 -h | --help detailed instructions
1222 -q | --quiet run quietly
1223 --no-quiet do not run quietly
1224 -P | --progress show occasional status updates
1225 -i | --inetd run as inetd unix stream wait service
1226 implies --quiet --syslog
1227 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1229 -s | --syslog[=facility] send messages to syslog instead of
1230 stderr but see --stderr
1232 --no-syslog do not send message to syslog
1233 --stderr always send messages to stderr too
1234 --abbrev=n abbreviate hashes to n (default is 8)
1235 --show-fast-forward-info show fast-forward info (default is on)
1236 --no-show-fast-forward-info disable showing fast-forward info
1237 --status-interval=MINUTES status update interval (default 1)
1238 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1246 Print the full description of taskd.pl's options.
1250 Suppress non-error messages, e.g. for use when running this task as an inetd
1251 service. Enabled by default by --inetd.
1255 Enable non-error messages. When running in --inetd mode these messages are
1256 sent to STDERR instead of STDOUT.
1260 Show information about the current status of the task operation occasionally.
1261 This is automatically enabled if --quiet is not given.
1265 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1266 stream socket ready to have accept called on it. To be useful, the unix socket
1267 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1268 will be issued if the socket is not in the expected location. Socket file
1269 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1270 die. The --inetd option also enables the --quiet and --syslog options but
1271 --no-quiet and --no-syslog may be used to alter that.
1273 The correct specification for the inetd socket is a "unix" protocol "stream"
1274 socket in "wait" mode with user and group writable permissions (0660). An
1275 attempt will be made to alter the socket's file mode if needed and if that
1276 cannot be accomplished taskd.pl will die.
1278 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1279 in wait mode and will die if the passed in socket is already connected.
1281 Note that while *BSD's inetd happily supports unix sockets (and so does
1282 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1283 However, systemd does seem to.
1285 =item B<--idle-timeout=SECONDS>
1287 Only permitted when running in --inetd mode. After SECONDS of inactivity
1288 (i.e. all outstanding tasks have completed and no new requests have come in)
1289 exit normally. The default is no timeout at all (a SECONDS value of 0).
1290 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1292 =item B<--syslog[=facility]>
1294 Normally error output is sent to STDERR. With this option it's sent to
1295 syslog instead. Note that when running in --inetd mode non-error output is
1296 also affected by this option as it's sent to STDERR in that case. If
1297 not specified, the default for facility is LOG_USER. Facility names are
1298 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1299 with the LOG_NOTICE priority.
1301 =item B<--no-syslog>
1303 Send error message output to STDERR but not syslog.
1307 Always send error message output to STDERR. If --syslog is in effect then
1308 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1313 Abbreviate displayed hash values to only the first n hexadecimal characters.
1314 The default is 8 characters. Set to 0 for no abbreviation at all.
1316 =item B<--show-fast-forward-info>
1318 Instead of showing ' -> ' in ref-change/ref-notify update messages, show either
1319 '..' for a fast-forward, creation or deletion or '...' for non-fast-forward.
1320 This requires running an extra git command for each ref update that is not a
1321 creation or deletion in order to determine whether or not it's a fast forward.
1323 =item B<--no-show-fast-forward-info>
1325 Disable showing of fast-forward information for ref-change/ref-notify update
1326 messages. Instead just show a ' -> ' indicator.
1328 =item B<--status-interval=MINUTES>
1330 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1331 status updates are shown at each MINUTES interval. Setting the interval to 0
1332 disables them entirely even with --progress.
1334 =item B<--idle-status-interval=IDLEMINUTES>
1336 Two consecutive "idle" status updates with no intervening activity will not be
1337 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1338 Setting the interval to 0 prevents any consecutive idle updates (with no
1339 activity between them) from appearing at all.
1345 taskd.pl is Girocco's service request servant; it listens for service requests
1346 such as new clone requests and ref update notifications and spawns a task to
1347 perform the requested action.