taskd.pl: eliminate main loop sleep 1 delay
[girocco.git] / taskd / taskd.pl
blob39b0b1c093966512077546bd6ee9fc0791fd49e6
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX ":sys_wait_h";
40 use File::Basename;
42 use lib dirname($0);
43 use Girocco::Config;
44 use Girocco::Notify;
45 use Girocco::Project;
46 use Girocco::User;
47 use Girocco::Util qw(noFatalsToBrowser);
48 BEGIN {noFatalsToBrowser}
50 # Throttle Classes Defaults
51 # Note that any same-named classes in @Girocco::Config::throttle_classes
52 # will override (completely replacing the entire hash) these ones.
53 my @throttle_defaults = (
55 name => "ref-change",
56 maxproc => 0,
57 maxjobs => 1,
58 interval => 1
61 name => "clone",
62 maxproc => 0,
63 maxjobs => 2,
64 interval => 5
67 name => "snapshot",
68 #maxproc => max(5, cpucount + maxjobs), # this is the default
69 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
70 interval => 5
74 # Options
75 my $quiet;
76 my $progress;
77 my $syslog;
78 my $stderr;
79 my $inetd;
80 my $idle_timeout;
81 my $abbrev = 8;
82 my $statusintv = 60;
83 my $idleintv = 3600;
84 my $maxspawn = 8;
86 $| = 1;
88 my $progname = basename($0);
89 my $children = 0;
90 my $idlestart = time;
91 my $idlestatus = 0;
93 sub cpucount {
94 use Girocco::Util "online_cpus";
95 our $online_cpus_result;
96 $online_cpus_result = online_cpus unless $online_cpus_result;
97 return $online_cpus_result;
100 sub logmsg {
101 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
102 if (tied *STDOUT) {
103 $OStream::only = 2; # STDERR only
104 print "$hdr@_\n";
105 $OStream::only = 1; # syslog only
106 print "@_\n";
107 $OStream::only = 0; # back to default
108 } else {
109 print "$hdr@_\n";
113 sub statmsg {
114 return unless $progress;
115 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
116 if (tied *STDERR) {
117 $OStream::only = 2; # STDERR only
118 print STDERR "$hdr@_\n";
119 $OStream::only = 1; # syslog only
120 print STDERR "@_\n";
121 $OStream::only = 0; # back to default
122 } else {
123 print STDERR "$hdr@_\n";
127 sub duration {
128 my $secs = shift;
129 return $secs unless defined($secs) && $secs >= 0;
130 $secs = int($secs);
131 my $ans = ($secs % 60) . 's';
132 return $ans if $secs < 60;
133 $secs = int($secs / 60);
134 $ans = ($secs % 60) . 'm' . $ans;
135 return $ans if $secs < 60;
136 $secs = int($secs / 60);
137 $ans = ($secs % 24) . 'h' . $ans;
138 return $ans if $secs < 24;
139 $secs = int($secs / 24);
140 return $secs . 'd' . $ans;
143 sub setnonblock {
144 my $fd = shift;
145 my $flags = fcntl($fd, F_GETFL, 0);
146 defined($flags) or die "fcntl failed: $!";
147 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
150 sub setblock {
151 my $fd = shift;
152 my $flags = fcntl($fd, F_GETFL, 0);
153 defined($flags) or die "fcntl failed: $!";
154 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
157 package Throttle;
160 ## Throttle protocol
162 ## 1) Process needing throttle services acquire a control file descriptor
163 ## a) Either as a result of a fork + exec (the write end of a pipe)
164 ## b) Or by connecting to the taskd socket (not yet implemented)
166 ## 2) The process requesting throttle services will be referred to
167 ## as the supplicant or just "supp" for short.
169 ## 3) The supp first completes any needed setup which may include
170 ## gathering data it needs to perform the action -- if that fails
171 ## then there's no need for any throttling.
173 ## 4) The supp writes a throttle request to the control descriptor in
174 ## this format:
175 ## throttle <pid> <class>\n
176 ## for example if the supp's pid was 1234 and it was requesting throttle
177 ## control as a member of the mail class it would write this message:
178 ## throttle 1234 mail\n
179 ## Note that if the control descriptor happens to be a pipe rather than a
180 ## socket, the message should be preceded by another "\n" just be be safe.
181 ## If the control descriptor is a socket, not a pipe, the message may be
182 ## preceded by a "\n" but that's not recommended.
184 ## 5) For supplicants with a control descriptor that is a pipe
185 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
186 ## If the control descriptor is a socket (getsockname succeeds) then
187 ## protocol (5b) should be used.
189 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
190 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
191 ## to write a "keepalive\n" message to the control descriptor. If that
192 ## fails, the controller has gone away and it may make its own decision
193 ## whether or not to proceed at that point. If, on the other hand, it
194 ## receives a SIGTERM, the process limit for its class has been reached
195 ## and it should abort without performing its action. If it receives
196 ## SIGUSR1, it may proceed without writing anything more to the control
197 ## descriptor, any MAY even close the control descriptor. Finally, a
198 ## SIGUSR2 indicates rejection of the throttle request for some other reason
199 ## such as unrecognized class name or invalid pid in which case the supp may
200 ## make its own decision how to proceed.
202 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
203 ## more than 512 bytes and if a '\n' does not appear within that number of
204 ## bytes the read should be considered failed. Otherwise the read should
205 ## be retried until either a full line has been read or the socket is
206 ## closed from the other end. If the lone read is "proceed\n" then it may
207 ## proceed without reading or writing anything more to the control
208 ## descriptor, but MUST keep the control descriptor open and not call
209 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
210 ## should be retried) constitutes failure. If a full line starting with at
211 ## least one alpha character was read but it was not "proceed" then it
212 ## should abort without performing its action. For any other failure it
213 ## may make its own decision whether or not to proceed as the controller has
214 ## gone away.
216 ## 6) The supp now performs its throttled action.
218 ## 7) The supp now closes its control descriptor (if it hasn't already in the
219 ## case of (5a)) and exits -- in the case of a socket, the other end receives
220 ## notification that the socket has been closed (read EOF). In the case of
221 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
222 ## of the other end of the pipe, so it will not reaach EOF by the supp's
223 ## exit in that case).
226 # keys are class names, values are hash refs with these fields:
227 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
228 # many may be queued waiting plus how many may be
229 # concurrently active) with 0 meaning no limit.
230 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
231 # of 0 is unlimited but the number of concurrent
232 # supplicants will always be limited to no more than
233 # the 'maxproc' value (if > 0) no matter what the
234 # 'maxjobs' value is.
235 # 'total' -> integer; the total number of pids belonging to this clase that
236 # can currently be found in %pid.
237 # 'active' -> integer; the number of currently active supplicants which should
238 # be the same as (the number of elements of %pid with a
239 # matching class name) - (number of my class in @queue).
240 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
241 # or SIGUSR1 signals to members of this class.
242 # 'lastqueue' -> time; last time a supplicant was successfully queued.
243 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
244 # 'lastthrottle' => time; last time a supplicant was throttled
245 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
246 my %classes = ();
248 # keys are pid numbers, values are array refs with these elements:
249 # [0] => name of class (key to classes hash)
250 # [1] => supplicant state (0 => queued, non-zero => time it started running)
251 # [2] => descriptive text (e.g. project name)
252 my %pid = ();
254 # minimum number of seconds between any two proceed responses no matter what
255 # class. this takes priority in that it can effectively increase the
256 # class's 'interval' value by delaying proceed notifications if the minimum
257 # interval has not yet elapsed.
258 my $interval = 1;
260 # fifo of pids awaiting notification as soon as the next $interval elapses
261 # provided interval and maxjobs requirements are satisfied
262 # for the class of the pid that will next be triggered.
263 my @queue = ();
265 # time of most recent successful call to AddSupplicant
266 my $lastqueue = 0;
268 # time of most recent proceed notification
269 my $lastproceed = 0;
271 # time of most recent throttle
272 my $lastthrottle = 0;
274 # time of most recent removal
275 my $lastdied = 0;
277 # lifetime count of how many have been queued
278 my $totalqueue = 0;
280 # lifetime count of how many have been allowed to proceed
281 my $totalproceed = 0;
283 # lifetime count of how many have been throttled
284 my $totalthrottle = 0;
286 # lifetime count of how many have died
287 # It should always be true that $totalqueued - $totaldied == $curentlyactive
288 my $totaldied = 0;
290 # Returns an unordered list of currently registered class names
291 sub GetClassList {
292 return keys(%classes);
295 sub _max {
296 return $_[0] if $_[0] >= $_[1];
297 return $_[1];
300 sub _getnum {
301 my ($min, $val, $default) = @_;
302 my $ans;
303 if (defined($val) && $val =~ /^[+-]?\d+$/) {
304 $ans = 0 + $val;
305 } else {
306 $ans = &$default;
308 return _max($min, $ans);
311 # [0] => name of class to find
312 # [1] => if true, create class if it doesn't exist, if a hashref then
313 # it contains initial values for maxproc, maxjobs and interval.
314 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
315 # defaults to the max(5, number of cpu cores + maxjobs) and interval
316 # defaults to 1.
317 # Returns a hash ref with info about the class on success
318 sub GetClassInfo {
319 my ($classname, $init) = @_;
320 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
321 or return;
322 $classname = lc($classname);
323 my %info;
324 if ($classes{$classname}) {
325 %info = %{$classes{$classname}};
326 return \%info;
328 return unless $init;
329 my %newclass = ();
330 ref($init) eq 'HASH' or $init = {};
331 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
332 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
333 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
334 $newclass{'total'} = 0;
335 $newclass{'active'} = 0;
336 $newclass{'lastqueue'} = 0;
337 $newclass{'lastproceed'} = 0;
338 $newclass{'lastthrottle'} = 0;
339 $newclass{'lastdied'} = 0;
340 $classes{$classname} = \%newclass;
341 %info = %newclass;
342 return \%info;
345 # [0] => pid to look up
346 # Returns () if not found otherwise ($classname, $timestarted, $description)
347 # Where $timestarted will be 0 if it's still queued otherwise a time() value
348 sub GetPidInfo {
349 my $pid = shift;
350 return () unless exists $pid{$pid};
351 return @{$pid{$pid}};
354 # Returns array of pid numbers that are currently running sorted
355 # by time started (oldest to newest). Can return an empty array.
356 sub GetRunningPids {
357 return sort({ ${$pid{$a}}[1] <=> ${$pid{$b}}[1] }
358 grep({ ${$pid{$_}}[1] } keys(%pid)));
361 # Returns a hash with various about the current state
362 # 'interval' => global minimum interval between proceeds
363 # 'active' => how many pids are currently queued + how many are running
364 # 'queue' => how many pids are currently queued
365 # 'lastqueue' => time (epoch seconds) of last queue
366 # 'lastproceed' => time (epoch seconds) of last proceed
367 # 'lastthrottle' => time (epoch seconds) of last throttle
368 # 'lastdied' => time (epoch seconds) of last removal
369 # 'totalqueue' => lifetime total number of processes queued
370 # 'totalproceed' => lifetime total number of processes proceeded
371 # 'totalthrottle' => lifetime total number of processes throttled
372 # 'totaldied' => lifetime total number of removed processes
373 sub GetInfo {
374 return {
375 interval => $interval,
376 active => scalar(keys(%pid)) - scalar(@queue),
377 queue => scalar(@queue),
378 lastqueue => $lastqueue,
379 lastproceed => $lastproceed,
380 lastthrottle => $lastthrottle,
381 lastdied => $lastdied,
382 totalqueue => $totalqueue,
383 totalproceed => $totalproceed,
384 totalthrottle => $totalthrottle,
385 totaldied => $totaldied
389 # with no args get the global interval
390 # with one arg set it, returns previous value if set
391 sub Interval {
392 my $ans = $interval;
393 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
394 return $ans;
397 sub RemoveSupplicant;
399 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
400 # Returns minimum interval until next proceed is possible
401 # Returns undef if there's nothing waiting to proceed or
402 # the 'maxjobs' limits have been reached for all queued items (in which
403 # case it won't be possible to proceed until one of them exits, hence undef)
404 # This is called automatially by AddSupplicant and RemoveSupplicant
405 sub ServiceQueue {
406 RETRY:
407 return undef unless @queue; # if there's nothing queued, nothing to do
408 my $now = time;
409 my $min = _max(0, $interval - ($now - $lastproceed));
410 my $classmin = undef;
411 my $classchecked = 0;
412 my %seenclass = ();
413 my $classcount = scalar(keys(%classes));
414 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
415 my $pid = $queue[$i];
416 my $procinfo = $pid{$pid};
417 if (!$procinfo) {
418 RemoveSupplicant($pid, 1);
419 goto RETRY;
421 my $classinfo = $classes{$$procinfo[0]};
422 if (!$classinfo) {
423 RemoveSupplicant($pid, 1);
424 goto RETRY;
426 if (!$seenclass{$$procinfo[0]}) {
427 $seenclass{$$procinfo[0]} = 1;
428 ++$classchecked;
429 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
430 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
431 if (!$cmin && !$min) {
432 $now = time;
433 $$procinfo[1] = $now;
434 splice(@queue, $i, 1);
435 ++$totalproceed;
436 $lastproceed = $now;
437 $classinfo->{'lastproceed'} = $now;
438 ++$classinfo->{'active'};
439 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
440 goto RETRY;
442 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
446 return defined($classmin) ? _max($min, $classmin) : undef;
449 # $1 => pid to add (must not already be in %pids)
450 # $2 => class name (must exist)
451 # Returns -1 if no such class or pid already present or invalid
452 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
453 # Return 1 if throttled and cannot be added
454 sub AddSupplicant {
455 my ($pid, $classname, $text, $noservice) = @_;
456 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
457 $pid += 0;
458 kill(0, $pid) or return -1;
459 my $classinfo = $classes{$classname};
460 return -1 unless $classinfo;
461 return -1 if $pid{$pid};
462 $text = '' unless defined($text);
463 my $now = time;
464 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
465 ++$totalthrottle;
466 $lastthrottle = $now;
467 $classinfo->{'lastthrottle'} = $now;
468 return 1;
470 ++$totalqueue;
471 $lastqueue = $now;
472 $pid{$pid} = [$classname, 0, $text];
473 ++$classinfo->{'total'};
474 $classinfo->{'lastqueue'} = $now;
475 push(@queue, $pid);
476 ServiceQueue unless $noservice;
477 return 0;
480 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
481 # Returns 0 if removed
482 # Returns -1 if unknown pid or other error during removal
483 sub RemoveSupplicant {
484 my ($pid, $noservice) = @_;
485 return -1 unless defined($pid) && $pid =~ /^\d+$/;
486 $pid += 0;
487 my $pidinfo = $pid{$pid};
488 $pidinfo or return -1;
489 my $now = time;
490 $lastdied = $now;
491 ++$totaldied;
492 delete $pid{$pid};
493 if (!$$pidinfo[1]) {
494 for (my $i=0; $i<=$#queue; ++$i) {
495 if ($queue[$i] == $pid) {
496 splice(@queue, $i, 1);
497 --$i;
501 my $classinfo = $classes{$$pidinfo[0]};
502 ServiceQueue, return -1 unless $classinfo;
503 --$classinfo->{'active'} if $$pidinfo[1];
504 --$classinfo->{'total'};
505 $classinfo->{'lastdied'} = $now;
506 ServiceQueue unless $noservice;
507 return 0;
510 # Instance Methods
512 package main;
515 ## ---------
516 ## Functions
517 ## ---------
520 my @reapedpids = ();
521 my %signame = (
522 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
523 1 => 'SIGHUP',
524 2 => 'SIGINT',
525 3 => 'SIGQUIT',
526 6 => 'SIGABRT',
527 9 => 'SIGKILL',
528 14 => 'SIGALRM',
529 15 => 'SIGTERM',
531 sub REAPER {
532 local $!;
533 my $child;
534 my $waitedpid;
535 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
536 my $code = $? & 0xffff;
537 $idlestart = time if !--$children;
538 my $codemsg = '';
539 if (!($code & 0xff)) {
540 $codemsg = " with exit code ".($code >> 8) if $code;
541 } elsif ($code & 0x7f) {
542 my $signum = ($code & 0x7f);
543 $codemsg = " with signal ".
544 ($signame{$signum}?$signame{$signum}:$signum);
546 logmsg "reaped $waitedpid$codemsg";
547 push(@reapedpids, $waitedpid);
549 $SIG{CHLD} = \&REAPER; # loathe sysV
552 $SIG{CHLD} = \&REAPER; # Apollo 440
554 my ($piperead, $pipewrite);
555 sub spawn {
556 my $coderef = shift;
558 my $pid = fork;
559 if (not defined $pid) {
560 logmsg "cannot fork: $!";
561 return;
562 } elsif ($pid) {
563 $idlestart = time if !++$children;
564 $idlestatus = 0;
565 logmsg "begat $pid";
566 return; # I'm the parent
569 close(Server) unless fileno(Server) == 0;
570 close($piperead);
571 $SIG{CHLD} = 'DEFAULT';
573 open STDIN, "+<&Client" or die "can't dup client to stdin";
574 close(Client);
575 exit &$coderef();
578 # returns:
579 # < 0: error
580 # = 0: proceed
581 # > 0: throttled
582 sub request_throttle {
583 use POSIX "pause";
584 my $classname = shift;
585 my $text = shift;
587 Throttle::GetClassInfo($classname)
588 or return -1; # no such throttle class
590 my $throttled = 0;
591 my $proceed = 0;
592 my $error = 0;
593 my $controldead = 0;
594 $SIG{'TERM'} = sub {$throttled = 1};
595 $SIG{'USR1'} = sub {$proceed = 1};
596 $SIG{'USR2'} = sub {$error = 1};
597 $SIG{'PIPE'} = sub {$controldead = 1};
598 $SIG{'ALRM'} = sub {};
600 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
601 print $pipewrite "\nthrottle $$ $classname $text\n";
602 until ($controldead || $throttled || $proceed || $error) {
603 alarm(30);
604 pause;
605 alarm(0);
606 print $pipewrite "\nkeepalive $$\n";
608 $SIG{'TERM'} = "DEFAULT";
609 $SIG{'USR1'} = "DEFAULT";
610 $SIG{'USR2'} = "DEFAULT";
611 $SIG{'ALRM'} = "DEFAULT";
612 $SIG{'PIPE'} = "DEFAULT";
614 my $result = -1;
615 if ($throttled) {
616 $result = 1;
617 } elsif ($proceed) {
618 $result = 0;
620 return $result;
623 sub clone {
624 my ($name) = @_;
625 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
626 my $proj = Girocco::Project->load($name);
627 $proj or die "failed to load project $name";
628 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
629 $proj->{clone_logged} and die "project $name is already being cloned";
630 request_throttle("clone", $name) <= 0 or die "cloning $name aborted (throttled)";
631 statmsg "cloning $name";
632 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
633 open STDERR, ">&STDOUT";
634 open STDIN, '<', '/dev/null';
635 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
638 sub ref_change {
639 my ($arg) = @_;
640 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
641 $username && $name && $oldrev && $newrev && $ref or return 0;
642 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
643 $newrev ne $oldrev or return 0;
645 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
646 my $proj = Girocco::Project->load($name);
647 $proj or die "failed to load project $name";
649 my $user;
650 if ($username && $username !~ /^%.*%$/) {
651 Girocco::User::does_exist($username, 1) or die "no such user: $username";
652 $user = Girocco::User->load($username);
653 $user or die "failed to load user $username";
654 } elsif ($username eq "%$name%") {
655 $username = "-";
658 request_throttle("ref-change", $name) <= 0 or die "ref-change $name aborted (throttled)";
659 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
660 open STDIN, '<', '/dev/null';
661 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
662 return 0;
665 sub ref_changes {
666 my ($arg) = @_;
667 my ($username, $name) = split(/\s+/, $arg);
668 $username && $name or return 0;
670 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
671 my $proj = Girocco::Project->load($name);
672 $proj or die "failed to load project $name";
674 my $user;
675 if ($username && $username !~ /^%.*%$/) {
676 Girocco::User::does_exist($username, 1) or die "no such user: $username";
677 $user = Girocco::User->load($username);
678 $user or die "failed to load user $username";
679 } elsif ($username eq "%$name%") {
680 $username = "-";
683 my @changes = ();
684 while (my $change = <STDIN>) {
685 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
686 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
687 $newrev ne $oldrev or return 0;
688 push(@changes, [$oldrev, $newrev, $ref]);
690 return 0 unless @changes;
691 open STDIN, '<', '/dev/null';
692 request_throttle("ref-change", $name) <= 0 or die "ref-changes $name aborted (throttled)";
693 foreach my $change (@changes) {
694 my ($oldrev, $newrev, $ref) = @$change;
695 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
696 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
697 sleep 1;
699 return 0;
702 sub throttle {
703 my ($arg) = @_;
704 my ($pid, $classname, $text) = split(/\s+/, $arg);
705 $pid =~ /^\d+/ or return 0; # invalid pid
706 $pid += 0;
707 $pid > 0 or return 0; # invalid pid
708 kill(0, $pid) || $!{EPERM} or return 0; # no such process
709 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
710 defined($text) && $text ne '' or return 0; # no text no service
712 my $throttled = 0;
713 my $proceed = 0;
714 my $error = 0;
715 my $controldead = 0;
716 my $suppdead = 0;
717 $SIG{'TERM'} = sub {$throttled = 1};
718 $SIG{'USR1'} = sub {$proceed = 1};
719 $SIG{'USR2'} = sub {$error = 1};
720 $SIG{'PIPE'} = sub {$controldead = 1};
721 select((select(STDIN),$|=1)[0]);
723 logmsg "throttle $pid $classname $text request";
724 # After writing we can expect a SIGTERM or SIGUSR1
725 print $pipewrite "\nthrottle $$ $classname $text\n";
727 # NOTE: the only way to detect the socket close is to read all the
728 # data until EOF is reached -- recv can be used to peek.
729 my $v = '';
730 vec($v, fileno(STDIN), 1) = 1;
731 setnonblock(\*STDIN);
732 until ($controldead || $throttled || $proceed || $error || $suppdead) {
733 my ($r, $e);
734 select($r=$v, undef, $e=$v, 30);
735 my ($bytes, $discard);
736 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
737 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
738 print $pipewrite "\nkeepalive $$\n";
740 setblock(\*STDIN);
742 if ($throttled && !$suppdead) {
743 print STDIN "throttled\n";
744 logmsg "throttle $pid $classname $text throttled";
745 } elsif ($proceed && !$suppdead) {
746 print STDIN "proceed\n";
747 logmsg "throttle $pid $classname $text proceed";
748 $SIG{'TERM'} = 'DEFAULT';
749 # Stay alive until the child dies which we detect by EOF on STDIN
750 setnonblock(\*STDIN);
751 until ($controldead || $suppdead) {
752 my ($r, $e);
753 select($r=$v, undef, $e=$v, 30);
754 my ($bytes, $discard);
755 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
756 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
757 print $pipewrite "\nkeepalive $$\n";
759 setblock(\*STDIN);
760 } else {
761 my $prefix = '';
762 $prefix = "control" if $controldead && !$suppdead;
763 logmsg "throttle $pid $classname $text ${prefix}died";
765 exit 0;
768 sub process_pipe_msg {
769 my ($act, $pid, $cls, $text) = split(/\s+/, $_[0]);
770 if ($act eq "throttle") {
771 $pid =~ /^\d+$/ or return 0;
772 $pid += 0;
773 $pid > 0 or return 0; # invalid pid
774 kill(0, $pid) or return 0; # invalid pid
775 defined($cls) && $cls ne "" or kill('USR2', $pid), return 0;
776 defined($text) && $text ne "" or kill('USR2', $pid), return 0;
777 Throttle::GetClassInfo($cls) or kill('USR2', $pid), return 0;
778 # the AddSupplicant call could send SIGUSR1 before it returns
779 my $result = Throttle::AddSupplicant($pid, $cls, $text);
780 kill('USR2', $pid), return 0 if $result < 0;
781 kill('TERM', $pid), return 0 if $result > 0;
782 # $pid was added to class $cls and will receive SIGUSR1 when
783 # it's time for it to proceed
784 return 0;
785 } elsif ($act eq "keepalive") {
786 # nothing to do although we could verify pid is valid and
787 # still in %Throttle::pids and send a SIGUSR2 if not, but
788 # really keepalive should just be ignored.
789 return 0;
791 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
792 return 0;
796 ## -------
797 ## OStream
798 ## -------
801 package OStream;
803 # Set to 1 for only syslog output (if enabled by mode)
804 # Set to 2 for only stderr output (if enabled by mode)
805 our $only = 0; # This is a hack
807 use Carp 'croak';
808 use Sys::Syslog qw(:DEFAULT :macros);
810 sub writeall {
811 use POSIX qw();
812 use Errno;
813 my ($fd, $data) = @_;
814 my $offset = 0;
815 my $remaining = length($data);
816 while ($remaining) {
817 my $bytes = POSIX::write(
818 $fd,
819 substr($data, $offset, $remaining),
820 $remaining);
821 next if !defined($bytes) && $!{EINTR};
822 croak "POSIX::write failed: $!" unless defined $bytes;
823 croak "POSIX::write wrote 0 bytes" unless $bytes;
824 $remaining -= $bytes;
825 $offset += $bytes;
829 sub dumpline {
830 use POSIX qw(STDERR_FILENO);
831 my ($self, $line) = @_;
832 $only = 0 unless defined($only);
833 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
834 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
835 return unless length($line);
836 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
839 sub TIEHANDLE {
840 my $class = shift || 'OStream';
841 my $mode = shift;
842 my $syslogname = shift;
843 my $syslogfacility = shift;
844 defined($syslogfacility) or $syslogfacility = LOG_USER;
845 my $self = {};
846 $self->{'syslog'} = $mode > 0;
847 $self->{'stderr'} = $mode <= 0 || $mode > 1;
848 $self->{'lastline'} = '';
849 if ($self->{'syslog'}) {
850 # Some Sys::Syslog have a stupid default setlogsock order
851 eval {Sys::Syslog::setlogsock("native"); 1;} or
852 eval {Sys::Syslog::setlogsock("unix");};
853 openlog($syslogname, "ndelay,pid", $syslogfacility)
854 or croak "Sys::Syslog::openlog failed: $!";
856 return bless $self, $class;
859 sub BINMODE {return 1}
860 sub FILENO {return undef}
861 sub EOF {return 0}
862 sub CLOSE {return 1}
864 sub PRINTF {
865 my $self = shift;
866 my $template = shift;
867 return $self->PRINT(sprintf $template, @_);
870 sub PRINT {
871 my $self = shift;
872 my $data = join('', $self->{'lastline'}, @_);
873 my $pos = 0;
874 while ((my $idx = index($data, "\n", $pos)) >= 0) {
875 ++$idx;
876 my $line = substr($data, $pos, $idx - $pos);
877 substr($data, $pos, $idx - $pos) = '';
878 $pos = $idx;
879 $self->dumpline($line);
881 $self->{'lastline'} = $data;
882 return 1;
885 sub DESTROY {
886 my $self = shift;
887 $self->dumpline($self->{'lastline'})
888 if length($self->{'lastline'});
889 closelog;
892 sub WRITE {
893 my $self = shift;
894 my ($scalar, $length, $offset) = @_;
895 $scalar = '' if !defined($scalar);
896 $length = length($scalar) if !defined($length);
897 croak "OStream::WRITE invalid length $length"
898 if $length < 0;
899 $offset = 0 if !defined($offset);
900 $offset += length($scalar) if $offset < 0;
901 croak "OStream::WRITE invalid write offset"
902 if $offset < 0 || $offset > $length;
903 my $max = length($scalar) - $offset;
904 $length = $max if $length > $max;
905 $self->PRINT(substr($scalar, $offset, $length));
906 return $length;
910 ## ----
911 ## main
912 ## ----
915 package main;
917 close(DATA) if fileno(DATA);
918 my $sfac;
919 Getopt::Long::Configure('bundling');
920 my ($stiv, $idiv);
921 my $parse_res = GetOptions(
922 'help|?|h' => sub {pod2usage(-verbose => 2, -exitval => 0)},
923 'quiet|q' => \$quiet,
924 'no-quiet' => sub {$quiet = 0},
925 'progress|P' => \$progress,
926 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
927 'idle-timeout|t=i' => \$idle_timeout,
928 'syslog|s:s' => \$sfac,
929 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
930 'stderr' => \$stderr,
931 'abbrev=i' => \$abbrev,
932 'status-interval=i' => \$stiv,
933 'idle-status-interval=i' => \$idiv,
934 ) || pod2usage(2);
935 $syslog = 1 if defined($sfac);
936 $progress = 1 unless $quiet;
937 $abbrev = 128 unless $abbrev > 0;
938 if (defined($idle_timeout)) {
939 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
940 die "--idle-timeout may not be used without --inetd" unless $inetd;
942 if (defined($stiv)) {
943 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
944 $statusintv = $stiv * 60;
946 if (defined($idiv)) {
947 die "--idle-status-interval must be a whole number" unless $idiv =~ /^\d+$/;
948 $idleintv = $idiv * 60;
951 open STDOUT, '>&STDERR' if $inetd;
952 if ($syslog) {
953 use Sys::Syslog qw();
954 my $mode = 1;
955 ++$mode if $stderr;
956 $sfac = "user" unless defined($sfac) && $sfac ne "";
957 my $ofac = $sfac;
958 $sfac = uc($sfac);
959 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
960 my $facility;
961 my %badfac = map({("LOG_$_" => 1)}
962 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
963 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
964 die "invalid syslog facility: $ofac"
965 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
966 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
968 if ($quiet) {
969 open STDOUT, '>', '/dev/null';
970 } elsif ($inetd) {
971 *STDOUT = *STDERR;
974 my $NAME;
976 if ($inetd) {
977 open Server, '<&=0' or die "open: $!";
978 my $sockname = getsockname Server;
979 die "getsockname: $!" unless $sockname;
980 die "socket already connected! must be 'wait' socket" if getpeername Server;
981 die "getpeername: $!" unless $!{ENOTCONN};
982 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
983 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
984 my $socktype = unpack('i', $st);
985 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM;
986 die "AF_UNIX socket required" unless sockaddr_family($sockname) == AF_UNIX;
987 $NAME = unpack_sockaddr_un $sockname;
988 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
989 warn "listening on \"$NAME\" but expected \"$expected\"" unless $NAME eq $expected;
990 my $mode = (stat($NAME))[2];
991 die "stat: $!" unless $mode;
992 $mode &= 07777;
993 if (($mode & 0660) != 0660) {
994 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\" failed: $!";
996 } else {
997 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
998 my $uaddr = sockaddr_un($NAME);
1000 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket failed: $!";
1001 unlink($NAME);
1002 bind(Server, $uaddr) or die "bind failed: $!";
1003 listen(Server, SOMAXCONN) or die "listen failed: $!";
1004 chmod 0666, $NAME or die "chmod failed: $!";
1007 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
1008 my $classname = $throttle->{"name"};
1009 $classname or next;
1010 Throttle::GetClassInfo($classname, $throttle);
1013 sub _min {
1014 return $_[0] <= $_[1] ? $_[0] : $_[1];
1017 pipe($piperead, $pipewrite) or die "pipe failed: $!";
1018 setnonblock($piperead);
1019 select((select($pipewrite), $|=1)[0]);
1020 my $pipebuff = '';
1021 my $fdset_both = '';
1022 vec($fdset_both, fileno($piperead), 1) = 1;
1023 my $fdset_pipe = $fdset_both;
1024 vec($fdset_both, fileno(Server), 1) = 1;
1025 my $penalty = 0;
1026 my $t = time;
1027 my $penaltytime = $t;
1028 my $nextwakeup = $t + 60;
1029 my $nextstatus = undef;
1030 $nextstatus = $t + $statusintv if $statusintv;
1031 statmsg "listening on $NAME";
1032 while (1) {
1033 my ($rout, $eout, $nfound);
1034 do {
1035 my $wait;
1036 my $now = time;
1037 my $adjustpenalty = sub {
1038 if ($penaltytime < $now) {
1039 my $credit = $now - $penaltytime;
1040 $penalty = $penalty > $credit ? $penalty - $credit : 0;
1041 $penaltytime = $now;
1044 if (defined($nextstatus) && $now >= $nextstatus) {
1045 unless ($idlestatus && !$children && (!$idleintv || $now - $idlestatus < $idleintv)) {
1046 my $statmsg = "STATUS: $children active";
1047 my @running = ();
1048 if ($children) {
1049 my @stats = ();
1050 my $cnt = 0;
1051 foreach my $cls (sort(Throttle::GetClassList())) {
1052 my $inf = Throttle::GetClassInfo($cls);
1053 if ($inf->{'total'}) {
1054 $cnt += $inf->{'total'};
1055 push(@stats, substr(lc($cls),0,1)."=".
1056 $inf->{'total'}.'/'.$inf->{'active'});
1059 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1060 $statmsg .= " (".join(" ",@stats).")" if @stats;
1061 foreach (Throttle::GetRunningPids()) {
1062 my ($cls, $ts, $desc) = Throttle::GetPidInfo($_);
1063 next unless $ts;
1064 push(@running, "[${cls}::$desc] ".duration($now-$ts));
1067 my $idlesecs;
1068 $statmsg .= ", idle " . duration($idlesecs)
1069 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1070 statmsg $statmsg;
1071 statmsg "STATUS: currently running: ".join(", ", @running)
1072 if @running;
1073 $idlestatus = $now if !$children;
1075 $nextstatus += $statusintv while $nextstatus <= $now;
1077 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1078 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1079 &$adjustpenalty; # this prevents ignoring accept when we shouldn't
1080 my $fdset;
1081 if ($penalty <= $maxspawn) {
1082 $fdset = $fdset_both;
1083 } else {
1084 $fdset = $fdset_pipe;
1085 $wait = $penalty - $maxspawn if $wait > $penalty - $maxspawn;
1087 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1088 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1089 my $reaped;
1090 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1091 $now = time;
1092 &$adjustpenalty; # this prevents banking credits for elapsed time
1093 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1094 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1095 exit 0;
1097 } while $nfound < 1;
1098 my $reout = $rout | $eout;
1099 if (vec($reout, fileno($piperead), 1)) {{
1100 my $nloff = -1;
1102 my $bytes;
1103 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1104 while (!defined($bytes) && $!{EINTR});
1105 last if !defined($bytes) && $!{EAGAIN};
1106 die "sysread failed: $!" unless defined $bytes;
1107 # since we always keep a copy of $pipewrite open EOF is fatal
1108 die "sysread returned EOF on pipe read" unless $bytes;
1109 $nloff = index($pipebuff, "\n", 0);
1110 if ($nloff < 0 && length($pipebuff) >= 512) {
1111 $pipebuff = '';
1112 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1114 redo unless $nloff >= 0;
1116 last unless $nloff >= 0;
1117 do {
1118 my $msg = substr($pipebuff, 0, $nloff);
1119 substr($pipebuff, 0, $nloff + 1) = '';
1120 $nloff = index($pipebuff, "\n", 0);
1121 process_pipe_msg($msg) if length($msg);
1122 } while $nloff >= 0;
1123 redo;
1125 next unless vec($reout, fileno(Server), 1);
1126 unless (accept(Client, Server)) {
1127 logmsg "accept failed: $!" unless $!{EINTR};
1128 next;
1130 logmsg "connection on $NAME";
1131 ++$penalty;
1132 spawn sub {
1133 my $inp = <STDIN>;
1134 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1135 chomp $inp if defined($inp);
1136 $inp or exit 0; # ignore empty connects
1137 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1138 defined($arg) or $arg = '';
1139 if ($cmd eq 'ref-changes') {
1140 ref_changes($arg);
1141 } elsif ($cmd eq 'clone') {
1142 clone($arg);
1143 } elsif ($cmd eq 'ref-change') {
1144 ref_change($arg);
1145 } elsif ($cmd eq 'throttle') {
1146 throttle($arg);
1147 } else {
1148 die "ignoring unknown command: $cmd\n";
1151 close Client;
1155 ## -------------
1156 ## Documentation
1157 ## -------------
1160 __END__
1162 =head1 NAME
1164 taskd.pl - Perform Girocco service tasks
1166 =head1 SYNOPSIS
1168 taskd.pl [options]
1170 Options:
1171 -h | --help detailed instructions
1172 -q | --quiet run quietly
1173 --no-quiet do not run quietly
1174 -P | --progress show occasional status updates
1175 -i | --inetd run as inetd unix stream wait service
1176 implies --quiet --syslog
1177 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1178 requires --inetd
1179 -s | --syslog[=facility] send messages to syslog instead of
1180 stderr but see --stderr
1181 enabled by --inetd
1182 --no-syslog do not send message to syslog
1183 --stderr always send messages to stderr too
1184 --abbrev=n abbreviate hashes to n (default is 8)
1185 --status-interval=MINUTES status update interval (default 1)
1186 --idle-status-interval=IDLEMINUTES idle status interval (default 60)
1188 =head1 OPTIONS
1190 =over 8
1192 =item B<--help>
1194 Print the full description of taskd.pl's options.
1196 =item B<--quiet>
1198 Suppress non-error messages, e.g. for use when running this task as an inetd
1199 service. Enabled by default by --inetd.
1201 =item B<--no-quiet>
1203 Enable non-error messages. When running in --inetd mode these messages are
1204 sent to STDERR instead of STDOUT.
1206 =item B<--progress>
1208 Show information about the current status of the task operation occasionally.
1209 This is automatically enabled if --quiet is not given.
1211 =item B<--inetd>
1213 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1214 stream socket ready to have accept called on it. To be useful, the unix socket
1215 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1216 will be issued if the socket is not in the expected location. Socket file
1217 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1218 die. The --inetd option also enables the --quiet and --syslog options but
1219 --no-quiet and --no-syslog may be used to alter that.
1221 The correct specification for the inetd socket is a "unix" protocol "stream"
1222 socket in "wait" mode with user and group writable permissions (0660). An
1223 attempt will be made to alter the socket's file mode if needed and if that
1224 cannot be accomplished taskd.pl will die.
1226 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1227 in wait mode and will die if the passed in socket is already connected.
1229 Note that while *BSD's inetd happily supports unix sockets (and so does
1230 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1231 However, systemd does seem to.
1233 =item B<--idle-timeout=SECONDS>
1235 Only permitted when running in --inetd mode. After SECONDS of inactivity
1236 (i.e. all outstanding tasks have completed and no new requests have come in)
1237 exit normally. The default is no timeout at all (a SECONDS value of 0).
1238 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1240 =item B<--syslog[=facility]>
1242 Normally error output is sent to STDERR. With this option it's sent to
1243 syslog instead. Note that when running in --inetd mode non-error output is
1244 also affected by this option as it's sent to STDERR in that case. If
1245 not specified, the default for facility is LOG_USER. Facility names are
1246 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1247 with the LOG_NOTICE priority.
1249 =item B<--no-syslog>
1251 Send error message output to STDERR but not syslog.
1253 =item B<--stderr>
1255 Always send error message output to STDERR. If --syslog is in effect then
1256 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1257 messages as well.
1259 =item B<--abbrev=n>
1261 Abbreviate displayed hash values to only the first n hexadecimal characters.
1262 The default is 8 characters. Set to 0 for no abbreviation at all.
1264 =item B<--status-interval=MINUTES>
1266 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1267 status updates are shown at each MINUTES interval. Setting the interval to 0
1268 disables them entirely even with --progress.
1270 =item B<--idle-status-interval=IDLEMINUTES>
1272 Two consecutive "idle" status updates with no intervening activity will not be
1273 shown unless IDLEMINUTES have elapsed between them. The default is 60 minutes.
1274 Setting the interval to 0 prevents any consecutive idle updates (with no
1275 activity between them) from appearing at all.
1277 =back
1279 =head1 DESCRIPTION
1281 taskd.pl is Girocco's service request servant; it listens for service requests
1282 such as new clone requests and ref update notifications and spawns a task to
1283 perform the requested action.
1285 =cut