gc.sh: attempt to recover from a stale gc.pid.lock file
[girocco.git] / jobd / jobd.pl
blobdcd79479976c2c6bc1fa7fbb1b63df40d9775b9c
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
17 use Girocco::Util;
19 # Options
20 my $quiet;
21 my $progress;
22 my $kill_after = 900;
23 my $max_par = 20;
24 my $max_par_intensive = 1;
25 my $load_triggers = '10,2';
26 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
27 my $restart_delay = 60;
28 my $all_once;
29 my $one;
31 my ($load_trig, $load_untrig);
33 ######### Jobs {{{1
35 sub update_project {
36 my $job = shift;
37 my $p = $job->{'project'};
38 check_project_exists($job) || return;
39 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
40 -e get_project_path($p).".bypass_fetch") {
41 job_skip($job);
42 return setup_gc($job);
44 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
45 job_skip($job, "initial mirroring not complete yet");
46 return;
48 if (-e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring failed");
50 # Still need to gc non top-level clones even if they've failed
51 # otherwise the objects copied into them from the parent will
52 # just accumulate without bound
53 setup_gc($job) if $p =~ m,/,;
54 return;
56 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
57 job_skip($job, "not needed right now, last run at $ts");
58 setup_gc($job);
59 return;
61 if (is_svn_clone($p)) {
62 # git svn can be very, very slow at times
63 $job->{'timeout_factor'} = 3;
65 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
68 sub gc_project {
69 my $job = shift;
70 my $p = $job->{'project'};
71 check_project_exists($job) || return;
72 if (-e get_project_path($p).".nogc" || -e get_project_path($p).".bypass") {
73 job_skip($job);
74 return;
76 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
77 job_skip($job, "not needed right now, last run at $ts");
78 return;
80 # allow garbage collection to run for longer than an update
81 $job->{'timeout_factor'} = 2;
82 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
85 sub setup_gc {
86 my $job = shift;
87 queue_job(
88 project => $job->{'project'},
89 type => 'gc',
90 command => \&gc_project,
91 intensive => 1,
95 sub check_project_exists {
96 my $job = shift;
97 my $p = $job->{'project'};
98 if (!-d get_project_path($p)) {
99 job_skip($job, "non-existent project");
100 return 0;
105 sub get_project_path {
106 "$Girocco::Config::reporoot/".shift().".git/";
109 sub is_operation_uptodate {
110 my ($project, $which, $threshold) = @_;
111 my $path = get_project_path($project);
112 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
113 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
114 (time - $unix_ts) <= $threshold ? $timestamp : undef;
117 sub is_svn_clone {
118 my ($project) = @_;
119 my $path = get_project_path($project);
120 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
121 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
122 return $baseurl =~ /^svn[:+]/i && $svnurl;
125 sub queue_one {
126 my $project = shift;
127 queue_job(
128 project => $project,
129 type => 'update',
130 command => \&update_project,
131 on_success => \&setup_gc,
132 on_error => \&setup_gc,
136 sub queue_all {
137 queue_one($_) for (Girocco::Project->get_full_list());
140 ######### Daemon operation {{{1
142 my @queue;
143 my @running;
144 my $perpetual = 1;
145 my $locked = 0;
146 my $jobs_executed;
147 my $jobs_skipped;
148 my @jobs_killed;
150 sub handle_softexit {
151 error("Waiting for outstanding jobs to finish... ".
152 "^C again to exit immediately");
153 @queue = ();
154 $perpetual = 0;
155 $SIG{'INT'} = \&handle_exit;
158 sub handle_exit {
159 error("Killing outstanding jobs...");
160 $SIG{'TERM'} = 'IGNORE';
161 for (@running) {
162 kill 'KILL', -($_->{'pid'});
164 unlink $lockfile if ($locked);
165 exit(0);
168 sub queue_job {
169 my %opts = @_;
170 $opts{'queued_at'} = time;
171 $opts{'dont_run'} = 0;
172 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
173 push @queue, \%opts;
176 sub run_job {
177 my $job = shift;
179 push @running, $job;
180 $job->{'command'}->($job);
181 if ($job->{'dont_run'}) {
182 pop @running;
183 $jobs_skipped++;
184 return;
188 sub _job_name {
189 my $job = shift;
190 "[".$job->{'type'}."::".$job->{'project'}."]";
193 # Only one of those per job!
194 sub exec_job_command {
195 my ($job, $command, $err_only) = @_;
197 my $pid;
198 if (!defined($pid = fork)) {
199 error(_job_name($job) ." Can't fork job: $!");
200 $job->{'finished'} = 1;
201 return;
203 if (!$pid) {
204 open STDIN, '/dev/null' || do {
205 error(_job_name($job) ."Can't read from /dev/null: $!");
206 $job->{'finished'} = 1;
207 return;
209 if ($err_only) {
210 open STDOUT, '>/dev/null' || do {
211 error(_job_name($job) ." Can't write to /dev/null: $!");
212 $job->{'finished'} = 1;
213 return;
216 # New process group so we can keep track of all of its children
217 if (!defined(POSIX::setpgid(0, 0))) {
218 error(_job_name($job) ." Can't create process group: $!");
219 $job->{'finished'} = 1;
220 return;
222 # "Prevent" races
223 select(undef, undef, undef, 0.1);
224 exec @$command;
225 # Stop perl from complaining
226 exit $?;
228 $job->{'pid'} = $pid;
229 $job->{'finished'} = 0;
230 $job->{'started_at'} = time;
233 sub job_skip {
234 my ($job, $msg) = @_;
235 $job->{'dont_run'} = 1;
236 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
239 sub reap_hanging_jobs {
240 for (@running) {
241 my $factor = $_->{'timeout_factor'} || 1;
242 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
243 $_->{'finished'} = 1;
244 kill 'KILL', -($_->{'pid'});
245 error(_job_name($_) ." KILLED due to timeout");
246 push @jobs_killed, _job_name($_);
251 sub reap_finished_jobs {
252 my $pid;
253 my $finished_any = 0;
254 while (1) {
255 $pid = waitpid(-1, WNOHANG);
256 last if $pid < 1;
257 $finished_any = 1;
259 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
260 if ($?) {
261 # XXX- we currently don't care
263 if (@child && !$child[0]->{'finished'}) {
264 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
265 $child[0]->{'finished'} = 1;
266 $jobs_executed++;
267 } elsif (@child) {
268 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
271 @running = grep { $_->{'finished'} == 0 } @running;
272 $finished_any;
275 sub have_intensive_jobs {
276 grep { $_->{'intensive'} == 1 } @running;
279 sub ts {
280 "[". scalar(localtime) ."] ";
283 sub get_load_info {
284 if ($^O eq "linux") {
285 # Read /proc/loadavg on Linux
286 open(LOADAV, '<', '/proc/loadavg') or return undef;
287 my $loadinfo = <LOADAV>;
288 close LOADAV;
289 return (split(/\s/, $loadinfo, 4))[0..2];
290 } else {
291 # Read the output of uptime everywhere else (works on Linux too)
292 open(LOADAV, '-|', 'uptime') or return undef;
293 my $loadinfo = <LOADAV>;
294 close LOADAV;
295 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
296 return ($1, $2, $3);
300 sub run_queue {
301 my $last_progress = time;
302 my $last_checkload = time - 5;
303 my $current_load = $load_trig;
304 my $overloaded = 0;
305 my $load_info = '';
306 $jobs_executed = 0;
307 $jobs_skipped = 0;
308 @jobs_killed = ();
309 if ($progress) {
310 ferror("--- Processing %d queued jobs", scalar(@queue));
312 $SIG{'INT'} = \&handle_softexit;
313 $SIG{'TERM'} = \&handle_exit;
314 while (@queue || @running) {
315 reap_hanging_jobs();
316 my $proceed_immediately = reap_finished_jobs();
317 # Check current system load
318 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
319 my $current_load = $loadinfo[0];
320 if ($current_load > $load_trig && !$overloaded) {
321 $overloaded = 1;
322 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
323 } elsif ($current_load < $load_untrig && $overloaded) {
324 $overloaded = 0;
325 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
327 if ($overloaded) {
328 $load_info = ', paused (load '. $current_load .')';
329 } else {
330 $load_info = ', load '. $current_load;
332 $last_checkload = time;
334 # Status output
335 if ($progress && (time - $last_progress) >= 60) {
336 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
337 if (@running) {
338 my @run_status;
339 for (@running) {
340 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
342 error("STATUS: currently running: ". join(', ', @run_status));
344 $last_progress = time;
346 # Back off if we're too busy
347 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
348 sleep 1 unless $proceed_immediately;
349 next;
351 # Run next
352 run_job(shift(@queue)) if @queue;
354 if ($progress) {
355 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
359 sub run_perpetually {
360 if (-e $lockfile) {
361 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
363 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
364 print LOCK $$;
365 close LOCK;
366 $locked = 1;
368 while ($perpetual) {
369 queue_all();
370 run_queue();
371 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
373 unlink $lockfile;
376 ######### Helpers {{{1
378 sub error($) {
379 print STDERR ts().shift()."\n";
381 sub ferror(@) {
382 error(sprintf($_[0], @_[1..$#_]));
384 sub fatal($) {
385 error(shift);
386 exit 1;
389 ######### Main {{{1
391 # Parse options
392 Getopt::Long::Configure('bundling');
393 my $parse_res = GetOptions(
394 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
395 'quiet|q' => \$quiet,
396 'progress|P' => \$progress,
397 'kill-after|k=i' => \$kill_after,
398 'max-parallel|p=i' => \$max_par,
399 'max-intensive-parallel|i=i' => \$max_par_intensive,
400 'load-triggers=s' => \$load_triggers,
401 'restart-delay|d=i' => \$restart_delay,
402 'lockfile|l=s' => \$lockfile,
403 'all-once|a' => \$all_once,
404 'one|o=s' => \$one,
405 ) || pod2usage(2);
406 fatal("Error: can only use one out of --all-once and --one")
407 if ($all_once && $one);
409 unless ($quiet) {
410 $ENV{'show_progress'} = '1';
411 $progress = 1;
414 $load_triggers = '0,0' unless defined((get_load_info())[0]);
415 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
417 if ($one) {
418 queue_one($one);
419 run_queue();
420 exit;
423 if ($all_once) {
424 queue_all();
425 run_queue();
426 exit;
429 run_perpetually();
431 ########## Documentation {{{1
433 __END__
435 =head1 NAME
437 jobd - Perform Girocco maintenance jobs
439 =head1 SYNOPSIS
441 jobd [options]
443 Options:
444 -h | --help detailed instructions
445 -q | --quiet run quietly
446 -P | --progress show occasional status updates
447 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
448 -p NUM | --max-parallel NUM how many jobs to run at the same time
449 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
450 at the same time
451 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
452 TRIG and resume at load below UNTRIG
453 -d NUM | --restart-delay SECONDS wait for this many seconds between
454 queue runs
455 -l FILE | --lockfile FILE create a lockfile in the given
456 location
457 -a | --all-once process the list only once
458 -o PRJNAME | --one PRJNAME process only one project
460 =head1 OPTIONS
462 =over 8
464 =item B<--help>
466 Print the full description of jobd's options.
468 =item B<--quiet>
470 Suppress non-error messages, e.g. for use when running this task as a cronjob.
472 =item B<--progress>
474 Show information about the current status of the job queue occasionally. This
475 is automatically enabled if --quiet is not given.
477 =item B<--kill-after SECONDS>
479 Kill supervised jobs after a certain time to avoid hanging the daemon.
481 =item B<--max-parallel NUM>
483 Run no more than that many jobs at the same time.
485 =item B<--max-intensive-parallel NUM>
487 Run no more than that many resource-hungry jobs at the same time. Right now,
488 this refers to repacking jobs.
490 =item B<--load-triggers TRIG,UNTRIG>
492 If the first system load average (1 minute average) exceeds TRIG, don't queue
493 any more jobs until it goes below UNTRIG. This is currently only supported on
494 Linux and any other platforms that provide an uptime command with load average
495 output.
497 If both values are zero, load checks are disabled. Note that this is not the
498 default.
500 =item B<--restart-delay NUM>
502 After processing the queue, wait this many seconds until the queue is
503 restarted.
505 =item B<--lockfile FILE>
507 For perpetual operation, create a lockfile in that place and clean it up after
508 finishing/aborting.
510 =item B<--all-once>
512 Instead of perpetuously processing all projects over and over again, process
513 them just once and then exit.
515 =item B<--one PRJNAME>
517 Process only the given project (given as just the project name without C<.git>
518 suffix) and then exit.
520 =back
522 =head1 DESCRIPTION
524 jobd is Girocco's repositories maintenance servant; it periodically checks all
525 the repositories and updates mirrored repositories and repacks push-mode
526 repositories when needed.
528 =cut