fast-export.git: pick up latest and adopt changes
[girocco.git] / jobd / jobd.pl
blob4e129b25700db562012ee761faecfc6883df0cab
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
22 # Options
23 my $quiet;
24 my $progress;
25 my $cpus = online_cpus;
26 my $kill_after = 900;
27 my $max_par = $cpus ? $cpus * 2 : 8;
28 my $max_par_intensive = 1;
29 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
30 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
31 my $restart_delay = 300;
32 my $all_once;
33 my $one;
35 my ($load_trig, $load_untrig);
37 ######### Jobs {{{1
39 sub update_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
44 -e get_project_path($p).".bypass_fetch") {
45 job_skip($job);
46 return setup_gc($job);
48 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring not complete yet");
50 return;
52 if (-e get_project_path($p).".clone_failed") {
53 job_skip($job, "initial mirroring failed");
54 # Still need to gc non top-level clones even if they've failed
55 # otherwise the objects copied into them from the parent will
56 # just accumulate without bound
57 setup_gc($job) if $p =~ m,/,;
58 return;
60 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
61 job_skip($job, "not needed right now, last run at $ts");
62 setup_gc($job);
63 return;
65 if (is_svn_clone($p)) {
66 # git svn can be very, very slow at times
67 $job->{'timeout_factor'} = 3;
69 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
72 sub gc_project {
73 my $job = shift;
74 my $p = $job->{'project'};
75 check_project_exists($job) || return;
76 my $projpath = get_project_path($p);
77 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
78 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc" && ! -e "$projpath.needsgc")) {
79 job_skip($job);
80 return;
82 my $ts;
83 if (! -e "$projpath.needsgc" &&
84 ($ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval)))) {
85 job_skip($job, "not needed right now, last run at $ts");
86 return;
88 # allow garbage collection to run for longer than an update
89 $job->{'timeout_factor'} = 2;
90 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
93 sub setup_gc {
94 my $job = shift;
95 queue_job(
96 project => $job->{'project'},
97 type => 'gc',
98 command => \&gc_project,
99 intensive => 1,
103 sub check_project_exists {
104 my $job = shift;
105 my $p = $job->{'project'};
106 if (!-d get_project_path($p)) {
107 job_skip($job, "non-existent project");
108 return 0;
113 sub get_project_path {
114 "$Girocco::Config::reporoot/".shift().".git/";
117 sub is_operation_uptodate {
118 my ($project, $which, $threshold) = @_;
119 my $path = get_project_path($project);
120 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
121 defined($timestamp) or $timestamp = '';
122 chomp $timestamp;
123 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
124 (time - $unix_ts) <= $threshold ? $timestamp : undef;
127 sub is_svn_clone {
128 my ($project) = @_;
129 my $path = get_project_path($project);
130 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
131 defined($baseurl) or $baseurl = '';
132 chomp $baseurl;
133 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
134 defined($svnurl) or $svnurl = '';
135 chomp $svnurl;
136 return $baseurl =~ /^svn[:+]/i && $svnurl;
139 sub queue_one {
140 my $project = shift;
141 queue_job(
142 project => $project,
143 type => 'update',
144 command => \&update_project,
145 on_success => \&setup_gc,
146 on_error => \&setup_gc,
150 sub queue_all {
151 queue_one($_) for (Girocco::Project->get_full_list());
154 ######### Daemon operation {{{1
156 my @queue;
157 my @running;
158 my $perpetual = 1;
159 my $locked = 0;
160 my $jobs_executed;
161 my $jobs_skipped;
162 my @jobs_killed;
164 # Kills and reaps the specified pid. Returns exit status ($?) on success
165 # otherwise undef if process could not be killed or reaped
166 # First sends SIGTERM and if process does not exit within 15 seconds then SIGKILL
167 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
168 sub kill_gently {
169 my $targ = shift;
170 my $use_pg = shift || 0;
171 # Note that the docs for Perl's kill state that a negative signal
172 # number should be used to kill process groups and that while a
173 # a negative process id (and positive signal number) may also do that
174 # on some platforms, that's not portable.
175 my $pg = $use_pg ? -1 : 1;
176 my $harsh = time() + 15; # SIGKILL after this delay
177 my $count = kill(15*$pg, $targ); # SIGTERM is 15
178 my $reaped = waitpid($targ, WNOHANG);
179 return undef if $reaped < 0;
180 return $? if $reaped == $targ;
181 while ($count && time() < $harsh) {
182 select(undef, undef, undef, 0.2);
183 $reaped = waitpid($targ, WNOHANG);
184 return undef if $reaped < 0;
185 return $? if $reaped == $targ;
187 $harsh = time() + 2;
188 $count = kill(9*$pg, $targ); # SIGKILL is 9
189 $reaped = waitpid($targ, WNOHANG);
190 return undef if $reaped < 0;
191 return $? if $reaped == $targ;
192 # We should not need to wait to reap a SIGKILL, however, just in case
193 # the system doesn't make a SIGKILL'd process immediately reapable
194 # (perhaps under extremely heavy load) we accomodate a brief delay
195 while ($count && time() < $harsh) {
196 select(undef, undef, undef, 0.2);
197 $reaped = waitpid($targ, WNOHANG);
198 return undef if $reaped < 0;
199 return $? if $reaped == $targ;
201 return undef;
204 sub handle_softexit {
205 error("Waiting for outstanding jobs to finish... ".
206 "^C again to exit immediately");
207 @queue = ();
208 $perpetual = 0;
209 $SIG{'INT'} = \&handle_exit;
212 sub handle_exit {
213 error("Killing outstanding jobs, please be patient...");
214 $SIG{'TERM'} = 'IGNORE';
215 for (@running) {
216 kill_gently($_->{'pid'}, 1);
218 unlink $lockfile if ($locked);
219 exit(0);
222 sub queue_job {
223 my %opts = @_;
224 $opts{'queued_at'} = time;
225 $opts{'dont_run'} = 0;
226 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
227 push @queue, \%opts;
230 sub run_job {
231 my $job = shift;
233 push @running, $job;
234 $job->{'command'}->($job);
235 if ($job->{'dont_run'}) {
236 pop @running;
237 $jobs_skipped++;
238 return;
242 sub _job_name {
243 my $job = shift;
244 "[".$job->{'type'}."::".$job->{'project'}."]";
247 # Only one of those per job!
248 sub exec_job_command {
249 my ($job, $command, $err_only) = @_;
251 my $pid;
252 $job->{'finished'} = 0;
253 delete $job->{'pid'};
254 if (!defined($pid = fork)) {
255 error(_job_name($job) ." Can't fork job: $!");
256 $job->{'finished'} = 1;
257 return;
259 if (!$pid) {
260 # "Prevent" races
261 select(undef, undef, undef, 0.1);
263 open STDIN, '<', '/dev/null' || do {
264 error(_job_name($job) ."Can't read from /dev/null: $!");
265 exit 71; # EX_OSERR
267 if ($err_only) {
268 open STDOUT, '>', '/dev/null' || do {
269 error(_job_name($job) ." Can't write to /dev/null: $!");
270 exit 71; # EX_OSERR
273 # New process group so we can keep track of all of its children
274 if (!defined(POSIX::setpgid(0, 0))) {
275 error(_job_name($job) ." Can't create process group: $!");
276 exit 71; # EX_OSERR
279 exec @$command;
280 # Stop perl from complaining
281 exit 71; # EX_OSERR
283 $job->{'pid'} = $pid;
284 $job->{'started_at'} = time;
287 sub job_skip {
288 my ($job, $msg) = @_;
289 $job->{'dont_run'} = 1;
290 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
293 sub reap_hanging_jobs {
294 for (@running) {
295 my $factor = $_->{'timeout_factor'} || 1;
296 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
297 $_->{'finished'} = 1;
298 my $exitcode = kill_gently($_->{'pid'}, 1);
299 delete $_->{'pid'};
300 $_->{'killed'} = 1;
301 error(_job_name($_) ." KILLED due to timeout" .
302 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
303 push @jobs_killed, _job_name($_);
308 sub reap_one_job {
309 my $job = shift;
310 if (!$job->{'finished'}) {
311 $job->{'on_success'}->($job) if defined($job->{'on_success'});
312 $job->{'finished'} = 1;
313 $jobs_executed++;
314 } else {
315 $job->{'on_error'}->($job) if defined($job->{'on_error'});
319 sub reap_finished_jobs {
320 my $pid;
321 my $finished_any = 0;
322 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
323 delete $child->{'killed'};
324 reap_one_job($child);
325 $finished_any = 1;
327 while (1) {
328 $pid = waitpid(-1, WNOHANG);
329 last if $pid <= 0;
330 $finished_any = 1;
332 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
333 if ($?) {
334 # any non-zero exit status should trigger on_error
335 $child[0]->{'finished'} = 1 if @child;
337 if (@child) {
338 delete $child[0]->{'pid'};
339 reap_one_job($child[0]);
342 @running = grep { $_->{'finished'} == 0 } @running;
343 $finished_any;
346 sub have_intensive_jobs {
347 grep { $_->{'intensive'} == 1 } @running;
350 sub ts {
351 "[". scalar(localtime) ."] ";
354 sub get_load_info {
355 if ($^O eq "linux") {
356 # Read /proc/loadavg on Linux
357 open(LOADAV, '<', '/proc/loadavg') or return undef;
358 my $loadinfo = <LOADAV>;
359 close LOADAV;
360 return (split(/\s/, $loadinfo, 4))[0..2];
361 } else {
362 # Read the output of uptime everywhere else (works on Linux too)
363 open(LOADAV, '-|', 'uptime') or return undef;
364 my $loadinfo = <LOADAV>;
365 close LOADAV;
366 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
367 return ($1, $2, $3);
371 sub run_queue {
372 my $last_progress = time;
373 my $last_checkload = time - 5;
374 my $current_load = $load_trig;
375 my $overloaded = 0;
376 my $load_info = '';
377 $jobs_executed = 0;
378 $jobs_skipped = 0;
379 @jobs_killed = ();
380 if ($progress) {
381 my $s = @queue == 1 ? '' : 's';
382 ferror("--- Processing %d queued job$s", scalar(@queue));
384 $SIG{'INT'} = \&handle_softexit;
385 $SIG{'TERM'} = \&handle_exit;
386 while (@queue || @running) {
387 reap_hanging_jobs();
388 my $proceed_immediately = reap_finished_jobs();
389 # Check current system load
390 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
391 my $current_load = $loadinfo[0];
392 if ($current_load > $load_trig && !$overloaded) {
393 $overloaded = 1;
394 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
395 } elsif ($current_load < $load_untrig && $overloaded) {
396 $overloaded = 0;
397 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
399 if ($overloaded) {
400 $load_info = ', paused (load '. $current_load .')';
401 } else {
402 $load_info = ', load '. $current_load;
404 $last_checkload = time;
406 # Status output
407 if ($progress && (time - $last_progress) >= 60) {
408 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
409 if (@running) {
410 my @run_status;
411 for (@running) {
412 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
414 error("STATUS: currently running: ". join(', ', @run_status));
416 $last_progress = time;
418 # Back off if we're too busy
419 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
420 sleep 1 unless $proceed_immediately;
421 next;
423 # Run next
424 run_job(shift(@queue)) if @queue;
426 if ($progress) {
427 my $s = $jobs_executed == 1 ? '' : 's';
428 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
432 sub run_perpetually {
433 if (-e $lockfile) {
434 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
436 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
437 print LOCK $$;
438 close LOCK;
439 $locked = 1;
441 while ($perpetual) {
442 # touch ctime of lockfile to prevent it from being removed by /tmp cleaning
443 chmod 0444, $lockfile;
444 chmod 0644, $lockfile;
445 queue_all();
446 run_queue();
447 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
449 unlink $lockfile;
452 ######### Helpers {{{1
454 sub error($) {
455 print STDERR ts().shift()."\n";
457 sub ferror(@) {
458 error(sprintf($_[0], @_[1..$#_]));
460 sub fatal($) {
461 error(shift);
462 exit 1;
465 ######### Main {{{1
467 close(DATA) if fileno(DATA);
468 # Parse options
469 Getopt::Long::Configure('bundling');
470 my $parse_res = GetOptions(
471 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
472 'quiet|q' => \$quiet,
473 'progress|P' => \$progress,
474 'kill-after|k=i' => \$kill_after,
475 'max-parallel|p=i' => \$max_par,
476 'max-intensive-parallel|i=i' => \$max_par_intensive,
477 'load-triggers=s' => \$load_triggers,
478 'restart-delay|d=i' => \$restart_delay,
479 'lockfile|l=s' => \$lockfile,
480 'all-once|a' => \$all_once,
481 'one|o=s' => \$one,
482 ) || pod2usage(2);
483 fatal("Error: can only use one out of --all-once and --one")
484 if ($all_once && $one);
486 unless ($quiet) {
487 $ENV{'show_progress'} = '1';
488 $progress = 1;
491 $load_triggers = '0,0' unless defined((get_load_info())[0]);
492 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
494 if ($one) {
495 queue_one($one);
496 run_queue();
497 exit;
500 if ($all_once) {
501 queue_all();
502 run_queue();
503 exit;
506 run_perpetually();
508 ########## Documentation {{{1
510 __END__
512 =head1 NAME
514 jobd.pl - Perform Girocco maintenance jobs
516 =head1 SYNOPSIS
518 jobd.pl [options]
520 Options:
521 -h | --help detailed instructions
522 -q | --quiet run quietly
523 -P | --progress show occasional status updates
524 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
525 -p NUM | --max-parallel NUM how many jobs to run at the same time
526 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
527 at the same time
528 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
529 TRIG and resume at load below UNTRIG
530 -d NUM | --restart-delay SECONDS wait for this many seconds between
531 queue runs
532 -l FILE | --lockfile FILE create a lockfile in the given
533 location
534 -a | --all-once process the list only once
535 -o PRJNAME | --one PRJNAME process only one project
537 =head1 OPTIONS
539 =over 8
541 =item B<--help>
543 Print the full description of jobd.pl's options.
545 =item B<--quiet>
547 Suppress non-error messages, e.g. for use when running this task as a cronjob.
549 =item B<--progress>
551 Show information about the current status of the job queue occasionally. This
552 is automatically enabled if --quiet is not given.
554 =item B<--kill-after SECONDS>
556 Kill supervised jobs after a certain time to avoid hanging the daemon.
558 =item B<--max-parallel NUM>
560 Run no more than that many jobs at the same time. The default is the number
561 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
563 =item B<--max-intensive-parallel NUM>
565 Run no more than that many resource-hungry jobs at the same time. Right now,
566 this refers to repacking jobs. The default is 1.
568 =item B<--load-triggers TRIG,UNTRIG>
570 If the first system load average (1 minute average) exceeds TRIG, don't queue
571 any more jobs until it goes below UNTRIG. This is currently only supported on
572 Linux and any other platforms that provide an uptime command with load average
573 output.
575 If both values are zero, load checks are disabled. The default is the number
576 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
577 be determined, the default is 6,3.
579 =item B<--restart-delay NUM>
581 After processing the queue, wait this many seconds until the queue is
582 restarted. The default is 300 seconds.
584 =item B<--lockfile FILE>
586 For perpetual operation, specify the full path to a lock file to create and
587 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
588 where $suffix is a 6-character string uniquely determined by the name and
589 nicknme of this Girocco instance. The pid of the running jobd instance will
590 be written to the lock file.
592 =item B<--all-once>
594 Instead of perpetually processing all projects over and over again, process
595 them just once and then exit.
597 =item B<--one PRJNAME>
599 Process only the given project (given as just the project name without C<.git>
600 suffix) and then exit.
602 =back
604 =head1 DESCRIPTION
606 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
607 all the repositories and updates mirrored repositories and repacks push-mode
608 repositories when needed.
610 =cut