update-all-config: new utility to update projects' config
[girocco.git] / jobd / jobd.pl
blob68fbc1ccb1e4a49e91af1355bd1e978972892f62
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
21 # Options
22 my $quiet;
23 my $progress;
24 my $kill_after = 900;
25 my $max_par = 20;
26 my $max_par_intensive = 1;
27 my $cpus = online_cpus;
28 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
29 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
30 my $restart_delay = 300;
31 my $all_once;
32 my $one;
34 my ($load_trig, $load_untrig);
36 ######### Jobs {{{1
38 sub update_project {
39 my $job = shift;
40 my $p = $job->{'project'};
41 check_project_exists($job) || return;
42 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
43 -e get_project_path($p).".bypass_fetch") {
44 job_skip($job);
45 return setup_gc($job);
47 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
48 job_skip($job, "initial mirroring not complete yet");
49 return;
51 if (-e get_project_path($p).".clone_failed") {
52 job_skip($job, "initial mirroring failed");
53 # Still need to gc non top-level clones even if they've failed
54 # otherwise the objects copied into them from the parent will
55 # just accumulate without bound
56 setup_gc($job) if $p =~ m,/,;
57 return;
59 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
60 job_skip($job, "not needed right now, last run at $ts");
61 setup_gc($job);
62 return;
64 if (is_svn_clone($p)) {
65 # git svn can be very, very slow at times
66 $job->{'timeout_factor'} = 3;
68 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
71 sub gc_project {
72 my $job = shift;
73 my $p = $job->{'project'};
74 check_project_exists($job) || return;
75 if (-e get_project_path($p).".nogc" || -e get_project_path($p).".bypass") {
76 job_skip($job);
77 return;
79 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
80 job_skip($job, "not needed right now, last run at $ts");
81 return;
83 # allow garbage collection to run for longer than an update
84 $job->{'timeout_factor'} = 2;
85 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
88 sub setup_gc {
89 my $job = shift;
90 queue_job(
91 project => $job->{'project'},
92 type => 'gc',
93 command => \&gc_project,
94 intensive => 1,
98 sub check_project_exists {
99 my $job = shift;
100 my $p = $job->{'project'};
101 if (!-d get_project_path($p)) {
102 job_skip($job, "non-existent project");
103 return 0;
108 sub get_project_path {
109 "$Girocco::Config::reporoot/".shift().".git/";
112 sub is_operation_uptodate {
113 my ($project, $which, $threshold) = @_;
114 my $path = get_project_path($project);
115 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
116 defined($timestamp) or $timestamp = '';
117 chomp $timestamp;
118 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
119 (time - $unix_ts) <= $threshold ? $timestamp : undef;
122 sub is_svn_clone {
123 my ($project) = @_;
124 my $path = get_project_path($project);
125 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
126 defined($baseurl) or $baseurl = '';
127 chomp $baseurl;
128 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
129 defined($svnurl) or $svnurl = '';
130 chomp $svnurl;
131 return $baseurl =~ /^svn[:+]/i && $svnurl;
134 sub queue_one {
135 my $project = shift;
136 queue_job(
137 project => $project,
138 type => 'update',
139 command => \&update_project,
140 on_success => \&setup_gc,
141 on_error => \&setup_gc,
145 sub queue_all {
146 queue_one($_) for (Girocco::Project->get_full_list());
149 ######### Daemon operation {{{1
151 my @queue;
152 my @running;
153 my $perpetual = 1;
154 my $locked = 0;
155 my $jobs_executed;
156 my $jobs_skipped;
157 my @jobs_killed;
159 # Kills and reaps the specified pid. Returns exit status ($?) on success
160 # otherwise undef if process could not be killed or reaped
161 # First sends SIGTERM and if process does not exit within 15 seconds then SIGKILL
162 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
163 sub kill_gently {
164 my $targ = shift;
165 my $use_pg = shift || 0;
166 # Note that the docs for Perl's kill state that a negative signal
167 # number should be used to kill process groups and that while a
168 # a negative process id (and positive signal number) may also do that
169 # on some platforms, that's not portable.
170 my $pg = $use_pg ? -1 : 1;
171 my $harsh = time() + 15; # SIGKILL after this delay
172 my $count = kill(15*$pg, $targ); # SIGTERM is 15
173 my $reaped = waitpid($targ, WNOHANG);
174 return undef if $reaped < 0;
175 return $? if $reaped == $targ;
176 while ($count && time() < $harsh) {
177 select(undef, undef, undef, 0.2);
178 $reaped = waitpid($targ, WNOHANG);
179 return undef if $reaped < 0;
180 return $? if $reaped == $targ;
182 $harsh = time() + 2;
183 $count = kill(9*$pg, $targ); # SIGKILL is 9
184 $reaped = waitpid($targ, WNOHANG);
185 return undef if $reaped < 0;
186 return $? if $reaped == $targ;
187 # We should not need to wait to reap a SIGKILL, however, just in case
188 # the system doesn't make a SIGKILL'd process immediately reapable
189 # (perhaps under extremely heavy load) we accomodate a brief delay
190 while ($count && time() < $harsh) {
191 select(undef, undef, undef, 0.2);
192 $reaped = waitpid($targ, WNOHANG);
193 return undef if $reaped < 0;
194 return $? if $reaped == $targ;
196 return undef;
199 sub handle_softexit {
200 error("Waiting for outstanding jobs to finish... ".
201 "^C again to exit immediately");
202 @queue = ();
203 $perpetual = 0;
204 $SIG{'INT'} = \&handle_exit;
207 sub handle_exit {
208 error("Killing outstanding jobs, please be patient...");
209 $SIG{'TERM'} = 'IGNORE';
210 for (@running) {
211 kill_gently($_->{'pid'}, 1);
213 unlink $lockfile if ($locked);
214 exit(0);
217 sub queue_job {
218 my %opts = @_;
219 $opts{'queued_at'} = time;
220 $opts{'dont_run'} = 0;
221 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
222 push @queue, \%opts;
225 sub run_job {
226 my $job = shift;
228 push @running, $job;
229 $job->{'command'}->($job);
230 if ($job->{'dont_run'}) {
231 pop @running;
232 $jobs_skipped++;
233 return;
237 sub _job_name {
238 my $job = shift;
239 "[".$job->{'type'}."::".$job->{'project'}."]";
242 # Only one of those per job!
243 sub exec_job_command {
244 my ($job, $command, $err_only) = @_;
246 my $pid;
247 if (!defined($pid = fork)) {
248 error(_job_name($job) ." Can't fork job: $!");
249 $job->{'finished'} = 1;
250 return;
252 if (!$pid) {
253 open STDIN, '<', '/dev/null' || do {
254 error(_job_name($job) ."Can't read from /dev/null: $!");
255 $job->{'finished'} = 1;
256 return;
258 if ($err_only) {
259 open STDOUT, '>', '/dev/null' || do {
260 error(_job_name($job) ." Can't write to /dev/null: $!");
261 $job->{'finished'} = 1;
262 return;
265 # New process group so we can keep track of all of its children
266 if (!defined(POSIX::setpgid(0, 0))) {
267 error(_job_name($job) ." Can't create process group: $!");
268 $job->{'finished'} = 1;
269 return;
271 # "Prevent" races
272 select(undef, undef, undef, 0.1);
273 exec @$command;
274 # Stop perl from complaining
275 exit $?;
277 $job->{'pid'} = $pid;
278 $job->{'finished'} = 0;
279 $job->{'started_at'} = time;
282 sub job_skip {
283 my ($job, $msg) = @_;
284 $job->{'dont_run'} = 1;
285 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
288 sub reap_hanging_jobs {
289 for (@running) {
290 my $factor = $_->{'timeout_factor'} || 1;
291 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
292 $_->{'finished'} = 1;
293 my $exitcode = kill_gently($_->{'pid'}, 1);
294 delete $_->{'pid'};
295 $_->{'killed'} = 1;
296 error(_job_name($_) ." KILLED due to timeout" .
297 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
298 push @jobs_killed, _job_name($_);
303 sub reap_one_job {
304 my $job = shift;
305 if (!$job->{'finished'}) {
306 $job->{'on_success'}->($job) if defined($job->{'on_success'});
307 $job->{'finished'} = 1;
308 $jobs_executed++;
309 } else {
310 $job->{'on_error'}->($job) if defined($job->{'on_error'});
314 sub reap_finished_jobs {
315 my $pid;
316 my $finished_any = 0;
317 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
318 delete $child->{'killed'};
319 reap_one_job($child);
320 $finished_any = 1;
322 while (1) {
323 $pid = waitpid(-1, WNOHANG);
324 last if $pid <= 0;
325 $finished_any = 1;
327 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
328 if ($?) {
329 # XXX- we currently don't care
331 if (@child) {
332 delete $child[0]->{'pid'};
333 reap_one_job($child[0]);
336 @running = grep { $_->{'finished'} == 0 } @running;
337 $finished_any;
340 sub have_intensive_jobs {
341 grep { $_->{'intensive'} == 1 } @running;
344 sub ts {
345 "[". scalar(localtime) ."] ";
348 sub get_load_info {
349 if ($^O eq "linux") {
350 # Read /proc/loadavg on Linux
351 open(LOADAV, '<', '/proc/loadavg') or return undef;
352 my $loadinfo = <LOADAV>;
353 close LOADAV;
354 return (split(/\s/, $loadinfo, 4))[0..2];
355 } else {
356 # Read the output of uptime everywhere else (works on Linux too)
357 open(LOADAV, '-|', 'uptime') or return undef;
358 my $loadinfo = <LOADAV>;
359 close LOADAV;
360 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
361 return ($1, $2, $3);
365 sub run_queue {
366 my $last_progress = time;
367 my $last_checkload = time - 5;
368 my $current_load = $load_trig;
369 my $overloaded = 0;
370 my $load_info = '';
371 $jobs_executed = 0;
372 $jobs_skipped = 0;
373 @jobs_killed = ();
374 if ($progress) {
375 ferror("--- Processing %d queued jobs", scalar(@queue));
377 $SIG{'INT'} = \&handle_softexit;
378 $SIG{'TERM'} = \&handle_exit;
379 while (@queue || @running) {
380 reap_hanging_jobs();
381 my $proceed_immediately = reap_finished_jobs();
382 # Check current system load
383 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
384 my $current_load = $loadinfo[0];
385 if ($current_load > $load_trig && !$overloaded) {
386 $overloaded = 1;
387 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
388 } elsif ($current_load < $load_untrig && $overloaded) {
389 $overloaded = 0;
390 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
392 if ($overloaded) {
393 $load_info = ', paused (load '. $current_load .')';
394 } else {
395 $load_info = ', load '. $current_load;
397 $last_checkload = time;
399 # Status output
400 if ($progress && (time - $last_progress) >= 60) {
401 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
402 if (@running) {
403 my @run_status;
404 for (@running) {
405 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
407 error("STATUS: currently running: ". join(', ', @run_status));
409 $last_progress = time;
411 # Back off if we're too busy
412 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
413 sleep 1 unless $proceed_immediately;
414 next;
416 # Run next
417 run_job(shift(@queue)) if @queue;
419 if ($progress) {
420 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
424 sub run_perpetually {
425 if (-e $lockfile) {
426 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
428 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
429 print LOCK $$;
430 close LOCK;
431 $locked = 1;
433 while ($perpetual) {
434 queue_all();
435 run_queue();
436 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
438 unlink $lockfile;
441 ######### Helpers {{{1
443 sub error($) {
444 print STDERR ts().shift()."\n";
446 sub ferror(@) {
447 error(sprintf($_[0], @_[1..$#_]));
449 sub fatal($) {
450 error(shift);
451 exit 1;
454 ######### Main {{{1
456 # Parse options
457 Getopt::Long::Configure('bundling');
458 my $parse_res = GetOptions(
459 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
460 'quiet|q' => \$quiet,
461 'progress|P' => \$progress,
462 'kill-after|k=i' => \$kill_after,
463 'max-parallel|p=i' => \$max_par,
464 'max-intensive-parallel|i=i' => \$max_par_intensive,
465 'load-triggers=s' => \$load_triggers,
466 'restart-delay|d=i' => \$restart_delay,
467 'lockfile|l=s' => \$lockfile,
468 'all-once|a' => \$all_once,
469 'one|o=s' => \$one,
470 ) || pod2usage(2);
471 fatal("Error: can only use one out of --all-once and --one")
472 if ($all_once && $one);
474 unless ($quiet) {
475 $ENV{'show_progress'} = '1';
476 $progress = 1;
479 $load_triggers = '0,0' unless defined((get_load_info())[0]);
480 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
482 if ($one) {
483 queue_one($one);
484 run_queue();
485 exit;
488 if ($all_once) {
489 queue_all();
490 run_queue();
491 exit;
494 run_perpetually();
496 ########## Documentation {{{1
498 __END__
500 =head1 NAME
502 jobd - Perform Girocco maintenance jobs
504 =head1 SYNOPSIS
506 jobd [options]
508 Options:
509 -h | --help detailed instructions
510 -q | --quiet run quietly
511 -P | --progress show occasional status updates
512 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
513 -p NUM | --max-parallel NUM how many jobs to run at the same time
514 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
515 at the same time
516 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
517 TRIG and resume at load below UNTRIG
518 -d NUM | --restart-delay SECONDS wait for this many seconds between
519 queue runs
520 -l FILE | --lockfile FILE create a lockfile in the given
521 location
522 -a | --all-once process the list only once
523 -o PRJNAME | --one PRJNAME process only one project
525 =head1 OPTIONS
527 =over 8
529 =item B<--help>
531 Print the full description of jobd's options.
533 =item B<--quiet>
535 Suppress non-error messages, e.g. for use when running this task as a cronjob.
537 =item B<--progress>
539 Show information about the current status of the job queue occasionally. This
540 is automatically enabled if --quiet is not given.
542 =item B<--kill-after SECONDS>
544 Kill supervised jobs after a certain time to avoid hanging the daemon.
546 =item B<--max-parallel NUM>
548 Run no more than that many jobs at the same time.
550 =item B<--max-intensive-parallel NUM>
552 Run no more than that many resource-hungry jobs at the same time. Right now,
553 this refers to repacking jobs.
555 =item B<--load-triggers TRIG,UNTRIG>
557 If the first system load average (1 minute average) exceeds TRIG, don't queue
558 any more jobs until it goes below UNTRIG. This is currently only supported on
559 Linux and any other platforms that provide an uptime command with load average
560 output.
562 If both values are zero, load checks are disabled. The default is the number
563 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
564 be determined, the default is 6,3.
566 =item B<--restart-delay NUM>
568 After processing the queue, wait this many seconds until the queue is
569 restarted. The default is 300 seconds.
571 =item B<--lockfile FILE>
573 For perpetual operation, create a lockfile in that place and clean it up after
574 finishing/aborting.
576 =item B<--all-once>
578 Instead of perpetuously processing all projects over and over again, process
579 them just once and then exit.
581 =item B<--one PRJNAME>
583 Process only the given project (given as just the project name without C<.git>
584 suffix) and then exit.
586 =back
588 =head1 DESCRIPTION
590 jobd is Girocco's repositories maintenance servant; it periodically checks all
591 the repositories and updates mirrored repositories and repacks push-mode
592 repositories when needed.
594 =cut