jobd.pl: avoid spawning git to read config info
[girocco.git] / jobd / jobd.pl
blobe2ad8224922bdbdbc779d3155f4898537c7fa2b1
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
22 # Options
23 my $quiet;
24 my $progress;
25 my $cpus = online_cpus;
26 my $kill_after = 900;
27 my $max_par = $cpus ? $cpus * 2 : 8;
28 my $max_par_intensive = 1;
29 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
30 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
31 my $restart_delay = 300;
32 my $all_once;
33 my $one;
35 my ($load_trig, $load_untrig);
37 ######### Jobs {{{1
39 sub update_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
44 -e get_project_path($p).".bypass_fetch") {
45 job_skip($job);
46 return setup_gc($job);
48 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring not complete yet");
50 return;
52 if (-e get_project_path($p).".clone_failed") {
53 job_skip($job, "initial mirroring failed");
54 # Still need to gc non top-level clones even if they've failed
55 # otherwise the objects copied into them from the parent will
56 # just accumulate without bound
57 setup_gc($job) if $p =~ m,/,;
58 return;
60 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
61 job_skip($job, "not needed right now, last run at $ts");
62 setup_gc($job);
63 return;
65 if (is_svn_clone($p)) {
66 # git svn can be very, very slow at times
67 $job->{'timeout_factor'} = 3;
69 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
72 sub gc_project {
73 my $job = shift;
74 my $p = $job->{'project'};
75 check_project_exists($job) || return;
76 my $projpath = get_project_path($p);
77 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
78 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc" && ! -e "$projpath.needsgc")) {
79 job_skip($job);
80 return;
82 my $ts;
83 if (! -e "$projpath.needsgc" &&
84 ($ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval)))) {
85 job_skip($job, "not needed right now, last run at $ts");
86 return;
88 # allow garbage collection to run for longer than an update
89 $job->{'timeout_factor'} = 2;
90 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
93 sub setup_gc {
94 my $job = shift;
95 queue_job(
96 project => $job->{'project'},
97 type => 'gc',
98 command => \&gc_project,
99 intensive => 1,
103 sub check_project_exists {
104 my $job = shift;
105 my $p = $job->{'project'};
106 if (!-d get_project_path($p)) {
107 job_skip($job, "non-existent project");
108 return 0;
113 sub get_project_path {
114 "$Girocco::Config::reporoot/".shift().".git/";
117 my $_last_config_path;
118 my $_last_config_id;
119 my $_last_config;
120 BEGIN {
121 $_last_config_path = "";
122 $_last_config_id = "";
123 $_last_config = {};
126 sub get_git_config {
127 my ($projdir, $name) = @_;
128 defined($projdir) && -d $projdir && -f "$projdir/config" or return undef;
129 my $cf = "$projdir/config";
130 my @stat = stat($cf);
131 @stat && $stat[7] && $stat[9] or return undef;
132 my $id = join(":", $stat[0], $stat[1], $stat[7], $stat[9]); # dev,ino,size,mtime
133 if ($_last_config_path ne $cf || $_last_config_id ne $id || ref($_last_config) ne 'HASH') {
134 my $data = read_config_file_hash($cf);
135 defined($data) or $data = {};
136 $_last_config_path = $_last_config_id = "";
137 $_last_config = $data;
138 $_last_config_id = $id;
139 $_last_config_path = $cf;
141 return $_last_config->{$name};
144 sub is_operation_uptodate {
145 my ($project, $which, $threshold) = @_;
146 my $path = get_project_path($project);
147 my $timestamp = get_git_config($path, "gitweb.$which");
148 defined($timestamp) or $timestamp = '';
149 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
150 (time - $unix_ts) <= $threshold ? $timestamp : undef;
153 sub is_svn_clone {
154 my ($project) = @_;
155 my $path = get_project_path($project);
156 my $baseurl = get_git_config($path, 'gitweb.baseurl');
157 defined($baseurl) or $baseurl = '';
158 my $svnurl = get_git_config($path, 'svn-remote.svn.url');
159 defined($svnurl) or $svnurl = '';
160 return $baseurl =~ /^svn[:+]/i && $svnurl;
163 sub queue_one {
164 my $project = shift;
165 queue_job(
166 project => $project,
167 type => 'update',
168 command => \&update_project,
169 on_success => \&setup_gc,
170 on_error => \&setup_gc,
174 sub queue_all {
175 queue_one($_) for (Girocco::Project->get_full_list());
178 ######### Daemon operation {{{1
180 my @queue;
181 my @running;
182 my $perpetual = 1;
183 my $locked = 0;
184 my $jobs_executed;
185 my $jobs_skipped;
186 my @jobs_killed;
188 # Kills and reaps the specified pid. Returns exit status ($?) on success
189 # otherwise undef if process could not be killed or reaped
190 # First sends SIGINT and if process does not exit within 15 seconds then SIGKILL
191 # We used to send SIGTERM instead of SIGINT, but by using SIGINT we can take
192 # advantage of "tee -i" in our update scripts and really anything we're killing
193 # should respond the same to either SIGINT or SIGTERM and exit gracefully.
194 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
195 sub kill_gently {
196 my $targ = shift;
197 my $use_pg = shift || 0;
198 # Note that the docs for Perl's kill state that a negative signal
199 # number should be used to kill process groups and that while a
200 # a negative process id (and positive signal number) may also do that
201 # on some platforms, that's not portable.
202 my $pg = $use_pg ? -1 : 1;
203 my $harsh = time() + 15; # SIGKILL after this delay
204 my $count = kill(2*$pg, $targ); # SIGINT is 2
205 my $reaped = waitpid($targ, WNOHANG);
206 return undef if $reaped < 0;
207 return $? if $reaped == $targ;
208 while ($count && time() < $harsh) {
209 select(undef, undef, undef, 0.2);
210 $reaped = waitpid($targ, WNOHANG);
211 return undef if $reaped < 0;
212 return $? if $reaped == $targ;
214 $harsh = time() + 2;
215 $count = kill(9*$pg, $targ); # SIGKILL is 9
216 $reaped = waitpid($targ, WNOHANG);
217 return undef if $reaped < 0;
218 return $? if $reaped == $targ;
219 # We should not need to wait to reap a SIGKILL, however, just in case
220 # the system doesn't make a SIGKILL'd process immediately reapable
221 # (perhaps under extremely heavy load) we accomodate a brief delay
222 while ($count && time() < $harsh) {
223 select(undef, undef, undef, 0.2);
224 $reaped = waitpid($targ, WNOHANG);
225 return undef if $reaped < 0;
226 return $? if $reaped == $targ;
228 return undef;
231 sub handle_softexit {
232 error("Waiting for outstanding jobs to finish... ".
233 "^C again to exit immediately");
234 @queue = ();
235 $perpetual = 0;
236 $SIG{'INT'} = \&handle_exit;
239 sub handle_exit {
240 error("Killing outstanding jobs, please be patient...");
241 $SIG{'TERM'} = 'IGNORE';
242 for (@running) {
243 kill_gently($_->{'pid'}, 1);
245 unlink $lockfile if ($locked);
246 exit(0);
249 sub queue_job {
250 my %opts = @_;
251 $opts{'queued_at'} = time;
252 $opts{'dont_run'} = 0;
253 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
254 push @queue, \%opts;
257 sub run_job {
258 my $job = shift;
260 push @running, $job;
261 $job->{'command'}->($job);
262 if ($job->{'dont_run'}) {
263 pop @running;
264 $jobs_skipped++;
265 return;
269 sub _job_name {
270 my $job = shift;
271 "[".$job->{'type'}."::".$job->{'project'}."]";
274 # Only one of those per job!
275 sub exec_job_command {
276 my ($job, $command, $err_only) = @_;
278 my $pid;
279 $job->{'finished'} = 0;
280 delete $job->{'pid'};
281 if (!defined($pid = fork)) {
282 error(_job_name($job) ." Can't fork job: $!");
283 $job->{'finished'} = 1;
284 return;
286 if (!$pid) {
287 # "Prevent" races
288 select(undef, undef, undef, 0.1);
290 open STDIN, '<', '/dev/null' || do {
291 error(_job_name($job) ."Can't read from /dev/null: $!");
292 exit 71; # EX_OSERR
294 if ($err_only) {
295 open STDOUT, '>', '/dev/null' || do {
296 error(_job_name($job) ." Can't write to /dev/null: $!");
297 exit 71; # EX_OSERR
300 # New process group so we can keep track of all of its children
301 if (!defined(POSIX::setpgid(0, 0))) {
302 error(_job_name($job) ." Can't create process group: $!");
303 exit 71; # EX_OSERR
306 exec @$command;
307 # Stop perl from complaining
308 exit 71; # EX_OSERR
310 $job->{'pid'} = $pid;
311 $job->{'started_at'} = time;
314 sub job_skip {
315 my ($job, $msg) = @_;
316 $job->{'dont_run'} = 1;
317 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
320 sub reap_hanging_jobs {
321 for (@running) {
322 my $factor = $_->{'timeout_factor'} || 1;
323 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
324 $_->{'finished'} = 1;
325 my $exitcode = kill_gently($_->{'pid'}, 1);
326 delete $_->{'pid'};
327 $_->{'killed'} = 1;
328 error(_job_name($_) ." KILLED due to timeout" .
329 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
330 push @jobs_killed, _job_name($_);
335 sub reap_one_job {
336 my $job = shift;
337 if (!$job->{'finished'}) {
338 $job->{'on_success'}->($job) if defined($job->{'on_success'});
339 $job->{'finished'} = 1;
340 $jobs_executed++;
341 } else {
342 $job->{'on_error'}->($job) if defined($job->{'on_error'});
346 sub reap_finished_jobs {
347 my $pid;
348 my $finished_any = 0;
349 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
350 delete $child->{'killed'};
351 reap_one_job($child);
352 $finished_any = 1;
354 while (1) {
355 $pid = waitpid(-1, WNOHANG);
356 last if $pid <= 0;
357 $finished_any = 1;
359 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
360 if ($?) {
361 # any non-zero exit status should trigger on_error
362 $child[0]->{'finished'} = 1 if @child;
364 if (@child) {
365 delete $child[0]->{'pid'};
366 reap_one_job($child[0]);
369 @running = grep { $_->{'finished'} == 0 } @running;
370 $finished_any;
373 sub have_intensive_jobs {
374 grep { $_->{'intensive'} == 1 } @running;
377 sub ts {
378 "[". scalar(localtime) ."] ";
381 sub get_load_info {
382 if ($^O eq "linux") {
383 # Read /proc/loadavg on Linux
384 open(LOADAV, '<', '/proc/loadavg') or return undef;
385 my $loadinfo = <LOADAV>;
386 close LOADAV;
387 return (split(/\s/, $loadinfo, 4))[0..2];
388 } else {
389 # Read the output of uptime everywhere else (works on Linux too)
390 open(LOADAV, '-|', 'uptime') or return undef;
391 my $loadinfo = <LOADAV>;
392 close LOADAV;
393 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
394 return ($1, $2, $3);
398 sub run_queue {
399 my $last_progress = time;
400 my $last_checkload = time - 5;
401 my $current_load = $load_trig;
402 my $overloaded = 0;
403 my $load_info = '';
404 $jobs_executed = 0;
405 $jobs_skipped = 0;
406 @jobs_killed = ();
407 if ($progress) {
408 my $s = @queue == 1 ? '' : 's';
409 ferror("--- Processing %d queued job$s", scalar(@queue));
411 $SIG{'INT'} = \&handle_softexit;
412 $SIG{'TERM'} = \&handle_exit;
413 while (@queue || @running) {
414 reap_hanging_jobs();
415 my $proceed_immediately = reap_finished_jobs();
416 # Check current system load
417 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
418 my $current_load = $loadinfo[0];
419 if ($current_load > $load_trig && !$overloaded) {
420 $overloaded = 1;
421 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
422 } elsif ($current_load < $load_untrig && $overloaded) {
423 $overloaded = 0;
424 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
426 if ($overloaded) {
427 $load_info = ', paused (load '. $current_load .')';
428 } else {
429 $load_info = ', load '. $current_load;
431 $last_checkload = time;
433 # Status output
434 if ($progress && (time - $last_progress) >= 60) {
435 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
436 if (@running) {
437 my @run_status;
438 for (@running) {
439 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
441 error("STATUS: currently running: ". join(', ', @run_status));
443 $last_progress = time;
445 # Back off if we're too busy
446 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
447 sleep 1 unless $proceed_immediately;
448 next;
450 # Run next
451 run_job(shift(@queue)) if @queue;
453 if ($progress) {
454 my $s = $jobs_executed == 1 ? '' : 's';
455 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
459 sub run_perpetually {
460 if (-e $lockfile) {
461 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
463 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
464 print LOCK $$;
465 close LOCK;
466 $locked = 1;
468 while ($perpetual) {
469 # touch ctime of lockfile to prevent it from being removed by /tmp cleaning
470 chmod 0444, $lockfile;
471 chmod 0644, $lockfile;
472 queue_all();
473 run_queue();
474 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
476 unlink $lockfile;
479 ######### Helpers {{{1
481 sub error($) {
482 print STDERR ts().shift()."\n";
484 sub ferror(@) {
485 error(sprintf($_[0], @_[1..$#_]));
487 sub fatal($) {
488 error(shift);
489 exit 1;
492 ######### Main {{{1
494 chdir "/";
495 close(DATA) if fileno(DATA);
496 # Parse options
497 Getopt::Long::Configure('bundling');
498 my $parse_res = GetOptions(
499 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
500 'quiet|q' => \$quiet,
501 'progress|P' => \$progress,
502 'kill-after|k=i' => \$kill_after,
503 'max-parallel|p=i' => \$max_par,
504 'max-intensive-parallel|i=i' => \$max_par_intensive,
505 'load-triggers=s' => \$load_triggers,
506 'restart-delay|d=i' => \$restart_delay,
507 'lockfile|l=s' => \$lockfile,
508 'all-once|a' => \$all_once,
509 'one|o=s' => \$one,
510 ) || pod2usage(2);
511 fatal("Error: can only use one out of --all-once and --one")
512 if ($all_once && $one);
514 unless ($quiet) {
515 $ENV{'show_progress'} = '1';
516 $progress = 1;
519 $load_triggers = '0,0' unless defined((get_load_info())[0]);
520 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
522 if ($one) {
523 queue_one($one);
524 run_queue();
525 exit;
528 if ($all_once) {
529 queue_all();
530 run_queue();
531 exit;
534 run_perpetually();
536 ########## Documentation {{{1
538 __END__
540 =head1 NAME
542 jobd.pl - Perform Girocco maintenance jobs
544 =head1 SYNOPSIS
546 jobd.pl [options]
548 Options:
549 -h | --help detailed instructions
550 -q | --quiet run quietly
551 -P | --progress show occasional status updates
552 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
553 -p NUM | --max-parallel NUM how many jobs to run at the same time
554 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
555 at the same time
556 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
557 TRIG and resume at load below UNTRIG
558 -d NUM | --restart-delay SECONDS wait for this many seconds between
559 queue runs
560 -l FILE | --lockfile FILE create a lockfile in the given
561 location
562 -a | --all-once process the list only once
563 -o PRJNAME | --one PRJNAME process only one project
565 =head1 OPTIONS
567 =over 8
569 =item B<--help>
571 Print the full description of jobd.pl's options.
573 =item B<--quiet>
575 Suppress non-error messages, e.g. for use when running this task as a cronjob.
577 =item B<--progress>
579 Show information about the current status of the job queue occasionally. This
580 is automatically enabled if --quiet is not given.
582 =item B<--kill-after SECONDS>
584 Kill supervised jobs after a certain time to avoid hanging the daemon.
586 =item B<--max-parallel NUM>
588 Run no more than that many jobs at the same time. The default is the number
589 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
591 =item B<--max-intensive-parallel NUM>
593 Run no more than that many resource-hungry jobs at the same time. Right now,
594 this refers to repacking jobs. The default is 1.
596 =item B<--load-triggers TRIG,UNTRIG>
598 If the first system load average (1 minute average) exceeds TRIG, don't queue
599 any more jobs until it goes below UNTRIG. This is currently only supported on
600 Linux and any other platforms that provide an uptime command with load average
601 output.
603 If both values are zero, load checks are disabled. The default is the number
604 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
605 be determined, the default is 6,3.
607 =item B<--restart-delay NUM>
609 After processing the queue, wait this many seconds until the queue is
610 restarted. The default is 300 seconds.
612 =item B<--lockfile FILE>
614 For perpetual operation, specify the full path to a lock file to create and
615 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
616 where $suffix is a 6-character string uniquely determined by the name and
617 nicknme of this Girocco instance. The pid of the running jobd instance will
618 be written to the lock file.
620 =item B<--all-once>
622 Instead of perpetually processing all projects over and over again, process
623 them just once and then exit.
625 =item B<--one PRJNAME>
627 Process only the given project (given as just the project name without C<.git>
628 suffix) and then exit.
630 =back
632 =head1 DESCRIPTION
634 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
635 all the repositories and updates mirrored repositories and repacks push-mode
636 repositories when needed.
638 =cut