jobd.pl: avoid uninitialized warning when no lastgc/lastrefresh
[girocco.git] / jobd / jobd.pl
blobfa7a9c852d305d822fe602a7e7ef0b980a29eff5
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
17 use Girocco::Util;
19 # Options
20 my $quiet;
21 my $progress;
22 my $kill_after = 900;
23 my $max_par = 20;
24 my $max_par_intensive = 1;
25 my $load_triggers = '10,2';
26 my $lockfile = "/tmp/jobd-$Girocco::Config::tmpsuffix.lock";
27 my $restart_delay = 60;
28 my $all_once;
29 my $one;
31 my ($load_trig, $load_untrig);
33 ######### Jobs {{{1
35 sub update_project {
36 my $job = shift;
37 my $p = $job->{'project'};
38 check_project_exists($job) || return;
39 if (-e get_project_path($p).".nofetch") {
40 job_skip($job);
41 return setup_gc($job);
43 if (-e get_project_path($p).".clone_in_progress") {
44 if (-e get_project_path($p).".clone_failed") {
45 job_skip($job, "initial mirroring failed");
46 # Still need to gc non top-level clones even if they've failed
47 # otherwise the objects copied into them from the parent will
48 # just accumulate without bound
49 setup_gc($job) if $p =~ m,/,;
50 } else {
51 job_skip($job, "initial mirroring not complete yet");
53 return;
55 if (my $ts = is_operation_uptodate($p, 'lastrefresh', $Girocco::Config::min_mirror_interval)) {
56 job_skip($job, "not needed right now, last run at $ts");
57 setup_gc($job);
58 return;
60 if (is_svn_clone($p)) {
61 # git svn can be very, very slow at times
62 $job->{'timeout_factor'} = 3;
64 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
67 sub gc_project {
68 my $job = shift;
69 my $p = $job->{'project'};
70 check_project_exists($job) || return;
71 if (my $ts = is_operation_uptodate($p, 'lastgc', $Girocco::Config::min_gc_interval)) {
72 job_skip($job, "not needed right now, last run at $ts");
73 return;
75 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
78 sub setup_gc {
79 my $job = shift;
80 queue_job(
81 project => $job->{'project'},
82 type => 'gc',
83 command => \&gc_project,
84 intensive => 1,
88 sub check_project_exists {
89 my $job = shift;
90 my $p = $job->{'project'};
91 if (!-d get_project_path($p)) {
92 job_skip($job, "non-existent project");
93 return 0;
98 sub get_project_path {
99 "$Girocco::Config::reporoot/".shift().".git/";
102 sub is_operation_uptodate {
103 my ($project, $which, $threshold) = @_;
104 my $path = get_project_path($project);
105 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
106 my $unix_ts = parse_date($timestamp) || 0;
107 (time - $unix_ts) <= $threshold ? $timestamp : undef;
110 sub is_svn_clone {
111 my ($project) = @_;
112 my $path = get_project_path($project);
113 my $baseurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.baseurl"`;
114 my $svnurl = `GIT_DIR="$path" $Girocco::Config::git_bin config "svn-remote.svn.url"`;
115 return $baseurl =~ /^svn[:+]/i && $svnurl;
118 sub queue_one {
119 my $project = shift;
120 queue_job(
121 project => $project,
122 type => 'update',
123 command => \&update_project,
124 on_success => \&setup_gc,
125 on_error => \&setup_gc,
129 sub queue_all {
130 queue_one($_) for (Girocco::Project->get_full_list());
133 ######### Daemon operation {{{1
135 my @queue;
136 my @running;
137 my $perpetual = 1;
138 my $locked = 0;
139 my $jobs_executed;
140 my $jobs_skipped;
141 my @jobs_killed;
143 sub handle_softexit {
144 error("Waiting for outstanding jobs to finish... ".
145 "^C again to exit immediately");
146 @queue = ();
147 $perpetual = 0;
148 $SIG{'INT'} = \&handle_exit;
151 sub handle_exit {
152 error("Killing outstanding jobs...");
153 $SIG{'TERM'} = 'IGNORE';
154 for (@running) {
155 kill 'KILL', -($_->{'pid'});
157 unlink $lockfile if ($locked);
158 exit(0);
161 sub queue_job {
162 my %opts = @_;
163 $opts{'queued_at'} = time;
164 $opts{'dont_run'} = 0;
165 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
166 push @queue, \%opts;
169 sub run_job {
170 my $job = shift;
172 push @running, $job;
173 $job->{'command'}->($job);
174 if ($job->{'dont_run'}) {
175 pop @running;
176 $jobs_skipped++;
177 return;
181 sub _job_name {
182 my $job = shift;
183 "[".$job->{'type'}."::".$job->{'project'}."]";
186 # Only one of those per job!
187 sub exec_job_command {
188 my ($job, $command, $err_only) = @_;
190 my $pid;
191 if (!defined($pid = fork)) {
192 error(_job_name($job) ." Can't fork job: $!");
193 $job->{'finished'} = 1;
194 return;
196 if (!$pid) {
197 open STDIN, '/dev/null' || do {
198 error(_job_name($job) ."Can't read from /dev/null: $!");
199 $job->{'finished'} = 1;
200 return;
202 if ($err_only) {
203 open STDOUT, '>/dev/null' || do {
204 error(_job_name($job) ." Can't write to /dev/null: $!");
205 $job->{'finished'} = 1;
206 return;
209 # New process group so we can keep track of all of its children
210 if (!defined(POSIX::setpgid(0, 0))) {
211 error(_job_name($job) ." Can't create process group: $!");
212 $job->{'finished'} = 1;
213 return;
215 # "Prevent" races
216 select(undef, undef, undef, 0.1);
217 exec @$command;
218 # Stop perl from complaining
219 exit $?;
221 $job->{'pid'} = $pid;
222 $job->{'finished'} = 0;
223 $job->{'started_at'} = time;
226 sub job_skip {
227 my ($job, $msg) = @_;
228 $job->{'dont_run'} = 1;
229 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
232 sub reap_hanging_jobs {
233 for (@running) {
234 my $factor = $_->{'timeout_factor'} || 1;
235 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > ($kill_after * $factor)) {
236 $_->{'finished'} = 1;
237 kill 'KILL', -($_->{'pid'});
238 error(_job_name($_) ." KILLED due to timeout");
239 push @jobs_killed, _job_name($_);
244 sub reap_finished_jobs {
245 my $pid;
246 my $finished_any = 0;
247 while (1) {
248 $pid = waitpid(-1, WNOHANG);
249 last if $pid < 1;
250 $finished_any = 1;
252 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
253 if ($?) {
254 # XXX- we currently don't care
256 if (@child && !$child[0]->{'finished'}) {
257 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
258 $child[0]->{'finished'} = 1;
259 $jobs_executed++;
260 } elsif (@child) {
261 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
264 @running = grep { $_->{'finished'} == 0 } @running;
265 $finished_any;
268 sub have_intensive_jobs {
269 grep { $_->{'intensive'} == 1 } @running;
272 sub ts {
273 "[". scalar(localtime) ."] ";
276 sub get_load_info {
277 if ($^O eq "linux") {
278 # Read /proc/loadavg on Linux
279 open(LOADAV, '<', '/proc/loadavg') or return undef;
280 my $loadinfo = <LOADAV>;
281 close LOADAV;
282 return (split(/\s/, $loadinfo, 4))[0..2];
283 } else {
284 # Read the output of uptime everywhere else (works on Linux too)
285 open(LOADAV, '-|', 'uptime') or return undef;
286 my $loadinfo = <LOADAV>;
287 close LOADAV;
288 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
289 return ($1, $2, $3);
293 sub run_queue {
294 my $last_progress = time;
295 my $last_checkload = time - 5;
296 my $current_load = $load_trig;
297 my $overloaded = 0;
298 my $load_info = '';
299 $jobs_executed = 0;
300 $jobs_skipped = 0;
301 @jobs_killed = ();
302 if ($progress) {
303 ferror("--- Processing %d queued jobs", scalar(@queue));
305 $SIG{'INT'} = \&handle_softexit;
306 $SIG{'TERM'} = \&handle_exit;
307 while (@queue || @running) {
308 reap_hanging_jobs();
309 my $proceed_immediately = reap_finished_jobs();
310 # Check current system load
311 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
312 my $current_load = $loadinfo[0];
313 if ($current_load > $load_trig && !$overloaded) {
314 $overloaded = 1;
315 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
316 } elsif ($current_load < $load_untrig && $overloaded) {
317 $overloaded = 0;
318 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
320 if ($overloaded) {
321 $load_info = ', paused (load '. $current_load .')';
322 } else {
323 $load_info = ', load '. $current_load;
325 $last_checkload = time;
327 # Status output
328 if ($progress && (time - $last_progress) >= 60) {
329 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
330 if (@running) {
331 my @run_status;
332 for (@running) {
333 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
335 error("STATUS: currently running: ". join(', ', @run_status));
337 $last_progress = time;
339 # Back off if we're too busy
340 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
341 sleep 1 unless $proceed_immediately;
342 next;
344 # Run next
345 run_job(shift(@queue)) if @queue;
347 if ($progress) {
348 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
352 sub run_perpetually {
353 if (-e $lockfile) {
354 die "Lockfile exists. Please make sure no other instance of jobd is running.";
356 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
357 print LOCK $$;
358 close LOCK;
359 $locked = 1;
361 while ($perpetual) {
362 queue_all();
363 run_queue();
364 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
366 unlink $lockfile;
369 ######### Helpers {{{1
371 sub error($) {
372 print STDERR ts().shift()."\n";
374 sub ferror(@) {
375 error(sprintf($_[0], @_[1..$#_]));
377 sub fatal($) {
378 error(shift);
379 exit 1;
382 ######### Main {{{1
384 # Parse options
385 Getopt::Long::Configure('bundling');
386 my $parse_res = GetOptions(
387 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
388 'quiet|q' => \$quiet,
389 'progress|P' => \$progress,
390 'kill-after|k=i' => \$kill_after,
391 'max-parallel|p=i' => \$max_par,
392 'max-intensive-parallel|i=i' => \$max_par_intensive,
393 'load-triggers=s' => \$load_triggers,
394 'restart-delay|d=i' => \$restart_delay,
395 'lockfile|l=s' => \$lockfile,
396 'all-once|a' => \$all_once,
397 'one|o=s' => \$one,
398 ) || pod2usage(2);
399 fatal("Error: can only use one out of --all-once and --one")
400 if ($all_once && $one);
402 unless ($quiet) {
403 $ENV{'show_progress'} = '1';
404 $progress = 1;
407 $load_triggers = '0,0' if (!-f '/proc/loadavg');
408 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
410 if ($one) {
411 queue_one($one);
412 run_queue();
413 exit;
416 if ($all_once) {
417 queue_all();
418 run_queue();
419 exit;
422 run_perpetually();
424 ########## Documentation {{{1
426 __END__
428 =head1 NAME
430 jobd - Perform Girocco maintenance jobs
432 =head1 SYNOPSIS
434 jobd [options]
436 Options:
437 -h | --help detailed instructions
438 -q | --quiet run quietly
439 -P | --progress show occasional status updates
440 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
441 -p NUM | --max-parallel NUM how many jobs to run at the same time
442 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
443 at the same time
444 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
445 TRIG and resume at load below UNTRIG
446 -d NUM | --restart-delay SECONDS wait for this many seconds between
447 queue runs
448 -l FILE | --lockfile FILE create a lockfile in the given
449 location
450 -a | --all-once process the list only once
451 -o PRJNAME | --one PRJNAME process only one project
453 =head1 OPTIONS
455 =over 8
457 =item B<--help>
459 Print the full description of jobd's options.
461 =item B<--quiet>
463 Suppress non-error messages, e.g. for use when running this task as a cronjob.
465 =item B<--progress>
467 Show information about the current status of the job queue occasionally. This
468 is automatically enabled if --quiet is not given.
470 =item B<--kill-after SECONDS>
472 Kill supervised jobs after a certain time to avoid hanging the daemon.
474 =item B<--max-parallel NUM>
476 Run no more than that many jobs at the same time.
478 =item B<--max-intensive-parallel NUM>
480 Run no more than that many resource-hungry jobs at the same time. Right now,
481 this refers to repacking jobs.
483 =item B<--load-triggers TRIG,UNTRIG>
485 If the first system load average (1 minute average) exceeds TRIG, don't queue
486 any more jobs until it goes below UNTRIG. This is currently only supported on
487 Linux and any other platforms that provide an uptime command with load average
488 output.
490 If both values are zero, load checks are disabled. Note that this is not the
491 default.
493 =item B<--restart-delay NUM>
495 After processing the queue, wait this many seconds until the queue is
496 restarted.
498 =item B<--lockfile FILE>
500 For perpetual operation, create a lockfile in that place and clean it up after
501 finishing/aborting.
503 =item B<--all-once>
505 Instead of perpetuously processing all projects over and over again, process
506 them just once and then exit.
508 =item B<--one PRJNAME>
510 Process only the given project (given as just the project name without C<.git>
511 suffix) and then exit.
513 =back
515 =head1 DESCRIPTION
517 jobd is Girocco's repositories maintenance servant; it periodically checks all
518 the repositories and updates mirrored repositories and repacks push-mode
519 repositories when needed.
521 =cut