jobd: take a short break after each queue run
[girocco.git] / jobd / jobd.pl
blob54d26a0bd7c14d38dddcb1c442a651ab2bead192
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $max_par_intensive = 1;
24 my $lockfile = "/tmp/jobd.lock";
25 my $restart_delay = 10; # not currently configurable
26 my $all_once;
27 my $one;
29 ######### Jobs {{{1
31 sub update_project {
32 my $job = shift;
33 my $p = $job->{'project'};
34 check_project_exists($job) || return;
35 if (-e get_project_path($p).".nofetch") {
36 job_skip($job);
37 return setup_gc($job);
39 if (-e get_project_path($p).".clone_in_progress") {
40 job_skip($job, "initial mirroring not complete yet");
41 return;
43 if (my $ts = is_operation_uptodate($p, 'lastrefresh', $Girocco::Config::min_mirror_interval)) {
44 job_skip($job, "not needed right now, last run at $ts");
45 setup_gc($job);
46 return;
48 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
51 sub gc_project {
52 my $job = shift;
53 my $p = $job->{'project'};
54 check_project_exists($job) || return;
55 if (my $ts = is_operation_uptodate($p, 'lastgc', $Girocco::Config::min_gc_interval)) {
56 job_skip($job, "not needed right now, last run at $ts");
57 return;
59 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
62 sub setup_gc {
63 my $job = shift;
64 queue_job(
65 project => $job->{'project'},
66 type => 'gc',
67 command => \&gc_project,
68 intensive => 1,
72 sub check_project_exists {
73 my $job = shift;
74 my $p = $job->{'project'};
75 if (!-d get_project_path($p)) {
76 job_skip($job, "non-existent project");
77 return 0;
82 sub get_project_path {
83 "$Girocco::Config::reporoot/".shift().".git/";
86 sub is_operation_uptodate {
87 my ($project, $which, $threshold) = @_;
88 my $path = get_project_path($project);
89 my $timestamp = `GIT_DIR="$path" $Girocco::Config::git_bin config "gitweb.$which"`;
90 my $unix_ts = `date +%s -d "$timestamp"`;
91 (time - $unix_ts) <= $threshold ? $timestamp : undef;
94 sub queue_one {
95 my $project = shift;
96 queue_job(
97 project => $project,
98 type => 'update',
99 command => \&update_project,
100 on_success => \&setup_gc,
101 on_error => \&setup_gc,
105 sub queue_all {
106 queue_one($_) for (Girocco::Project->get_full_list());
109 ######### Daemon operation {{{1
111 my @queue;
112 my @running;
113 my $perpetual = 1;
114 my $locked = 0;
115 my $jobs_executed;
116 my $jobs_skipped;
117 my @jobs_killed;
119 sub handle_softexit {
120 error("Waiting for outstanding jobs to finish... ".
121 "^C again to exit immediately");
122 @queue = ();
123 $perpetual = 0;
124 $SIG{'INT'} = \&handle_exit;
127 sub handle_exit {
128 error("Killing outstanding jobs...");
129 $SIG{'TERM'} = 'IGNORE';
130 for (@running) {
131 kill 'KILL', -($_->{'pid'});
133 unlink $lockfile if ($locked);
134 exit(0);
137 sub queue_job {
138 my %opts = @_;
139 $opts{'queued_at'} = time;
140 $opts{'dont_run'} = 0;
141 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
142 push @queue, \%opts;
145 sub run_job {
146 my $job = shift;
148 push @running, $job;
149 $job->{'command'}->($job);
150 if ($job->{'dont_run'}) {
151 pop @running;
152 $jobs_skipped++;
153 return;
157 sub _job_name {
158 my $job = shift;
159 "[".$job->{'type'}."::".$job->{'project'}."]";
162 # Only one of those per job!
163 sub exec_job_command {
164 my ($job, $command, $err_only) = @_;
166 my $pid;
167 if (!defined($pid = fork)) {
168 error(_job_name($job) ." Can't fork job: $!");
169 $job->{'finished'} = 1;
170 return;
172 if (!$pid) {
173 open STDIN, '/dev/null' || do {
174 error(_job_name($job) ."Can't read from /dev/null: $!");
175 $job->{'finished'} = 1;
176 return;
178 if ($err_only) {
179 open STDOUT, '>/dev/null' || do {
180 error(_job_name($job) ." Can't write to /dev/null: $!");
181 $job->{'finished'} = 1;
182 return;
185 # New process group so we can keep track of all of its children
186 if (!defined(POSIX::setpgid(0, 0))) {
187 error(_job_name($job) ." Can't create process group: $!");
188 $job->{'finished'} = 1;
189 return;
191 # "Prevent" races
192 select(undef, undef, undef, 0.1);
193 exec @$command;
194 # Stop perl from complaining
195 exit $?;
197 $job->{'pid'} = $pid;
198 $job->{'finished'} = 0;
199 $job->{'started_at'} = time;
202 sub job_skip {
203 my ($job, $msg) = @_;
204 $job->{'dont_run'} = 1;
205 error(_job_name($job) ." Skipping job: $msg") unless $quiet || !$msg;
208 sub reap_hanging_jobs {
209 for (@running) {
210 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > $kill_after) {
211 $_->{'finished'} = 1;
212 kill 'KILL', -($_->{'pid'});
213 print STDERR _job_name($_) ." KILLED due to timeout\n";
214 push @jobs_killed, _job_name($_);
219 sub reap_finished_jobs {
220 my $pid;
221 my $finished_any = 0;
222 while (1) {
223 $pid = waitpid(-1, WNOHANG);
224 last if $pid < 1;
225 $finished_any = 1;
227 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
228 if ($?) {
229 # XXX- we currently don't care
231 if (@child && !$child[0]->{'finished'}) {
232 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
233 $child[0]->{'finished'} = 1;
234 $jobs_executed++;
235 } elsif (@child) {
236 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
239 @running = grep { $_->{'finished'} == 0 } @running;
240 $finished_any;
243 sub have_intensive_jobs {
244 grep { $_->{'intensive'} == 1 } @running;
247 sub ts {
248 "[". scalar(localtime) ."] ";
251 sub run_queue {
252 my $last_progress = time;
253 $jobs_executed = 0;
254 $jobs_skipped = 0;
255 @jobs_killed = ();
256 if ($progress) {
257 printf STDERR ts() ."--- Processing %d queued jobs\n", scalar(@queue);
259 $SIG{'INT'} = \&handle_softexit;
260 $SIG{'TERM'} = \&handle_exit;
261 while (@queue || @running) {
262 reap_hanging_jobs();
263 my $proceed_immediately = reap_finished_jobs();
264 # Back off if we're too busy
265 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue) {
266 sleep 1 unless $proceed_immediately;
267 if ($progress && (time - $last_progress) >= 60) {
268 printf STDERR ts() ."STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
269 if (@running) {
270 my @run_status;
271 for (@running) {
272 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
274 error("STATUS: currently running: ". join(', ', @run_status));
276 $last_progress = time;
278 next;
280 # Run next
281 run_job(shift(@queue)) if @queue;
283 if ($progress) {
284 printf STDERR ts() ."--- Queue processed. %d jobs executed, %d skipped, %d killed.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
288 sub run_perpetually {
289 if (-e $lockfile) {
290 die "Lockfile exists. Please make sure no other instance of jobd is running.";
292 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
293 print LOCK $$;
294 close LOCK;
295 $locked = 1;
297 while ($perpetual) {
298 queue_all();
299 run_queue();
300 sleep($restart_delay) if $perpetual; # Let the system breathe for a moment
302 unlink $lockfile;
305 ######### Helpers {{{1
307 sub error($) {
308 print STDERR shift()."\n";
310 sub fatal($) {
311 error(shift);
312 exit 1;
315 ######### Main {{{1
317 # Parse options
318 Getopt::Long::Configure('bundling');
319 my $parse_res = GetOptions(
320 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
321 'quiet|q' => \$quiet,
322 'progress|P' => \$progress,
323 'kill-after|k=i' => \$kill_after,
324 'max-parallel|p=i' => \$max_par,
325 'max-intensive-parallel|i=i' => \$max_par_intensive,
326 'lockfile|l=s' => \$lockfile,
327 'all-once|a' => \$all_once,
328 'one|o=s' => \$one,
329 ) || pod2usage(2);
330 fatal("Error: can only use one out of --all-once and --one")
331 if ($all_once && $one);
333 unless ($quiet) {
334 $ENV{'show_progress'} = '1';
335 $progress = 1;
338 if ($one) {
339 queue_one($one);
340 run_queue();
341 exit;
344 if ($all_once) {
345 queue_all();
346 run_queue();
347 exit;
350 run_perpetually();
352 ########## Documentation {{{1
354 __END__
356 =head1 NAME
358 jobd - Perform Girocco maintenance jobs
360 =head1 SYNOPSIS
362 jobd [options]
364 Options:
365 -h | --help detailed instructions
366 -q | --quiet run quietly
367 -P | --progress show occasional status updates
368 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
369 -p NUM | --max-parallel=NUM how many jobs to run at the same time
370 -i NUM | --max-intensive-parallel=NUM how many resource-hungry jobs to run
371 at the same time
372 -l FILE | --lockfile=FILE create a lockfile in the given
373 location
374 -a | --all-once process the list only once
375 -o PRJNAME | --one=PRJNAME process only one project
377 =head1 OPTIONS
379 =over 8
381 =item B<--help>
383 Print the full description of jobd's options.
385 =item B<--quiet>
387 Suppress non-error messages, e.g. for use when running this task as a cronjob.
389 =item B<--progress>
391 Show information about the current status of the job queue occasionally. This
392 is automatically enabled if --quiet is not given.
394 =item B<--kill-after=SECONDS>
396 Kill supervised jobs after a certain time to avoid hanging the daemon.
398 =item B<--max-parallel=NUM>
400 Run no more than that many jobs at the same time.
402 =item B<--max-intensive-parallel=NUM>
404 Run no more than that many resource-hungry jobs at the same time. Right now,
405 this refers to repacking jobs.
407 =item B<--lockfile=FILE>
409 For perpetual operation, create a lockfile in that place and clean it up after
410 finishing/aborting.
412 =item B<--all-once>
414 Instead of perpetuously processing all projects over and over again, process
415 them just once and then exit.
417 =item B<--one=PRJNAME>
419 Process only the given project (given as just the project name without C<.git>
420 suffix) and then exit.
422 =back
424 =head1 DESCRIPTION
426 jobd is Girocco's repositories maintenance servant; it periodically checks all
427 the repositories and updates mirrored repositories and repacks push-mode
428 repositories when needed.
430 =cut