jobd.pl: eliminate unnecessary use of the shell
[girocco.git] / jobd / jobd.pl
blob11e3ba05087c6bff73c3510b0fe86f428ff01ee2
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
21 # Options
22 my $quiet;
23 my $progress;
24 my $kill_after = 900;
25 my $max_par = 20;
26 my $max_par_intensive = 1;
27 my $load_triggers = '10,2';
28 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
29 my $restart_delay = 60;
30 my $all_once;
31 my $one;
33 my ($load_trig, $load_untrig);
35 ######### Jobs {{{1
37 sub update_project {
38 my $job = shift;
39 my $p = $job->{'project'};
40 check_project_exists($job) || return;
41 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
42 -e get_project_path($p).".bypass_fetch") {
43 job_skip($job);
44 return setup_gc($job);
46 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
47 job_skip($job, "initial mirroring not complete yet");
48 return;
50 if (-e get_project_path($p).".clone_failed") {
51 job_skip($job, "initial mirroring failed");
52 # Still need to gc non top-level clones even if they've failed
53 # otherwise the objects copied into them from the parent will
54 # just accumulate without bound
55 setup_gc($job) if $p =~ m,/,;
56 return;
58 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
59 job_skip($job, "not needed right now, last run at $ts");
60 setup_gc($job);
61 return;
63 if (is_svn_clone($p)) {
64 # git svn can be very, very slow at times
65 $job->{'timeout_factor'} = 3;
67 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
70 sub gc_project {
71 my $job = shift;
72 my $p = $job->{'project'};
73 check_project_exists($job) || return;
74 if (-e get_project_path($p).".nogc" || -e get_project_path($p).".bypass") {
75 job_skip($job);
76 return;
78 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
79 job_skip($job, "not needed right now, last run at $ts");
80 return;
82 # allow garbage collection to run for longer than an update
83 $job->{'timeout_factor'} = 2;
84 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
87 sub setup_gc {
88 my $job = shift;
89 queue_job(
90 project => $job->{'project'},
91 type => 'gc',
92 command => \&gc_project,
93 intensive => 1,
97 sub check_project_exists {
98 my $job = shift;
99 my $p = $job->{'project'};
100 if (!-d get_project_path($p)) {
101 job_skip($job, "non-existent project");
102 return 0;
107 sub get_project_path {
108 "$Girocco::Config::reporoot/".shift().".git/";
111 sub is_operation_uptodate {
112 my ($project, $which, $threshold) = @_;
113 my $path = get_project_path($project);
114 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
115 defined($timestamp) or $timestamp = '';
116 chomp $timestamp;
117 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
118 (time - $unix_ts) <= $threshold ? $timestamp : undef;
121 sub is_svn_clone {
122 my ($project) = @_;
123 my $path = get_project_path($project);
124 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
125 defined($baseurl) or $baseurl = '';
126 chomp $baseurl;
127 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
128 defined($svnurl) or $svnurl = '';
129 chomp $svnurl;
130 return $baseurl =~ /^svn[:+]/i && $svnurl;
133 sub queue_one {
134 my $project = shift;
135 queue_job(
136 project => $project,
137 type => 'update',
138 command => \&update_project,
139 on_success => \&setup_gc,
140 on_error => \&setup_gc,
144 sub queue_all {
145 queue_one($_) for (Girocco::Project->get_full_list());
148 ######### Daemon operation {{{1
150 my @queue;
151 my @running;
152 my $perpetual = 1;
153 my $locked = 0;
154 my $jobs_executed;
155 my $jobs_skipped;
156 my @jobs_killed;
158 # Kills and reaps the specified pid. Returns exit status ($?) on success
159 # otherwise undef if process could not be killed or reaped
160 # First sends SIGTERM and if process does not exit within 15 seconds then SIGKILL
161 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
162 sub kill_gently {
163 my $targ = shift;
164 my $use_pg = shift || 0;
165 # Note that the docs for Perl's kill state that a negative signal
166 # number should be used to kill process groups and that while a
167 # a negative process id (and positive signal number) may also do that
168 # on some platforms, that's not portable.
169 my $pg = $use_pg ? -1 : 1;
170 my $harsh = time() + 15; # SIGKILL after this delay
171 my $count = kill(15*$pg, $targ); # SIGTERM is 15
172 my $reaped = waitpid($targ, WNOHANG);
173 return undef if $reaped < 0;
174 return $? if $reaped == $targ;
175 while ($count && time() < $harsh) {
176 select(undef, undef, undef, 0.2);
177 $reaped = waitpid($targ, WNOHANG);
178 return undef if $reaped < 0;
179 return $? if $reaped == $targ;
181 $harsh = time() + 2;
182 $count = kill(9*$pg, $targ); # SIGKILL is 9
183 $reaped = waitpid($targ, WNOHANG);
184 return undef if $reaped < 0;
185 return $? if $reaped == $targ;
186 # We should not need to wait to reap a SIGKILL, however, just in case
187 # the system doesn't make a SIGKILL'd process immediately reapable
188 # (perhaps under extremely heavy load) we accomodate a brief delay
189 while ($count && time() < $harsh) {
190 select(undef, undef, undef, 0.2);
191 $reaped = waitpid($targ, WNOHANG);
192 return undef if $reaped < 0;
193 return $? if $reaped == $targ;
195 return undef;
198 sub handle_softexit {
199 error("Waiting for outstanding jobs to finish... ".
200 "^C again to exit immediately");
201 @queue = ();
202 $perpetual = 0;
203 $SIG{'INT'} = \&handle_exit;
206 sub handle_exit {
207 error("Killing outstanding jobs, please be patient...");
208 $SIG{'TERM'} = 'IGNORE';
209 for (@running) {
210 kill_gently($_->{'pid'}, 1);
212 unlink $lockfile if ($locked);
213 exit(0);
216 sub queue_job {
217 my %opts = @_;
218 $opts{'queued_at'} = time;
219 $opts{'dont_run'} = 0;
220 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
221 push @queue, \%opts;
224 sub run_job {
225 my $job = shift;
227 push @running, $job;
228 $job->{'command'}->($job);
229 if ($job->{'dont_run'}) {
230 pop @running;
231 $jobs_skipped++;
232 return;
236 sub _job_name {
237 my $job = shift;
238 "[".$job->{'type'}."::".$job->{'project'}."]";
241 # Only one of those per job!
242 sub exec_job_command {
243 my ($job, $command, $err_only) = @_;
245 my $pid;
246 if (!defined($pid = fork)) {
247 error(_job_name($job) ." Can't fork job: $!");
248 $job->{'finished'} = 1;
249 return;
251 if (!$pid) {
252 open STDIN, '<', '/dev/null' || do {
253 error(_job_name($job) ."Can't read from /dev/null: $!");
254 $job->{'finished'} = 1;
255 return;
257 if ($err_only) {
258 open STDOUT, '>', '/dev/null' || do {
259 error(_job_name($job) ." Can't write to /dev/null: $!");
260 $job->{'finished'} = 1;
261 return;
264 # New process group so we can keep track of all of its children
265 if (!defined(POSIX::setpgid(0, 0))) {
266 error(_job_name($job) ." Can't create process group: $!");
267 $job->{'finished'} = 1;
268 return;
270 # "Prevent" races
271 select(undef, undef, undef, 0.1);
272 exec @$command;
273 # Stop perl from complaining
274 exit $?;
276 $job->{'pid'} = $pid;
277 $job->{'finished'} = 0;
278 $job->{'started_at'} = time;
281 sub job_skip {
282 my ($job, $msg) = @_;
283 $job->{'dont_run'} = 1;
284 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
287 sub reap_hanging_jobs {
288 for (@running) {
289 my $factor = $_->{'timeout_factor'} || 1;
290 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
291 $_->{'finished'} = 1;
292 my $exitcode = kill_gently($_->{'pid'}, 1);
293 delete $_->{'pid'};
294 $_->{'killed'} = 1;
295 error(_job_name($_) ." KILLED due to timeout" .
296 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
297 push @jobs_killed, _job_name($_);
302 sub reap_one_job {
303 my $job = shift;
304 if (!$job->{'finished'}) {
305 $job->{'on_success'}->($job) if defined($job->{'on_success'});
306 $job->{'finished'} = 1;
307 $jobs_executed++;
308 } else {
309 $job->{'on_error'}->($job) if defined($job->{'on_error'});
313 sub reap_finished_jobs {
314 my $pid;
315 my $finished_any = 0;
316 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
317 delete $child->{'killed'};
318 reap_one_job($child);
319 $finished_any = 1;
321 while (1) {
322 $pid = waitpid(-1, WNOHANG);
323 last if $pid <= 0;
324 $finished_any = 1;
326 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
327 if ($?) {
328 # XXX- we currently don't care
330 if (@child) {
331 delete $child[0]->{'pid'};
332 reap_one_job($child[0]);
335 @running = grep { $_->{'finished'} == 0 } @running;
336 $finished_any;
339 sub have_intensive_jobs {
340 grep { $_->{'intensive'} == 1 } @running;
343 sub ts {
344 "[". scalar(localtime) ."] ";
347 sub get_load_info {
348 if ($^O eq "linux") {
349 # Read /proc/loadavg on Linux
350 open(LOADAV, '<', '/proc/loadavg') or return undef;
351 my $loadinfo = <LOADAV>;
352 close LOADAV;
353 return (split(/\s/, $loadinfo, 4))[0..2];
354 } else {
355 # Read the output of uptime everywhere else (works on Linux too)
356 open(LOADAV, '-|', 'uptime') or return undef;
357 my $loadinfo = <LOADAV>;
358 close LOADAV;
359 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
360 return ($1, $2, $3);
364 sub run_queue {
365 my $last_progress = time;
366 my $last_checkload = time - 5;
367 my $current_load = $load_trig;
368 my $overloaded = 0;
369 my $load_info = '';
370 $jobs_executed = 0;
371 $jobs_skipped = 0;
372 @jobs_killed = ();
373 if ($progress) {
374 ferror("--- Processing %d queued jobs", scalar(@queue));
376 $SIG{'INT'} = \&handle_softexit;
377 $SIG{'TERM'} = \&handle_exit;
378 while (@queue || @running) {
379 reap_hanging_jobs();
380 my $proceed_immediately = reap_finished_jobs();
381 # Check current system load
382 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
383 my $current_load = $loadinfo[0];
384 if ($current_load > $load_trig && !$overloaded) {
385 $overloaded = 1;
386 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
387 } elsif ($current_load < $load_untrig && $overloaded) {
388 $overloaded = 0;
389 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
391 if ($overloaded) {
392 $load_info = ', paused (load '. $current_load .')';
393 } else {
394 $load_info = ', load '. $current_load;
396 $last_checkload = time;
398 # Status output
399 if ($progress && (time - $last_progress) >= 60) {
400 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
401 if (@running) {
402 my @run_status;
403 for (@running) {
404 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
406 error("STATUS: currently running: ". join(', ', @run_status));
408 $last_progress = time;
410 # Back off if we're too busy
411 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
412 sleep 1 unless $proceed_immediately;
413 next;
415 # Run next
416 run_job(shift(@queue)) if @queue;
418 if ($progress) {
419 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
423 sub run_perpetually {
424 if (-e $lockfile) {
425 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
427 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
428 print LOCK $$;
429 close LOCK;
430 $locked = 1;
432 while ($perpetual) {
433 queue_all();
434 run_queue();
435 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
437 unlink $lockfile;
440 ######### Helpers {{{1
442 sub error($) {
443 print STDERR ts().shift()."\n";
445 sub ferror(@) {
446 error(sprintf($_[0], @_[1..$#_]));
448 sub fatal($) {
449 error(shift);
450 exit 1;
453 ######### Main {{{1
455 # Parse options
456 Getopt::Long::Configure('bundling');
457 my $parse_res = GetOptions(
458 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
459 'quiet|q' => \$quiet,
460 'progress|P' => \$progress,
461 'kill-after|k=i' => \$kill_after,
462 'max-parallel|p=i' => \$max_par,
463 'max-intensive-parallel|i=i' => \$max_par_intensive,
464 'load-triggers=s' => \$load_triggers,
465 'restart-delay|d=i' => \$restart_delay,
466 'lockfile|l=s' => \$lockfile,
467 'all-once|a' => \$all_once,
468 'one|o=s' => \$one,
469 ) || pod2usage(2);
470 fatal("Error: can only use one out of --all-once and --one")
471 if ($all_once && $one);
473 unless ($quiet) {
474 $ENV{'show_progress'} = '1';
475 $progress = 1;
478 $load_triggers = '0,0' unless defined((get_load_info())[0]);
479 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
481 if ($one) {
482 queue_one($one);
483 run_queue();
484 exit;
487 if ($all_once) {
488 queue_all();
489 run_queue();
490 exit;
493 run_perpetually();
495 ########## Documentation {{{1
497 __END__
499 =head1 NAME
501 jobd - Perform Girocco maintenance jobs
503 =head1 SYNOPSIS
505 jobd [options]
507 Options:
508 -h | --help detailed instructions
509 -q | --quiet run quietly
510 -P | --progress show occasional status updates
511 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
512 -p NUM | --max-parallel NUM how many jobs to run at the same time
513 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
514 at the same time
515 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
516 TRIG and resume at load below UNTRIG
517 -d NUM | --restart-delay SECONDS wait for this many seconds between
518 queue runs
519 -l FILE | --lockfile FILE create a lockfile in the given
520 location
521 -a | --all-once process the list only once
522 -o PRJNAME | --one PRJNAME process only one project
524 =head1 OPTIONS
526 =over 8
528 =item B<--help>
530 Print the full description of jobd's options.
532 =item B<--quiet>
534 Suppress non-error messages, e.g. for use when running this task as a cronjob.
536 =item B<--progress>
538 Show information about the current status of the job queue occasionally. This
539 is automatically enabled if --quiet is not given.
541 =item B<--kill-after SECONDS>
543 Kill supervised jobs after a certain time to avoid hanging the daemon.
545 =item B<--max-parallel NUM>
547 Run no more than that many jobs at the same time.
549 =item B<--max-intensive-parallel NUM>
551 Run no more than that many resource-hungry jobs at the same time. Right now,
552 this refers to repacking jobs.
554 =item B<--load-triggers TRIG,UNTRIG>
556 If the first system load average (1 minute average) exceeds TRIG, don't queue
557 any more jobs until it goes below UNTRIG. This is currently only supported on
558 Linux and any other platforms that provide an uptime command with load average
559 output.
561 If both values are zero, load checks are disabled. Note that this is not the
562 default.
564 =item B<--restart-delay NUM>
566 After processing the queue, wait this many seconds until the queue is
567 restarted.
569 =item B<--lockfile FILE>
571 For perpetual operation, create a lockfile in that place and clean it up after
572 finishing/aborting.
574 =item B<--all-once>
576 Instead of perpetuously processing all projects over and over again, process
577 them just once and then exit.
579 =item B<--one PRJNAME>
581 Process only the given project (given as just the project name without C<.git>
582 suffix) and then exit.
584 =back
586 =head1 DESCRIPTION
588 jobd is Girocco's repositories maintenance servant; it periodically checks all
589 the repositories and updates mirrored repositories and repacks push-mode
590 repositories when needed.
592 =cut