3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
39 use POSIX
":sys_wait_h";
47 use Girocco
::Util
qw(noFatalsToBrowser);
48 BEGIN {noFatalsToBrowser
}
50 # Throttle Classes Defaults
51 # Note that any same-named classes in @Girocco::Config::throttle_classes
52 # will override (completely replacing the entire hash) these ones.
53 my @throttle_defaults = (
68 #maxproc => max(5, cpucount + maxjobs), # this is the default
69 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
88 my $progname = basename
($0);
94 use Girocco
::Util
"online_cpus";
95 our $online_cpus_result;
96 $online_cpus_result = online_cpus
unless $online_cpus_result;
97 return $online_cpus_result;
101 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
103 $OStream::only
= 2; # STDERR only
105 $OStream::only
= 1; # syslog only
107 $OStream::only
= 0; # back to default
114 return unless $progress;
115 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
117 $OStream::only
= 2; # STDERR only
118 print STDERR
"$hdr@_\n";
119 $OStream::only
= 1; # syslog only
121 $OStream::only
= 0; # back to default
123 print STDERR
"$hdr@_\n";
129 return $secs unless defined($secs) && $secs >= 0;
131 my $ans = ($secs % 60) . 's';
132 return $ans if $secs < 60;
133 $secs = int($secs / 60);
134 $ans = ($secs % 60) . 'm' . $ans;
135 return $ans if $secs < 60;
136 $secs = int($secs / 60);
137 $ans = ($secs % 24) . 'h' . $ans;
138 return $ans if $secs < 24;
139 $secs = int($secs / 24);
140 return $secs . 'd' . $ans;
145 my $flags = fcntl($fd, F_GETFL
, 0);
146 defined($flags) or die "fcntl failed: $!";
147 fcntl($fd, F_SETFL
, $flags | O_NONBLOCK
) or die "fcntl failed: $!";
152 my $flags = fcntl($fd, F_GETFL
, 0);
153 defined($flags) or die "fcntl failed: $!";
154 fcntl($fd, F_SETFL
, $flags & ~O_NONBLOCK
) or die "fcntl failed: $!";
162 ## 1) Process needing throttle services acquire a control file descriptor
163 ## a) Either as a result of a fork + exec (the write end of a pipe)
164 ## b) Or by connecting to the taskd socket (not yet implemented)
166 ## 2) The process requesting throttle services will be referred to
167 ## as the supplicant or just "supp" for short.
169 ## 3) The supp first completes any needed setup which may include
170 ## gathering data it needs to perform the action -- if that fails
171 ## then there's no need for any throttling.
173 ## 4) The supp writes a throttle request to the control descriptor in
175 ## throttle <pid> <class>\n
176 ## for example if the supp's pid was 1234 and it was requesting throttle
177 ## control as a member of the mail class it would write this message:
178 ## throttle 1234 mail\n
179 ## Note that if the control descriptor happens to be a pipe rather than a
180 ## socket, the message should be preceded by another "\n" just be be safe.
181 ## If the control descriptor is a socket, not a pipe, the message may be
182 ## preceded by a "\n" but that's not recommended.
184 ## 5) For supplicants with a control descriptor that is a pipe
185 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
186 ## If the control descriptor is a socket (getsockname succeeds) then
187 ## protocol (5b) should be used.
189 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
190 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
191 ## to write a "keepalive\n" message to the control descriptor. If that
192 ## fails, the controller has gone away and it may make its own decision
193 ## whether or not to proceed at that point. If, on the other hand, it
194 ## receives a SIGTERM, the process limit for its class has been reached
195 ## and it should abort without performing its action. If it receives
196 ## SIGUSR1, it may proceed without writing anything more to the control
197 ## descriptor, any MAY even close the control descriptor. Finally, a
198 ## SIGUSR2 indicates rejection of the throttle request for some other reason
199 ## such as unrecognized class name or invalid pid in which case the supp may
200 ## make its own decision how to proceed.
202 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
203 ## more than 512 bytes and if a '\n' does not appear within that number of
204 ## bytes the read should be considered failed. Otherwise the read should
205 ## be retried until either a full line has been read or the socket is
206 ## closed from the other end. If the lone read is "proceed\n" then it may
207 ## proceed without reading or writing anything more to the control
208 ## descriptor, but MUST keep the control descriptor open and not call
209 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
210 ## should be retried) constitutes failure. If a full line starting with at
211 ## least one alpha character was read but it was not "proceed" then it
212 ## should abort without performing its action. For any other failure it
213 ## may make its own decision whether or not to proceed as the controller has
216 ## 6) The supp now performs its throttled action.
218 ## 7) The supp now closes its control descriptor (if it hasn't already in the
219 ## case of (5a)) and exits -- in the case of a socket, the other end receives
220 ## notification that the socket has been closed (read EOF). In the case of
221 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
222 ## of the other end of the pipe, so it will not reaach EOF by the supp's
223 ## exit in that case).
226 # keys are class names, values are hash refs with these fields:
227 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
228 # many may be queued waiting plus how many may be
229 # concurrently active) with 0 meaning no limit.
230 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
231 # of 0 is unlimited but the number of concurrent
232 # supplicants will always be limited to no more than
233 # the 'maxproc' value (if > 0) no matter what the
234 # 'maxjobs' value is.
235 # 'total' -> integer; the total number of pids belonging to this clase that
236 # can currently be found in %pid.
237 # 'active' -> integer; the number of currently active supplicants which should
238 # be the same as (the number of elements of %pid with a
239 # matching class name) - (number of my class in @queue).
240 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
241 # or SIGUSR1 signals to members of this class.
242 # 'lastqueue' -> time; last time a supplicant was successfully queued.
243 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
244 # 'lastthrottle' => time; last time a supplicant was throttled
245 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
248 # keys are pid numbers, values are array refs with these elements:
249 # [0] => name of class (key to classes hash)
250 # [1] => supplicant state (0 => queued, non-zero => time it started running)
251 # [2] => descriptive text (e.g. project name)
254 # minimum number of seconds between any two proceed responses no matter what
255 # class. this takes priority in that it can effectively increase the
256 # class's 'interval' value by delaying proceed notifications if the minimum
257 # interval has not yet elapsed.
260 # fifo of pids awaiting notification as soon as the next $interval elapses
261 # provided interval and maxjobs requirements are satisfied
262 # for the class of the pid that will next be triggered.
265 # time of most recent successful call to AddSupplicant
268 # time of most recent proceed notification
271 # time of most recent throttle
272 my $lastthrottle = 0;
274 # time of most recent removal
277 # lifetime count of how many have been queued
280 # lifetime count of how many have been allowed to proceed
281 my $totalproceed = 0;
283 # lifetime count of how many have been throttled
284 my $totalthrottle = 0;
286 # lifetime count of how many have died
287 # It should always be true that $totalqueued - $totaldied == $curentlyactive
290 # Returns an unordered list of currently registered class names
292 return keys(%classes);
296 return $_[0] if $_[0] >= $_[1];
301 my ($min, $val, $default) = @_;
303 if (defined($val) && $val =~ /^[+-]?\d+$/) {
308 return _max
($min, $ans);
311 # [0] => name of class to find
312 # [1] => if true, create class if it doesn't exist, if a hashref then
313 # it contains initial values for maxproc, maxjobs and interval.
314 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
315 # defaults to the max(5, number of cpu cores + maxjobs) and interval
317 # Returns a hash ref with info about the class on success
319 my ($classname, $init) = @_;
320 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
322 $classname = lc($classname);
324 if ($classes{$classname}) {
325 %info = %{$classes{$classname}};
330 ref($init) eq 'HASH' or $init = {};
331 $newclass{'maxjobs'} = _getnum
(0, $init->{'maxjobs'}, sub{_max
(1, int(::cpucount
() / 4))});
332 $newclass{'maxproc'} = _getnum
(0, $init->{'maxproc'}, sub{_max
(5, ::cpucount
() + $newclass{'maxjobs'})});
333 $newclass{'interval'} = _getnum
(0, $init->{'interval'}, sub{1});
334 $newclass{'total'} = 0;
335 $newclass{'active'} = 0;
336 $newclass{'lastqueue'} = 0;
337 $newclass{'lastproceed'} = 0;
338 $newclass{'lastthrottle'} = 0;
339 $newclass{'lastdied'} = 0;
340 $classes{$classname} = \
%newclass;
345 # [0] => pid to look up
346 # Returns () if not found otherwise ($classname, $timestarted, $description)
347 # Where $timestarted will be 0 if it's still queued otherwise a time() value
350 return () unless exists $pid{$pid};
351 return @
{$pid{$pid}};
354 # Returns array of pid numbers that are currently running sorted
355 # by time started (oldest to newest). Can return an empty array.
357 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
358 grep({ ${$pid{$_}}[1] } keys(%pid)));
361 # Returns a hash with various about the current state
362 # 'interval' => global minimum interval between proceeds
363 # 'active' => how many pids are currently queued + how many are running
364 # 'queue' => how many pids are currently queued
365 # 'lastqueue' => time (epoch seconds) of last queue
366 # 'lastproceed' => time (epoch seconds) of last proceed
367 # 'lastthrottle' => time (epoch seconds) of last throttle
368 # 'lastdied' => time (epoch seconds) of last removal
369 # 'totalqueue' => lifetime total number of processes queued
370 # 'totalproceed' => lifetime total number of processes proceeded
371 # 'totalthrottle' => lifetime total number of processes throttled
372 # 'totaldied' => lifetime total number of removed processes
375 interval
=> $interval,
376 active
=> scalar(keys(%pid)) - scalar(@queue),
377 queue
=> scalar(@queue),
378 lastqueue
=> $lastqueue,
379 lastproceed
=> $lastproceed,
380 lastthrottle
=> $lastthrottle,
381 lastdied
=> $lastdied,
382 totalqueue
=> $totalqueue,
383 totalproceed
=> $totalproceed,
384 totalthrottle
=> $totalthrottle,
385 totaldied
=> $totaldied
389 # with no args get the global interval
390 # with one arg set it, returns previous value if set
393 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
397 sub RemoveSupplicant
;
399 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
400 # Returns minimum interval until next proceed is possible
401 # Returns undef if there's nothing waiting to proceed or
402 # the 'maxjobs' limits have been reached for all queued items (in which
403 # case it won't be possible to proceed until one of them exits, hence undef)
404 # This is called automatially by AddSupplicant and RemoveSupplicant
407 return undef unless @queue; # if there's nothing queued, nothing to do
409 my $min = _max
(0, $interval - ($now - $lastproceed));
410 my $classmin = undef;
411 my $classchecked = 0;
413 my $classcount = scalar(keys(%classes));
414 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
415 my $pid = $queue[$i];
416 my $procinfo = $pid{$pid};
418 RemoveSupplicant
($pid, 1);
421 my $classinfo = $classes{$$procinfo[0]};
423 RemoveSupplicant
($pid, 1);
426 if (!$seenclass{$$procinfo[0]}) {
427 $seenclass{$$procinfo[0]} = 1;
429 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
430 my $cmin = _max
(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
431 if (!$cmin && !$min) {
433 $$procinfo[1] = $now;
434 splice(@queue, $i, 1);
437 $classinfo->{'lastproceed'} = $now;
438 ++$classinfo->{'active'};
439 kill("USR1", $pid) or RemoveSupplicant
($pid, 1);
442 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
446 return defined($classmin) ? _max
($min, $classmin) : undef;
449 # $1 => pid to add (must not already be in %pids)
450 # $2 => class name (must exist)
451 # Returns -1 if no such class or pid already present or invalid
452 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
453 # Return 1 if throttled and cannot be added
455 my ($pid, $classname, $text, $noservice) = @_;
456 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
458 kill(0, $pid) or return -1;
459 my $classinfo = $classes{$classname};
460 return -1 unless $classinfo;
461 return -1 if $pid{$pid};
462 $text = '' unless defined($text);
464 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
466 $lastthrottle = $now;
467 $classinfo->{'lastthrottle'} = $now;
472 $pid{$pid} = [$classname, 0, $text];
473 ++$classinfo->{'total'};
474 $classinfo->{'lastqueue'} = $now;
476 ServiceQueue
unless $noservice;
480 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
481 # Returns 0 if removed
482 # Returns -1 if unknown pid or other error during removal
483 sub RemoveSupplicant
{
484 my ($pid, $noservice) = @_;
485 return -1 unless defined($pid) && $pid =~ /^\d+$/;
487 my $pidinfo = $pid{$pid};
488 $pidinfo or return -1;
494 for (my $i=0; $i<=$#queue; ++$i) {
495 if ($queue[$i] == $pid) {
496 splice(@queue, $i, 1);
501 my $classinfo = $classes{$$pidinfo[0]};
502 ServiceQueue
, return -1 unless $classinfo;
503 --$classinfo->{'active'} if $$pidinfo[1];
504 --$classinfo->{'total'};
505 $classinfo->{'lastdied'} = $now;
506 ServiceQueue
unless $noservice;
522 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
535 while (($waitedpid = waitpid(-1, WNOHANG
)) > 0) {
536 my $code = $?
& 0xffff;
537 $idlestart = time if !--$children;
539 if (!($code & 0xff)) {
540 $codemsg = " with exit code ".($code >> 8) if $code;
541 } elsif ($code & 0x7f) {
542 my $signum = ($code & 0x7f);
543 $codemsg = " with signal ".
544 ($signame{$signum}?
$signame{$signum}:$signum);
546 logmsg
"reaped $waitedpid$codemsg";
547 push(@reapedpids, $waitedpid);
549 $SIG{CHLD
} = \
&REAPER
; # loathe sysV
552 $SIG{CHLD
} = \
&REAPER
; # Apollo 440
554 my ($piperead, $pipewrite);
559 if (not defined $pid) {
560 logmsg
"cannot fork: $!";
563 $idlestart = time if !++$children;
566 return; # I'm the parent
569 close(Server
) unless fileno(Server
) == 0;
571 $SIG{CHLD
} = 'DEFAULT';
573 open STDIN
, "+<&Client" or die "can't dup client to stdin";
582 sub request_throttle
{
584 my $classname = shift;
587 Throttle
::GetClassInfo
($classname)
588 or return -1; # no such throttle class
594 $SIG{'TERM'} = sub {$throttled = 1};
595 $SIG{'USR1'} = sub {$proceed = 1};
596 $SIG{'USR2'} = sub {$error = 1};
597 $SIG{'PIPE'} = sub {$controldead = 1};
598 $SIG{'ALRM'} = sub {};
600 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
601 print $pipewrite "\nthrottle $$ $classname $text\n";
602 until ($controldead || $throttled || $proceed || $error) {
606 print $pipewrite "\nkeepalive $$\n";
608 $SIG{'TERM'} = "DEFAULT";
609 $SIG{'USR1'} = "DEFAULT";
610 $SIG{'USR2'} = "DEFAULT";
611 $SIG{'ALRM'} = "DEFAULT";
612 $SIG{'PIPE'} = "DEFAULT";
625 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
626 my $proj = Girocco
::Project
->load($name);
627 $proj or die "failed to load project $name";
628 $proj->{clone_in_progress
} or die "project $name is not marked for cloning";
629 $proj->{clone_logged
} and die "project $name is already being cloned";
630 request_throttle
("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
631 statmsg
"cloning $name";
632 open STDOUT
, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
633 open STDERR
, ">&STDOUT";
634 open STDIN
, '<', '/dev/null';
635 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
640 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
641 $username && $name && $oldrev && $newrev && $ref or return 0;
642 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
643 $newrev ne $oldrev or return 0;
645 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
646 my $proj = Girocco
::Project
->load($name);
647 $proj or die "failed to load project $name";
650 if ($username && $username !~ /^%.*%$/) {
651 Girocco
::User
::does_exist
($username, 1) or die "no such user: $username";
652 $user = Girocco
::User
->load($username);
653 $user or die "failed to load user $username";
654 } elsif ($username eq "%$name%") {
658 request_throttle
("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
659 statmsg
"ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
660 open STDIN
, '<', '/dev/null';
661 Girocco
::Notify
::ref_change
($proj, $user, $ref, $oldrev, $newrev);
667 my ($username, $name) = split(/\s+/, $arg);
668 $username && $name or return 0;
670 Girocco
::Project
::does_exist
($name, 1) or die "no such project: $name";
671 my $proj = Girocco
::Project
->load($name);
672 $proj or die "failed to load project $name";
675 if ($username && $username !~ /^%.*%$/) {
676 Girocco
::User
::does_exist
($username, 1) or die "no such user: $username";
677 $user = Girocco
::User
->load($username);
678 $user or die "failed to load user $username";
679 } elsif ($username eq "%$name%") {
684 while (my $change = <STDIN
>) {
685 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
686 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
687 $newrev ne $oldrev or return 0;
688 push(@changes, [$oldrev, $newrev, $ref]);
690 return 0 unless @changes;
691 open STDIN
, '<', '/dev/null';
692 request_throttle
("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
693 foreach my $change (@changes) {
694 my ($oldrev, $newrev, $ref) = @
$change;
695 statmsg
"ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
696 Girocco
::Notify
::ref_change
($proj, $user, $ref, $oldrev, $newrev);
704 my ($pid, $classname, $text) = split(/\s+/, $arg);
705 $pid =~ /^\d+/ or return 0; # invalid pid
707 $pid > 0 or return 0; # invalid pid
708 kill(0, $pid) || $!{EPERM
} or return 0; # no such process
709 Throttle
::GetClassInfo
($classname) or return 0; # no such throttle class
710 defined($text) && $text ne '' or return 0; # no text no service
717 $SIG{'TERM'} = sub {$throttled = 1};
718 $SIG{'USR1'} = sub {$proceed = 1};
719 $SIG{'USR2'} = sub {$error = 1};
720 $SIG{'PIPE'} = sub {$controldead = 1};
721 select((select(STDIN
),$|=1)[0]);
723 logmsg
"throttle $pid $classname $text request";
724 # After writing we can expect a SIGTERM or SIGUSR1
725 print $pipewrite "\nthrottle $$ $classname $text\n";
727 # NOTE: the only way to detect the socket close is to read all the
728 # data until EOF is reached -- recv can be used to peek.
730 vec($v, fileno(STDIN
), 1) = 1;
731 setnonblock
(\
*STDIN
);
732 until ($controldead || $throttled || $proceed || $error || $suppdead) {
734 select($r=$v, undef, $e=$v, 30);
735 my ($bytes, $discard);
736 do {$bytes = sysread(STDIN
, $discard, 4096)} while (defined($bytes) && $bytes > 0);
737 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN
};
738 print $pipewrite "\nkeepalive $$\n";
742 if ($throttled && !$suppdead) {
743 print STDIN
"throttled\n";
744 logmsg
"throttle $pid $classname $text throttled";
745 } elsif ($proceed && !$suppdead) {
746 print STDIN
"proceed\n";
747 logmsg
"throttle $pid $classname $text proceed";
748 $SIG{'TERM'} = 'DEFAULT';
749 # Stay alive until the child dies which we detect by EOF on STDIN
750 setnonblock
(\
*STDIN
);
751 until ($controldead || $suppdead) {
753 select($r=$v, undef, $e=$v, 30);
754 my ($bytes, $discard);
755 do {$bytes = sysread(STDIN
, $discard, 512)} while (defined($bytes) && $bytes > 0);
756 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN
};
757 print $pipewrite "\nkeepalive $$\n";
762 $prefix = "control" if $controldead && !$suppdead;
763 logmsg
"throttle $pid $classname $text ${prefix}died";
768 sub process_pipe_msg
{
769 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
770 if ($act eq "throttle") {
771 $pid =~ /^\d+$/ or return 0;
773 $pid > 0 or return 0; # invalid pid
774 kill(0, $pid) or return 0; # invalid pid
775 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
776 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
777 Throttle
::GetClassInfo
($cls) or kill('USR2', $pid), return 0;
778 # the AddSupplicant call could send SIGUSR1 before it returns
779 my $result = Throttle
::AddSupplicant
($pid, $cls, $text);
780 kill('USR2', $pid), return 0 if $result < 0;
781 kill('TERM', $pid), return 0 if $result > 0;
782 # $pid was added to class $cls and will receive SIGUSR1 when
783 # it's time for it to proceed
785 } elsif ($act eq "keepalive") {
786 # nothing to do although we could verify pid is valid and
787 # still in %Throttle::pids and send a SIGUSR2 if not, but
788 # really keepalive should just be ignored.
791 print STDERR
"discarding unknown pipe message \"$_[0]\"\n";
803 # Set to 1 for only syslog output (if enabled by mode)
804 # Set to 2 for only stderr output (if enabled by mode)
805 our $only = 0; # This is a hack
808 use Sys
::Syslog
qw(:DEFAULT :macros);
813 my ($fd, $data) = @_;
815 my $remaining = length($data);
817 my $bytes = POSIX
::write(
819 substr($data, $offset, $remaining),
821 next if !defined($bytes) && $!{EINTR
};
822 croak
"POSIX::write failed: $!" unless defined $bytes;
823 croak
"POSIX::write wrote 0 bytes" unless $bytes;
824 $remaining -= $bytes;
830 use POSIX
qw(STDERR_FILENO);
831 my ($self, $line) = @_;
832 $only = 0 unless defined($only);
833 writeall
(STDERR_FILENO
, $line) if $self->{'stderr'} && $only != 1;
834 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
835 return unless length($line);
836 syslog
(LOG_NOTICE
, "%s", $line) if $self->{'syslog'} && $only != 2;
840 my $class = shift || 'OStream';
842 my $syslogname = shift;
843 my $syslogfacility = shift;
844 defined($syslogfacility) or $syslogfacility = LOG_USER
;
846 $self->{'syslog'} = $mode > 0;
847 $self->{'stderr'} = $mode <= 0 || $mode > 1;
848 $self->{'lastline'} = '';
849 if ($self->{'syslog'}) {
850 # Some Sys::Syslog have a stupid default setlogsock order
851 eval {Sys
::Syslog
::setlogsock
("native"); 1;} or
852 eval {Sys
::Syslog
::setlogsock
("unix");};
853 openlog
($syslogname, "ndelay,pid", $syslogfacility)
854 or croak
"Sys::Syslog::openlog failed: $!";
856 return bless $self, $class;
859 sub BINMODE
{return 1}
860 sub FILENO
{return undef}
866 my $template = shift;
867 return $self->PRINT(sprintf $template, @_);
872 my $data = join('', $self->{'lastline'}, @_);
874 while ((my $idx = index($data, "\n", $pos)) >= 0) {
876 my $line = substr($data, $pos, $idx - $pos);
877 substr($data, $pos, $idx - $pos) = '';
879 $self->dumpline($line);
881 $self->{'lastline'} = $data;
887 $self->dumpline($self->{'lastline'})
888 if length($self->{'lastline'});
894 my ($scalar, $length, $offset) = @_;
895 $scalar = '' if !defined($scalar);
896 $length = length($scalar) if !defined($length);
897 croak
"OStream::WRITE invalid length $length"
899 $offset = 0 if !defined($offset);
900 $offset += length($scalar) if $offset < 0;
901 croak
"OStream::WRITE invalid write offset"
902 if $offset < 0 || $offset > $length;
903 my $max = length($scalar) - $offset;
904 $length = $max if $length > $max;
905 $self->PRINT(substr($scalar, $offset, $length));
917 close(DATA
) if fileno(DATA
);
919 Getopt
::Long
::Configure
('bundling');
921 my $parse_res = GetOptions
(
922 'help|?|h' => sub {pod2usage
(-verbose
=> 2, -exitval
=> 0)},
923 'quiet|q' => \
$quiet,
924 'no-quiet' => sub {$quiet = 0},
925 'progress|P' => \
$progress,
926 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
927 'idle-timeout|t=i' => \
$idle_timeout,
928 'syslog|s:s' => \
$sfac,
929 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
930 'stderr' => \
$stderr,
931 'abbrev=i' => \
$abbrev,
932 'status-interval=i' => \
$stiv,
933 'idle-status-interval=i' => \
$idiv,
935 $syslog = 1 if defined($sfac);
936 $progress = 1 unless $quiet;
937 $abbrev = 128 unless $abbrev > 0;
938 if (defined($idle_timeout)) {
939 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
940 die "--idle-timeout may not be used without --inetd" unless $inetd;
942 if (defined($stiv)) {
943 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
944 $statusintv = $stiv * 60;
946 if (defined($idiv)) {
947 die "--idle-status-interval must be a whole number" unless $idiv =~ /^\d+$/;
948 $idleintv = $idiv * 60;
951 open STDOUT
, '>&STDERR' if $inetd;
953 use Sys
::Syslog
qw();
956 $sfac = "user" unless defined($sfac) && $sfac ne "";
959 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
961 my %badfac = map({("LOG_$_" => 1)}
962 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
963 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
964 die "invalid syslog facility: $ofac"
965 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
966 tie
*STDERR
, 'OStream', $mode, $progname, $facility or die "tie failed";
969 open STDOUT
, '>', '/dev/null';
977 open Server
, '<&=0' or die "open: $!";
978 my $sockname = getsockname Server
;
979 die "getsockname: $!" unless $sockname;
980 die "socket already connected! must be 'wait' socket" if getpeername Server
;
981 die "getpeername: $!" unless $!{ENOTCONN
};
982 my $st = getsockopt Server
, SOL_SOCKET
, SO_TYPE
;
983 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
984 my $socktype = unpack('i', $st);
985 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM
;
986 die "AF_UNIX socket required" unless sockaddr_family
($sockname) == AF_UNIX
;
987 $NAME = unpack_sockaddr_un
$sockname;
988 my $expected = $Girocco::Config
::chroot.'/etc/taskd.socket';
989 warn "listening on \"$NAME\" but expected \"$expected\"" unless $NAME eq $expected;
990 my $mode = (stat($NAME))[2];
991 die "stat: $!" unless $mode;
993 if (($mode & 0660) != 0660) {
994 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
997 $NAME = $Girocco::Config
::chroot.'/etc/taskd.socket';
998 my $uaddr = sockaddr_un
($NAME);
1000 socket(Server
, PF_UNIX
, SOCK_STREAM
, 0) or die "socket failed: $!";
1002 bind(Server
, $uaddr) or die "bind failed: $!";
1003 listen(Server
, SOMAXCONN
) or die "listen failed: $!";
1004 chmod 0666, $NAME or die "chmod failed: $!";
1007 foreach my $throttle (@Girocco::Config
::throttle_classes
, @throttle_defaults) {
1008 my $classname = $throttle->{"name"};
1010 Throttle
::GetClassInfo
($classname, $throttle);
1014 return $_[0] <= $_[1] ?
$_[0] : $_[1];
1017 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1018 setnonblock
($piperead);
1019 select((select($pipewrite), $|=1)[0]);
1021 my $fdset_both = '';
1022 vec($fdset_both, fileno($piperead), 1) = 1;
1023 my $fdset_pipe = $fdset_both;
1024 vec($fdset_both, fileno(Server
), 1) = 1;
1027 my $penaltytime = $t;
1028 my $nextwakeup = $t + 60;
1029 my $nextstatus = undef;
1030 $nextstatus = $t + $statusintv if $statusintv;
1031 statmsg
"listening on $NAME";
1033 my ($rout, $eout, $nfound);
1037 my $adjustpenalty = sub {
1038 if ($penaltytime < $now) {
1039 my $credit = $now - $penaltytime;
1040 $penalty = $penalty > $credit ?
$penalty - $credit : 0;
1041 $penaltytime = $now;
1044 if (defined($nextstatus) && $now >= $nextstatus) {
1045 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1046 my $statmsg = "STATUS: $children active";
1051 foreach my $cls (sort(Throttle
::GetClassList
())) {
1052 my $inf = Throttle
::GetClassInfo
($cls);
1053 if ($inf->{'total'}) {
1054 $cnt += $inf->{'total'};
1055 push(@stats, substr(lc($cls),0,1)."=".
1056 $inf->{'total'}.'/'.$inf->{'active'});
1059 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1060 $statmsg .= " (".join(" ",@stats).")" if @stats;
1061 foreach (Throttle
::GetRunningPids
()) {
1062 my ($cls, $ts, $desc) = Throttle
::GetPidInfo
($_);
1064 push(@running, "[${cls}::$desc] ".duration
($now-$ts));
1068 $statmsg .= ", idle " . duration
($idlesecs)
1069 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1071 statmsg
"STATUS: currently running: ".join(", ", @running)
1073 $idlestatus = $now if !$children;
1075 $nextstatus += $statusintv while $nextstatus <= $now;
1077 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1078 $wait = _min
($wait, (Throttle
::ServiceQueue
()||60));
1079 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1081 if ($penalty <= $maxspawn) {
1082 $fdset = $fdset_both;
1084 $fdset = $fdset_pipe;
1085 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1087 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1088 logmsg
("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR
} || $!{EAGAIN
};
1090 Throttle
::RemoveSupplicant
($reaped) while ($reaped = shift(@reapedpids));
1092 &$adjustpenalty; # this prevents banking credits for elapsed time
1093 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1094 statmsg
"idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1097 } while $nfound < 1;
1098 my $reout = $rout | $eout;
1099 if (vec($reout, fileno($piperead), 1)) {{
1103 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1104 while (!defined($bytes) && $!{EINTR
});
1105 last if !defined($bytes) && $!{EAGAIN
};
1106 die "sysread failed: $!" unless defined $bytes;
1107 # since we always keep a copy of $pipewrite open EOF is fatal
1108 die "sysread returned EOF on pipe read" unless $bytes;
1109 $nloff = index($pipebuff, "\n", 0);
1110 if ($nloff < 0 && length($pipebuff) >= 512) {
1112 print STDERR
"discarding 512 bytes of control pipe data with no \\n found\n";
1114 redo unless $nloff >= 0;
1116 last unless $nloff >= 0;
1118 my $msg = substr($pipebuff, 0, $nloff);
1119 substr($pipebuff, 0, $nloff + 1) = '';
1120 $nloff = index($pipebuff, "\n", 0);
1121 process_pipe_msg
($msg) if length($msg);
1122 } while $nloff >= 0;
1125 next unless vec($reout, fileno(Server
), 1);
1126 unless (accept(Client
, Server
)) {
1127 logmsg
"accept failed: $!" unless $!{EINTR
};
1130 logmsg
"connection on $NAME";
1134 $inp = <STDIN
> if defined($inp) && $inp eq "\n";
1135 chomp $inp if defined($inp);
1136 $inp or exit 0; # ignore empty connects
1137 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1138 defined($arg) or $arg = '';
1139 if ($cmd eq 'ref-changes') {
1141 } elsif ($cmd eq 'clone') {
1143 } elsif ($cmd eq 'ref-change') {
1145 } elsif ($cmd eq 'throttle') {
1148 die "ignoring unknown command: $cmd\n";
1164 taskd.pl - Perform Girocco service tasks
1171 -h | --help detailed instructions
1172 -q | --quiet run quietly
1173 --no-quiet do not run quietly
1174 -P | --progress show occasional status updates
1175 -i | --inetd run as inetd unix stream wait service
1176 implies --quiet --syslog
1177 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1179 -s | --syslog[=facility] send messages to syslog instead of
1180 stderr but see --stderr
1182 --no-syslog do not send message to syslog
1183 --stderr always send messages to stderr too
1184 --abbrev=n abbreviate hashes to n (default is 8)
1185 --status-interval=MINUTES status update interval (default 1)
1186 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1194 Print the full description of taskd.pl's options.
1198 Suppress non-error messages, e.g. for use when running this task as an inetd
1199 service. Enabled by default by --inetd.
1203 Enable non-error messages. When running in --inetd mode these messages are
1204 sent to STDERR instead of STDOUT.
1208 Show information about the current status of the task operation occasionally.
1209 This is automatically enabled if --quiet is not given.
1213 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1214 stream socket ready to have accept called on it. To be useful, the unix socket
1215 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1216 will be issued if the socket is not in the expected location. Socket file
1217 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1218 die. The --inetd option also enables the --quiet and --syslog options but
1219 --no-quiet and --no-syslog may be used to alter that.
1221 The correct specification for the inetd socket is a "unix" protocol "stream"
1222 socket in "wait" mode with user and group writable permissions (0660). An
1223 attempt will be made to alter the socket's file mode if needed and if that
1224 cannot be accomplished taskd.pl will die.
1226 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1227 in wait mode and will die if the passed in socket is already connected.
1229 Note that while *BSD's inetd happily supports unix sockets (and so does
1230 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1231 However, systemd does seem to.
1233 =item B<--idle-timeout=SECONDS>
1235 Only permitted when running in --inetd mode. After SECONDS of inactivity
1236 (i.e. all outstanding tasks have completed and no new requests have come in)
1237 exit normally. The default is no timeout at all (a SECONDS value of 0).
1238 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1240 =item B<--syslog[=facility]>
1242 Normally error output is sent to STDERR. With this option it's sent to
1243 syslog instead. Note that when running in --inetd mode non-error output is
1244 also affected by this option as it's sent to STDERR in that case. If
1245 not specified, the default for facility is LOG_USER. Facility names are
1246 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1247 with the LOG_NOTICE priority.
1249 =item B<--no-syslog>
1251 Send error message output to STDERR but not syslog.
1255 Always send error message output to STDERR. If --syslog is in effect then
1256 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1261 Abbreviate displayed hash values to only the first n hexadecimal characters.
1262 The default is 8 characters. Set to 0 for no abbreviation at all.
1264 =item B<--status-interval=MINUTES>
1266 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1267 status updates are shown at each MINUTES interval. Setting the interval to 0
1268 disables them entirely even with --progress.
1270 =item B<--idle-status-interval=IDLEMINUTES>
1272 Two consecutive "idle" status updates with no intervening activity will not be
1273 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1274 Setting the interval to 0 prevents any consecutive idle updates (with no
1275 activity between them) from appearing at all.
1281 taskd.pl is Girocco's service request servant; it listens for service requests
1282 such as new clone requests and ref update notifications and spawns a task to
1283 perform the requested action.