mob.html: personal mob updates do NOT generate CIA/JSON notifications
[girocco.git] / jobd / jobd.pl
blob0cbebc5259000f34c588166e395de378ebb83bbd
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
22 # Options
23 my $quiet;
24 my $progress;
25 my $cpus = online_cpus;
26 my $kill_after = 900;
27 my $max_par = $cpus ? $cpus * 2 : 8;
28 my $max_par_intensive = 1;
29 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
30 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
31 my $restart_delay = 300;
32 my $all_once;
33 my $one;
35 my ($load_trig, $load_untrig);
37 ######### Jobs {{{1
39 sub update_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
44 -e get_project_path($p).".bypass_fetch") {
45 job_skip($job);
46 return setup_gc($job);
48 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring not complete yet");
50 return;
52 if (-e get_project_path($p).".clone_failed") {
53 job_skip($job, "initial mirroring failed");
54 # Still need to gc non top-level clones even if they've failed
55 # otherwise the objects copied into them from the parent will
56 # just accumulate without bound
57 setup_gc($job) if $p =~ m,/,;
58 return;
60 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
61 job_skip($job, "not needed right now, last run at $ts");
62 setup_gc($job);
63 return;
65 if (is_svn_clone($p)) {
66 # git svn can be very, very slow at times
67 $job->{'timeout_factor'} = 3;
69 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
72 sub gc_project {
73 my $job = shift;
74 my $p = $job->{'project'};
75 check_project_exists($job) || return;
76 my $projpath = get_project_path($p);
77 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
78 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc" && ! -e "$projpath.needsgc")) {
79 job_skip($job);
80 return;
82 my $ts;
83 if (! -e "$projpath.needsgc" &&
84 ($ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval)))) {
85 job_skip($job, "not needed right now, last run at $ts");
86 return;
88 # allow garbage collection to run for longer than an update
89 $job->{'timeout_factor'} = 2;
90 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
93 sub setup_gc {
94 my $job = shift;
95 queue_job(
96 project => $job->{'project'},
97 type => 'gc',
98 command => \&gc_project,
99 intensive => 1,
103 sub check_project_exists {
104 my $job = shift;
105 my $p = $job->{'project'};
106 if (!-d get_project_path($p)) {
107 job_skip($job, "non-existent project");
108 return 0;
113 sub get_project_path {
114 "$Girocco::Config::reporoot/".shift().".git/";
117 sub is_operation_uptodate {
118 my ($project, $which, $threshold) = @_;
119 my $path = get_project_path($project);
120 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
121 defined($timestamp) or $timestamp = '';
122 chomp $timestamp;
123 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
124 (time - $unix_ts) <= $threshold ? $timestamp : undef;
127 sub is_svn_clone {
128 my ($project) = @_;
129 my $path = get_project_path($project);
130 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
131 defined($baseurl) or $baseurl = '';
132 chomp $baseurl;
133 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
134 defined($svnurl) or $svnurl = '';
135 chomp $svnurl;
136 return $baseurl =~ /^svn[:+]/i && $svnurl;
139 sub queue_one {
140 my $project = shift;
141 queue_job(
142 project => $project,
143 type => 'update',
144 command => \&update_project,
145 on_success => \&setup_gc,
146 on_error => \&setup_gc,
150 sub queue_all {
151 queue_one($_) for (Girocco::Project->get_full_list());
154 ######### Daemon operation {{{1
156 my @queue;
157 my @running;
158 my $perpetual = 1;
159 my $locked = 0;
160 my $jobs_executed;
161 my $jobs_skipped;
162 my @jobs_killed;
164 # Kills and reaps the specified pid. Returns exit status ($?) on success
165 # otherwise undef if process could not be killed or reaped
166 # First sends SIGINT and if process does not exit within 15 seconds then SIGKILL
167 # We used to send SIGTERM instead of SIGINT, but by using SIGINT we can take
168 # advantage of "tee -i" in our update scripts and really anything we're killing
169 # should respond the same to either SIGINT or SIGTERM and exit gracefully.
170 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
171 sub kill_gently {
172 my $targ = shift;
173 my $use_pg = shift || 0;
174 # Note that the docs for Perl's kill state that a negative signal
175 # number should be used to kill process groups and that while a
176 # a negative process id (and positive signal number) may also do that
177 # on some platforms, that's not portable.
178 my $pg = $use_pg ? -1 : 1;
179 my $harsh = time() + 15; # SIGKILL after this delay
180 my $count = kill(2*$pg, $targ); # SIGINT is 2
181 my $reaped = waitpid($targ, WNOHANG);
182 return undef if $reaped < 0;
183 return $? if $reaped == $targ;
184 while ($count && time() < $harsh) {
185 select(undef, undef, undef, 0.2);
186 $reaped = waitpid($targ, WNOHANG);
187 return undef if $reaped < 0;
188 return $? if $reaped == $targ;
190 $harsh = time() + 2;
191 $count = kill(9*$pg, $targ); # SIGKILL is 9
192 $reaped = waitpid($targ, WNOHANG);
193 return undef if $reaped < 0;
194 return $? if $reaped == $targ;
195 # We should not need to wait to reap a SIGKILL, however, just in case
196 # the system doesn't make a SIGKILL'd process immediately reapable
197 # (perhaps under extremely heavy load) we accomodate a brief delay
198 while ($count && time() < $harsh) {
199 select(undef, undef, undef, 0.2);
200 $reaped = waitpid($targ, WNOHANG);
201 return undef if $reaped < 0;
202 return $? if $reaped == $targ;
204 return undef;
207 sub handle_softexit {
208 error("Waiting for outstanding jobs to finish... ".
209 "^C again to exit immediately");
210 @queue = ();
211 $perpetual = 0;
212 $SIG{'INT'} = \&handle_exit;
215 sub handle_exit {
216 error("Killing outstanding jobs, please be patient...");
217 $SIG{'TERM'} = 'IGNORE';
218 for (@running) {
219 kill_gently($_->{'pid'}, 1);
221 unlink $lockfile if ($locked);
222 exit(0);
225 sub queue_job {
226 my %opts = @_;
227 $opts{'queued_at'} = time;
228 $opts{'dont_run'} = 0;
229 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
230 push @queue, \%opts;
233 sub run_job {
234 my $job = shift;
236 push @running, $job;
237 $job->{'command'}->($job);
238 if ($job->{'dont_run'}) {
239 pop @running;
240 $jobs_skipped++;
241 return;
245 sub _job_name {
246 my $job = shift;
247 "[".$job->{'type'}."::".$job->{'project'}."]";
250 # Only one of those per job!
251 sub exec_job_command {
252 my ($job, $command, $err_only) = @_;
254 my $pid;
255 $job->{'finished'} = 0;
256 delete $job->{'pid'};
257 if (!defined($pid = fork)) {
258 error(_job_name($job) ." Can't fork job: $!");
259 $job->{'finished'} = 1;
260 return;
262 if (!$pid) {
263 # "Prevent" races
264 select(undef, undef, undef, 0.1);
266 open STDIN, '<', '/dev/null' || do {
267 error(_job_name($job) ."Can't read from /dev/null: $!");
268 exit 71; # EX_OSERR
270 if ($err_only) {
271 open STDOUT, '>', '/dev/null' || do {
272 error(_job_name($job) ." Can't write to /dev/null: $!");
273 exit 71; # EX_OSERR
276 # New process group so we can keep track of all of its children
277 if (!defined(POSIX::setpgid(0, 0))) {
278 error(_job_name($job) ." Can't create process group: $!");
279 exit 71; # EX_OSERR
282 exec @$command;
283 # Stop perl from complaining
284 exit 71; # EX_OSERR
286 $job->{'pid'} = $pid;
287 $job->{'started_at'} = time;
290 sub job_skip {
291 my ($job, $msg) = @_;
292 $job->{'dont_run'} = 1;
293 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
296 sub reap_hanging_jobs {
297 for (@running) {
298 my $factor = $_->{'timeout_factor'} || 1;
299 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
300 $_->{'finished'} = 1;
301 my $exitcode = kill_gently($_->{'pid'}, 1);
302 delete $_->{'pid'};
303 $_->{'killed'} = 1;
304 error(_job_name($_) ." KILLED due to timeout" .
305 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
306 push @jobs_killed, _job_name($_);
311 sub reap_one_job {
312 my $job = shift;
313 if (!$job->{'finished'}) {
314 $job->{'on_success'}->($job) if defined($job->{'on_success'});
315 $job->{'finished'} = 1;
316 $jobs_executed++;
317 } else {
318 $job->{'on_error'}->($job) if defined($job->{'on_error'});
322 sub reap_finished_jobs {
323 my $pid;
324 my $finished_any = 0;
325 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
326 delete $child->{'killed'};
327 reap_one_job($child);
328 $finished_any = 1;
330 while (1) {
331 $pid = waitpid(-1, WNOHANG);
332 last if $pid <= 0;
333 $finished_any = 1;
335 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
336 if ($?) {
337 # any non-zero exit status should trigger on_error
338 $child[0]->{'finished'} = 1 if @child;
340 if (@child) {
341 delete $child[0]->{'pid'};
342 reap_one_job($child[0]);
345 @running = grep { $_->{'finished'} == 0 } @running;
346 $finished_any;
349 sub have_intensive_jobs {
350 grep { $_->{'intensive'} == 1 } @running;
353 sub ts {
354 "[". scalar(localtime) ."] ";
357 sub get_load_info {
358 if ($^O eq "linux") {
359 # Read /proc/loadavg on Linux
360 open(LOADAV, '<', '/proc/loadavg') or return undef;
361 my $loadinfo = <LOADAV>;
362 close LOADAV;
363 return (split(/\s/, $loadinfo, 4))[0..2];
364 } else {
365 # Read the output of uptime everywhere else (works on Linux too)
366 open(LOADAV, '-|', 'uptime') or return undef;
367 my $loadinfo = <LOADAV>;
368 close LOADAV;
369 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
370 return ($1, $2, $3);
374 sub run_queue {
375 my $last_progress = time;
376 my $last_checkload = time - 5;
377 my $current_load = $load_trig;
378 my $overloaded = 0;
379 my $load_info = '';
380 $jobs_executed = 0;
381 $jobs_skipped = 0;
382 @jobs_killed = ();
383 if ($progress) {
384 my $s = @queue == 1 ? '' : 's';
385 ferror("--- Processing %d queued job$s", scalar(@queue));
387 $SIG{'INT'} = \&handle_softexit;
388 $SIG{'TERM'} = \&handle_exit;
389 while (@queue || @running) {
390 reap_hanging_jobs();
391 my $proceed_immediately = reap_finished_jobs();
392 # Check current system load
393 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
394 my $current_load = $loadinfo[0];
395 if ($current_load > $load_trig && !$overloaded) {
396 $overloaded = 1;
397 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
398 } elsif ($current_load < $load_untrig && $overloaded) {
399 $overloaded = 0;
400 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
402 if ($overloaded) {
403 $load_info = ', paused (load '. $current_load .')';
404 } else {
405 $load_info = ', load '. $current_load;
407 $last_checkload = time;
409 # Status output
410 if ($progress && (time - $last_progress) >= 60) {
411 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
412 if (@running) {
413 my @run_status;
414 for (@running) {
415 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
417 error("STATUS: currently running: ". join(', ', @run_status));
419 $last_progress = time;
421 # Back off if we're too busy
422 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
423 sleep 1 unless $proceed_immediately;
424 next;
426 # Run next
427 run_job(shift(@queue)) if @queue;
429 if ($progress) {
430 my $s = $jobs_executed == 1 ? '' : 's';
431 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
435 sub run_perpetually {
436 if (-e $lockfile) {
437 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
439 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
440 print LOCK $$;
441 close LOCK;
442 $locked = 1;
444 while ($perpetual) {
445 # touch ctime of lockfile to prevent it from being removed by /tmp cleaning
446 chmod 0444, $lockfile;
447 chmod 0644, $lockfile;
448 queue_all();
449 run_queue();
450 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
452 unlink $lockfile;
455 ######### Helpers {{{1
457 sub error($) {
458 print STDERR ts().shift()."\n";
460 sub ferror(@) {
461 error(sprintf($_[0], @_[1..$#_]));
463 sub fatal($) {
464 error(shift);
465 exit 1;
468 ######### Main {{{1
470 close(DATA) if fileno(DATA);
471 # Parse options
472 Getopt::Long::Configure('bundling');
473 my $parse_res = GetOptions(
474 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
475 'quiet|q' => \$quiet,
476 'progress|P' => \$progress,
477 'kill-after|k=i' => \$kill_after,
478 'max-parallel|p=i' => \$max_par,
479 'max-intensive-parallel|i=i' => \$max_par_intensive,
480 'load-triggers=s' => \$load_triggers,
481 'restart-delay|d=i' => \$restart_delay,
482 'lockfile|l=s' => \$lockfile,
483 'all-once|a' => \$all_once,
484 'one|o=s' => \$one,
485 ) || pod2usage(2);
486 fatal("Error: can only use one out of --all-once and --one")
487 if ($all_once && $one);
489 unless ($quiet) {
490 $ENV{'show_progress'} = '1';
491 $progress = 1;
494 $load_triggers = '0,0' unless defined((get_load_info())[0]);
495 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
497 if ($one) {
498 queue_one($one);
499 run_queue();
500 exit;
503 if ($all_once) {
504 queue_all();
505 run_queue();
506 exit;
509 run_perpetually();
511 ########## Documentation {{{1
513 __END__
515 =head1 NAME
517 jobd.pl - Perform Girocco maintenance jobs
519 =head1 SYNOPSIS
521 jobd.pl [options]
523 Options:
524 -h | --help detailed instructions
525 -q | --quiet run quietly
526 -P | --progress show occasional status updates
527 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
528 -p NUM | --max-parallel NUM how many jobs to run at the same time
529 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
530 at the same time
531 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
532 TRIG and resume at load below UNTRIG
533 -d NUM | --restart-delay SECONDS wait for this many seconds between
534 queue runs
535 -l FILE | --lockfile FILE create a lockfile in the given
536 location
537 -a | --all-once process the list only once
538 -o PRJNAME | --one PRJNAME process only one project
540 =head1 OPTIONS
542 =over 8
544 =item B<--help>
546 Print the full description of jobd.pl's options.
548 =item B<--quiet>
550 Suppress non-error messages, e.g. for use when running this task as a cronjob.
552 =item B<--progress>
554 Show information about the current status of the job queue occasionally. This
555 is automatically enabled if --quiet is not given.
557 =item B<--kill-after SECONDS>
559 Kill supervised jobs after a certain time to avoid hanging the daemon.
561 =item B<--max-parallel NUM>
563 Run no more than that many jobs at the same time. The default is the number
564 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
566 =item B<--max-intensive-parallel NUM>
568 Run no more than that many resource-hungry jobs at the same time. Right now,
569 this refers to repacking jobs. The default is 1.
571 =item B<--load-triggers TRIG,UNTRIG>
573 If the first system load average (1 minute average) exceeds TRIG, don't queue
574 any more jobs until it goes below UNTRIG. This is currently only supported on
575 Linux and any other platforms that provide an uptime command with load average
576 output.
578 If both values are zero, load checks are disabled. The default is the number
579 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
580 be determined, the default is 6,3.
582 =item B<--restart-delay NUM>
584 After processing the queue, wait this many seconds until the queue is
585 restarted. The default is 300 seconds.
587 =item B<--lockfile FILE>
589 For perpetual operation, specify the full path to a lock file to create and
590 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
591 where $suffix is a 6-character string uniquely determined by the name and
592 nicknme of this Girocco instance. The pid of the running jobd instance will
593 be written to the lock file.
595 =item B<--all-once>
597 Instead of perpetually processing all projects over and over again, process
598 them just once and then exit.
600 =item B<--one PRJNAME>
602 Process only the given project (given as just the project name without C<.git>
603 suffix) and then exit.
605 =back
607 =head1 DESCRIPTION
609 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
610 all the repositories and updates mirrored repositories and repacks push-mode
611 repositories when needed.
613 =cut