jobd.pl: update help
[girocco.git] / jobd / jobd.pl
blob6223cd89addd7dac1393c9e2f84da264cafab2e7
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
22 # Options
23 my $quiet;
24 my $progress;
25 my $cpus = online_cpus;
26 my $kill_after = 900;
27 my $max_par = $cpus ? $cpus * 2 : 8;
28 my $max_par_intensive = 1;
29 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
30 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
31 my $restart_delay = 300;
32 my $all_once;
33 my $one;
35 my ($load_trig, $load_untrig);
37 ######### Jobs {{{1
39 sub update_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
44 -e get_project_path($p).".bypass_fetch") {
45 job_skip($job);
46 return setup_gc($job);
48 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring not complete yet");
50 return;
52 if (-e get_project_path($p).".clone_failed") {
53 job_skip($job, "initial mirroring failed");
54 # Still need to gc non top-level clones even if they've failed
55 # otherwise the objects copied into them from the parent will
56 # just accumulate without bound
57 setup_gc($job) if $p =~ m,/,;
58 return;
60 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
61 job_skip($job, "not needed right now, last run at $ts");
62 setup_gc($job);
63 return;
65 if (is_svn_clone($p)) {
66 # git svn can be very, very slow at times
67 $job->{'timeout_factor'} = 3;
69 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
72 sub gc_project {
73 my $job = shift;
74 my $p = $job->{'project'};
75 check_project_exists($job) || return;
76 my $projpath = get_project_path($p);
77 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
78 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc")) {
79 job_skip($job);
80 return;
82 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
83 job_skip($job, "not needed right now, last run at $ts");
84 return;
86 # allow garbage collection to run for longer than an update
87 $job->{'timeout_factor'} = 2;
88 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
91 sub setup_gc {
92 my $job = shift;
93 queue_job(
94 project => $job->{'project'},
95 type => 'gc',
96 command => \&gc_project,
97 intensive => 1,
101 sub check_project_exists {
102 my $job = shift;
103 my $p = $job->{'project'};
104 if (!-d get_project_path($p)) {
105 job_skip($job, "non-existent project");
106 return 0;
111 sub get_project_path {
112 "$Girocco::Config::reporoot/".shift().".git/";
115 sub is_operation_uptodate {
116 my ($project, $which, $threshold) = @_;
117 my $path = get_project_path($project);
118 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
119 defined($timestamp) or $timestamp = '';
120 chomp $timestamp;
121 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
122 (time - $unix_ts) <= $threshold ? $timestamp : undef;
125 sub is_svn_clone {
126 my ($project) = @_;
127 my $path = get_project_path($project);
128 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
129 defined($baseurl) or $baseurl = '';
130 chomp $baseurl;
131 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
132 defined($svnurl) or $svnurl = '';
133 chomp $svnurl;
134 return $baseurl =~ /^svn[:+]/i && $svnurl;
137 sub queue_one {
138 my $project = shift;
139 queue_job(
140 project => $project,
141 type => 'update',
142 command => \&update_project,
143 on_success => \&setup_gc,
144 on_error => \&setup_gc,
148 sub queue_all {
149 queue_one($_) for (Girocco::Project->get_full_list());
152 ######### Daemon operation {{{1
154 my @queue;
155 my @running;
156 my $perpetual = 1;
157 my $locked = 0;
158 my $jobs_executed;
159 my $jobs_skipped;
160 my @jobs_killed;
162 # Kills and reaps the specified pid. Returns exit status ($?) on success
163 # otherwise undef if process could not be killed or reaped
164 # First sends SIGTERM and if process does not exit within 15 seconds then SIGKILL
165 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
166 sub kill_gently {
167 my $targ = shift;
168 my $use_pg = shift || 0;
169 # Note that the docs for Perl's kill state that a negative signal
170 # number should be used to kill process groups and that while a
171 # a negative process id (and positive signal number) may also do that
172 # on some platforms, that's not portable.
173 my $pg = $use_pg ? -1 : 1;
174 my $harsh = time() + 15; # SIGKILL after this delay
175 my $count = kill(15*$pg, $targ); # SIGTERM is 15
176 my $reaped = waitpid($targ, WNOHANG);
177 return undef if $reaped < 0;
178 return $? if $reaped == $targ;
179 while ($count && time() < $harsh) {
180 select(undef, undef, undef, 0.2);
181 $reaped = waitpid($targ, WNOHANG);
182 return undef if $reaped < 0;
183 return $? if $reaped == $targ;
185 $harsh = time() + 2;
186 $count = kill(9*$pg, $targ); # SIGKILL is 9
187 $reaped = waitpid($targ, WNOHANG);
188 return undef if $reaped < 0;
189 return $? if $reaped == $targ;
190 # We should not need to wait to reap a SIGKILL, however, just in case
191 # the system doesn't make a SIGKILL'd process immediately reapable
192 # (perhaps under extremely heavy load) we accomodate a brief delay
193 while ($count && time() < $harsh) {
194 select(undef, undef, undef, 0.2);
195 $reaped = waitpid($targ, WNOHANG);
196 return undef if $reaped < 0;
197 return $? if $reaped == $targ;
199 return undef;
202 sub handle_softexit {
203 error("Waiting for outstanding jobs to finish... ".
204 "^C again to exit immediately");
205 @queue = ();
206 $perpetual = 0;
207 $SIG{'INT'} = \&handle_exit;
210 sub handle_exit {
211 error("Killing outstanding jobs, please be patient...");
212 $SIG{'TERM'} = 'IGNORE';
213 for (@running) {
214 kill_gently($_->{'pid'}, 1);
216 unlink $lockfile if ($locked);
217 exit(0);
220 sub queue_job {
221 my %opts = @_;
222 $opts{'queued_at'} = time;
223 $opts{'dont_run'} = 0;
224 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
225 push @queue, \%opts;
228 sub run_job {
229 my $job = shift;
231 push @running, $job;
232 $job->{'command'}->($job);
233 if ($job->{'dont_run'}) {
234 pop @running;
235 $jobs_skipped++;
236 return;
240 sub _job_name {
241 my $job = shift;
242 "[".$job->{'type'}."::".$job->{'project'}."]";
245 # Only one of those per job!
246 sub exec_job_command {
247 my ($job, $command, $err_only) = @_;
249 my $pid;
250 if (!defined($pid = fork)) {
251 error(_job_name($job) ." Can't fork job: $!");
252 $job->{'finished'} = 1;
253 return;
255 if (!$pid) {
256 open STDIN, '<', '/dev/null' || do {
257 error(_job_name($job) ."Can't read from /dev/null: $!");
258 $job->{'finished'} = 1;
259 return;
261 if ($err_only) {
262 open STDOUT, '>', '/dev/null' || do {
263 error(_job_name($job) ." Can't write to /dev/null: $!");
264 $job->{'finished'} = 1;
265 return;
268 # New process group so we can keep track of all of its children
269 if (!defined(POSIX::setpgid(0, 0))) {
270 error(_job_name($job) ." Can't create process group: $!");
271 $job->{'finished'} = 1;
272 return;
274 # "Prevent" races
275 select(undef, undef, undef, 0.1);
276 exec @$command;
277 # Stop perl from complaining
278 exit $?;
280 $job->{'pid'} = $pid;
281 $job->{'finished'} = 0;
282 $job->{'started_at'} = time;
285 sub job_skip {
286 my ($job, $msg) = @_;
287 $job->{'dont_run'} = 1;
288 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
291 sub reap_hanging_jobs {
292 for (@running) {
293 my $factor = $_->{'timeout_factor'} || 1;
294 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
295 $_->{'finished'} = 1;
296 my $exitcode = kill_gently($_->{'pid'}, 1);
297 delete $_->{'pid'};
298 $_->{'killed'} = 1;
299 error(_job_name($_) ." KILLED due to timeout" .
300 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
301 push @jobs_killed, _job_name($_);
306 sub reap_one_job {
307 my $job = shift;
308 if (!$job->{'finished'}) {
309 $job->{'on_success'}->($job) if defined($job->{'on_success'});
310 $job->{'finished'} = 1;
311 $jobs_executed++;
312 } else {
313 $job->{'on_error'}->($job) if defined($job->{'on_error'});
317 sub reap_finished_jobs {
318 my $pid;
319 my $finished_any = 0;
320 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
321 delete $child->{'killed'};
322 reap_one_job($child);
323 $finished_any = 1;
325 while (1) {
326 $pid = waitpid(-1, WNOHANG);
327 last if $pid <= 0;
328 $finished_any = 1;
330 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
331 if ($?) {
332 # XXX- we currently don't care
334 if (@child) {
335 delete $child[0]->{'pid'};
336 reap_one_job($child[0]);
339 @running = grep { $_->{'finished'} == 0 } @running;
340 $finished_any;
343 sub have_intensive_jobs {
344 grep { $_->{'intensive'} == 1 } @running;
347 sub ts {
348 "[". scalar(localtime) ."] ";
351 sub get_load_info {
352 if ($^O eq "linux") {
353 # Read /proc/loadavg on Linux
354 open(LOADAV, '<', '/proc/loadavg') or return undef;
355 my $loadinfo = <LOADAV>;
356 close LOADAV;
357 return (split(/\s/, $loadinfo, 4))[0..2];
358 } else {
359 # Read the output of uptime everywhere else (works on Linux too)
360 open(LOADAV, '-|', 'uptime') or return undef;
361 my $loadinfo = <LOADAV>;
362 close LOADAV;
363 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
364 return ($1, $2, $3);
368 sub run_queue {
369 my $last_progress = time;
370 my $last_checkload = time - 5;
371 my $current_load = $load_trig;
372 my $overloaded = 0;
373 my $load_info = '';
374 $jobs_executed = 0;
375 $jobs_skipped = 0;
376 @jobs_killed = ();
377 if ($progress) {
378 my $s = @queue == 1 ? '' : 's';
379 ferror("--- Processing %d queued job$s", scalar(@queue));
381 $SIG{'INT'} = \&handle_softexit;
382 $SIG{'TERM'} = \&handle_exit;
383 while (@queue || @running) {
384 reap_hanging_jobs();
385 my $proceed_immediately = reap_finished_jobs();
386 # Check current system load
387 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
388 my $current_load = $loadinfo[0];
389 if ($current_load > $load_trig && !$overloaded) {
390 $overloaded = 1;
391 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
392 } elsif ($current_load < $load_untrig && $overloaded) {
393 $overloaded = 0;
394 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
396 if ($overloaded) {
397 $load_info = ', paused (load '. $current_load .')';
398 } else {
399 $load_info = ', load '. $current_load;
401 $last_checkload = time;
403 # Status output
404 if ($progress && (time - $last_progress) >= 60) {
405 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
406 if (@running) {
407 my @run_status;
408 for (@running) {
409 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
411 error("STATUS: currently running: ". join(', ', @run_status));
413 $last_progress = time;
415 # Back off if we're too busy
416 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
417 sleep 1 unless $proceed_immediately;
418 next;
420 # Run next
421 run_job(shift(@queue)) if @queue;
423 if ($progress) {
424 my $s = $jobs_executed == 1 ? '' : 's';
425 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
429 sub run_perpetually {
430 if (-e $lockfile) {
431 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
433 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
434 print LOCK $$;
435 close LOCK;
436 $locked = 1;
438 while ($perpetual) {
439 queue_all();
440 run_queue();
441 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
443 unlink $lockfile;
446 ######### Helpers {{{1
448 sub error($) {
449 print STDERR ts().shift()."\n";
451 sub ferror(@) {
452 error(sprintf($_[0], @_[1..$#_]));
454 sub fatal($) {
455 error(shift);
456 exit 1;
459 ######### Main {{{1
461 close(DATA) if fileno(DATA);
462 # Parse options
463 Getopt::Long::Configure('bundling');
464 my $parse_res = GetOptions(
465 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
466 'quiet|q' => \$quiet,
467 'progress|P' => \$progress,
468 'kill-after|k=i' => \$kill_after,
469 'max-parallel|p=i' => \$max_par,
470 'max-intensive-parallel|i=i' => \$max_par_intensive,
471 'load-triggers=s' => \$load_triggers,
472 'restart-delay|d=i' => \$restart_delay,
473 'lockfile|l=s' => \$lockfile,
474 'all-once|a' => \$all_once,
475 'one|o=s' => \$one,
476 ) || pod2usage(2);
477 fatal("Error: can only use one out of --all-once and --one")
478 if ($all_once && $one);
480 unless ($quiet) {
481 $ENV{'show_progress'} = '1';
482 $progress = 1;
485 $load_triggers = '0,0' unless defined((get_load_info())[0]);
486 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
488 if ($one) {
489 queue_one($one);
490 run_queue();
491 exit;
494 if ($all_once) {
495 queue_all();
496 run_queue();
497 exit;
500 run_perpetually();
502 ########## Documentation {{{1
504 __END__
506 =head1 NAME
508 jobd.pl - Perform Girocco maintenance jobs
510 =head1 SYNOPSIS
512 jobd.pl [options]
514 Options:
515 -h | --help detailed instructions
516 -q | --quiet run quietly
517 -P | --progress show occasional status updates
518 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
519 -p NUM | --max-parallel NUM how many jobs to run at the same time
520 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
521 at the same time
522 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
523 TRIG and resume at load below UNTRIG
524 -d NUM | --restart-delay SECONDS wait for this many seconds between
525 queue runs
526 -l FILE | --lockfile FILE create a lockfile in the given
527 location
528 -a | --all-once process the list only once
529 -o PRJNAME | --one PRJNAME process only one project
531 =head1 OPTIONS
533 =over 8
535 =item B<--help>
537 Print the full description of jobd.pl's options.
539 =item B<--quiet>
541 Suppress non-error messages, e.g. for use when running this task as a cronjob.
543 =item B<--progress>
545 Show information about the current status of the job queue occasionally. This
546 is automatically enabled if --quiet is not given.
548 =item B<--kill-after SECONDS>
550 Kill supervised jobs after a certain time to avoid hanging the daemon.
552 =item B<--max-parallel NUM>
554 Run no more than that many jobs at the same time. The default is the number
555 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
557 =item B<--max-intensive-parallel NUM>
559 Run no more than that many resource-hungry jobs at the same time. Right now,
560 this refers to repacking jobs. The default is 1.
562 =item B<--load-triggers TRIG,UNTRIG>
564 If the first system load average (1 minute average) exceeds TRIG, don't queue
565 any more jobs until it goes below UNTRIG. This is currently only supported on
566 Linux and any other platforms that provide an uptime command with load average
567 output.
569 If both values are zero, load checks are disabled. The default is the number
570 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
571 be determined, the default is 6,3.
573 =item B<--restart-delay NUM>
575 After processing the queue, wait this many seconds until the queue is
576 restarted. The default is 300 seconds.
578 =item B<--lockfile FILE>
580 For perpetual operation, specify the full path to a lock file to create and
581 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
582 where $suffix is a 6-character string uniquely determined by the name and
583 nicknme of this Girocco instance. The pid of the running jobd instance will
584 be written to the lock file.
586 =item B<--all-once>
588 Instead of perpetually processing all projects over and over again, process
589 them just once and then exit.
591 =item B<--one PRJNAME>
593 Process only the given project (given as just the project name without C<.git>
594 suffix) and then exit.
596 =back
598 =head1 DESCRIPTION
600 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
601 all the repositories and updates mirrored repositories and repacks push-mode
602 repositories when needed.
604 =cut