Util.pm: improve multiple email address checking
[girocco/readme.git] / jobd / jobd.pl
blob1e46ea8578886ad0a34d7661643c42a126af87c4
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
20 BEGIN {noFatalsToBrowser}
22 # Options
23 my $quiet;
24 my $progress;
25 my $cpus = online_cpus;
26 my $kill_after = 900;
27 my $max_par = $cpus ? $cpus * 2 : 8;
28 my $max_par_intensive = 1;
29 my $load_triggers = $cpus ? sprintf("%g,%g", $cpus * 1.5, $cpus * 0.75) : "6,3";
30 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
31 my $restart_delay = 300;
32 my $all_once;
33 my $one;
35 my ($load_trig, $load_untrig);
37 ######### Jobs {{{1
39 sub update_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
44 -e get_project_path($p).".bypass_fetch") {
45 job_skip($job);
46 return setup_gc($job);
48 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
49 job_skip($job, "initial mirroring not complete yet");
50 return;
52 if (-e get_project_path($p).".clone_failed") {
53 job_skip($job, "initial mirroring failed");
54 # Still need to gc non top-level clones even if they've failed
55 # otherwise the objects copied into them from the parent will
56 # just accumulate without bound
57 setup_gc($job) if $p =~ m,/,;
58 return;
60 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
61 job_skip($job, "not needed right now, last run at $ts");
62 setup_gc($job);
63 return;
65 if (is_svn_clone($p)) {
66 # git svn can be very, very slow at times
67 $job->{'timeout_factor'} = 3;
69 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
72 sub gc_project {
73 my $job = shift;
74 my $p = $job->{'project'};
75 check_project_exists($job) || return;
76 my $projpath = get_project_path($p);
77 if (-e "$projpath.nogc" || -e "$projpath.bypass" ||
78 (-e "$projpath.delaygc" && ! -e "$projpath.allowgc" && ! -e "$projpath.needsgc")) {
79 job_skip($job);
80 return;
82 my $ts;
83 if (! -e "$projpath.needsgc" &&
84 ($ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval)))) {
85 job_skip($job, "not needed right now, last run at $ts");
86 return;
88 # allow garbage collection to run for longer than an update
89 $job->{'timeout_factor'} = 2;
90 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
93 sub setup_gc {
94 my $job = shift;
95 queue_job(
96 project => $job->{'project'},
97 type => 'gc',
98 command => \&gc_project,
99 intensive => 1,
103 sub check_project_exists {
104 my $job = shift;
105 my $p = $job->{'project'};
106 if (!-d get_project_path($p)) {
107 job_skip($job, "non-existent project");
108 return 0;
113 sub get_project_path {
114 "$Girocco::Config::reporoot/".shift().".git/";
117 sub is_operation_uptodate {
118 my ($project, $which, $threshold) = @_;
119 my $path = get_project_path($project);
120 my $timestamp = get_git("--git-dir=$path", 'config', "gitweb.$which");
121 defined($timestamp) or $timestamp = '';
122 chomp $timestamp;
123 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
124 (time - $unix_ts) <= $threshold ? $timestamp : undef;
127 sub is_svn_clone {
128 my ($project) = @_;
129 my $path = get_project_path($project);
130 my $baseurl = get_git("--git-dir=$path", 'config', 'gitweb.baseurl');
131 defined($baseurl) or $baseurl = '';
132 chomp $baseurl;
133 my $svnurl = get_git("--git-dir=$path", 'config', 'svn-remote.svn.url');
134 defined($svnurl) or $svnurl = '';
135 chomp $svnurl;
136 return $baseurl =~ /^svn[:+]/i && $svnurl;
139 sub queue_one {
140 my $project = shift;
141 queue_job(
142 project => $project,
143 type => 'update',
144 command => \&update_project,
145 on_success => \&setup_gc,
146 on_error => \&setup_gc,
150 sub queue_all {
151 queue_one($_) for (Girocco::Project->get_full_list());
154 ######### Daemon operation {{{1
156 my @queue;
157 my @running;
158 my $perpetual = 1;
159 my $locked = 0;
160 my $jobs_executed;
161 my $jobs_skipped;
162 my @jobs_killed;
164 # Kills and reaps the specified pid. Returns exit status ($?) on success
165 # otherwise undef if process could not be killed or reaped
166 # First sends SIGINT and if process does not exit within 15 seconds then SIGKILL
167 # We used to send SIGTERM instead of SIGINT, but by using SIGINT we can take
168 # advantage of "tee -i" in our update scripts and really anything we're killing
169 # should respond the same to either SIGINT or SIGTERM and exit gracefully.
170 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
171 sub kill_gently {
172 my $targ = shift;
173 my $use_pg = shift || 0;
174 # Note that the docs for Perl's kill state that a negative signal
175 # number should be used to kill process groups and that while a
176 # a negative process id (and positive signal number) may also do that
177 # on some platforms, that's not portable.
178 my $pg = $use_pg ? -1 : 1;
179 my $harsh = time() + 15; # SIGKILL after this delay
180 my $count = kill(2*$pg, $targ); # SIGINT is 2
181 my $reaped = waitpid($targ, WNOHANG);
182 return undef if $reaped < 0;
183 return $? if $reaped == $targ;
184 while ($count && time() < $harsh) {
185 select(undef, undef, undef, 0.2);
186 $reaped = waitpid($targ, WNOHANG);
187 return undef if $reaped < 0;
188 return $? if $reaped == $targ;
190 $harsh = time() + 2;
191 $count = kill(9*$pg, $targ); # SIGKILL is 9
192 $reaped = waitpid($targ, WNOHANG);
193 return undef if $reaped < 0;
194 return $? if $reaped == $targ;
195 # We should not need to wait to reap a SIGKILL, however, just in case
196 # the system doesn't make a SIGKILL'd process immediately reapable
197 # (perhaps under extremely heavy load) we accomodate a brief delay
198 while ($count && time() < $harsh) {
199 select(undef, undef, undef, 0.2);
200 $reaped = waitpid($targ, WNOHANG);
201 return undef if $reaped < 0;
202 return $? if $reaped == $targ;
204 return undef;
207 sub handle_softexit {
208 error("Waiting for outstanding jobs to finish... ".
209 "^C again to exit immediately");
210 @queue = ();
211 $perpetual = 0;
212 $SIG{'INT'} = \&handle_exit;
215 sub handle_exit {
216 error("Killing outstanding jobs, please be patient...");
217 $SIG{'TERM'} = 'IGNORE';
218 for (@running) {
219 kill_gently($_->{'pid'}, 1);
221 unlink $lockfile if ($locked);
222 exit(0);
225 sub queue_job {
226 my %opts = @_;
227 $opts{'queued_at'} = time;
228 $opts{'dont_run'} = 0;
229 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
230 push @queue, \%opts;
233 sub run_job {
234 my $job = shift;
236 push @running, $job;
237 $job->{'command'}->($job);
238 if ($job->{'dont_run'}) {
239 pop @running;
240 $jobs_skipped++;
241 return;
245 sub _job_name {
246 my $job = shift;
247 "[".$job->{'type'}."::".$job->{'project'}."]";
250 # Only one of those per job!
251 sub exec_job_command {
252 my ($job, $command, $err_only) = @_;
254 my $pid;
255 $job->{'finished'} = 0;
256 delete $job->{'pid'};
257 if (!defined($pid = fork)) {
258 error(_job_name($job) ." Can't fork job: $!");
259 $job->{'finished'} = 1;
260 return;
262 if (!$pid) {
263 # "Prevent" races
264 select(undef, undef, undef, 0.1);
266 open STDIN, '<', '/dev/null' || do {
267 error(_job_name($job) ."Can't read from /dev/null: $!");
268 exit 71; # EX_OSERR
270 if ($err_only) {
271 open STDOUT, '>', '/dev/null' || do {
272 error(_job_name($job) ." Can't write to /dev/null: $!");
273 exit 71; # EX_OSERR
276 # New process group so we can keep track of all of its children
277 if (!defined(POSIX::setpgid(0, 0))) {
278 error(_job_name($job) ." Can't create process group: $!");
279 exit 71; # EX_OSERR
282 exec @$command;
283 # Stop perl from complaining
284 exit 71; # EX_OSERR
286 $job->{'pid'} = $pid;
287 $job->{'started_at'} = time;
290 sub job_skip {
291 my ($job, $msg) = @_;
292 $job->{'dont_run'} = 1;
293 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
296 sub reap_hanging_jobs {
297 for (@running) {
298 my $factor = $_->{'timeout_factor'} || 1;
299 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
300 $_->{'finished'} = 1;
301 my $exitcode = kill_gently($_->{'pid'}, 1);
302 delete $_->{'pid'};
303 $_->{'killed'} = 1;
304 error(_job_name($_) ." KILLED due to timeout" .
305 (($exitcode & 0x7f) == 9 ? " with SIGKILL": ""));
306 push @jobs_killed, _job_name($_);
311 sub reap_one_job {
312 my $job = shift;
313 if (!$job->{'finished'}) {
314 $job->{'on_success'}->($job) if defined($job->{'on_success'});
315 $job->{'finished'} = 1;
316 $jobs_executed++;
317 } else {
318 $job->{'on_error'}->($job) if defined($job->{'on_error'});
322 sub reap_finished_jobs {
323 my $pid;
324 my $finished_any = 0;
325 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
326 delete $child->{'killed'};
327 reap_one_job($child);
328 $finished_any = 1;
330 while (1) {
331 $pid = waitpid(-1, WNOHANG);
332 last if $pid <= 0;
333 $finished_any = 1;
335 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
336 if ($?) {
337 # any non-zero exit status should trigger on_error
338 $child[0]->{'finished'} = 1 if @child;
340 if (@child) {
341 delete $child[0]->{'pid'};
342 reap_one_job($child[0]);
345 @running = grep { $_->{'finished'} == 0 } @running;
346 $finished_any;
349 sub have_intensive_jobs {
350 grep { $_->{'intensive'} == 1 } @running;
353 sub ts {
354 "[". scalar(localtime) ."] ";
357 sub get_load_info {
358 if ($^O eq "linux") {
359 # Read /proc/loadavg on Linux
360 open(LOADAV, '<', '/proc/loadavg') or return undef;
361 my $loadinfo = <LOADAV>;
362 close LOADAV;
363 return (split(/\s/, $loadinfo, 4))[0..2];
364 } else {
365 # Read the output of uptime everywhere else (works on Linux too)
366 open(LOADAV, '-|', 'uptime') or return undef;
367 my $loadinfo = <LOADAV>;
368 close LOADAV;
369 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
370 return ($1, $2, $3);
374 sub run_queue {
375 my $last_progress = time;
376 my $last_checkload = time - 5;
377 my $current_load = $load_trig;
378 my $overloaded = 0;
379 my $load_info = '';
380 $jobs_executed = 0;
381 $jobs_skipped = 0;
382 @jobs_killed = ();
383 if ($progress) {
384 my $s = @queue == 1 ? '' : 's';
385 ferror("--- Processing %d queued job$s", scalar(@queue));
387 $SIG{'INT'} = \&handle_softexit;
388 $SIG{'TERM'} = \&handle_exit;
389 while (@queue || @running) {
390 reap_hanging_jobs();
391 my $proceed_immediately = reap_finished_jobs();
392 # Check current system load
393 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
394 my $current_load = $loadinfo[0];
395 if ($current_load > $load_trig && !$overloaded) {
396 $overloaded = 1;
397 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
398 } elsif ($current_load < $load_untrig && $overloaded) {
399 $overloaded = 0;
400 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
402 if ($overloaded) {
403 $load_info = ', paused (load '. $current_load .')';
404 } else {
405 $load_info = ', load '. $current_load;
407 $last_checkload = time;
409 # Status output
410 if ($progress && (time - $last_progress) >= 60) {
411 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
412 if (@running) {
413 my @run_status;
414 for (@running) {
415 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
417 error("STATUS: currently running: ". join(', ', @run_status));
419 $last_progress = time;
421 # Back off if we're too busy
422 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
423 sleep 1 unless $proceed_immediately;
424 next;
426 # Run next
427 run_job(shift(@queue)) if @queue;
429 if ($progress) {
430 my $s = $jobs_executed == 1 ? '' : 's';
431 ferror("--- Queue processed. %d job$s executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
435 sub run_perpetually {
436 if (-e $lockfile) {
437 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
439 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
440 print LOCK $$;
441 close LOCK;
442 $locked = 1;
444 while ($perpetual) {
445 # touch ctime of lockfile to prevent it from being removed by /tmp cleaning
446 chmod 0444, $lockfile;
447 chmod 0644, $lockfile;
448 queue_all();
449 run_queue();
450 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
452 unlink $lockfile;
455 ######### Helpers {{{1
457 sub error($) {
458 print STDERR ts().shift()."\n";
460 sub ferror(@) {
461 error(sprintf($_[0], @_[1..$#_]));
463 sub fatal($) {
464 error(shift);
465 exit 1;
468 ######### Main {{{1
470 chdir "/";
471 close(DATA) if fileno(DATA);
472 # Parse options
473 Getopt::Long::Configure('bundling');
474 my $parse_res = GetOptions(
475 'help|?|h' => sub { pod2usage(-verbose => 2, -exitval => 0); },
476 'quiet|q' => \$quiet,
477 'progress|P' => \$progress,
478 'kill-after|k=i' => \$kill_after,
479 'max-parallel|p=i' => \$max_par,
480 'max-intensive-parallel|i=i' => \$max_par_intensive,
481 'load-triggers=s' => \$load_triggers,
482 'restart-delay|d=i' => \$restart_delay,
483 'lockfile|l=s' => \$lockfile,
484 'all-once|a' => \$all_once,
485 'one|o=s' => \$one,
486 ) || pod2usage(2);
487 fatal("Error: can only use one out of --all-once and --one")
488 if ($all_once && $one);
490 unless ($quiet) {
491 $ENV{'show_progress'} = '1';
492 $progress = 1;
495 $load_triggers = '0,0' unless defined((get_load_info())[0]);
496 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
498 if ($one) {
499 queue_one($one);
500 run_queue();
501 exit;
504 if ($all_once) {
505 queue_all();
506 run_queue();
507 exit;
510 run_perpetually();
512 ########## Documentation {{{1
514 __END__
516 =head1 NAME
518 jobd.pl - Perform Girocco maintenance jobs
520 =head1 SYNOPSIS
522 jobd.pl [options]
524 Options:
525 -h | --help detailed instructions
526 -q | --quiet run quietly
527 -P | --progress show occasional status updates
528 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
529 -p NUM | --max-parallel NUM how many jobs to run at the same time
530 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
531 at the same time
532 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
533 TRIG and resume at load below UNTRIG
534 -d NUM | --restart-delay SECONDS wait for this many seconds between
535 queue runs
536 -l FILE | --lockfile FILE create a lockfile in the given
537 location
538 -a | --all-once process the list only once
539 -o PRJNAME | --one PRJNAME process only one project
541 =head1 OPTIONS
543 =over 8
545 =item B<--help>
547 Print the full description of jobd.pl's options.
549 =item B<--quiet>
551 Suppress non-error messages, e.g. for use when running this task as a cronjob.
553 =item B<--progress>
555 Show information about the current status of the job queue occasionally. This
556 is automatically enabled if --quiet is not given.
558 =item B<--kill-after SECONDS>
560 Kill supervised jobs after a certain time to avoid hanging the daemon.
562 =item B<--max-parallel NUM>
564 Run no more than that many jobs at the same time. The default is the number
565 of cpus * 2. If the number of cpus cannot be determined, the default is 8.
567 =item B<--max-intensive-parallel NUM>
569 Run no more than that many resource-hungry jobs at the same time. Right now,
570 this refers to repacking jobs. The default is 1.
572 =item B<--load-triggers TRIG,UNTRIG>
574 If the first system load average (1 minute average) exceeds TRIG, don't queue
575 any more jobs until it goes below UNTRIG. This is currently only supported on
576 Linux and any other platforms that provide an uptime command with load average
577 output.
579 If both values are zero, load checks are disabled. The default is the number
580 of cpus * 1.5 for TRIG and half that for UNTRIG. If the number of cpus cannot
581 be determined, the default is 6,3.
583 =item B<--restart-delay NUM>
585 After processing the queue, wait this many seconds until the queue is
586 restarted. The default is 300 seconds.
588 =item B<--lockfile FILE>
590 For perpetual operation, specify the full path to a lock file to create and
591 then remove after finishing/aborting. The default is /tmp/jobd-$suffix.lock
592 where $suffix is a 6-character string uniquely determined by the name and
593 nicknme of this Girocco instance. The pid of the running jobd instance will
594 be written to the lock file.
596 =item B<--all-once>
598 Instead of perpetually processing all projects over and over again, process
599 them just once and then exit.
601 =item B<--one PRJNAME>
603 Process only the given project (given as just the project name without C<.git>
604 suffix) and then exit.
606 =back
608 =head1 DESCRIPTION
610 jobd.pl is Girocco's repositories maintenance servant; it periodically checks
611 all the repositories and updates mirrored repositories and repacks push-mode
612 repositories when needed.
614 =cut