clone.sh: clean up failed svn clone remnants before trying again
[girocco.git] / jobd / jobd.pl
blob273a1dc96535dc5123e2e6d748d1e74bc3d4435b
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
21 # Options
22 my $quiet;
23 my $progress;
24 my $kill_after = 900;
25 my $max_par = 20;
26 my $max_par_intensive = 1;
27 my $load_triggers = '10,2';
28 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
29 my $restart_delay = 60;
30 my $all_once;
31 my $one;
33 my ($load_trig, $load_untrig);
35 ######### Jobs {{{1
37 sub update_project {
38 my $job = shift;
39 my $p = $job->{'project'};
40 check_project_exists($job) || return;
41 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
42 -e get_project_path($p).".bypass_fetch") {
43 job_skip($job);
44 return setup_gc($job);
46 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
47 job_skip($job, "initial mirroring not complete yet");
48 return;
50 if (-e get_project_path($p).".clone_failed") {
51 job_skip($job, "initial mirroring failed");
52 # Still need to gc non top-level clones even if they've failed
53 # otherwise the objects copied into them from the parent will
54 # just accumulate without bound
55 setup_gc($job) if $p =~ m,/,;
56 return;
58 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
59 job_skip($job, "not needed right now, last run at $ts");
60 setup_gc($job);
61 return;
63 if (is_svn_clone($p)) {
64 # git svn can be very, very slow at times
65 $job->{'timeout_factor'} = 3;
67 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
70 sub gc_project {
71 my $job = shift;
72 my $p = $job->{'project'};
73 check_project_exists($job) || return;
74 if (-e get_project_path($p).".nogc" || -e get_project_path($p).".bypass") {
75 job_skip($job);
76 return;
78 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
79 job_skip($job, "not needed right now, last run at $ts");
80 return;
82 # allow garbage collection to run for longer than an update
83 $job->{'timeout_factor'} = 2;
84 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
87 sub setup_gc {
88 my $job = shift;
89 queue_job(
90 project => $job->{'project'},
91 type => 'gc',
92 command => \&gc_project,
93 intensive => 1,
97 sub check_project_exists {
98 my $job = shift;
99 my $p = $job->{'project'};
100 if (!-d get_project_path($p)) {
101 job_skip($job, "non-existent project");
102 return 0;
107 sub get_project_path {
108 "$Girocco::Config::reporoot/".shift().".git/";
111 sub is_operation_uptodate {
112 my ($project, $which, $threshold) = @_;
113 my $path = get_project_path($project);
114 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
115 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
116 (time - $unix_ts) <= $threshold ? $timestamp : undef;
119 sub is_svn_clone {
120 my ($project) = @_;
121 my $path = get_project_path($project);
122 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
123 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
124 return $baseurl =~ /^svn[:+]/i && $svnurl;
127 sub queue_one {
128 my $project = shift;
129 queue_job(
130 project => $project,
131 type => 'update',
132 command => \&update_project,
133 on_success => \&setup_gc,
134 on_error => \&setup_gc,
138 sub queue_all {
139 queue_one($_) for (Girocco::Project->get_full_list());
142 ######### Daemon operation {{{1
144 my @queue;
145 my @running;
146 my $perpetual = 1;
147 my $locked = 0;
148 my $jobs_executed;
149 my $jobs_skipped;
150 my @jobs_killed;
152 sub handle_softexit {
153 error("Waiting for outstanding jobs to finish... ".
154 "^C again to exit immediately");
155 @queue = ();
156 $perpetual = 0;
157 $SIG{'INT'} = \&handle_exit;
160 sub handle_exit {
161 error("Killing outstanding jobs...");
162 $SIG{'TERM'} = 'IGNORE';
163 for (@running) {
164 kill 'KILL', -($_->{'pid'});
166 unlink $lockfile if ($locked);
167 exit(0);
170 sub queue_job {
171 my %opts = @_;
172 $opts{'queued_at'} = time;
173 $opts{'dont_run'} = 0;
174 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
175 push @queue, \%opts;
178 sub run_job {
179 my $job = shift;
181 push @running, $job;
182 $job->{'command'}->($job);
183 if ($job->{'dont_run'}) {
184 pop @running;
185 $jobs_skipped++;
186 return;
190 sub _job_name {
191 my $job = shift;
192 "[".$job->{'type'}."::".$job->{'project'}."]";
195 # Only one of those per job!
196 sub exec_job_command {
197 my ($job, $command, $err_only) = @_;
199 my $pid;
200 if (!defined($pid = fork)) {
201 error(_job_name($job) ." Can't fork job: $!");
202 $job->{'finished'} = 1;
203 return;
205 if (!$pid) {
206 open STDIN, '/dev/null' || do {
207 error(_job_name($job) ."Can't read from /dev/null: $!");
208 $job->{'finished'} = 1;
209 return;
211 if ($err_only) {
212 open STDOUT, '>/dev/null' || do {
213 error(_job_name($job) ." Can't write to /dev/null: $!");
214 $job->{'finished'} = 1;
215 return;
218 # New process group so we can keep track of all of its children
219 if (!defined(POSIX::setpgid(0, 0))) {
220 error(_job_name($job) ." Can't create process group: $!");
221 $job->{'finished'} = 1;
222 return;
224 # "Prevent" races
225 select(undef, undef, undef, 0.1);
226 exec @$command;
227 # Stop perl from complaining
228 exit $?;
230 $job->{'pid'} = $pid;
231 $job->{'finished'} = 0;
232 $job->{'started_at'} = time;
235 sub job_skip {
236 my ($job, $msg) = @_;
237 $job->{'dont_run'} = 1;
238 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
241 sub reap_hanging_jobs {
242 for (@running) {
243 my $factor = $_->{'timeout_factor'} || 1;
244 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
245 $_->{'finished'} = 1;
246 kill 'KILL', -($_->{'pid'});
247 error(_job_name($_) ." KILLED due to timeout");
248 push @jobs_killed, _job_name($_);
253 sub reap_finished_jobs {
254 my $pid;
255 my $finished_any = 0;
256 while (1) {
257 $pid = waitpid(-1, WNOHANG);
258 last if $pid < 1;
259 $finished_any = 1;
261 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
262 if ($?) {
263 # XXX- we currently don't care
265 if (@child && !$child[0]->{'finished'}) {
266 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
267 $child[0]->{'finished'} = 1;
268 $jobs_executed++;
269 } elsif (@child) {
270 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
273 @running = grep { $_->{'finished'} == 0 } @running;
274 $finished_any;
277 sub have_intensive_jobs {
278 grep { $_->{'intensive'} == 1 } @running;
281 sub ts {
282 "[". scalar(localtime) ."] ";
285 sub get_load_info {
286 if ($^O eq "linux") {
287 # Read /proc/loadavg on Linux
288 open(LOADAV, '<', '/proc/loadavg') or return undef;
289 my $loadinfo = <LOADAV>;
290 close LOADAV;
291 return (split(/\s/, $loadinfo, 4))[0..2];
292 } else {
293 # Read the output of uptime everywhere else (works on Linux too)
294 open(LOADAV, '-|', 'uptime') or return undef;
295 my $loadinfo = <LOADAV>;
296 close LOADAV;
297 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
298 return ($1, $2, $3);
302 sub run_queue {
303 my $last_progress = time;
304 my $last_checkload = time - 5;
305 my $current_load = $load_trig;
306 my $overloaded = 0;
307 my $load_info = '';
308 $jobs_executed = 0;
309 $jobs_skipped = 0;
310 @jobs_killed = ();
311 if ($progress) {
312 ferror("--- Processing %d queued jobs", scalar(@queue));
314 $SIG{'INT'} = \&handle_softexit;
315 $SIG{'TERM'} = \&handle_exit;
316 while (@queue || @running) {
317 reap_hanging_jobs();
318 my $proceed_immediately = reap_finished_jobs();
319 # Check current system load
320 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
321 my $current_load = $loadinfo[0];
322 if ($current_load > $load_trig && !$overloaded) {
323 $overloaded = 1;
324 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
325 } elsif ($current_load < $load_untrig && $overloaded) {
326 $overloaded = 0;
327 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
329 if ($overloaded) {
330 $load_info = ', paused (load '. $current_load .')';
331 } else {
332 $load_info = ', load '. $current_load;
334 $last_checkload = time;
336 # Status output
337 if ($progress && (time - $last_progress) >= 60) {
338 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
339 if (@running) {
340 my @run_status;
341 for (@running) {
342 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
344 error("STATUS: currently running: ". join(', ', @run_status));
346 $last_progress = time;
348 # Back off if we're too busy
349 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
350 sleep 1 unless $proceed_immediately;
351 next;
353 # Run next
354 run_job(shift(@queue)) if @queue;
356 if ($progress) {
357 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
361 sub run_perpetually {
362 if (-e $lockfile) {
363 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
365 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
366 print LOCK $$;
367 close LOCK;
368 $locked = 1;
370 while ($perpetual) {
371 queue_all();
372 run_queue();
373 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
375 unlink $lockfile;
378 ######### Helpers {{{1
380 sub error($) {
381 print STDERR ts().shift()."\n";
383 sub ferror(@) {
384 error(sprintf($_[0], @_[1..$#_]));
386 sub fatal($) {
387 error(shift);
388 exit 1;
391 ######### Main {{{1
393 # Parse options
394 Getopt::Long::Configure('bundling');
395 my $parse_res = GetOptions(
396 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
397 'quiet|q' => \$quiet,
398 'progress|P' => \$progress,
399 'kill-after|k=i' => \$kill_after,
400 'max-parallel|p=i' => \$max_par,
401 'max-intensive-parallel|i=i' => \$max_par_intensive,
402 'load-triggers=s' => \$load_triggers,
403 'restart-delay|d=i' => \$restart_delay,
404 'lockfile|l=s' => \$lockfile,
405 'all-once|a' => \$all_once,
406 'one|o=s' => \$one,
407 ) || pod2usage(2);
408 fatal("Error: can only use one out of --all-once and --one")
409 if ($all_once && $one);
411 unless ($quiet) {
412 $ENV{'show_progress'} = '1';
413 $progress = 1;
416 $load_triggers = '0,0' unless defined((get_load_info())[0]);
417 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
419 if ($one) {
420 queue_one($one);
421 run_queue();
422 exit;
425 if ($all_once) {
426 queue_all();
427 run_queue();
428 exit;
431 run_perpetually();
433 ########## Documentation {{{1
435 __END__
437 =head1 NAME
439 jobd - Perform Girocco maintenance jobs
441 =head1 SYNOPSIS
443 jobd [options]
445 Options:
446 -h | --help detailed instructions
447 -q | --quiet run quietly
448 -P | --progress show occasional status updates
449 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
450 -p NUM | --max-parallel NUM how many jobs to run at the same time
451 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
452 at the same time
453 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
454 TRIG and resume at load below UNTRIG
455 -d NUM | --restart-delay SECONDS wait for this many seconds between
456 queue runs
457 -l FILE | --lockfile FILE create a lockfile in the given
458 location
459 -a | --all-once process the list only once
460 -o PRJNAME | --one PRJNAME process only one project
462 =head1 OPTIONS
464 =over 8
466 =item B<--help>
468 Print the full description of jobd's options.
470 =item B<--quiet>
472 Suppress non-error messages, e.g. for use when running this task as a cronjob.
474 =item B<--progress>
476 Show information about the current status of the job queue occasionally. This
477 is automatically enabled if --quiet is not given.
479 =item B<--kill-after SECONDS>
481 Kill supervised jobs after a certain time to avoid hanging the daemon.
483 =item B<--max-parallel NUM>
485 Run no more than that many jobs at the same time.
487 =item B<--max-intensive-parallel NUM>
489 Run no more than that many resource-hungry jobs at the same time. Right now,
490 this refers to repacking jobs.
492 =item B<--load-triggers TRIG,UNTRIG>
494 If the first system load average (1 minute average) exceeds TRIG, don't queue
495 any more jobs until it goes below UNTRIG. This is currently only supported on
496 Linux and any other platforms that provide an uptime command with load average
497 output.
499 If both values are zero, load checks are disabled. Note that this is not the
500 default.
502 =item B<--restart-delay NUM>
504 After processing the queue, wait this many seconds until the queue is
505 restarted.
507 =item B<--lockfile FILE>
509 For perpetual operation, create a lockfile in that place and clean it up after
510 finishing/aborting.
512 =item B<--all-once>
514 Instead of perpetuously processing all projects over and over again, process
515 them just once and then exit.
517 =item B<--one PRJNAME>
519 Process only the given project (given as just the project name without C<.git>
520 suffix) and then exit.
522 =back
524 =head1 DESCRIPTION
526 jobd is Girocco's repositories maintenance servant; it periodically checks all
527 the repositories and updates mirrored repositories and repacks push-mode
528 repositories when needed.
530 =cut