taskd.pl: add throttling support
[girocco/readme.git] / taskd / taskd.pl
blob7cb6908a884d44e51b5c01e8f19758bc1d209b77
1 #!/usr/bin/perl
3 # taskd - Clone repositories on request
5 # taskd is Girocco mirroring servant; it processes requests for clones
6 # of given URLs received over its socket.
8 # When a request is received, new process is spawned that sets up
9 # the repository and reports further progress
10 # to .clonelog within the repository. In case the clone fails,
11 # .clone_failed is touched and .clone_in_progress is removed.
13 # Clone protocol:
14 # Alice sets up repository and touches .cloning
15 # Alice opens connection to Bob
16 # Alice sends project name through the connection
17 # Bob opens the repository and sends error code if there is a problem
18 # Bob closes connection
19 # Alice polls .clonelog in case of success.
20 # If Alice reads "@OVER@" from .clonelog, it stops polling.
22 # Ref-change protocol:
23 # Alice opens connection to Bob
24 # Alice sends ref-change command for each changed ref
25 # Alice closes connection
26 # Bob sends out notifications
28 # Initially based on perlipc example.
30 use 5.008; # we need safe signals
31 use strict;
32 use warnings;
34 use Getopt::Long;
35 use Pod::Usage;
36 use Socket;
37 use Errno;
38 use Fcntl;
39 use POSIX ":sys_wait_h";
40 use File::Basename;
42 use lib dirname($0);
43 use Girocco::Config;
44 use Girocco::Notify;
45 use Girocco::Project;
46 use Girocco::User;
47 use Girocco::Util qw(noFatalsToBrowser);
48 BEGIN {noFatalsToBrowser}
50 # Throttle Classes Defaults
51 # Note that any same-named classes in @Girocco::Config::throttle_classes
52 # will override (completely replacing the entire hash) these ones.
53 my @throttle_defaults = (
55 name => "ref-change",
56 maxproc => 0,
57 maxjobs => 1,
58 interval => 1
61 name => "clone",
62 maxproc => 0,
63 maxjobs => 2,
64 interval => 5
67 name => "snapshot",
68 #maxproc => max(5, cpucount + maxjobs), # this is the default
69 #maxjobs => max(1, int(cpucount / 4)) , # this is the default
70 interval => 5
74 # Options
75 my $quiet;
76 my $progress;
77 my $syslog;
78 my $stderr;
79 my $inetd;
80 my $idle_timeout;
81 my $abbrev = 8;
82 my $statusintv = 60;
84 $| = 1;
86 my $progname = basename($0);
87 my $children = 0;
88 my $idlestart = time;
90 sub cpucount {
91 use Girocco::Util "online_cpus";
92 our $online_cpus_result;
93 $online_cpus_result = online_cpus unless $online_cpus_result;
94 return $online_cpus_result;
97 sub logmsg {
98 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
99 if (tied *STDOUT) {
100 $OStream::only = 2; # STDERR only
101 print "$hdr@_\n";
102 $OStream::only = 1; # syslog only
103 print "@_\n";
104 $OStream::only = 0; # back to default
105 } else {
106 print "$hdr@_\n";
110 sub statmsg {
111 return unless $progress;
112 my $hdr = "[@{[scalar localtime]}] $progname $$: ";
113 if (tied *STDERR) {
114 $OStream::only = 2; # STDERR only
115 print STDERR "$hdr@_\n";
116 $OStream::only = 1; # syslog only
117 print STDERR "@_\n";
118 $OStream::only = 0; # back to default
119 } else {
120 print STDERR "$hdr@_\n";
124 sub duration {
125 my $secs = shift;
126 return $secs unless defined($secs) && $secs >= 0;
127 $secs = int($secs);
128 my $ans = ($secs % 60) . 's';
129 return $ans if $secs < 60;
130 $secs = int($secs / 60);
131 $ans = ($secs % 60) . 'm' . $ans;
132 return $ans if $secs < 60;
133 $secs = int($secs / 60);
134 $ans = ($secs % 24) . 'h' . $ans;
135 return $ans if $secs < 24;
136 $secs = int($secs / 24);
137 return $secs . 'd' . $ans;
140 sub setnonblock {
141 my $fd = shift;
142 my $flags = fcntl($fd, F_GETFL, 0);
143 defined($flags) or die "fcntl failed: $!";
144 fcntl($fd, F_SETFL, $flags | O_NONBLOCK) or die "fcntl failed: $!";
147 sub setblock {
148 my $fd = shift;
149 my $flags = fcntl($fd, F_GETFL, 0);
150 defined($flags) or die "fcntl failed: $!";
151 fcntl($fd, F_SETFL, $flags & ~O_NONBLOCK) or die "fcntl failed: $!";
154 package Throttle;
157 ## Throttle protocol
159 ## 1) Process needing throttle services acquire a control file descriptor
160 ## a) Either as a result of a fork + exec (the write end of a pipe)
161 ## b) Or by connecting to the taskd socket (not yet implemented)
163 ## 2) The process requesting throttle services will be referred to
164 ## as the supplicant or just "supp" for short.
166 ## 3) The supp first completes any needed setup which may include
167 ## gathering data it needs to perform the action -- if that fails
168 ## then there's no need for any throttling.
170 ## 4) The supp writes a throttle request to the control descriptor in
171 ## this format:
172 ## throttle <pid> <class>\n
173 ## for example if the supp's pid was 1234 and it was requesting throttle
174 ## control as a member of the mail class it would write this message:
175 ## throttle 1234 mail\n
176 ## Note that if the control descriptor happens to be a pipe rather than a
177 ## socket, the message should be preceded by another "\n" just be be safe.
178 ## If the control descriptor is a socket, not a pipe, the message may be
179 ## preceded by a "\n" but that's not recommended.
181 ## 5) For supplicants with a control descriptor that is a pipe
182 ## (getsockopt(SO_TYPE) returns ENOTSOCK) the (5a) protocol should be used.
183 ## If the control descriptor is a socket (getsockname succeeds) then
184 ## protocol (5b) should be used.
186 ## 5a) The supp now enters a "pause" loop awaiting either a SIGUSR1, SIGUSR2 or
187 ## SIGTERM. It should wake up periodically (SIGALRM works well) and attempt
188 ## to write a "keepalive\n" message to the control descriptor. If that
189 ## fails, the controller has gone away and it may make its own decision
190 ## whether or not to proceed at that point. If, on the other hand, it
191 ## receives a SIGTERM, the process limit for its class has been reached
192 ## and it should abort without performing its action. If it receives
193 ## SIGUSR1, it may proceed without writing anything more to the control
194 ## descriptor, any MAY even close the control descriptor. Finally, a
195 ## SIGUSR2 indicates rejection of the throttle request for some other reason
196 ## such as unrecognized class name or invalid pid in which case the supp may
197 ## make its own decision how to proceed.
199 ## 5b) The supp now enters a read wait on the socket -- it need accomodate no
200 ## more than 512 bytes and if a '\n' does not appear within that number of
201 ## bytes the read should be considered failed. Otherwise the read should
202 ## be retried until either a full line has been read or the socket is
203 ## closed from the other end. If the lone read is "proceed\n" then it may
204 ## proceed without reading or writing anything more to the control
205 ## descriptor, but MUST keep the control descriptor open and not call
206 ## shutdown on it either. Any other result (except EINTR or EAGAIN which
207 ## should be retried) constitutes failure. If a full line starting with at
208 ## least one alpha character was read but it was not "proceed" then it
209 ## should abort without performing its action. For any other failure it
210 ## may make its own decision whether or not to proceed as the controller has
211 ## gone away.
213 ## 6) The supp now performs its throttled action.
215 ## 7) The supp now closes its control descriptor (if it hasn't already in the
216 ## case of (5a)) and exits -- in the case of a socket, the other end receives
217 ## notification that the socket has been closed (read EOF). In the case of
218 ## a pipe the other end receives a SIGCHLD (multiple processes have a hold
219 ## of the other end of the pipe, so it will not reaach EOF by the supp's
220 ## exit in that case).
223 # keys are class names, values are hash refs with these fields:
224 # 'maxproc' => integer; maximum number of allowed supplicants (the sum of how
225 # many may be queued waiting plus how many may be
226 # concurrently active) with 0 meaning no limit.
227 # 'maxjobs' => integer; how many supplicants may proceed simultaneously a value
228 # of 0 is unlimited but the number of concurrent
229 # supplicants will always be limited to no more than
230 # the 'maxproc' value (if > 0) no matter what the
231 # 'maxjobs' value is.
232 # 'total' -> integer; the total number of pids belonging to this clase that
233 # can currently be found in %pid.
234 # 'active' -> integer; the number of currently active supplicants which should
235 # be the same as (the number of elements of %pid with a
236 # matching class name) - (number of my class in @queue).
237 # 'interval' -> integer; minimum number of seconds between 'proceed' responses
238 # or SIGUSR1 signals to members of this class.
239 # 'lastqueue' -> time; last time a supplicant was successfully queued.
240 # 'lastproceed' => time; last time a supplicant was allowed to proceed.
241 # 'lastthrottle' => time; last time a supplicant was throttled
242 # 'lastdied' => time; last time a supplicant in this class died/exited/etc.
243 my %classes = ();
245 # keys are pid numbers, values are array refs with these elements:
246 # [0] => name of class (key to classes hash)
247 # [1] => supplicant state (0 => queued, 1 => running)
248 my %pid = ();
250 # minimum number of seconds between any two proceed responses no matter what
251 # class. this takes priority in that it can effectively increase the
252 # class's 'interval' value by delaying proceed notifications if the minimum
253 # interval has not yet elapsed.
254 my $interval = 1;
256 # fifo of pids awaiting notification as soon as the next $interval elapses
257 # provided interval and maxjobs requirements are satisfied
258 # for the class of the pid that will next be triggered.
259 my @queue = ();
261 # time of most recent successful call to AddSupplicant
262 my $lastqueue = 0;
264 # time of most recent proceed notification
265 my $lastproceed = 0;
267 # time of most recent throttle
268 my $lastthrottle = 0;
270 # time of most recent removal
271 my $lastdied = 0;
273 # lifetime count of how many have been queued
274 my $totalqueue = 0;
276 # lifetime count of how many have been allowed to proceed
277 my $totalproceed = 0;
279 # lifetime count of how many have been throttled
280 my $totalthrottle = 0;
282 # lifetime count of how many have died
283 # It should always be true that $totalqueued - $totaldied == $curentlyactive
284 my $totaldied = 0;
286 # Returns an unordered list of currently registered class names
287 sub GetClassList {
288 return keys(%classes);
291 sub _max {
292 return $_[0] if $_[0] >= $_[1];
293 return $_[1];
296 sub _getnum {
297 my ($min, $val, $default) = @_;
298 my $ans;
299 if (defined($val) && $val =~ /^[+-]?\d+$/) {
300 $ans = 0 + $val;
301 } else {
302 $ans = &$default;
304 return _max($min, $ans);
307 # [0] => name of class to find
308 # [1] => if true, create class if it doesn't exist, if a hashref then
309 # it contains initial values for maxproc, maxjobs and interval.
310 # Otherwise maxjobs defaults to max(cpu cores/4, 1), maxprocs
311 # defaults to the max(5, number of cpu cores + maxjobs) and interval
312 # defaults to 1.
313 # Returns a hash ref with info about the class on success
314 sub GetClassInfo {
315 my ($classname, $init) = @_;
316 defined($classname) && $classname =~ /^[a-zA-Z][a-zA-Z0-9._+-]*$/
317 or return;
318 $classname = lc($classname);
319 my %info;
320 if ($classes{$classname}) {
321 %info = %{$classes{$classname}};
322 return \%info;
324 return unless $init;
325 my %newclass = ();
326 ref($init) eq 'HASH' or $init = {};
327 $newclass{'maxjobs'} = _getnum(0, $init->{'maxjobs'}, sub{_max(1, int(::cpucount() / 4))});
328 $newclass{'maxproc'} = _getnum(0, $init->{'maxproc'}, sub{_max(5, ::cpucount() + $newclass{'maxjobs'})});
329 $newclass{'interval'} = _getnum(0, $init->{'interval'}, sub{1});
330 $newclass{'total'} = 0;
331 $newclass{'active'} = 0;
332 $newclass{'lastqueue'} = 0;
333 $newclass{'lastproceed'} = 0;
334 $newclass{'lastthrottle'} = 0;
335 $newclass{'lastdied'} = 0;
336 $classes{$classname} = \%newclass;
337 %info = %newclass;
338 return \%info;
341 # Returns a hash with various about the current state
342 # 'interval' => global minimum interval between proceeds
343 # 'active' => how many pids are currently queued + how many are running
344 # 'queue' => how many pids are currently queued
345 # 'lastqueue' => time (epoch seconds) of last queue
346 # 'lastproceed' => time (epoch seconds) of last proceed
347 # 'lastthrottle' => time (epoch seconds) of last throttle
348 # 'lastdied' => time (epoch seconds) of last removal
349 # 'totalqueue' => lifetime total number of processes queued
350 # 'totalproceed' => lifetime total number of processes proceeded
351 # 'totalthrottle' => lifetime total number of processes throttled
352 # 'totaldied' => lifetime total number of removed processes
353 sub GetInfo {
354 return {
355 interval => $interval,
356 active => scalar(keys(%pid)) - scalar(@queue),
357 queue => scalar(@queue),
358 lastqueue => $lastqueue,
359 lastproceed => $lastproceed,
360 lastthrottle => $lastthrottle,
361 lastdied => $lastdied,
362 totalqueue => $totalqueue,
363 totalproceed => $totalproceed,
364 totalthrottle => $totalthrottle,
365 totaldied => $totaldied
369 # with no args get the global interval
370 # with one arg set it, returns previous value if set
371 sub Interval {
372 my $ans = $interval;
373 $interval = 0 + $_[0] if defined($_[0]) && $_[0] =~ /^\d+$/;
374 return $ans;
377 sub RemoveSupplicant;
379 # Perform queue service (i.e. send SIGUSR1 to any eligible queued process)
380 # Returns minimum interval until next proceed is possible
381 # Returns undef if there's nothing waiting to proceed or
382 # the 'maxjobs' limits have been reached for all queued items (in which
383 # case it won't be possible to proceed until one of them exits, hence undef)
384 # This is called automatially by AddSupplicant and RemoveSupplicant
385 sub ServiceQueue {
386 RETRY:
387 return undef unless @queue; # if there's nothing queued, nothing to do
388 my $now = time;
389 my $min = _max(0, $interval - ($now - $lastproceed));
390 my $classmin = undef;
391 my $classchecked = 0;
392 my %seenclass = ();
393 my $classcount = scalar(keys(%classes));
394 for (my $i=0; $i <= $#queue && $classchecked < $classcount; ++$i) {
395 my $pid = $queue[$i];
396 my $procinfo = $pid{$pid};
397 if (!$procinfo) {
398 RemoveSupplicant($pid, 1);
399 goto RETRY;
401 my $classinfo = $classes{$$procinfo[0]};
402 if (!$classinfo) {
403 RemoveSupplicant($pid, 1);
404 goto RETRY;
406 if (!$seenclass{$$procinfo[0]}) {
407 $seenclass{$$procinfo[0]} = 1;
408 ++$classchecked;
409 if (!$classinfo->{'maxjobs'} || $classinfo->{'active'} < $classinfo->{'maxjobs'}) {
410 my $cmin = _max(0, $classinfo->{'interval'} - ($now - $classinfo->{'lastproceed'}));
411 if (!$cmin && !$min) {
412 $now = time;
413 $$procinfo[1] = 1;
414 splice(@queue, $i, 1);
415 ++$totalproceed;
416 $lastproceed = $now;
417 $classinfo->{'lastproceed'} = $now;
418 ++$classinfo->{'active'};
419 kill("USR1", $pid) or RemoveSupplicant($pid, 1);
420 goto RETRY;
422 $classmin = $cmin unless defined($classmin) && $classmin < $cmin;
426 return defined($classmin) ? _max($min, $classmin) : undef;
429 # $1 => pid to add (must not already be in %pids)
430 # $2 => class name (must exist)
431 # Returns -1 if no such class or pid already present or invalid
432 # Returns 0 if added successfully (and possibly already SIGUSR1'd)
433 # Return 1 if throttled and cannot be added
434 sub AddSupplicant {
435 my ($pid, $classname, $noservice) = @_;
436 return -1 unless $pid && $pid =~ /^[1-9][0-9]*$/;
437 $pid += 0;
438 kill(0, $pid) or return -1;
439 my $classinfo = $classes{$classname};
440 return -1 unless $classinfo;
441 return -1 if $pid{$pid};
442 my $now = time;
443 if ($classinfo->{'maxproc'} && $classinfo->{'total'} >= $classinfo->{'maxproc'}) {
444 ++$totalthrottle;
445 $lastthrottle = $now;
446 $classinfo->{'lastthrottle'} = $now;
447 return 1;
449 ++$totalqueue;
450 $lastqueue = $now;
451 $pid{$pid} = [$classname, 0];
452 ++$classinfo->{'total'};
453 $classinfo->{'lastqueue'} = $now;
454 push(@queue, $pid);
455 ServiceQueue unless $noservice;
456 return 0;
459 # $1 => pid to remove (died, killed, exited normally, doesn't matter)
460 # Returns 0 if removed
461 # Returns -1 if unknown pid or other error during removal
462 sub RemoveSupplicant {
463 my ($pid, $noservice) = @_;
464 return -1 unless defined($pid) && $pid =~ /^\d+$/;
465 $pid += 0;
466 my $pidinfo = $pid{$pid};
467 $pidinfo or return -1;
468 my $now = time;
469 $lastdied = $now;
470 ++$totaldied;
471 delete $pid{$pid};
472 if (!$$pidinfo[1]) {
473 for (my $i=0; $i<=$#queue; ++$i) {
474 if ($queue[$i] == $pid) {
475 splice(@queue, $i, 1);
476 --$i;
480 my $classinfo = $classes{$$pidinfo[0]};
481 ServiceQueue, return -1 unless $classinfo;
482 --$classinfo->{'active'} if $$pidinfo[1];
483 --$classinfo->{'total'};
484 $classinfo->{'lastdied'} = $now;
485 ServiceQueue unless $noservice;
486 return 0;
489 # Instance Methods
491 package main;
494 ## ---------
495 ## Functions
496 ## ---------
499 my @reapedpids = ();
500 my %signame = (
501 # http://pubs.opengroup.org/onlinepubs/000095399/utilities/trap.html
502 1 => 'SIGHUP',
503 2 => 'SIGINT',
504 3 => 'SIGQUIT',
505 6 => 'SIGABRT',
506 9 => 'SIGKILL',
507 14 => 'SIGALRM',
508 15 => 'SIGTERM',
510 sub REAPER {
511 local $!;
512 my $child;
513 my $waitedpid;
514 while (($waitedpid = waitpid(-1, WNOHANG)) > 0) {
515 my $code = $? & 0xffff;
516 $idlestart = time if !--$children;
517 my $codemsg = '';
518 if (!($code & 0xff)) {
519 $codemsg = " with exit code ".($code >> 8) if $code;
520 } elsif ($code & 0x7f) {
521 my $signum = ($code & 0x7f);
522 $codemsg = " with signal ".
523 ($signame{$signum}?$signame{$signum}:$signum);
525 logmsg "reaped $waitedpid$codemsg";
526 push(@reapedpids, $waitedpid);
528 $SIG{CHLD} = \&REAPER; # loathe sysV
531 $SIG{CHLD} = \&REAPER; # Apollo 440
533 my ($piperead, $pipewrite);
534 sub spawn {
535 my $coderef = shift;
537 my $pid = fork;
538 if (not defined $pid) {
539 logmsg "cannot fork: $!";
540 return;
541 } elsif ($pid) {
542 $idlestart = time if !++$children;
543 logmsg "begat $pid";
544 return; # I'm the parent
547 close(Server) unless fileno(Server) == 0;
548 close($piperead);
549 $SIG{CHLD} = 'DEFAULT';
551 open STDIN, "+<&Client" or die "can't dup client to stdin";
552 close(Client);
553 exit &$coderef();
556 # returns:
557 # < 0: error
558 # = 0: proceed
559 # > 0: throttled
560 sub request_throttle {
561 use POSIX "pause";
562 my $classname = shift;
564 Throttle::GetClassInfo($classname)
565 or return -1; # no such throttle class
567 my $throttled = 0;
568 my $proceed = 0;
569 my $error = 0;
570 my $controldead = 0;
571 $SIG{'TERM'} = sub {$throttled = 1};
572 $SIG{'USR1'} = sub {$proceed = 1};
573 $SIG{'USR2'} = sub {$error = 1};
574 $SIG{'PIPE'} = sub {$controldead = 1};
575 $SIG{'ALRM'} = sub {};
577 # After writing we can expect a SIGTERM, SIGUSR1 or SIGUSR2
578 print $pipewrite "\nthrottle $$ $classname\n";
579 do {
580 alarm(30);
581 pause unless $controldead;
582 alarm(0);
583 print $pipewrite "\nkeepalive $$\n";
584 } until ($controldead || $throttled || $proceed || $error);
585 $SIG{'TERM'} = "DEFAULT";
586 $SIG{'USR1'} = "DEFAULT";
587 $SIG{'USR2'} = "DEFAULT";
588 $SIG{'ALRM'} = "DEFAULT";
589 $SIG{'PIPE'} = "DEFAULT";
591 my $result = -1;
592 if ($throttled) {
593 $result = 1;
594 } elsif ($proceed) {
595 $result = 0;
597 return $result;
600 sub clone {
601 my ($name) = @_;
602 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
603 my $proj = Girocco::Project->load($name);
604 $proj or die "failed to load project $name";
605 $proj->{clone_in_progress} or die "project $name is not marked for cloning";
606 $proj->{clone_logged} and die "project $name is already being cloned";
607 request_throttle("clone") <= 0 or die "cloning $name aborted (throttled)";
608 statmsg "cloning $name";
609 open STDOUT, '>', "$Girocco::Config::reporoot/$name.git/.clonelog" or die "cannot open clonelog: $!";
610 open STDERR, ">&STDOUT";
611 open STDIN, '<', '/dev/null';
612 exec "$Girocco::Config::basedir/taskd/clone.sh", "$name.git" or die "exec failed: $!";
615 sub ref_change {
616 my ($arg) = @_;
617 my ($username, $name, $oldrev, $newrev, $ref) = split(/\s+/, $arg);
618 $username && $name && $oldrev && $newrev && $ref or return 0;
619 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
620 $newrev ne $oldrev or return 0;
622 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
623 my $proj = Girocco::Project->load($name);
624 $proj or die "failed to load project $name";
626 my $user;
627 if ($username && $username !~ /^%.*%$/) {
628 Girocco::User::does_exist($username, 1) or die "no such user: $username";
629 $user = Girocco::User->load($username);
630 $user or die "failed to load user $username";
631 } elsif ($username eq "%$name%") {
632 $username = "-";
635 request_throttle("ref-change") <= 0 or die "ref-change $name aborted (throttled)";
636 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
637 open STDIN, '<', '/dev/null';
638 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
639 return 0;
642 sub ref_changes {
643 my ($arg) = @_;
644 my ($username, $name) = split(/\s+/, $arg);
645 $username && $name or return 0;
647 Girocco::Project::does_exist($name, 1) or die "no such project: $name";
648 my $proj = Girocco::Project->load($name);
649 $proj or die "failed to load project $name";
651 my $user;
652 if ($username && $username !~ /^%.*%$/) {
653 Girocco::User::does_exist($username, 1) or die "no such user: $username";
654 $user = Girocco::User->load($username);
655 $user or die "failed to load user $username";
656 } elsif ($username eq "%$name%") {
657 $username = "-";
660 my @changes = ();
661 while (my $change = <STDIN>) {
662 my ($oldrev, $newrev, $ref) = split(/\s+/, $change);
663 $oldrev =~ /^[0-9a-f]{40}$/ && $newrev =~ /^[0-9a-f]{40}$/ && $ref =~ m{^refs/} or return 0;
664 $newrev ne $oldrev or return 0;
665 push(@changes, [$oldrev, $newrev, $ref]);
667 return 0 unless @changes;
668 open STDIN, '<', '/dev/null';
669 request_throttle("ref-change") <= 0 or die "ref-changes $name aborted (throttled)";
670 foreach my $change (@changes) {
671 my ($oldrev, $newrev, $ref) = @$change;
672 statmsg "ref-change $username $name ($ref: @{[substr($oldrev,0,$abbrev)]} -> @{[substr($newrev,0,$abbrev)]})";
673 Girocco::Notify::ref_change($proj, $user, $ref, $oldrev, $newrev);
674 sleep 1;
676 return 0;
679 sub throttle {
680 my ($arg) = @_;
681 my ($pid, $classname) = split(/\s+/, $arg);
682 $pid =~ /^\d+/ or return 0; # invalid pid
683 $pid += 0;
684 $pid > 0 or return 0; # invalid pid
685 kill(0, $pid) || $!{EPERM} or return 0; # no such process
686 Throttle::GetClassInfo($classname) or return 0; # no such throttle class
688 my $throttled = 0;
689 my $proceed = 0;
690 my $error = 0;
691 my $controldead = 0;
692 my $suppdead = 0;
693 $SIG{'TERM'} = sub {$throttled = 1};
694 $SIG{'USR1'} = sub {$proceed = 1};
695 $SIG{'USR2'} = sub {$error = 1};
696 $SIG{'PIPE'} = sub {$controldead = 1};
697 select((select(STDIN),$|=1)[0]);
699 logmsg "throttle $pid $classname request";
700 # After writing we can expect a SIGTERM or SIGUSR1
701 print $pipewrite "\nthrottle $$ $classname\n";
703 # NOTE: the only way to detect the socket close is to read all the
704 # data until EOF is reached -- recv can be used to peek.
705 my $v = '';
706 vec($v, fileno(STDIN), 1) = 1;
707 setnonblock(\*STDIN);
708 do {
709 my ($r, $e);
710 select($r=$v, undef, $e=$v, 30);
711 my ($bytes, $discard);
712 do {$bytes = sysread(STDIN, $discard, 4096)} while (defined($bytes) && $bytes > 0);
713 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
714 print $pipewrite "\nkeepalive $$\n";
715 } until ($controldead || $throttled || $proceed || $error || $suppdead);
716 setblock(\*STDIN);
718 if ($throttled && !$suppdead) {
719 print STDIN "throttled\n";
720 logmsg "throttle $pid $classname throttled";
721 } elsif ($proceed && !$suppdead) {
722 print STDIN "proceed\n";
723 logmsg "throttle $pid $classname proceed";
724 $SIG{'TERM'} = 'DEFAULT';
725 # Stay alive until the child dies which we detect by EOF on STDIN
726 setnonblock(\*STDIN);
727 do {
728 my ($r, $e);
729 select($r=$v, undef, $e=$v, 30);
730 my ($bytes, $discard);
731 do {$bytes = sysread(STDIN, $discard, 512)} while (defined($bytes) && $bytes > 0);
732 $suppdead = 1 unless !defined($bytes) && $!{EAGAIN};
733 print $pipewrite "\nkeepalive $$\n";
734 } until ($controldead || $suppdead);
735 setblock(\*STDIN);
736 } else {
737 my $prefix = '';
738 $prefix = "control" if $controldead && !$suppdead;
739 logmsg "throttle $pid $classname ${prefix}died";
741 exit 0;
744 sub process_pipe_msg {
745 my ($act, $pid, $arg) = split(/\s+/, $_[0]);
746 if ($act eq "throttle") {
747 $pid =~ /^\d+$/ or return 0;
748 $pid += 0;
749 $pid > 0 or return 0; # invalid pid
750 kill(0, $pid) or return 0; # invalid pid
751 defined($arg) && $arg ne "" or kill('USR2', $pid), return 0;
752 Throttle::GetClassInfo($arg) or kill('USR2', $pid), return 0;
753 my $result = Throttle::AddSupplicant($pid, $arg); # could send SIGUSR1
754 kill('USR2', $pid), return 0 if $result < 0;
755 kill('TERM', $pid), return 0 if $result > 0;
756 # $pid was added to class $arg and will receive SIGUSR1 when
757 # it's time for it to proceed
758 return 0;
759 } elsif ($act eq "keepalive") {
760 # nothing to do although we could verify pid is valid and
761 # still in %Throttle::pids and send a SIGUSR2 if not, but
762 # really keepalive should just be ignored.
763 return 0;
765 print STDERR "discarding unknown pipe message \"$_[0]\"\n";
766 return 0;
770 ## -------
771 ## OStream
772 ## -------
775 package OStream;
777 # Set to 1 for only syslog output (if enabled by mode)
778 # Set to 2 for only stderr output (if enabled by mode)
779 our $only = 0; # This is a hack
781 use Carp 'croak';
782 use Sys::Syslog qw(:DEFAULT :macros);
784 sub writeall {
785 use POSIX qw();
786 use Errno;
787 my ($fd, $data) = @_;
788 my $offset = 0;
789 my $remaining = length($data);
790 while ($remaining) {
791 my $bytes = POSIX::write(
792 $fd,
793 substr($data, $offset, $remaining),
794 $remaining);
795 next if !defined($bytes) && $!{EINTR};
796 croak "POSIX::write failed: $!" unless defined $bytes;
797 croak "POSIX::write wrote 0 bytes" unless $bytes;
798 $remaining -= $bytes;
799 $offset += $bytes;
803 sub dumpline {
804 use POSIX qw(STDERR_FILENO);
805 my ($self, $line) = @_;
806 $only = 0 unless defined($only);
807 writeall(STDERR_FILENO, $line) if $self->{'stderr'} && $only != 1;
808 substr($line, -1, 1) = '' if substr($line, -1, 1) eq "\n";
809 return unless length($line);
810 syslog(LOG_NOTICE, "%s", $line) if $self->{'syslog'} && $only != 2;
813 sub TIEHANDLE {
814 my $class = shift || 'OStream';
815 my $mode = shift;
816 my $syslogname = shift;
817 my $syslogfacility = shift;
818 defined($syslogfacility) or $syslogfacility = LOG_USER;
819 my $self = {};
820 $self->{'syslog'} = $mode > 0;
821 $self->{'stderr'} = $mode <= 0 || $mode > 1;
822 $self->{'lastline'} = '';
823 if ($self->{'syslog'}) {
824 # Some Sys::Syslog have a stupid default setlogsock order
825 eval {Sys::Syslog::setlogsock("native"); 1;} or
826 eval {Sys::Syslog::setlogsock("unix");};
827 openlog($syslogname, "ndelay,pid", $syslogfacility)
828 or croak "Sys::Syslog::openlog failed: $!";
830 return bless $self, $class;
833 sub BINMODE {return 1}
834 sub FILENO {return undef}
835 sub EOF {return 0}
836 sub CLOSE {return 1}
838 sub PRINTF {
839 my $self = shift;
840 my $template = shift;
841 return $self->PRINT(sprintf $template, @_);
844 sub PRINT {
845 my $self = shift;
846 my $data = join('', $self->{'lastline'}, @_);
847 my $pos = 0;
848 while ((my $idx = index($data, "\n", $pos)) >= 0) {
849 ++$idx;
850 my $line = substr($data, $pos, $idx - $pos);
851 substr($data, $pos, $idx - $pos) = '';
852 $pos = $idx;
853 $self->dumpline($line);
855 $self->{'lastline'} = $data;
856 return 1;
859 sub DESTROY {
860 my $self = shift;
861 $self->dumpline($self->{'lastline'})
862 if length($self->{'lastline'});
863 closelog;
866 sub WRITE {
867 my $self = shift;
868 my ($scalar, $length, $offset) = @_;
869 $scalar = '' if !defined($scalar);
870 $length = length($scalar) if !defined($length);
871 croak "OStream::WRITE invalid length $length"
872 if $length < 0;
873 $offset = 0 if !defined($offset);
874 $offset += length($scalar) if $offset < 0;
875 croak "OStream::WRITE invalid write offset"
876 if $offset < 0 || $offset > $length;
877 my $max = length($scalar) - $offset;
878 $length = $max if $length > $max;
879 $self->PRINT(substr($scalar, $offset, $length));
880 return $length;
884 ## ----
885 ## main
886 ## ----
889 package main;
891 my $sfac;
892 Getopt::Long::Configure('bundling');
893 my $stiv;
894 my $parse_res = GetOptions(
895 'help|?|h' => sub {pod2usage(-verbose => 2, -exitval => 0)},
896 'quiet|q' => \$quiet,
897 'no-quiet' => sub {$quiet = 0},
898 'progress|P' => \$progress,
899 'inetd|i' => sub {$inetd = 1; $syslog = 1; $quiet = 1;},
900 'idle-timeout|t=i' => \$idle_timeout,
901 'syslog|s:s' => \$sfac,
902 'no-syslog' => sub {$syslog = 0; $sfac = undef;},
903 'stderr' => \$stderr,
904 'abbrev=i' => \$abbrev,
905 'status-interval=i' => \$stiv,
906 ) || pod2usage(2);
907 $syslog = 1 if defined($sfac);
908 $progress = 1 unless $quiet;
909 $abbrev = 128 unless $abbrev > 0;
910 if (defined($idle_timeout)) {
911 die "--idle-timeout must be a whole number" unless $idle_timeout =~ /^\d+$/;
912 die "--idle-timeout may not be used without --inetd" unless $inetd;
914 if (defined($stiv)) {
915 die "--status-interval must be a whole number" unless $stiv =~ /^\d+$/;
916 $statusintv = $stiv * 60;
919 open STDOUT, '>&STDERR' if $inetd;
920 if ($syslog) {
921 use Sys::Syslog qw();
922 my $mode = 1;
923 ++$mode if $stderr;
924 $sfac = "user" unless defined($sfac) && $sfac ne "";
925 my $ofac = $sfac;
926 $sfac = uc($sfac);
927 $sfac = 'LOG_'.$sfac unless $sfac =~ /^LOG_/;
928 my $facility;
929 my %badfac = map({("LOG_$_" => 1)}
930 (qw(PID CONS ODELAY NDELAY NOWAIT PERROR FACMASK NFACILITIES PRIMASK LFMT)));
931 eval "\$facility = Sys::Syslog::$sfac; 1" or die "invalid syslog facility: $ofac";
932 die "invalid syslog facility: $ofac"
933 if ($facility & ~0xf8) || ($facility >> 3) > 23 || $badfac{$sfac};
934 tie *STDERR, 'OStream', $mode, $progname, $facility or die "tie failed";
936 if ($quiet) {
937 open STDOUT, '>', '/dev/null';
938 } elsif ($inetd) {
939 *STDOUT = *STDERR;
942 my $NAME;
944 if ($inetd) {
945 open Server, '<&=0' or die "open: $!";
946 my $sockname = getsockname Server;
947 die "getsockname: $!" unless $sockname;
948 die "socket already connected! must be 'wait' socket" if getpeername Server;
949 die "getpeername: $!" unless $!{ENOTCONN};
950 my $st = getsockopt Server, SOL_SOCKET, SO_TYPE;
951 die "getsockopt(SOL_SOCKET, SO_TYPE): $!" unless $st;
952 my $socktype = unpack('i', $st);
953 die "stream socket required" unless defined $socktype && $socktype == SOCK_STREAM;
954 die "AF_UNIX socket required" unless sockaddr_family($sockname) == AF_UNIX;
955 $NAME = unpack_sockaddr_un $sockname;
956 my $expected = $Girocco::Config::chroot.'/etc/taskd.socket';
957 warn "listening on \"$NAME\" but expected \"$expected\"" unless $NAME eq $expected;
958 my $mode = (stat($NAME))[2];
959 die "stat: $!" unless $mode;
960 $mode &= 07777;
961 if (($mode & 0660) != 0660) {
962 chmod(($mode|0660), $NAME) == 1 or die "chmod ug+rw \"$NAME\": $!";
964 } else {
965 $NAME = $Girocco::Config::chroot.'/etc/taskd.socket';
966 my $uaddr = sockaddr_un($NAME);
968 socket(Server, PF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
969 unlink($NAME);
970 bind(Server, $uaddr) or die "bind: $!";
971 listen(Server, SOMAXCONN) or die "listen: $!";
972 chmod 0666, $NAME or die "chmod: $!";
975 foreach my $throttle (@Girocco::Config::throttle_classes, @throttle_defaults) {
976 my $classname = $throttle->{"name"};
977 $classname or next;
978 Throttle::GetClassInfo($classname, $throttle);
981 sub _min {
982 return $_[0] <= $_[1] ? $_[0] : $_[1];
985 pipe($piperead, $pipewrite) or die "pipe failed: $!";
986 setnonblock($piperead);
987 select((select($pipewrite), $|=1)[0]);
988 my $pipebuff = '';
989 my $fdset = '';
990 vec($fdset, fileno(Server), 1) = 1;
991 vec($fdset, fileno($piperead), 1) = 1;
992 my $t = time;
993 my $nextwakeup = $t + 60;
994 my $nextstatus = undef;
995 $nextstatus = $t + $statusintv if $statusintv;
996 statmsg "listening on $NAME";
997 while (1) {
998 my ($rout, $eout, $nfound);
999 do {
1000 my $wait;
1001 my $now = time;
1002 if (defined($nextstatus) && $now >= $nextstatus) {
1003 my $statmsg = "STATUS: $children active";
1004 if ($children) {
1005 my @stats = ();
1006 my $cnt = 0;
1007 foreach my $cls (sort(Throttle::GetClassList())) {
1008 my $inf = Throttle::GetClassInfo($cls);
1009 if ($inf->{'total'}) {
1010 $cnt += $inf->{'total'};
1011 push(@stats, substr(lc($cls),0,1)."=".
1012 $inf->{'total'}.'/'.$inf->{'active'});
1015 push(@stats, "?=".($children-$cnt)) if @stats && $cnt < $children;
1016 $statmsg .= " (".join(" ",@stats).")" if @stats;
1018 my $idlesecs;
1019 $statmsg .= ", idle " . duration($idlesecs)
1020 if !$children && ($idlesecs = $now - $idlestart) >= 2;
1021 statmsg $statmsg;
1022 $nextstatus += $statusintv while $nextstatus <= $now;
1024 $nextwakeup += 60, $now = time while ($wait = $nextwakeup - $now) <= 0;
1025 $wait = _min($wait, (Throttle::ServiceQueue()||60));
1026 $nfound = select($rout=$fdset, undef, $eout=$fdset, $wait);
1027 logmsg("select failed: $!"), exit(1) unless $nfound >= 0 || $!{EINTR} || $!{EAGAIN};
1028 my $reaped;
1029 Throttle::RemoveSupplicant($reaped) while ($reaped = shift(@reapedpids));
1030 $now = time;
1031 if ($idle_timeout && !$children && !$nfound && $now - $idlestart >= $idle_timeout) {
1032 statmsg "idle timeout (@{[duration($idle_timeout)]}) exceeded now exiting";
1033 exit 0;
1035 } while $nfound < 1;
1036 my $reout = $rout | $eout;
1037 if (vec($reout, fileno($piperead), 1)) {{
1038 my $nloff = -1;
1040 my $bytes;
1041 do {$bytes = sysread($piperead, $pipebuff, 512, length($pipebuff))}
1042 while (!defined($bytes) && $!{EINTR});
1043 last if !defined($bytes) && $!{EAGAIN};
1044 die "sysread failed: $!" unless defined $bytes;
1045 # since we always keep a copy of $pipewrite open EOF is fatal
1046 die "sysread returned EOF on pipe read" unless $bytes;
1047 $nloff = index($pipebuff, "\n", 0);
1048 if ($nloff < 0 && length($pipebuff) >= 512) {
1049 $pipebuff = '';
1050 print STDERR "discarding 512 bytes of control pipe data with no \\n found\n";
1052 redo unless $nloff >= 0;
1054 last unless $nloff >= 0;
1055 do {
1056 my $msg = substr($pipebuff, 0, $nloff);
1057 substr($pipebuff, 0, $nloff + 1) = '';
1058 $nloff = index($pipebuff, "\n", 0);
1059 process_pipe_msg($msg) if length($msg);
1060 } while $nloff >= 0;
1061 redo;
1063 next unless vec($reout, fileno(Server), 1);
1064 unless (accept(Client, Server)) {
1065 logmsg "accept failed: $!" unless $!{EINTR};
1066 next;
1068 logmsg "connection on $NAME";
1069 spawn sub {
1070 my $inp = <STDIN>;
1071 $inp = <STDIN> if defined($inp) && $inp eq "\n";
1072 chomp $inp if defined($inp);
1073 $inp or exit 0; # ignore empty connects
1074 my ($cmd, $arg) = $inp =~ /^([a-zA-Z][a-zA-Z0-9._+-]*)(?:\s+(.*))?$/;
1075 defined($arg) or $arg = '';
1076 if ($cmd eq 'ref-changes') {
1077 ref_changes($arg);
1078 } elsif ($cmd eq 'clone') {
1079 clone($arg);
1080 } elsif ($cmd eq 'ref-change') {
1081 ref_change($arg);
1082 } elsif ($cmd eq 'throttle') {
1083 throttle($arg);
1084 } else {
1085 die "ignoring unknown command: $cmd\n";
1088 close Client;
1089 sleep 1;
1093 ## -------------
1094 ## Documentation
1095 ## -------------
1098 __END__
1100 =head1 NAME
1102 taskd.pl - Perform Girocco service tasks
1104 =head1 SYNOPSIS
1106 taskd.pl [options]
1108 Options:
1109 -h | --help detailed instructions
1110 -q | --quiet run quietly
1111 --no-quiet do not run quietly
1112 -P | --progress show occasional status updates
1113 -i | --inetd run as inetd unix stream wait service
1114 implies --quiet --syslog
1115 -t SECONDS | --idle-timeout=SECONDS how long to wait idle before exiting
1116 requires --inetd
1117 -s | --syslog[=facility] send messages to syslog instead of
1118 stderr but see --stderr
1119 enabled by --inetd
1120 --no-syslog do not send message to syslog
1121 --stderr always send messages to stderr too
1122 --abbrev=n abbreviate hashes to n (default is 8)
1123 --status-interval=MINUTES status update interval (default 1)
1125 =head1 OPTIONS
1127 =over 8
1129 =item B<--help>
1131 Print the full description of taskd.pl's options.
1133 =item B<--quiet>
1135 Suppress non-error messages, e.g. for use when running this task as an inetd
1136 service. Enabled by default by --inetd.
1138 =item B<--no-quiet>
1140 Enable non-error messages. When running in --inetd mode these messages are
1141 sent to STDERR instead of STDOUT.
1143 =item B<--progress>
1145 Show information about the current status of the task operation occasionally.
1146 This is automatically enabled if --quiet is not given.
1148 =item B<--inetd>
1150 Run as an inetd wait service. File descriptor 0 must be an unconnected unix
1151 stream socket ready to have accept called on it. To be useful, the unix socket
1152 should be located at "$Girocco::Config::chroot/etc/taskd.socket". A warning
1153 will be issued if the socket is not in the expected location. Socket file
1154 permissions will be adjusted if necessary and if they cannot be taskd.pl will
1155 die. The --inetd option also enables the --quiet and --syslog options but
1156 --no-quiet and --no-syslog may be used to alter that.
1158 The correct specification for the inetd socket is a "unix" protocol "stream"
1159 socket in "wait" mode with user and group writable permissions (0660). An
1160 attempt will be made to alter the socket's file mode if needed and if that
1161 cannot be accomplished taskd.pl will die.
1163 Although most inetd stream services run in nowait mode, taskd.pl MUST be run
1164 in wait mode and will die if the passed in socket is already connected.
1166 Note that while *BSD's inetd happily supports unix sockets (and so does
1167 Darwin's launchd), neither xinetd nor GNU's inetd supports unix sockets.
1168 However, systemd does seem to.
1170 =item B<--idle-timeout=SECONDS>
1172 Only permitted when running in --inetd mode. After SECONDS of inactivity
1173 (i.e. all outstanding tasks have completed and no new requests have come in)
1174 exit normally. The default is no timeout at all (a SECONDS value of 0).
1175 Note that it may actually take up to SECONDS+60 for the idle exit to occur.
1177 =item B<--syslog[=facility]>
1179 Normally error output is sent to STDERR. With this option it's sent to
1180 syslog instead. Note that when running in --inetd mode non-error output is
1181 also affected by this option as it's sent to STDERR in that case. If
1182 not specified, the default for facility is LOG_USER. Facility names are
1183 case-insensitive and the leading 'LOG_' is optional. Messages are logged
1184 with the LOG_NOTICE priority.
1186 =item B<--no-syslog>
1188 Send error message output to STDERR but not syslog.
1190 =item B<--stderr>
1192 Always send error message output to STDERR. If --syslog is in effect then
1193 a copy will also be sent to syslog. In --inetd mode this applies to non-error
1194 messages as well.
1196 =item B<--abbrev=n>
1198 Abbreviate displayed hash values to only the first n hexadecimal characters.
1199 The default is 8 characters. Set to 0 for no abbreviation at all.
1201 =item B<--status-interval=MINUTES>
1203 If progress is enabled (with --progress or by default if no --inetd or --quiet)
1204 status updates are shown at each MINUTES interval. Setting the interval to 0
1205 disables them entirely even with --progress.
1207 =back
1209 =head1 DESCRIPTION
1211 taskd.pl is Girocco's service request servant; it listens for service requests
1212 such as new clone requests and ref update notifications and spawns a task to
1213 perform the requested action.
1215 =cut