jobd: properly skip unnecessary jobs
[girocco/msimkins.git] / jobd / jobd.pl
blob2fd6eb6a787a8ba26c8a96d22863ce24fb85e50e
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
13 use Girocco::Config;
14 use Girocco::Project;
15 use Girocco::User;
17 # Options
18 my $quiet;
19 my $progress;
20 my $kill_after = 900;
21 my $max_par = 3;
22 my $lockfile = "/tmp/jobd.lock";
23 my $all_once;
24 my $one;
26 ######### Jobs {{{1
28 sub update_project {
29 my $job = shift;
30 my $p = $job->{'project'};
31 check_project_exists($job) || return;
32 (-e "$Girocco::Config::reporoot/$p.git/.nofetch") && do {
33 job_skip($job);
34 setup_gc($job);
35 return;
37 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
40 sub gc_project {
41 my $job = shift;
42 my $p = $job->{'project'};
43 check_project_exists($job) || return;
44 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
47 sub setup_gc {
48 my $job = shift;
49 queue_job(
50 project => $job->{'project'},
51 type => 'gc',
52 command => \&gc_project,
56 sub check_project_exists {
57 my $job = shift;
58 my $p = $job->{'project'};
59 if (!-d "$Girocco::Config::reporoot/$p.git") {
60 error("Warning: skipping non-existent project: $job->{project}")
61 unless $quiet;
62 job_skip();
63 return 0;
68 sub queue_one {
69 my $project = shift;
70 queue_job(
71 project => $project,
72 type => 'update',
73 command => \&update_project,
74 on_success => \&setup_gc,
75 on_error => \&setup_gc,
79 sub queue_all {
80 queue_one($_) for (Girocco::Project->get_full_list());
83 ######### Daemon operation {{{1
85 my @queue;
86 my @running;
87 my $perpetual = 1;
88 my $locked = 0;
89 my $jobs_executed;
90 my $jobs_skipped;
91 my @jobs_killed;
93 sub handle_softexit {
94 error("Waiting for outstanding jobs to finish... ".
95 "^C again to exit immediately");
96 @queue = ();
97 $perpetual = 0;
98 $SIG{'INT'} = \&handle_exit;
101 sub handle_exit {
102 error("Killing outstanding jobs...");
103 $SIG{'CHLD'} = 'IGNORE';
104 $SIG{'TERM'} = 'IGNORE';
105 for (@running) {
106 kill 'KILL', $_->{'pid'};
108 unlink $lockfile if ($locked);
109 exit(0);
112 sub handle_childgone {
113 my $pid = wait;
114 if ($pid != -1) {
115 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
116 if ($?) {
117 # XXX- we currently don't care
119 $child[0]->{'finished'} = 2 if (@child);
120 $jobs_executed++;
122 # Just to be safe
123 $SIG{'CHLD'} = \&handle_childgone;
126 sub queue_job {
127 my %opts = @_;
128 $opts{'queued_at'} = time;
129 $opts{'dont_run'} = 0;
130 push @queue, \%opts;
133 sub run_job {
134 my $job = shift;
136 push @running, $job;
137 $job->{'command'}->($job);
138 if ($job->{'dont_run'}) {
139 pop @running;
140 $jobs_skipped++;
141 return;
145 sub _job_name {
146 my $job = shift;
147 "[".$job->{'type'}."::".$job->{'project'}."]";
150 # Only one of those per job!
151 sub exec_job_command {
152 my ($job, $command, $err_only) = @_;
154 my $pid;
155 if (!defined($pid = fork)) {
156 error(_job_name($job) ." Can't fork job: $!");
157 $job->{'finished'} = 1;
158 return;
160 if (!$pid) {
161 if ($err_only) {
162 open STDOUT, '>/dev/null' || do {
163 error(_job_name($job) ." Can't write to /dev/null: $!");
164 $job->{'finished'} = 1;
165 return;
168 exec @$command;
169 exit $?;
171 $job->{'pid'} = $pid;
172 $job->{'finished'} = 0;
173 $job->{'started_at'} = time;
176 sub job_skip {
177 my $job = shift;
178 $job->{'dont_run'} = 1;
181 sub reap_hanging_jobs {
182 for (@running) {
183 if ((time - $_->{'started_at'}) > $kill_after) {
184 $_->{'finished'} = 1;
185 kill 'KILL', $_->{'pid'};
186 print STDERR _job_name($_) ." KILLED due to timeout\n";
187 push @jobs_killed, _job_name($_);
192 sub reap_finished_jobs {
193 for (@running) {
194 my $status = $_->{'finished'};
195 if ($status == 0) { next; }
196 elsif ($status == 1 && defined($_->{'on_error'})) {
197 $_->{'on_error'}->($_);
198 } elsif ($status == 2 && defined($_->{'on_success'})) {
199 $_->{'on_success'}->($_);
202 @running = grep { $_->{'finished'} == 0 } @running;
205 sub run_queue {
206 my $last_progress = time;
207 $jobs_executed = 0;
208 $jobs_skipped = 0;
209 @jobs_killed = ();
210 unless ($quiet) {
211 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
213 $SIG{'CHLD'} = \&handle_childgone;
214 $SIG{'INT'} = \&handle_softexit;
215 $SIG{'TERM'} = \&handle_exit;
216 while (@queue || @running) {
217 reap_hanging_jobs();
218 reap_finished_jobs();
219 # Back off if we're too busy
220 if (@running >= $max_par) {
221 sleep 10;
222 unless (($quiet && !$progress) || (time - $last_progress) < 60) {
223 printf STDERR "STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
224 if (@running) {
225 my @run_status;
226 for (@running) {
227 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
229 error("STATUS: currently running: ". join(', ', @run_status));
231 $last_progress = time;
233 next;
235 # Run next
236 run_job(shift(@queue)) if @queue;
238 unless ($quiet) {
239 printf STDERR "--- Queue processed. %d jobs executed, %d skipped, %d killed. Now restarting.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
243 sub run_perpetually {
244 if (-e $lockfile) {
245 die "Lockfile exists. Please make sure no other instance of jobd is running.";
247 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
248 print LOCK $$;
249 close LOCK;
250 $locked = 1;
252 while ($perpetual) {
253 queue_all();
254 run_queue();
256 unlink $lockfile;
259 ######### Helpers {{{1
261 sub error($) {
262 print STDERR shift()."\n";
264 sub fatal($) {
265 error(shift);
266 exit 1;
269 ######### Main {{{1
271 # Parse options
272 Getopt::Long::Configure('bundling');
273 my $parse_res = GetOptions(
274 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
275 'quiet|q' => \$quiet,
276 'progress|P' => \$progress,
277 'kill-after|k=i' => \$kill_after,
278 'max-parallel|p=i' => \$max_par,
279 'lockfile|l=s' => \$lockfile,
280 'all-once|a' => \$all_once,
281 'one|o=s' => \$one,
282 ) || pod2usage(2);
283 fatal("Error: can only use one out of --all-once and --one")
284 if ($all_once && $one);
286 unless ($quiet) {
287 $ENV{'show_progress'} = '1';
290 if ($one) {
291 queue_one($one);
292 run_queue();
293 exit;
296 if ($all_once) {
297 queue_all();
298 run_queue();
299 exit;
302 run_perpetually();
304 ########## Documentation {{{1
306 __END__
308 =head1 NAME
310 jobd - Perform Girocco maintenance jobs
312 =head1 SYNOPSIS
314 jobd [options]
316 Options:
317 -h | --help detailed instructions
318 -q | --quiet run quietly
319 -P | --progress show occasional status updates
320 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
321 -p NUM | --max-parallel=NUM how many jobs to run at the same time
322 -l FILE | --lockfile=FILE create a lockfile in the given location
323 -a | --all-once process the list only once
324 -o PRJNAME | --one=PRJNAME process only one project
326 =head1 OPTIONS
328 =over 8
330 =item B<--help>
332 Print the full description of jobd's options.
334 =item B<--quiet>
336 Suppress non-error messages, e.g. for use when running this task as a cronjob.
338 =item B<--progress>
340 Show information about the current status of the job queue occasionally. This
341 is automatically enabled if --quiet is not given.
343 =item B<--kill-after=SECONDS>
345 Kill supervised jobs after a certain time to avoid hanging the daemon.
347 =item B<--max-parallel=NUM>
349 Run no more than that many jobs at the same time.
351 =item B<--lockfile=FILE>
353 For perpetual operation, create a lockfile in that place and clean it up after
354 finishing/aborting.
356 =item B<--all-once>
358 Instead of perpetuously processing all projects over and over again, process
359 them just once and then exit.
361 =item B<--one=PRJNAME>
363 Process only the given project (given as just the project name without C<.git>
364 suffix) and then exit.
366 =back
368 =head1 DESCRIPTION
370 jobd is Girocco's repositories maintenance servant; it periodically checks all
371 the repositories and updates mirrored repositories and repacks push-mode
372 repositories when needed.
374 =cut