Merge branch 'master' into rorcz
[girocco.git] / jobd / jobd.pl
blobe170e263c2082217288df7d7a611207f9402e3a7
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $max_par_intensive = 1;
24 my $load_triggers = '10,2';
25 my $lockfile = "/tmp/jobd.lock";
26 my $restart_delay = 60;
27 my $all_once;
28 my $one;
30 my ($load_trig, $load_untrig);
32 ######### Jobs {{{1
34 sub update_project {
35 my $job = shift;
36 my $p = $job->{'project'};
37 check_project_exists($job) || return;
38 if (-e get_project_path($p).".nofetch") {
39 job_skip($job);
40 return setup_gc($job);
42 if (-e get_project_path($p).".clone_in_progress") {
43 job_skip($job, "initial mirroring not complete yet");
44 return;
46 if (my $ts = is_operation_uptodate($p, 'lastrefresh', $Girocco::Config::min_mirror_interval)) {
47 job_skip($job, "not needed right now, last run at $ts");
48 setup_gc($job);
49 return;
51 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
54 sub gc_project {
55 my $job = shift;
56 my $p = $job->{'project'};
57 check_project_exists($job) || return;
58 if (my $ts = is_operation_uptodate($p, 'lastgc', $Girocco::Config::min_gc_interval)) {
59 job_skip($job, "not needed right now, last run at $ts");
60 return;
62 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
65 sub setup_gc {
66 my $job = shift;
67 queue_job(
68 project => $job->{'project'},
69 type => 'gc',
70 command => \&gc_project,
71 intensive => 1,
75 sub check_project_exists {
76 my $job = shift;
77 my $p = $job->{'project'};
78 if (!-d get_project_path($p)) {
79 job_skip($job, "non-existent project");
80 return 0;
85 sub get_project_path {
86 "$Girocco::Config::reporoot/".shift().".git/";
89 sub is_operation_uptodate {
90 my ($project, $which, $threshold) = @_;
91 my $path = get_project_path($project);
92 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
93 my $unix_ts = `date +%s -d "$timestamp"`;
94 (time - $unix_ts) <= $threshold ? $timestamp : undef;
97 sub queue_one {
98 my $project = shift;
99 queue_job(
100 project => $project,
101 type => 'update',
102 command => \&update_project,
103 on_success => \&setup_gc,
104 on_error => \&setup_gc,
108 sub queue_all {
109 queue_one($_) for (Girocco::Project->get_full_list());
112 ######### Daemon operation {{{1
114 my @queue;
115 my @running;
116 my $perpetual = 1;
117 my $locked = 0;
118 my $jobs_executed;
119 my $jobs_skipped;
120 my @jobs_killed;
122 sub handle_softexit {
123 error("Waiting for outstanding jobs to finish... ".
124 "^C again to exit immediately");
125 @queue = ();
126 $perpetual = 0;
127 $SIG{'INT'} = \&handle_exit;
130 sub handle_exit {
131 error("Killing outstanding jobs...");
132 $SIG{'TERM'} = 'IGNORE';
133 for (@running) {
134 kill 'KILL', -($_->{'pid'});
136 unlink $lockfile if ($locked);
137 exit(0);
140 sub queue_job {
141 my %opts = @_;
142 $opts{'queued_at'} = time;
143 $opts{'dont_run'} = 0;
144 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
145 push @queue, \%opts;
148 sub run_job {
149 my $job = shift;
151 push @running, $job;
152 $job->{'command'}->($job);
153 if ($job->{'dont_run'}) {
154 pop @running;
155 $jobs_skipped++;
156 return;
160 sub _job_name {
161 my $job = shift;
162 "[".$job->{'type'}."::".$job->{'project'}."]";
165 # Only one of those per job!
166 sub exec_job_command {
167 my ($job, $command, $err_only) = @_;
169 my $pid;
170 if (!defined($pid = fork)) {
171 error(_job_name($job) ." Can't fork job: $!");
172 $job->{'finished'} = 1;
173 return;
175 if (!$pid) {
176 open STDIN, '/dev/null' || do {
177 error(_job_name($job) ."Can't read from /dev/null: $!");
178 $job->{'finished'} = 1;
179 return;
181 if ($err_only) {
182 open STDOUT, '>/dev/null' || do {
183 error(_job_name($job) ." Can't write to /dev/null: $!");
184 $job->{'finished'} = 1;
185 return;
188 # New process group so we can keep track of all of its children
189 if (!defined(POSIX::setpgid(0, 0))) {
190 error(_job_name($job) ." Can't create process group: $!");
191 $job->{'finished'} = 1;
192 return;
194 # "Prevent" races
195 select(undef, undef, undef, 0.1);
196 exec @$command;
197 # Stop perl from complaining
198 exit $?;
200 $job->{'pid'} = $pid;
201 $job->{'finished'} = 0;
202 $job->{'started_at'} = time;
205 sub job_skip {
206 my ($job, $msg) = @_;
207 $job->{'dont_run'} = 1;
208 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
211 sub reap_hanging_jobs {
212 for (@running) {
213 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > $kill_after) {
214 $_->{'finished'} = 1;
215 kill 'KILL', -($_->{'pid'});
216 error(_job_name($_) ." KILLED due to timeout");
217 push @jobs_killed, _job_name($_);
222 sub reap_finished_jobs {
223 my $pid;
224 my $finished_any = 0;
225 while (1) {
226 $pid = waitpid(-1, WNOHANG);
227 last if $pid < 1;
228 $finished_any = 1;
230 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
231 if ($?) {
232 # XXX- we currently don't care
234 if (@child && !$child[0]->{'finished'}) {
235 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
236 $child[0]->{'finished'} = 1;
237 $jobs_executed++;
238 } elsif (@child) {
239 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
242 @running = grep { $_->{'finished'} == 0 } @running;
243 $finished_any;
246 sub have_intensive_jobs {
247 grep { $_->{'intensive'} == 1 } @running;
250 sub ts {
251 "[". scalar(localtime) ."] ";
254 sub get_load_info {
255 if ($^O eq "linux") {
256 # Read /proc/loadavg on Linux
257 open(LOADAV, '<', '/proc/loadavg') or return undef;
258 my $loadinfo = <LOADAV>;
259 close LOADAV;
260 return (split(/\s/, $loadinfo, 4))[0..2];
261 } else {
262 # Read the output of uptime everywhere else (works on Linux too)
263 open(LOADAV, '-|', 'uptime') or return undef;
264 my $loadinfo = <LOADAV>;
265 close LOADAV;
266 $loadinfo =~ /load average[^0-9.]*([0-9.]+)[^0-9.]+([0-9.]+)[^0-9.]+([0-9.]+)/iso or return undef;
267 return ($1, $2, $3);
271 sub run_queue {
272 my $last_progress = time;
273 my $last_checkload = time - 5;
274 my $current_load = $load_trig;
275 my $overloaded = 0;
276 my $load_info = '';
277 $jobs_executed = 0;
278 $jobs_skipped = 0;
279 @jobs_killed = ();
280 if ($progress) {
281 ferror("--- Processing %d queued jobs", scalar(@queue));
283 $SIG{'INT'} = \&handle_softexit;
284 $SIG{'TERM'} = \&handle_exit;
285 while (@queue || @running) {
286 reap_hanging_jobs();
287 my $proceed_immediately = reap_finished_jobs();
288 # Check current system load
289 if ($load_trig && (time - $last_checkload) >= 5 && defined((my @loadinfo = get_load_info())[0])) {
290 my $current_load = $loadinfo[0];
291 if ($current_load > $load_trig && !$overloaded) {
292 $overloaded = 1;
293 error("PAUSE: system load is at $current_load > $load_trig") if $progress;
294 } elsif ($current_load < $load_untrig && $overloaded) {
295 $overloaded = 0;
296 error("RESUME: system load is at $current_load < $load_untrig") if $progress;
298 if ($overloaded) {
299 $load_info = ', paused (load '. $current_load .')';
300 } else {
301 $load_info = ', load '. $current_load;
303 $last_checkload = time;
305 # Status output
306 if ($progress && (time - $last_progress) >= 60) {
307 ferror("STATUS: %d queued, %d running, %d finished, %d skipped, %d killed$load_info", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
308 if (@running) {
309 my @run_status;
310 for (@running) {
311 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
313 error("STATUS: currently running: ". join(', ', @run_status));
315 $last_progress = time;
317 # Back off if we're too busy
318 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue || $overloaded) {
319 sleep 1 unless $proceed_immediately;
320 next;
322 # Run next
323 run_job(shift(@queue)) if @queue;
325 if ($progress) {
326 ferror("--- Queue processed. %d jobs executed, %d skipped, %d killed.", $jobs_executed, $jobs_skipped, scalar(@jobs_killed));
330 sub run_perpetually {
331 if (-e $lockfile) {
332 die "Lockfile exists. Please make sure no other instance of jobd is running.";
334 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
335 print LOCK $$;
336 close LOCK;
337 $locked = 1;
339 while ($perpetual) {
340 queue_all();
341 run_queue();
342 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
344 unlink $lockfile;
347 ######### Helpers {{{1
349 sub error($) {
350 print STDERR ts().shift()."\n";
352 sub ferror(@) {
353 error(sprintf($_[0], @_[1..$#_]));
355 sub fatal($) {
356 error(shift);
357 exit 1;
360 ######### Main {{{1
362 # Parse options
363 Getopt::Long::Configure('bundling');
364 my $parse_res = GetOptions(
365 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
366 'quiet|q' => \$quiet,
367 'progress|P' => \$progress,
368 'kill-after|k=i' => \$kill_after,
369 'max-parallel|p=i' => \$max_par,
370 'max-intensive-parallel|i=i' => \$max_par_intensive,
371 'load-triggers=s' => \$load_triggers,
372 'restart-delay|d=i' => \$restart_delay,
373 'lockfile|l=s' => \$lockfile,
374 'all-once|a' => \$all_once,
375 'one|o=s' => \$one,
376 ) || pod2usage(2);
377 fatal("Error: can only use one out of --all-once and --one")
378 if ($all_once && $one);
380 unless ($quiet) {
381 $ENV{'show_progress'} = '1';
382 $progress = 1;
385 $load_triggers = '0,0' if (!-f '/proc/loadavg');
386 ($load_trig, $load_untrig) = split(/,/, $load_triggers);
388 if ($one) {
389 queue_one($one);
390 run_queue();
391 exit;
394 if ($all_once) {
395 queue_all();
396 run_queue();
397 exit;
400 run_perpetually();
402 ########## Documentation {{{1
404 __END__
406 =head1 NAME
408 jobd - Perform Girocco maintenance jobs
410 =head1 SYNOPSIS
412 jobd [options]
414 Options:
415 -h | --help detailed instructions
416 -q | --quiet run quietly
417 -P | --progress show occasional status updates
418 -k SECONDS | --kill-after SECONDS how long to wait before killing jobs
419 -p NUM | --max-parallel NUM how many jobs to run at the same time
420 -i NUM | --max-intensive-parallel NUM how many resource-hungry jobs to run
421 at the same time
422 --load-triggers TRIG,UNTRIG stop queueing jobs at load above
423 TRIG and resume at load below UNTRIG
424 -d NUM | --restart-delay SECONDS wait for this many seconds between
425 queue runs
426 -l FILE | --lockfile FILE create a lockfile in the given
427 location
428 -a | --all-once process the list only once
429 -o PRJNAME | --one PRJNAME process only one project
431 =head1 OPTIONS
433 =over 8
435 =item B<--help>
437 Print the full description of jobd's options.
439 =item B<--quiet>
441 Suppress non-error messages, e.g. for use when running this task as a cronjob.
443 =item B<--progress>
445 Show information about the current status of the job queue occasionally. This
446 is automatically enabled if --quiet is not given.
448 =item B<--kill-after SECONDS>
450 Kill supervised jobs after a certain time to avoid hanging the daemon.
452 =item B<--max-parallel NUM>
454 Run no more than that many jobs at the same time.
456 =item B<--max-intensive-parallel NUM>
458 Run no more than that many resource-hungry jobs at the same time. Right now,
459 this refers to repacking jobs.
461 =item B<--load-triggers TRIG,UNTRIG>
463 If the first system load average (1 minute average) exceeds TRIG, don't queue
464 any more jobs until it goes below UNTRIG. This is currently only supported on
465 Linux and any other platforms that provide an uptime command with load average
466 output.
468 If both values are zero, load checks are disabled. Note that this is not the
469 default.
471 =item B<--restart-delay NUM>
473 After processing the queue, wait this many seconds until the queue is
474 restarted.
476 =item B<--lockfile FILE>
478 For perpetual operation, create a lockfile in that place and clean it up after
479 finishing/aborting.
481 =item B<--all-once>
483 Instead of perpetuously processing all projects over and over again, process
484 them just once and then exit.
486 =item B<--one PRJNAME>
488 Process only the given project (given as just the project name without C<.git>
489 suffix) and then exit.
491 =back
493 =head1 DESCRIPTION
495 jobd is Girocco's repositories maintenance servant; it periodically checks all
496 the repositories and updates mirrored repositories and repacks push-mode
497 repositories when needed.
499 =cut