jobd.pl: allow gc longer before timing out
[girocco.git] / jobd / jobd.pl
blob2d848c236cec282d413f8e8af2fbcbb57336ab62
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
17 use Girocco::Util;
19 # Options
20 my $quiet;
21 my $progress;
22 my $kill_after = 900;
23 my $max_par = 20;
24 my $max_par_intensive = 1;
25 my $load_triggers = '10,2';
26 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
27 my $restart_delay = 60;
28 my $all_once;
29 my $one;
31 my ($load_trig, $load_untrig);
33 ######### Jobs {{{1
35 sub update_project {
36 my $job = shift;
37 my $p = $job->{'project'};
38 check_project_exists($job) || return;
39 if (-e get_project_path($p).".nofetch") {
40 job_skip($job);
41 return setup_gc($job);
43 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
44 job_skip($job, "initial mirroring not complete yet");
45 return;
47 if (-e get_project_path($p).".clone_failed") {
48 job_skip($job, "initial mirroring failed");
49 # Still need to gc non top-level clones even if they've failed
50 # otherwise the objects copied into them from the parent will
51 # just accumulate without bound
52 setup_gc($job) if $p =~ m,/,;
53 return;
55 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
56 job_skip($job, "not needed right now, last run at $ts");
57 setup_gc($job);
58 return;
60 if (is_svn_clone($p)) {
61 # git svn can be very, very slow at times
62 $job->{'timeout_factor'} = 3;
64 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
67 sub gc_project {
68 my $job = shift;
69 my $p = $job->{'project'};
70 check_project_exists($job) || return;
71 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
72 job_skip($job, "not needed right now, last run at $ts");
73 return;
75 # allow garbage collection to run for longer than an update
76 $job->{'timeout_factor'} = 2;
77 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
80 sub setup_gc {
81 my $job = shift;
82 queue_job(
83 project => $job->{'project'},
84 type => 'gc',
85 command => \&gc_project,
86 intensive => 1,
90 sub check_project_exists {
91 my $job = shift;
92 my $p = $job->{'project'};
93 if (!-d get_project_path($p)) {
94 job_skip($job, "non-existent project");
95 return 0;
100 sub get_project_path {
101 "$Girocco::Config::reporoot/".shift().".git/";
104 sub is_operation_uptodate {
105 my ($project, $which, $threshold) = @_;
106 my $path = get_project_path($project);
107 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
108 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
109 (time - $unix_ts) <= $threshold ? $timestamp : undef;
112 sub is_svn_clone {
113 my ($project) = @_;
114 my $path = get_project_path($project);
115 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
116 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
117 return $baseurl =~ /^svn[:+]/i && $svnurl;
120 sub queue_one {
121 my $project = shift;
122 queue_job(
123 project => $project,
124 type => 'update',
125 command => \&update_project,
126 on_success => \&setup_gc,
127 on_error => \&setup_gc,
131 sub queue_all {
132 queue_one($_) for (Girocco::Project->get_full_list());
135 ######### Daemon operation {{{1
137 my @queue;
138 my @running;
139 my $perpetual = 1;
140 my $locked = 0;
141 my $jobs_executed;
142 my $jobs_skipped;
143 my @jobs_killed;
145 sub handle_softexit {
146 error("Waiting for outstanding jobs to finish... ".
147 "^C again to exit immediately");
148 @queue = ();
149 $perpetual = 0;
150 $SIG{'INT'} = \&handle_exit;
153 sub handle_exit {
154 error("Killing outstanding jobs...");
155 $SIG{'TERM'} = 'IGNORE';
156 for (@running) {
157 kill 'KILL', -($_->{'pid'});
159 unlink $lockfile if ($locked);
160 exit(0);
163 sub queue_job {
164 my %opts = @_;
165 $opts{'queued_at'} = time;
166 $opts{'dont_run'} = 0;
167 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
168 push @queue, \%opts;
171 sub run_job {
172 my $job = shift;
174 push @running, $job;
175 $job->{'command'}->($job);
176 if ($job->{'dont_run'}) {
177 pop @running;
178 $jobs_skipped++;
179 return;
183 sub _job_name {
184 my $job = shift;
185 "[".$job->{'type'}."::".$job->{'project'}."]";
188 # Only one of those per job!
189 sub exec_job_command {
190 my ($job, $command, $err_only) = @_;
192 my $pid;
193 if (!defined($pid = fork)) {
194 error(_job_name($job) ." Can't fork job: $!");
195 $job->{'finished'} = 1;
196 return;
198 if (!$pid) {
199 open STDIN, '/dev/null' || do {
200 error(_job_name($job) ."Can't read from /dev/null: $!");
201 $job->{'finished'} = 1;
202 return;
204 if ($err_only) {
205 open STDOUT, '>/dev/null' || do {
206 error(_job_name($job) ." Can't write to /dev/null: $!");
207 $job->{'finished'} = 1;
208 return;
211 # New process group so we can keep track of all of its children
212 if (!defined(POSIX::setpgid(0, 0))) {
213 error(_job_name($job) ." Can't create process group: $!");
214 $job->{'finished'} = 1;
215 return;
217 # "Prevent" races
218 select(undef, undef, undef, 0.1);
219 exec @$command;
220 # Stop perl from complaining
221 exit $?;
223 $job->{'pid'} = $pid;
224 $job->{'finished'} = 0;
225 $job->{'started_at'} = time;
228 sub job_skip {
229 my ($job, $msg) = @_;
230 $job->{'dont_run'} = 1;
231 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
234 sub reap_hanging_jobs {
235 for (@running) {
236 my $factor = $_->{'timeout_factor'} || 1;
237 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
238 $_->{'finished'} = 1;
239 kill 'KILL', -($_->{'pid'});
240 error(_job_name($_) ." KILLED due to timeout");
241 push @jobs_killed, _job_name($_);
246 sub reap_finished_jobs {
247 my $pid;
248 my $finished_any = 0;
249 while (1) {
250 $pid = waitpid(-1, WNOHANG);
251 last if $pid < 1;
252 $finished_any = 1;
254 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
255 if ($?) {
256 # XXX- we currently don't care
258 if (@child && !$child[0]->{'finished'}) {
259 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
260 $child[0]->{'finished'} = 1;
261 $jobs_executed++;
262 } elsif (@child) {
263 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
266 @running = grep { $_->{'finished'} == 0 } @running;
267 $finished_any;
270 sub have_intensive_jobs {
271 grep { $_->{'intensive'} == 1 } @running;
274 sub ts {
275 "[". scalar(localtime) ."] ";
278 sub get_load_info {
279 if ($^O eq "linux") {
280 # Read /proc/loadavg on Linux
281 open(LOADAV, '<', '/proc/loadavg') or return undef;
282 my $loadinfo = <LOADAV>;
283 close LOADAV;
284 return (split(/\s/, $loadinfo, 4))[0..2];
285 } else {
286 # Read the output of uptime everywhere else (works on Linux too)
287 open(LOADAV, '-|', 'uptime') or return undef;
288 my $loadinfo = <LOADAV>;
289 close LOADAV;
290 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
291 return ($1, $2, $3);
295 sub run_queue {
296 my $last_progress = time;
297 my $last_checkload = time - 5;
298 my $current_load = $load_trig;
299 my $overloaded = 0;
300 my $load_info = '';
301 $jobs_executed = 0;
302 $jobs_skipped = 0;
303 @jobs_killed = ();
304 if ($progress) {
305 ferror("--- Processing %d queued jobs", scalar(@queue));
307 $SIG{'INT'} = \&handle_softexit;
308 $SIG{'TERM'} = \&handle_exit;
309 while (@queue || @running) {
310 reap_hanging_jobs();
311 my $proceed_immediately = reap_finished_jobs();
312 # Check current system load
313 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
314 my $current_load = $loadinfo[0];
315 if ($current_load > $load_trig && !$overloaded) {
316 $overloaded = 1;
317 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
318 } elsif ($current_load < $load_untrig && $overloaded) {
319 $overloaded = 0;
320 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
322 if ($overloaded) {
323 $load_info = ', paused (load '. $current_load .')';
324 } else {
325 $load_info = ', load '. $current_load;
327 $last_checkload = time;
329 # Status output
330 if ($progress && (time - $last_progress) >= 60) {
331 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
332 if (@running) {
333 my @run_status;
334 for (@running) {
335 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
337 error("STATUS: currently running: ". join(', ', @run_status));
339 $last_progress = time;
341 # Back off if we're too busy
342 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
343 sleep 1 unless $proceed_immediately;
344 next;
346 # Run next
347 run_job(shift(@queue)) if @queue;
349 if ($progress) {
350 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
354 sub run_perpetually {
355 if (-e $lockfile) {
356 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
358 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
359 print LOCK $$;
360 close LOCK;
361 $locked = 1;
363 while ($perpetual) {
364 queue_all();
365 run_queue();
366 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
368 unlink $lockfile;
371 ######### Helpers {{{1
373 sub error($) {
374 print STDERR ts().shift()."\n";
376 sub ferror(@) {
377 error(sprintf($_[0], @_[1..$#_]));
379 sub fatal($) {
380 error(shift);
381 exit 1;
384 ######### Main {{{1
386 # Parse options
387 Getopt::Long::Configure('bundling');
388 my $parse_res = GetOptions(
389 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
390 'quiet|q' => \$quiet,
391 'progress|P' => \$progress,
392 'kill-after|k=i' => \$kill_after,
393 'max-parallel|p=i' => \$max_par,
394 'max-intensive-parallel|i=i' => \$max_par_intensive,
395 'load-triggers=s' => \$load_triggers,
396 'restart-delay|d=i' => \$restart_delay,
397 'lockfile|l=s' => \$lockfile,
398 'all-once|a' => \$all_once,
399 'one|o=s' => \$one,
400 ) || pod2usage(2);
401 fatal("Error: can only use one out of --all-once and --one")
402 if ($all_once && $one);
404 unless ($quiet) {
405 $ENV{'show_progress'} = '1';
406 $progress = 1;
409 $load_triggers = '0,0' unless defined((get_load_info())[0]);
410 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
412 if ($one) {
413 queue_one($one);
414 run_queue();
415 exit;
418 if ($all_once) {
419 queue_all();
420 run_queue();
421 exit;
424 run_perpetually();
426 ########## Documentation {{{1
428 __END__
430 =head1 NAME
432 jobd - Perform Girocco maintenance jobs
434 =head1 SYNOPSIS
436 jobd [options]
438 Options:
439 -h | --help detailed instructions
440 -q | --quiet run quietly
441 -P | --progress show occasional status updates
442 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
443 -p NUM | --max-parallel NUM how many jobs to run at the same time
444 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
445 at the same time
446 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
447 TRIG and resume at load below UNTRIG
448 -d NUM | --restart-delay SECONDS wait for this many seconds between
449 queue runs
450 -l FILE | --lockfile FILE create a lockfile in the given
451 location
452 -a | --all-once process the list only once
453 -o PRJNAME | --one PRJNAME process only one project
455 =head1 OPTIONS
457 =over 8
459 =item B<--help>
461 Print the full description of jobd's options.
463 =item B<--quiet>
465 Suppress non-error messages, e.g. for use when running this task as a cronjob.
467 =item B<--progress>
469 Show information about the current status of the job queue occasionally. This
470 is automatically enabled if --quiet is not given.
472 =item B<--kill-after SECONDS>
474 Kill supervised jobs after a certain time to avoid hanging the daemon.
476 =item B<--max-parallel NUM>
478 Run no more than that many jobs at the same time.
480 =item B<--max-intensive-parallel NUM>
482 Run no more than that many resource-hungry jobs at the same time. Right now,
483 this refers to repacking jobs.
485 =item B<--load-triggers TRIG,UNTRIG>
487 If the first system load average (1 minute average) exceeds TRIG, don't queue
488 any more jobs until it goes below UNTRIG. This is currently only supported on
489 Linux and any other platforms that provide an uptime command with load average
490 output.
492 If both values are zero, load checks are disabled. Note that this is not the
493 default.
495 =item B<--restart-delay NUM>
497 After processing the queue, wait this many seconds until the queue is
498 restarted.
500 =item B<--lockfile FILE>
502 For perpetual operation, create a lockfile in that place and clean it up after
503 finishing/aborting.
505 =item B<--all-once>
507 Instead of perpetuously processing all projects over and over again, process
508 them just once and then exit.
510 =item B<--one PRJNAME>
512 Process only the given project (given as just the project name without C<.git>
513 suffix) and then exit.
515 =back
517 =head1 DESCRIPTION
519 jobd is Girocco's repositories maintenance servant; it periodically checks all
520 the repositories and updates mirrored repositories and repacks push-mode
521 repositories when needed.
523 =cut