jobd.pl: garbage collect failed forks that are clones
[girocco.git] / jobd / jobd.pl
blob5ccf46bae30c7e29e73a02085d90d768d337aa41
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $max_par_intensive = 1;
24 my $load_triggers = '10,2';
25 my $lockfile = "/tmp/jobd.lock";
26 my $restart_delay = 60;
27 my $all_once;
28 my $one;
30 my ($load_trig, $load_untrig);
32 ######### Jobs {{{1
34 sub update_project {
35 my $job = shift;
36 my $p = $job->{'project'};
37 check_project_exists($job) || return;
38 if (-e get_project_path($p).".nofetch") {
39 job_skip($job);
40 return setup_gc($job);
42 if (-e get_project_path($p).".clone_in_progress") {
43 if (-e get_project_path($p).".clone_failed") {
44 job_skip($job, "initial mirroring failed");
45 # Still need to gc non top-level clones even if they've failed
46 # otherwise the objects copied into them from the parent will
47 # just accumulate without bound
48 setup_gc($job) if $p =~ m,/,;
49 } else {
50 job_skip($job, "initial mirroring not complete yet");
52 return;
54 if (my $ts = is_operation_uptodate($p, 'lastrefresh', $Girocco::Config::min_mirror_interval)) {
55 job_skip($job, "not needed right now, last run at $ts");
56 setup_gc($job);
57 return;
59 if (is_svn_clone($p)) {
60 # git svn can be very, very slow at times
61 $job->{'timeout_factor'} = 3;
63 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
66 sub gc_project {
67 my $job = shift;
68 my $p = $job->{'project'};
69 check_project_exists($job) || return;
70 if (my $ts = is_operation_uptodate($p, 'lastgc', $Girocco::Config::min_gc_interval)) {
71 job_skip($job, "not needed right now, last run at $ts");
72 return;
74 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
77 sub setup_gc {
78 my $job = shift;
79 queue_job(
80 project => $job->{'project'},
81 type => 'gc',
82 command => \&gc_project,
83 intensive => 1,
87 sub check_project_exists {
88 my $job = shift;
89 my $p = $job->{'project'};
90 if (!-d get_project_path($p)) {
91 job_skip($job, "non-existent project");
92 return 0;
97 sub get_project_path {
98 "$Girocco::Config::reporoot/".shift().".git/";
101 sub is_operation_uptodate {
102 my ($project, $which, $threshold) = @_;
103 my $path = get_project_path($project);
104 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
105 my $unix_ts = `date +%s -d "$timestamp"`;
106 (time - $unix_ts) <= $threshold ? $timestamp : undef;
109 sub is_svn_clone {
110 my ($project) = @_;
111 my $path = get_project_path($project);
112 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
113 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
114 return $baseurl =~ /^svn[:+]/i && $svnurl;
117 sub queue_one {
118 my $project = shift;
119 queue_job(
120 project => $project,
121 type => 'update',
122 command => \&update_project,
123 on_success => \&setup_gc,
124 on_error => \&setup_gc,
128 sub queue_all {
129 queue_one($_) for (Girocco::Project->get_full_list());
132 ######### Daemon operation {{{1
134 my @queue;
135 my @running;
136 my $perpetual = 1;
137 my $locked = 0;
138 my $jobs_executed;
139 my $jobs_skipped;
140 my @jobs_killed;
142 sub handle_softexit {
143 error("Waiting for outstanding jobs to finish... ".
144 "^C again to exit immediately");
145 @queue = ();
146 $perpetual = 0;
147 $SIG{'INT'} = \&handle_exit;
150 sub handle_exit {
151 error("Killing outstanding jobs...");
152 $SIG{'TERM'} = 'IGNORE';
153 for (@running) {
154 kill 'KILL', -($_->{'pid'});
156 unlink $lockfile if ($locked);
157 exit(0);
160 sub queue_job {
161 my %opts = @_;
162 $opts{'queued_at'} = time;
163 $opts{'dont_run'} = 0;
164 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
165 push @queue, \%opts;
168 sub run_job {
169 my $job = shift;
171 push @running, $job;
172 $job->{'command'}->($job);
173 if ($job->{'dont_run'}) {
174 pop @running;
175 $jobs_skipped++;
176 return;
180 sub _job_name {
181 my $job = shift;
182 "[".$job->{'type'}."::".$job->{'project'}."]";
185 # Only one of those per job!
186 sub exec_job_command {
187 my ($job, $command, $err_only) = @_;
189 my $pid;
190 if (!defined($pid = fork)) {
191 error(_job_name($job) ." Can't fork job: $!");
192 $job->{'finished'} = 1;
193 return;
195 if (!$pid) {
196 open STDIN, '/dev/null' || do {
197 error(_job_name($job) ."Can't read from /dev/null: $!");
198 $job->{'finished'} = 1;
199 return;
201 if ($err_only) {
202 open STDOUT, '>/dev/null' || do {
203 error(_job_name($job) ." Can't write to /dev/null: $!");
204 $job->{'finished'} = 1;
205 return;
208 # New process group so we can keep track of all of its children
209 if (!defined(POSIX::setpgid(0, 0))) {
210 error(_job_name($job) ." Can't create process group: $!");
211 $job->{'finished'} = 1;
212 return;
214 # "Prevent" races
215 select(undef, undef, undef, 0.1);
216 exec @$command;
217 # Stop perl from complaining
218 exit $?;
220 $job->{'pid'} = $pid;
221 $job->{'finished'} = 0;
222 $job->{'started_at'} = time;
225 sub job_skip {
226 my ($job, $msg) = @_;
227 $job->{'dont_run'} = 1;
228 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
231 sub reap_hanging_jobs {
232 for (@running) {
233 my $factor = $_->{'timeout_factor'} || 1;
234 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
235 $_->{'finished'} = 1;
236 kill 'KILL', -($_->{'pid'});
237 error(_job_name($_) ." KILLED due to timeout");
238 push @jobs_killed, _job_name($_);
243 sub reap_finished_jobs {
244 my $pid;
245 my $finished_any = 0;
246 while (1) {
247 $pid = waitpid(-1, WNOHANG);
248 last if $pid < 1;
249 $finished_any = 1;
251 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
252 if ($?) {
253 # XXX- we currently don't care
255 if (@child && !$child[0]->{'finished'}) {
256 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
257 $child[0]->{'finished'} = 1;
258 $jobs_executed++;
259 } elsif (@child) {
260 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
263 @running = grep { $_->{'finished'} == 0 } @running;
264 $finished_any;
267 sub have_intensive_jobs {
268 grep { $_->{'intensive'} == 1 } @running;
271 sub ts {
272 "[". scalar(localtime) ."] ";
275 sub get_load_info {
276 if ($^O eq "linux") {
277 # Read /proc/loadavg on Linux
278 open(LOADAV, '<', '/proc/loadavg') or return undef;
279 my $loadinfo = <LOADAV>;
280 close LOADAV;
281 return (split(/\s/, $loadinfo, 4))[0..2];
282 } else {
283 # Read the output of uptime everywhere else (works on Linux too)
284 open(LOADAV, '-|', 'uptime') or return undef;
285 my $loadinfo = <LOADAV>;
286 close LOADAV;
287 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
288 return ($1, $2, $3);
292 sub run_queue {
293 my $last_progress = time;
294 my $last_checkload = time - 5;
295 my $current_load = $load_trig;
296 my $overloaded = 0;
297 my $load_info = '';
298 $jobs_executed = 0;
299 $jobs_skipped = 0;
300 @jobs_killed = ();
301 if ($progress) {
302 ferror("--- Processing %d queued jobs", scalar(@queue));
304 $SIG{'INT'} = \&handle_softexit;
305 $SIG{'TERM'} = \&handle_exit;
306 while (@queue || @running) {
307 reap_hanging_jobs();
308 my $proceed_immediately = reap_finished_jobs();
309 # Check current system load
310 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
311 my $current_load = $loadinfo[0];
312 if ($current_load > $load_trig && !$overloaded) {
313 $overloaded = 1;
314 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
315 } elsif ($current_load < $load_untrig && $overloaded) {
316 $overloaded = 0;
317 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
319 if ($overloaded) {
320 $load_info = ', paused (load '. $current_load .')';
321 } else {
322 $load_info = ', load '. $current_load;
324 $last_checkload = time;
326 # Status output
327 if ($progress && (time - $last_progress) >= 60) {
328 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
329 if (@running) {
330 my @run_status;
331 for (@running) {
332 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
334 error("STATUS: currently running: ". join(', ', @run_status));
336 $last_progress = time;
338 # Back off if we're too busy
339 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
340 sleep 1 unless $proceed_immediately;
341 next;
343 # Run next
344 run_job(shift(@queue)) if @queue;
346 if ($progress) {
347 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
351 sub run_perpetually {
352 if (-e $lockfile) {
353 die "Lockfile exists. Please make sure no other instance of jobd is running.";
355 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
356 print LOCK $$;
357 close LOCK;
358 $locked = 1;
360 while ($perpetual) {
361 queue_all();
362 run_queue();
363 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
365 unlink $lockfile;
368 ######### Helpers {{{1
370 sub error($) {
371 print STDERR ts().shift()."\n";
373 sub ferror(@) {
374 error(sprintf($_[0], @_[1..$#_]));
376 sub fatal($) {
377 error(shift);
378 exit 1;
381 ######### Main {{{1
383 # Parse options
384 Getopt::Long::Configure('bundling');
385 my $parse_res = GetOptions(
386 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
387 'quiet|q' => \$quiet,
388 'progress|P' => \$progress,
389 'kill-after|k=i' => \$kill_after,
390 'max-parallel|p=i' => \$max_par,
391 'max-intensive-parallel|i=i' => \$max_par_intensive,
392 'load-triggers=s' => \$load_triggers,
393 'restart-delay|d=i' => \$restart_delay,
394 'lockfile|l=s' => \$lockfile,
395 'all-once|a' => \$all_once,
396 'one|o=s' => \$one,
397 ) || pod2usage(2);
398 fatal("Error: can only use one out of --all-once and --one")
399 if ($all_once && $one);
401 unless ($quiet) {
402 $ENV{'show_progress'} = '1';
403 $progress = 1;
406 $load_triggers = '0,0' if (!-f '/proc/loadavg');
407 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
409 if ($one) {
410 queue_one($one);
411 run_queue();
412 exit;
415 if ($all_once) {
416 queue_all();
417 run_queue();
418 exit;
421 run_perpetually();
423 ########## Documentation {{{1
425 __END__
427 =head1 NAME
429 jobd - Perform Girocco maintenance jobs
431 =head1 SYNOPSIS
433 jobd [options]
435 Options:
436 -h | --help detailed instructions
437 -q | --quiet run quietly
438 -P | --progress show occasional status updates
439 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
440 -p NUM | --max-parallel NUM how many jobs to run at the same time
441 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
442 at the same time
443 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
444 TRIG and resume at load below UNTRIG
445 -d NUM | --restart-delay SECONDS wait for this many seconds between
446 queue runs
447 -l FILE | --lockfile FILE create a lockfile in the given
448 location
449 -a | --all-once process the list only once
450 -o PRJNAME | --one PRJNAME process only one project
452 =head1 OPTIONS
454 =over 8
456 =item B<--help>
458 Print the full description of jobd's options.
460 =item B<--quiet>
462 Suppress non-error messages, e.g. for use when running this task as a cronjob.
464 =item B<--progress>
466 Show information about the current status of the job queue occasionally. This
467 is automatically enabled if --quiet is not given.
469 =item B<--kill-after SECONDS>
471 Kill supervised jobs after a certain time to avoid hanging the daemon.
473 =item B<--max-parallel NUM>
475 Run no more than that many jobs at the same time.
477 =item B<--max-intensive-parallel NUM>
479 Run no more than that many resource-hungry jobs at the same time. Right now,
480 this refers to repacking jobs.
482 =item B<--load-triggers TRIG,UNTRIG>
484 If the first system load average (1 minute average) exceeds TRIG, don't queue
485 any more jobs until it goes below UNTRIG. This is currently only supported on
486 Linux and any other platforms that provide an uptime command with load average
487 output.
489 If both values are zero, load checks are disabled. Note that this is not the
490 default.
492 =item B<--restart-delay NUM>
494 After processing the queue, wait this many seconds until the queue is
495 restarted.
497 =item B<--lockfile FILE>
499 For perpetual operation, create a lockfile in that place and clean it up after
500 finishing/aborting.
502 =item B<--all-once>
504 Instead of perpetuously processing all projects over and over again, process
505 them just once and then exit.
507 =item B<--one PRJNAME>
509 Process only the given project (given as just the project name without C<.git>
510 suffix) and then exit.
512 =back
514 =head1 DESCRIPTION
516 jobd is Girocco's repositories maintenance servant; it periodically checks all
517 the repositories and updates mirrored repositories and repacks push-mode
518 repositories when needed.
520 =cut