jobd: check for finished jobs in main loop
[girocco.git] / jobd / jobd.pl
blob475c18549c3e890a145dd026a22623f71bc996e1
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
13 use Girocco::Config;
14 use Girocco::Project;
15 use Girocco::User;
17 # Options
18 my $quiet;
19 my $progress;
20 my $kill_after = 900;
21 my $max_par = 3;
22 my $lockfile = "/tmp/jobd.lock";
23 my $all_once;
24 my $one;
26 ######### Jobs {{{1
28 sub update_project {
29 my $job = shift;
30 my $p = $job->{'project'};
31 check_project_exists($job) || return;
32 (-e "$Girocco::Config::reporoot/$p.git/.nofetch") && do {
33 job_skip($job);
34 setup_gc($job);
35 return;
37 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
40 sub gc_project {
41 my $job = shift;
42 my $p = $job->{'project'};
43 check_project_exists($job) || return;
44 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
47 sub setup_gc {
48 my $job = shift;
49 queue_job(
50 project => $job->{'project'},
51 type => 'gc',
52 command => \&gc_project,
53 intensive => 1,
57 sub check_project_exists {
58 my $job = shift;
59 my $p = $job->{'project'};
60 if (!-d "$Girocco::Config::reporoot/$p.git") {
61 error("Warning: skipping non-existent project: $job->{project}")
62 unless $quiet;
63 job_skip();
64 return 0;
69 sub queue_one {
70 my $project = shift;
71 queue_job(
72 project => $project,
73 type => 'update',
74 command => \&update_project,
75 on_success => \&setup_gc,
76 on_error => \&setup_gc,
80 sub queue_all {
81 queue_one($_) for (Girocco::Project->get_full_list());
84 ######### Daemon operation {{{1
86 my @queue;
87 my @running;
88 my $perpetual = 1;
89 my $locked = 0;
90 my $jobs_executed;
91 my $jobs_skipped;
92 my @jobs_killed;
94 sub handle_softexit {
95 error("Waiting for outstanding jobs to finish... ".
96 "^C again to exit immediately");
97 @queue = ();
98 $perpetual = 0;
99 $SIG{'INT'} = \&handle_exit;
102 sub handle_exit {
103 error("Killing outstanding jobs...");
104 $SIG{'TERM'} = 'IGNORE';
105 for (@running) {
106 kill 'KILL', $_->{'pid'};
108 unlink $lockfile if ($locked);
109 exit(0);
112 sub queue_job {
113 my %opts = @_;
114 $opts{'queued_at'} = time;
115 $opts{'dont_run'} = 0;
116 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
117 push @queue, \%opts;
120 sub run_job {
121 my $job = shift;
123 push @running, $job;
124 $job->{'command'}->($job);
125 if ($job->{'dont_run'}) {
126 pop @running;
127 $jobs_skipped++;
128 return;
132 sub _job_name {
133 my $job = shift;
134 "[".$job->{'type'}."::".$job->{'project'}."]";
137 # Only one of those per job!
138 sub exec_job_command {
139 my ($job, $command, $err_only) = @_;
141 my $pid;
142 if (!defined($pid = fork)) {
143 error(_job_name($job) ." Can't fork job: $!");
144 $job->{'finished'} = 1;
145 return;
147 if (!$pid) {
148 if ($err_only) {
149 open STDOUT, '>/dev/null' || do {
150 error(_job_name($job) ." Can't write to /dev/null: $!");
151 $job->{'finished'} = 1;
152 return;
155 # "Prevent" races
156 select(undef, undef, undef, 0.25);
157 exec @$command;
158 # Stop perl from complaining
159 exit $?;
161 $job->{'pid'} = $pid;
162 $job->{'finished'} = 0;
163 $job->{'started_at'} = time;
166 sub job_skip {
167 my $job = shift;
168 $job->{'dont_run'} = 1;
171 sub reap_hanging_jobs {
172 for (@running) {
173 if ((time - $_->{'started_at'}) > $kill_after) {
174 $_->{'finished'} = 1;
175 kill 'KILL', $_->{'pid'};
176 print STDERR _job_name($_) ." KILLED due to timeout\n";
177 push @jobs_killed, _job_name($_);
182 sub reap_finished_jobs {
183 my $pid;
184 while (1) {
185 $pid = waitpid(-1, WNOHANG);
186 last if $pid == -1;
188 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
189 if ($?) {
190 # XXX- we currently don't care
192 $child[0]->{'finished'} = 2 if (@child && !$child[0]->{'finished'});
193 my $status = $child[0]->{'finished'};
194 if ($status == 0) { next; }
195 elsif ($status == 1 && defined($child[0]->{'on_error'})) {
196 $child[0]->{'on_error'}->($_);
197 } elsif ($status == 2 && defined($child[0]_->{'on_success'})) {
198 $child[0]->{'on_success'}->($_);
200 $jobs_executed++;
202 @running = grep { $_->{'finished'} == 0 } @running;
205 sub have_intensive_jobs {
206 grep { $_->{'intensive'} == 1 } @running;
209 sub run_queue {
210 my $last_progress = time;
211 $jobs_executed = 0;
212 $jobs_skipped = 0;
213 @jobs_killed = ();
214 if ($progress) {
215 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
217 $SIG{'INT'} = \&handle_softexit;
218 $SIG{'TERM'} = \&handle_exit;
219 while (@queue || @running) {
220 reap_hanging_jobs();
221 reap_finished_jobs();
222 # Back off if we're too busy
223 if (@running >= $max_par || have_intensive_jobs() || !@queue) {
224 sleep 1;
225 if ($progress && (time - $last_progress) >= 60) {
226 printf STDERR "STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
227 if (@running) {
228 my @run_status;
229 for (@running) {
230 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
232 error("STATUS: currently running: ". join(', ', @run_status));
234 $last_progress = time;
236 next;
238 # Run next
239 run_job(shift(@queue)) if @queue;
241 if ($progress) {
242 printf STDERR "--- Queue processed. %d jobs executed, %d skipped, %d killed. Now restarting.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
246 sub run_perpetually {
247 if (-e $lockfile) {
248 die "Lockfile exists. Please make sure no other instance of jobd is running.";
250 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
251 print LOCK $$;
252 close LOCK;
253 $locked = 1;
255 while ($perpetual) {
256 queue_all();
257 run_queue();
259 unlink $lockfile;
262 ######### Helpers {{{1
264 sub error($) {
265 print STDERR shift()."\n";
267 sub fatal($) {
268 error(shift);
269 exit 1;
272 ######### Main {{{1
274 # Parse options
275 Getopt::Long::Configure('bundling');
276 my $parse_res = GetOptions(
277 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
278 'quiet|q' => \$quiet,
279 'progress|P' => \$progress,
280 'kill-after|k=i' => \$kill_after,
281 'max-parallel|p=i' => \$max_par,
282 'lockfile|l=s' => \$lockfile,
283 'all-once|a' => \$all_once,
284 'one|o=s' => \$one,
285 ) || pod2usage(2);
286 fatal("Error: can only use one out of --all-once and --one")
287 if ($all_once && $one);
289 unless ($quiet) {
290 $ENV{'show_progress'} = '1';
291 $progress = 1;
294 if ($one) {
295 queue_one($one);
296 run_queue();
297 exit;
300 if ($all_once) {
301 queue_all();
302 run_queue();
303 exit;
306 run_perpetually();
308 ########## Documentation {{{1
310 __END__
312 =head1 NAME
314 jobd - Perform Girocco maintenance jobs
316 =head1 SYNOPSIS
318 jobd [options]
320 Options:
321 -h | --help detailed instructions
322 -q | --quiet run quietly
323 -P | --progress show occasional status updates
324 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
325 -p NUM | --max-parallel=NUM how many jobs to run at the same time
326 -l FILE | --lockfile=FILE create a lockfile in the given location
327 -a | --all-once process the list only once
328 -o PRJNAME | --one=PRJNAME process only one project
330 =head1 OPTIONS
332 =over 8
334 =item B<--help>
336 Print the full description of jobd's options.
338 =item B<--quiet>
340 Suppress non-error messages, e.g. for use when running this task as a cronjob.
342 =item B<--progress>
344 Show information about the current status of the job queue occasionally. This
345 is automatically enabled if --quiet is not given.
347 =item B<--kill-after=SECONDS>
349 Kill supervised jobs after a certain time to avoid hanging the daemon.
351 =item B<--max-parallel=NUM>
353 Run no more than that many jobs at the same time.
355 =item B<--lockfile=FILE>
357 For perpetual operation, create a lockfile in that place and clean it up after
358 finishing/aborting.
360 =item B<--all-once>
362 Instead of perpetuously processing all projects over and over again, process
363 them just once and then exit.
365 =item B<--one=PRJNAME>
367 Process only the given project (given as just the project name without C<.git>
368 suffix) and then exit.
370 =back
372 =head1 DESCRIPTION
374 jobd is Girocco's repositories maintenance servant; it periodically checks all
375 the repositories and updates mirrored repositories and repacks push-mode
376 repositories when needed.
378 =cut