User.pm: reserve 'git' and 'lock' user names
[girocco.git] / jobd / jobd.pl
blobc8683710cfe6e4062b35225d9f2d647888252514
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
13 use File::Basename;
15 use lib dirname($0);
16 use Girocco::Config;
17 use Girocco::Project;
18 use Girocco::User;
19 use Girocco::Util;
21 # Options
22 my $quiet;
23 my $progress;
24 my $kill_after = 900;
25 my $max_par = 20;
26 my $max_par_intensive = 1;
27 my $load_triggers = '10,2';
28 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
29 my $restart_delay = 60;
30 my $all_once;
31 my $one;
33 my ($load_trig, $load_untrig);
35 ######### Jobs {{{1
37 sub update_project {
38 my $job = shift;
39 my $p = $job->{'project'};
40 check_project_exists($job) || return;
41 if (-e get_project_path($p).".nofetch" || -e get_project_path($p).".bypass" ||
42 -e get_project_path($p).".bypass_fetch") {
43 job_skip($job);
44 return setup_gc($job);
46 if (-e get_project_path($p).".clone_in_progress" && ! -e get_project_path($p).".clone_failed") {
47 job_skip($job, "initial mirroring not complete yet");
48 return;
50 if (-e get_project_path($p).".clone_failed") {
51 job_skip($job, "initial mirroring failed");
52 # Still need to gc non top-level clones even if they've failed
53 # otherwise the objects copied into them from the parent will
54 # just accumulate without bound
55 setup_gc($job) if $p =~ m,/,;
56 return;
58 if (my $ts = is_operation_uptodate($p, 'lastrefresh', rand_adjust($Girocco::Config::min_mirror_interval))) {
59 job_skip($job, "not needed right now, last run at $ts");
60 setup_gc($job);
61 return;
63 if (is_svn_clone($p)) {
64 # git svn can be very, very slow at times
65 $job->{'timeout_factor'} = 3;
67 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
70 sub gc_project {
71 my $job = shift;
72 my $p = $job->{'project'};
73 check_project_exists($job) || return;
74 if (-e get_project_path($p).".nogc" || -e get_project_path($p).".bypass") {
75 job_skip($job);
76 return;
78 if (my $ts = is_operation_uptodate($p, 'lastgc', rand_adjust($Girocco::Config::min_gc_interval))) {
79 job_skip($job, "not needed right now, last run at $ts");
80 return;
82 # allow garbage collection to run for longer than an update
83 $job->{'timeout_factor'} = 2;
84 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
87 sub setup_gc {
88 my $job = shift;
89 queue_job(
90 project => $job->{'project'},
91 type => 'gc',
92 command => \&gc_project,
93 intensive => 1,
97 sub check_project_exists {
98 my $job = shift;
99 my $p = $job->{'project'};
100 if (!-d get_project_path($p)) {
101 job_skip($job, "non-existent project");
102 return 0;
107 sub get_project_path {
108 "$Girocco::Config::reporoot/".shift().".git/";
111 sub is_operation_uptodate {
112 my ($project, $which, $threshold) = @_;
113 my $path = get_project_path($project);
114 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
115 my $unix_ts = parse_rfc2822_date($timestamp) || 0;
116 (time - $unix_ts) <= $threshold ? $timestamp : undef;
119 sub is_svn_clone {
120 my ($project) = @_;
121 my $path = get_project_path($project);
122 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
123 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
124 return $baseurl =~ /^svn[:+]/i && $svnurl;
127 sub queue_one {
128 my $project = shift;
129 queue_job(
130 project => $project,
131 type => 'update',
132 command => \&update_project,
133 on_success => \&setup_gc,
134 on_error => \&setup_gc,
138 sub queue_all {
139 queue_one($_) for (Girocco::Project->get_full_list());
142 ######### Daemon operation {{{1
144 my @queue;
145 my @running;
146 my $perpetual = 1;
147 my $locked = 0;
148 my $jobs_executed;
149 my $jobs_skipped;
150 my @jobs_killed;
152 # Kills and reaps the specified pid. Returns exit status ($?) on success
153 # otherwise undef if process could not be killed or reaped
154 # First sends SIGTERM and if process does not exit within 15 seconds then SIGKILL
155 # Usage: my $exitcode = kill_gently($pid, $kill_process_group = 0);
156 sub kill_gently {
157 my $targ = shift;
158 my $use_pg = shift || 0;
159 # Note that the docs for Perl's kill state that a negative signal
160 # number should be used to kill process groups and that while a
161 # a negative process id (and positive signal number) may also do that
162 # on some platforms, that's not portable.
163 my $pg = $use_pg ? -1 : 1;
164 my $harsh = time() + 15; # SIGKILL after this delay
165 my $count = kill(15*$pg, $targ); # SIGTERM is 15
166 my $reaped = waitpid($targ, WNOHANG);
167 return undef if $reaped < 0;
168 return $? if $reaped == $targ;
169 while ($count && time() < $harsh) {
170 select(undef, undef, undef, 0.2);
171 $reaped = waitpid($targ, WNOHANG);
172 return undef if $reaped < 0;
173 return $? if $reaped == $targ;
175 $harsh = time() + 2;
176 $count = kill(9*$pg, $targ); # SIGKILL is 9
177 $reaped = waitpid($targ, WNOHANG);
178 return undef if $reaped < 0;
179 return $? if $reaped == $targ;
180 # We should not need to wait to reap a SIGKILL, however, just in case
181 # the system doesn't make a SIGKILL'd process immediately reapable
182 # (perhaps under extremely heavy load) we accomodate a brief delay
183 while ($count && time() < $harsh) {
184 select(undef, undef, undef, 0.2);
185 $reaped = waitpid($targ, WNOHANG);
186 return undef if $reaped < 0;
187 return $? if $reaped == $targ;
189 return undef;
192 sub handle_softexit {
193 error("Waiting for outstanding jobs to finish... ".
194 "^C again to exit immediately");
195 @queue = ();
196 $perpetual = 0;
197 $SIG{'INT'} = \&handle_exit;
200 sub handle_exit {
201 error("Killing outstanding jobs...");
202 $SIG{'TERM'} = 'IGNORE';
203 for (@running) {
204 kill_gently($_->{'pid'}, 1);
206 unlink $lockfile if ($locked);
207 exit(0);
210 sub queue_job {
211 my %opts = @_;
212 $opts{'queued_at'} = time;
213 $opts{'dont_run'} = 0;
214 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
215 push @queue, \%opts;
218 sub run_job {
219 my $job = shift;
221 push @running, $job;
222 $job->{'command'}->($job);
223 if ($job->{'dont_run'}) {
224 pop @running;
225 $jobs_skipped++;
226 return;
230 sub _job_name {
231 my $job = shift;
232 "[".$job->{'type'}."::".$job->{'project'}."]";
235 # Only one of those per job!
236 sub exec_job_command {
237 my ($job, $command, $err_only) = @_;
239 my $pid;
240 if (!defined($pid = fork)) {
241 error(_job_name($job) ." Can't fork job: $!");
242 $job->{'finished'} = 1;
243 return;
245 if (!$pid) {
246 open STDIN, '/dev/null' || do {
247 error(_job_name($job) ."Can't read from /dev/null: $!");
248 $job->{'finished'} = 1;
249 return;
251 if ($err_only) {
252 open STDOUT, '>/dev/null' || do {
253 error(_job_name($job) ." Can't write to /dev/null: $!");
254 $job->{'finished'} = 1;
255 return;
258 # New process group so we can keep track of all of its children
259 if (!defined(POSIX::setpgid(0, 0))) {
260 error(_job_name($job) ." Can't create process group: $!");
261 $job->{'finished'} = 1;
262 return;
264 # "Prevent" races
265 select(undef, undef, undef, 0.1);
266 exec @$command;
267 # Stop perl from complaining
268 exit $?;
270 $job->{'pid'} = $pid;
271 $job->{'finished'} = 0;
272 $job->{'started_at'} = time;
275 sub job_skip {
276 my ($job, $msg) = @_;
277 $job->{'dont_run'} = 1;
278 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
281 sub reap_hanging_jobs {
282 for (@running) {
283 my $factor = $_->{'timeout_factor'} || 1;
284 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
285 $_->{'finished'} = 1;
286 kill_gently($_->{'pid'}, 1);
287 delete $_->{'pid'};
288 $_->{'killed'} = 1;
289 error(_job_name($_) ." KILLED due to timeout");
290 push @jobs_killed, _job_name($_);
295 sub reap_one_job {
296 my $job = shift;
297 if (!$job->{'finished'}) {
298 $job->{'on_success'}->($job) if defined($job->{'on_success'});
299 $job->{'finished'} = 1;
300 $jobs_executed++;
301 } else {
302 $job->{'on_error'}->($job) if defined($job->{'on_error'});
306 sub reap_finished_jobs {
307 my $pid;
308 my $finished_any = 0;
309 foreach my $child (grep { !$_->{'pid'} && $_->{'killed'} } @running) {
310 delete $child->{'killed'};
311 reap_one_job($child);
312 $finished_any = 1;
314 while (1) {
315 $pid = waitpid(-1, WNOHANG);
316 last if $pid <= 0;
317 $finished_any = 1;
319 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
320 if ($?) {
321 # XXX- we currently don't care
323 if (@child) {
324 delete $child[0]->{'pid'};
325 reap_one_job($child[0]);
328 @running = grep { $_->{'finished'} == 0 } @running;
329 $finished_any;
332 sub have_intensive_jobs {
333 grep { $_->{'intensive'} == 1 } @running;
336 sub ts {
337 "[". scalar(localtime) ."] ";
340 sub get_load_info {
341 if ($^O eq "linux") {
342 # Read /proc/loadavg on Linux
343 open(LOADAV, '<', '/proc/loadavg') or return undef;
344 my $loadinfo = <LOADAV>;
345 close LOADAV;
346 return (split(/\s/, $loadinfo, 4))[0..2];
347 } else {
348 # Read the output of uptime everywhere else (works on Linux too)
349 open(LOADAV, '-|', 'uptime') or return undef;
350 my $loadinfo = <LOADAV>;
351 close LOADAV;
352 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
353 return ($1, $2, $3);
357 sub run_queue {
358 my $last_progress = time;
359 my $last_checkload = time - 5;
360 my $current_load = $load_trig;
361 my $overloaded = 0;
362 my $load_info = '';
363 $jobs_executed = 0;
364 $jobs_skipped = 0;
365 @jobs_killed = ();
366 if ($progress) {
367 ferror("--- Processing %d queued jobs", scalar(@queue));
369 $SIG{'INT'} = \&handle_softexit;
370 $SIG{'TERM'} = \&handle_exit;
371 while (@queue || @running) {
372 reap_hanging_jobs();
373 my $proceed_immediately = reap_finished_jobs();
374 # Check current system load
375 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
376 my $current_load = $loadinfo[0];
377 if ($current_load > $load_trig && !$overloaded) {
378 $overloaded = 1;
379 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
380 } elsif ($current_load < $load_untrig && $overloaded) {
381 $overloaded = 0;
382 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
384 if ($overloaded) {
385 $load_info = ', paused (load '. $current_load .')';
386 } else {
387 $load_info = ', load '. $current_load;
389 $last_checkload = time;
391 # Status output
392 if ($progress && (time - $last_progress) >= 60) {
393 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
394 if (@running) {
395 my @run_status;
396 for (@running) {
397 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
399 error("STATUS: currently running: ". join(', ', @run_status));
401 $last_progress = time;
403 # Back off if we're too busy
404 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
405 sleep 1 unless $proceed_immediately;
406 next;
408 # Run next
409 run_job(shift(@queue)) if @queue;
411 if ($progress) {
412 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
416 sub run_perpetually {
417 if (-e $lockfile) {
418 die "Lockfile '$lockfile' exists. Please make sure no other instance of jobd is running.";
420 open LOCK, '>', $lockfile || die "Cannot create lockfile '$lockfile': $!";
421 print LOCK $$;
422 close LOCK;
423 $locked = 1;
425 while ($perpetual) {
426 queue_all();
427 run_queue();
428 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
430 unlink $lockfile;
433 ######### Helpers {{{1
435 sub error($) {
436 print STDERR ts().shift()."\n";
438 sub ferror(@) {
439 error(sprintf($_[0], @_[1..$#_]));
441 sub fatal($) {
442 error(shift);
443 exit 1;
446 ######### Main {{{1
448 # Parse options
449 Getopt::Long::Configure('bundling');
450 my $parse_res = GetOptions(
451 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
452 'quiet|q' => \$quiet,
453 'progress|P' => \$progress,
454 'kill-after|k=i' => \$kill_after,
455 'max-parallel|p=i' => \$max_par,
456 'max-intensive-parallel|i=i' => \$max_par_intensive,
457 'load-triggers=s' => \$load_triggers,
458 'restart-delay|d=i' => \$restart_delay,
459 'lockfile|l=s' => \$lockfile,
460 'all-once|a' => \$all_once,
461 'one|o=s' => \$one,
462 ) || pod2usage(2);
463 fatal("Error: can only use one out of --all-once and --one")
464 if ($all_once && $one);
466 unless ($quiet) {
467 $ENV{'show_progress'} = '1';
468 $progress = 1;
471 $load_triggers = '0,0' unless defined((get_load_info())[0]);
472 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
474 if ($one) {
475 queue_one($one);
476 run_queue();
477 exit;
480 if ($all_once) {
481 queue_all();
482 run_queue();
483 exit;
486 run_perpetually();
488 ########## Documentation {{{1
490 __END__
492 =head1 NAME
494 jobd - Perform Girocco maintenance jobs
496 =head1 SYNOPSIS
498 jobd [options]
500 Options:
501 -h | --help detailed instructions
502 -q | --quiet run quietly
503 -P | --progress show occasional status updates
504 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
505 -p NUM | --max-parallel NUM how many jobs to run at the same time
506 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
507 at the same time
508 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
509 TRIG and resume at load below UNTRIG
510 -d NUM | --restart-delay SECONDS wait for this many seconds between
511 queue runs
512 -l FILE | --lockfile FILE create a lockfile in the given
513 location
514 -a | --all-once process the list only once
515 -o PRJNAME | --one PRJNAME process only one project
517 =head1 OPTIONS
519 =over 8
521 =item B<--help>
523 Print the full description of jobd's options.
525 =item B<--quiet>
527 Suppress non-error messages, e.g. for use when running this task as a cronjob.
529 =item B<--progress>
531 Show information about the current status of the job queue occasionally. This
532 is automatically enabled if --quiet is not given.
534 =item B<--kill-after SECONDS>
536 Kill supervised jobs after a certain time to avoid hanging the daemon.
538 =item B<--max-parallel NUM>
540 Run no more than that many jobs at the same time.
542 =item B<--max-intensive-parallel NUM>
544 Run no more than that many resource-hungry jobs at the same time. Right now,
545 this refers to repacking jobs.
547 =item B<--load-triggers TRIG,UNTRIG>
549 If the first system load average (1 minute average) exceeds TRIG, don't queue
550 any more jobs until it goes below UNTRIG. This is currently only supported on
551 Linux and any other platforms that provide an uptime command with load average
552 output.
554 If both values are zero, load checks are disabled. Note that this is not the
555 default.
557 =item B<--restart-delay NUM>
559 After processing the queue, wait this many seconds until the queue is
560 restarted.
562 =item B<--lockfile FILE>
564 For perpetual operation, create a lockfile in that place and clean it up after
565 finishing/aborting.
567 =item B<--all-once>
569 Instead of perpetuously processing all projects over and over again, process
570 them just once and then exit.
572 =item B<--one PRJNAME>
574 Process only the given project (given as just the project name without C<.git>
575 suffix) and then exit.
577 =back
579 =head1 DESCRIPTION
581 jobd is Girocco's repositories maintenance servant; it periodically checks all
582 the repositories and updates mirrored repositories and repacks push-mode
583 repositories when needed.
585 =cut