jobd/taskd: support --same-pid graceful restart option
[girocco.git] / jobd / jobd.pl
bloba49913c509807619fd3d0ebc9ade6d71dfce4d82
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
21 use Girocco::ExecUtil;
23 # Options
24 my $quiet;
25 my $progress;
26 my $cpus = online_cpus;
27 my $kill_after = 900;
28 my $max_par = $cpus ? $cpus * 2 : 8;
29 my $max_par_intensive = 1;
30 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
31 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
32 my $restart_delay = 300;
33 my $all_once;
34 my $same_pid;
35 my $one;
37 my ($load_trig, $load_untrig);
39 ######### Jobs {{{1
41 sub update_project {
42 my $job = shift;
43 my $p = $job->{'project'};
44 check_project_exists($job) || return;
45 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
46 -e get_project_path($p).".bypass_fetch") {
47 job_skip($job);
48 return setup_gc($job);
50 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
51 job_skip($job, "initial mirroring not complete yet");
52 return;
54 if (-e get_project_path($p).".clone_failed") {
55 job_skip($job, "initial mirroring failed");
56 # Still need to gc non top-level clones even if they've failed
57 # otherwise the objects copied into them from the parent will
58 # just accumulate without bound
59 setup_gc($job) if $p =~ m,/,;
60 return;
62 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
63 job_skip($job, "not needed right now, last run at $ts");
64 setup_gc($job);
65 return;
67 if (is_svn_clone($p)) {
68 # git svn can be very, very slow at times
69 $job->{'timeout_factor'} = 3;
71 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
74 sub gc_project {
75 my $job = shift;
76 my $p = $job->{'project'};
77 check_project_exists($job) || return;
78 my $projpath = get_project_path($p);
79 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
80 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc" && ! -e "$projpath.needsgc")) {
81 job_skip($job);
82 return;
84 my $ts;
85 if (! -e "$projpath.needsgc" &&
86 ($ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval)))) {
87 job_skip($job, "not needed right now, last run at $ts");
88 return;
90 # allow garbage collection to run for longer than an update
91 $job->{'timeout_factor'} = 2;
92 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
95 sub setup_gc {
96 my $job = shift;
97 queue_job(
98 project => $job->{'project'},
99 type => 'gc',
100 command => \&gc_project,
101 intensive => 1,
105 sub check_project_exists {
106 my $job = shift;
107 my $p = $job->{'project'};
108 if (!-d get_project_path($p)) {
109 job_skip($job, "non-existent project");
110 return 0;
115 sub get_project_path {
116 "$Girocco::Config::reporoot/".shift().".git/";
119 my $_last_config_path;
120 my $_last_config_id;
121 my $_last_config;
122 BEGIN {
123 $_last_config_path = "";
124 $_last_config_id = "";
125 $_last_config = {};
128 sub get_git_config {
129 my ($projdir, $name) = @_;
130 defined($projdir) && -d $projdir && -f "$projdir/config" or return undef;
131 my $cf = "$projdir/config";
132 my @stat = stat($cf);
133 @stat && $stat[7] && $stat[9] or return undef;
134 my $id = join(":", $stat[0], $stat[1], $stat[7], $stat[9]); # dev,ino,size,mtime
135 if ($_last_config_path ne $cf || $_last_config_id ne $id || ref($_last_config) ne 'HASH') {
136 my $data = read_config_file_hash($cf);
137 defined($data) or $data = {};
138 $_last_config_path = $_last_config_id = "";
139 $_last_config = $data;
140 $_last_config_id = $id;
141 $_last_config_path = $cf;
143 return $_last_config->{$name};
146 sub is_operation_uptodate {
147 my ($project, $which, $threshold) = @_;
148 my $path = get_project_path($project);
149 my $timestamp = get_git_config($path, "gitweb.$which");
150 defined($timestamp) or $timestamp = '';
151 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
152 (time - $unix_ts) <= $threshold ? $timestamp : undef;
155 sub is_svn_clone {
156 my ($project) = @_;
157 my $path = get_project_path($project);
158 my $baseurl = get_git_config($path, 'gitweb.baseurl');
159 defined($baseurl) or $baseurl = '';
160 my $svnurl = get_git_config($path, 'svn-remote.svn.url');
161 defined($svnurl) or $svnurl = '';
162 return $baseurl =~ /^svn[:+]/i && $svnurl;
165 sub queue_one {
166 my $project = shift;
167 queue_job(
168 project => $project,
169 type => 'update',
170 command => \&update_project,
171 on_success => \&setup_gc,
172 on_error => \&setup_gc,
176 sub queue_all {
177 queue_one($_) for (Girocco::Project->get_full_list());
180 ######### Daemon operation {{{1
182 my @queue;
183 my @running;
184 my $perpetual = 1;
185 my $locked = 0;
186 my $jobs_executed;
187 my $jobs_skipped;
188 my @jobs_killed;
190 # Kills and reaps the specified pid. Returns exit status ($?) on success
191 # otherwise undef if process could not be killed or reaped
192 # First sends SIGINT and if process does not exit within 15 seconds then SIGKILL
193 # We used to send SIGTERM instead of SIGINT, but by using SIGINT we can take
194 # advantage of "tee -i" in our update scripts and really anything we're killing
195 # should respond the same to either SIGINT or SIGTERM and exit gracefully.
196 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
197 sub kill_gently {
198 my $targ = shift;
199 my $use_pg = shift || 0;
200 # Note that the docs for Perl's kill state that a negative signal
201 # number should be used to kill process groups and that while a
202 # a negative process id (and positive signal number) may also do that
203 # on some platforms, that's not portable.
204 my $pg = $use_pg ? -1 : 1;
205 my $harsh = time() + 15; # SIGKILL after this delay
206 my $count = kill(2*$pg, $targ); # SIGINT is 2
207 my $reaped = waitpid($targ, WNOHANG);
208 return undef if $reaped < 0;
209 return $? if $reaped == $targ;
210 while ($count && time() < $harsh) {
211 select(undef, undef, undef, 0.2);
212 $reaped = waitpid($targ, WNOHANG);
213 return undef if $reaped < 0;
214 return $? if $reaped == $targ;
216 $harsh = time() + 2;
217 $count = kill(9*$pg, $targ); # SIGKILL is 9
218 $reaped = waitpid($targ, WNOHANG);
219 return undef if $reaped < 0;
220 return $? if $reaped == $targ;
221 # We should not need to wait to reap a SIGKILL, however, just in case
222 # the system doesn't make a SIGKILL'd process immediately reapable
223 # (perhaps under extremely heavy load) we accomodate a brief delay
224 while ($count && time() < $harsh) {
225 select(undef, undef, undef, 0.2);
226 $reaped = waitpid($targ, WNOHANG);
227 return undef if $reaped < 0;
228 return $? if $reaped == $targ;
230 return undef;
233 sub handle_softexit {
234 error("Waiting for outstanding jobs to finish... ".
235 "^C again to exit immediately");
236 @queue = ();
237 $perpetual = 0;
238 $SIG{'INT'} = \&handle_exit;
241 sub handle_exit {
242 error("Killing outstanding jobs, please be patient...");
243 $SIG{'TERM'} = 'IGNORE';
244 for (@running) {
245 kill_gently($_->{'pid'}, 1);
247 unlink $lockfile if ($locked);
248 exit(0);
251 sub queue_job {
252 my %opts = @_;
253 $opts{'queued_at'} = time;
254 $opts{'dont_run'} = 0;
255 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
256 push @queue, \%opts;
259 sub run_job {
260 my $job = shift;
262 push @running, $job;
263 $job->{'command'}->($job);
264 if ($job->{'dont_run'}) {
265 pop @running;
266 $jobs_skipped++;
267 return;
271 sub _job_name {
272 my $job = shift;
273 "[".$job->{'type'}."::".$job->{'project'}."]";
276 # Only one of those per job!
277 sub exec_job_command {
278 my ($job, $command, $err_only) = @_;
280 my $pid;
281 $job->{'finished'} = 0;
282 delete $job->{'pid'};
283 if (!defined($pid = fork)) {
284 error(_job_name($job) ." Can't fork job: $!");
285 $job->{'finished'} = 1;
286 return;
288 if (!$pid) {
289 # "Prevent" races
290 select(undef, undef, undef, 0.1);
292 open STDIN, '<', '/dev/null' || do {
293 error(_job_name($job) ."Can't read from /dev/null: $!");
294 exit 71; # EX_OSERR
296 if ($err_only) {
297 open STDOUT, '>', '/dev/null' || do {
298 error(_job_name($job) ." Can't write to /dev/null: $!");
299 exit 71; # EX_OSERR
302 # New process group so we can keep track of all of its children
303 if (!defined(POSIX::setpgid(0, 0))) {
304 error(_job_name($job) ." Can't create process group: $!");
305 exit 71; # EX_OSERR
308 exec @$command;
309 # Stop perl from complaining
310 exit 71; # EX_OSERR
312 $job->{'pid'} = $pid;
313 $job->{'started_at'} = time;
316 sub job_skip {
317 my ($job, $msg) = @_;
318 $job->{'dont_run'} = 1;
319 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
322 sub reap_hanging_jobs {
323 for (@running) {
324 my $factor = $_->{'timeout_factor'} || 1;
325 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
326 $_->{'finished'} = 1;
327 my $exitcode = kill_gently($_->{'pid'}, 1);
328 delete $_->{'pid'};
329 $_->{'killed'} = 1;
330 error(_job_name($_) ." KILLED due to timeout" .
331 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
332 push @jobs_killed, _job_name($_);
337 sub reap_one_job {
338 my $job = shift;
339 if (!$job->{'finished'}) {
340 $job->{'on_success'}->($job) if defined($job->{'on_success'});
341 $job->{'finished'} = 1;
342 $jobs_executed++;
343 } else {
344 $job->{'on_error'}->($job) if defined($job->{'on_error'});
348 sub reap_finished_jobs {
349 my $pid;
350 my $finished_any = 0;
351 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
352 delete $child->{'killed'};
353 reap_one_job($child);
354 $finished_any = 1;
356 while (1) {
357 $pid = waitpid(-1, WNOHANG);
358 last if $pid <= 0;
359 $finished_any = 1;
361 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
362 if ($?) {
363 # any non-zero exit status should trigger on_error
364 $child[0]->{'finished'} = 1 if @child;
366 if (@child) {
367 delete $child[0]->{'pid'};
368 reap_one_job($child[0]);
371 @running = grep { $_->{'finished'} == 0 } @running;
372 $finished_any;
375 sub have_intensive_jobs {
376 grep { $_->{'intensive'} == 1 } @running;
379 sub ts {
380 "[". scalar(localtime) ."] ";
383 sub get_load_info {
384 if ($^O eq "linux") {
385 # Read /proc/loadavg on Linux
386 open(LOADAV, '<', '/proc/loadavg') or return undef;
387 my $loadinfo = <LOADAV>;
388 close LOADAV;
389 return (split(/\s/, $loadinfo, 4))[0..2];
390 } else {
391 # Read the output of uptime everywhere else (works on Linux too)
392 open(LOADAV, '-|', 'uptime') or return undef;
393 my $loadinfo = <LOADAV>;
394 close LOADAV;
395 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
396 return ($1, $2, $3);
400 sub run_queue {
401 my $last_progress = time;
402 my $last_checkload = time - 5;
403 my $current_load = $load_trig;
404 my $overloaded = 0;
405 my $load_info = '';
406 $jobs_executed = 0;
407 $jobs_skipped = 0;
408 @jobs_killed = ();
409 if ($progress) {
410 my $s = @queue == 1 ? '' : 's';
411 ferror("--- Processing %d queued job$s", scalar(@queue));
413 $SIG{'INT'} = \&handle_softexit;
414 $SIG{'TERM'} = \&handle_exit;
415 while (@queue || @running) {
416 reap_hanging_jobs();
417 my $proceed_immediately = reap_finished_jobs();
418 # Check current system load
419 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
420 my $current_load = $loadinfo[0];
421 if ($current_load > $load_trig && !$overloaded) {
422 $overloaded = 1;
423 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
424 } elsif ($current_load < $load_untrig && $overloaded) {
425 $overloaded = 0;
426 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
428 if ($overloaded) {
429 $load_info = ', paused (load '. $current_load .')';
430 } else {
431 $load_info = ', load '. $current_load;
433 $last_checkload = time;
435 # Status output
436 if ($progress && (time - $last_progress) >= 60) {
437 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
438 if (@running) {
439 my @run_status;
440 for (@running) {
441 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
443 error("STATUS: currently running: ". join(', ', @run_status));
445 $last_progress = time;
447 # Back off if we're too busy
448 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
449 sleep 1 unless $proceed_immediately;
450 next;
452 # Run next
453 run_job(shift(@queue)) if @queue;
455 if ($progress) {
456 my $s = $jobs_executed == 1 ? '' : 's';
457 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
461 sub run_perpetually {
462 if (-e $lockfile) {
463 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
465 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
466 print LOCK $$;
467 close LOCK;
468 $locked = 1;
470 my $result = "";
471 while ($perpetual) {
472 # touch ctime of lockfile to prevent it from being removed by /tmp cleaning
473 chmod 0640, $lockfile;
474 chmod 0644, $lockfile;
475 # check for restart request
476 open LOCK, '<', $lockfile || die "Lock file '$lockfile' has disappeared!";
477 my $request = <LOCK>;
478 close LOCK;
479 chomp $request if defined($request);
480 if (defined($request) && $request eq "restart") {
481 $result = $request;
482 last;
484 queue_all();
485 run_queue();
486 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
488 unlink $lockfile;
489 $locked = 0;
490 return $result;
493 ######### Helpers {{{1
495 sub error($) {
496 print STDERR ts().shift()."\n";
498 sub ferror(@) {
499 error(sprintf($_[0], @_[1..$#_]));
501 sub fatal($) {
502 error(shift);
503 exit 1;
506 ######### Main {{{1
508 my $reexec = Girocco::ExecUtil->new;
509 chdir "/";
510 close(DATA) if fileno(DATA);
511 # Parse options
512 Getopt::Long::Configure('bundling');
513 my $parse_res = GetOptions(
514 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
515 'quiet|q' => \$quiet,
516 'progress|P' => \$progress,
517 'kill-after|k=i' => \$kill_after,
518 'max-parallel|p=i' => \$max_par,
519 'max-intensive-parallel|i=i' => \$max_par_intensive,
520 'load-triggers=s' => \$load_triggers,
521 'restart-delay|d=i' => \$restart_delay,
522 'lockfile|l=s' => \$lockfile,
523 'same-pid' => \$same_pid,
524 'all-once|a' => \$all_once,
525 'one|o=s' => \$one,
526 ) || pod2usage(2);
527 fatal("Error: can only use one out of --all-once and --one")
528 if ($all_once && $one);
530 unless ($quiet) {
531 $ENV{'show_progress'} = '1';
532 $progress = 1;
535 $load_triggers = '0,0' unless defined((get_load_info())[0]);
536 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
538 if ($one) {
539 queue_one($one);
540 run_queue();
541 exit;
544 if ($all_once) {
545 queue_all();
546 run_queue();
547 exit;
551 if (run_perpetually() eq "restart") {
552 error("Restarting in response to restart request... ");
553 $reexec->reexec($same_pid);
554 error("Continuing after failed restart: $!");
555 chdir "/";
556 redo;
560 ########## Documentation {{{1
562 __END__
564 =head1 NAME
566 jobd.pl - Perform Girocco maintenance jobs
568 =head1 SYNOPSIS
570 jobd.pl [options]
572 Options:
573 -h | --help detailed instructions
574 -q | --quiet run quietly
575 -P | --progress show occasional status updates
576 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
577 -p NUM | --max-parallel NUM how many jobs to run at the same time
578 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
579 at the same time
580 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
581 TRIG and resume at load below UNTRIG
582 -d NUM | --restart-delay SECONDS wait for this many seconds between
583 queue runs
584 -l FILE | --lockfile FILE create a lockfile in the given
585 location
586 --same-pid keep same pid during graceful restart
587 -a | --all-once process the list only once
588 -o PRJNAME | --one PRJNAME process only one project
590 =head1 OPTIONS
592 =over 8
594 =item B<--help>
596 Print the full description of jobd.pl's options.
598 =item B<--quiet>
600 Suppress non-error messages, e.g. for use when running this task as a cronjob.
602 =item B<--progress>
604 Show information about the current status of the job queue occasionally. This
605 is automatically enabled if --quiet is not given.
607 =item B<--kill-after SECONDS>
609 Kill supervised jobs after a certain time to avoid hanging the daemon.
611 =item B<--max-parallel NUM>
613 Run no more than that many jobs at the same time. The default is the number
614 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
616 =item B<--max-intensive-parallel NUM>
618 Run no more than that many resource-hungry jobs at the same time. Right now,
619 this refers to repacking jobs. The default is 1.
621 =item B<--load-triggers TRIG,UNTRIG>
623 If the first system load average (1 minute average) exceeds TRIG, don't queue
624 any more jobs until it goes below UNTRIG. This is currently only supported on
625 Linux and any other platforms that provide an uptime command with load average
626 output.
628 If both values are zero, load checks are disabled. The default is the number
629 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
630 be determined, the default is 6,3.
632 =item B<--restart-delay NUM>
634 After processing the queue, wait this many seconds until the queue is
635 restarted. The default is 300 seconds.
637 =item B<--lockfile FILE>
639 For perpetual operation, specify the full path to a lock file to create and
640 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
641 where $suffix is a 6-character string uniquely determined by the name and
642 nicknme of this Girocco instance. The pid of the running jobd instance will
643 be written to the lock file.
645 =item B<--same-pid>
647 When performing a graceful restart, keep the same pid rather than switching to
648 a new one.
650 =item B<--all-once>
652 Instead of perpetually processing all projects over and over again, process
653 them just once and then exit.
655 =item B<--one PRJNAME>
657 Process only the given project (given as just the project name without C<.git>
658 suffix) and then exit.
660 =back
662 =head1 DESCRIPTION
664 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
665 all the repositories and updates mirrored repositories and repacks push-mode
666 repositories when needed.
668 =cut