jobd: increase number of intensive jobs run simultaneously
[girocco.git] / jobd / jobd.pl
blob515b7054ceeb778e478296e4ebd7a375011316da
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $max_par_intensive = 3; # no command line option right now
24 my $lockfile = "/tmp/jobd.lock";
25 my $all_once;
26 my $one;
28 ######### Jobs {{{1
30 sub update_project {
31 my $job = shift;
32 my $p = $job->{'project'};
33 check_project_exists($job) || return;
34 (-e "$Girocco::Config::reporoot/$p.git/.nofetch") && do {
35 job_skip($job);
36 setup_gc($job);
37 return;
39 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
42 sub gc_project {
43 my $job = shift;
44 my $p = $job->{'project'};
45 check_project_exists($job) || return;
46 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
49 sub setup_gc {
50 my $job = shift;
51 queue_job(
52 project => $job->{'project'},
53 type => 'gc',
54 command => \&gc_project,
55 intensive => 1,
59 sub check_project_exists {
60 my $job = shift;
61 my $p = $job->{'project'};
62 if (!-d "$Girocco::Config::reporoot/$p.git") {
63 error("Warning: skipping non-existent project: $job->{project}")
64 unless $quiet;
65 job_skip();
66 return 0;
71 sub queue_one {
72 my $project = shift;
73 queue_job(
74 project => $project,
75 type => 'update',
76 command => \&update_project,
77 on_success => \&setup_gc,
78 on_error => \&setup_gc,
82 sub queue_all {
83 queue_one($_) for (Girocco::Project->get_full_list());
86 ######### Daemon operation {{{1
88 my @queue;
89 my @running;
90 my $perpetual = 1;
91 my $locked = 0;
92 my $jobs_executed;
93 my $jobs_skipped;
94 my @jobs_killed;
96 sub handle_softexit {
97 error("Waiting for outstanding jobs to finish... ".
98 "^C again to exit immediately");
99 @queue = ();
100 $perpetual = 0;
101 $SIG{'INT'} = \&handle_exit;
104 sub handle_exit {
105 error("Killing outstanding jobs...");
106 $SIG{'TERM'} = 'IGNORE';
107 for (@running) {
108 kill 'KILL', $_->{'pid'};
110 unlink $lockfile if ($locked);
111 exit(0);
114 sub queue_job {
115 my %opts = @_;
116 $opts{'queued_at'} = time;
117 $opts{'dont_run'} = 0;
118 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
119 push @queue, \%opts;
122 sub run_job {
123 my $job = shift;
125 push @running, $job;
126 $job->{'command'}->($job);
127 if ($job->{'dont_run'}) {
128 pop @running;
129 $jobs_skipped++;
130 return;
134 sub _job_name {
135 my $job = shift;
136 "[".$job->{'type'}."::".$job->{'project'}."]";
139 # Only one of those per job!
140 sub exec_job_command {
141 my ($job, $command, $err_only) = @_;
143 my $pid;
144 if (!defined($pid = fork)) {
145 error(_job_name($job) ." Can't fork job: $!");
146 $job->{'finished'} = 1;
147 return;
149 if (!$pid) {
150 open STDIN, '/dev/null' || do {
151 error(_job_name($job) ."Can't read from /dev/null: $!");
152 $job->{'finished'} = 1;
153 return;
155 if ($err_only) {
156 open STDOUT, '>/dev/null' || do {
157 error(_job_name($job) ." Can't write to /dev/null: $!");
158 $job->{'finished'} = 1;
159 return;
162 # "Prevent" races
163 select(undef, undef, undef, 0.1);
164 exec @$command;
165 # Stop perl from complaining
166 exit $?;
168 $job->{'pid'} = $pid;
169 $job->{'finished'} = 0;
170 $job->{'started_at'} = time;
173 sub job_skip {
174 my $job = shift;
175 $job->{'dont_run'} = 1;
178 sub reap_hanging_jobs {
179 for (@running) {
180 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > $kill_after) {
181 $_->{'finished'} = 1;
182 kill 'KILL', $_->{'pid'};
183 print STDERR _job_name($_) ." KILLED due to timeout\n";
184 push @jobs_killed, _job_name($_);
189 sub reap_finished_jobs {
190 my $pid;
191 my $finished_any = 0;
192 while (1) {
193 $pid = waitpid(-1, WNOHANG);
194 last if $pid < 1;
195 $finished_any = 1;
197 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
198 if ($?) {
199 # XXX- we currently don't care
201 if (@child && !$child[0]->{'finished'}) {
202 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
203 $child[0]->{'finished'} = 1;
204 $jobs_executed++;
205 } elsif (@child) {
206 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
209 @running = grep { $_->{'finished'} == 0 } @running;
210 $finished_any;
213 sub have_intensive_jobs {
214 grep { $_->{'intensive'} == 1 } @running;
217 sub run_queue {
218 my $last_progress = time;
219 $jobs_executed = 0;
220 $jobs_skipped = 0;
221 @jobs_killed = ();
222 if ($progress) {
223 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
225 $SIG{'INT'} = \&handle_softexit;
226 $SIG{'TERM'} = \&handle_exit;
227 while (@queue || @running) {
228 reap_hanging_jobs();
229 my $proceed_immediately = reap_finished_jobs();
230 # Back off if we're too busy
231 if (@running >= $max_par || have_intensive_jobs() >= $max_par_intensive || !@queue) {
232 sleep 1 unless $proceed_immediately;
233 if ($progress && (time - $last_progress) >= 60) {
234 printf STDERR "STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
235 if (@running) {
236 my @run_status;
237 for (@running) {
238 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
240 error("STATUS: currently running: ". join(', ', @run_status));
242 $last_progress = time;
244 next;
246 # Run next
247 run_job(shift(@queue)) if @queue;
249 if ($progress) {
250 printf STDERR "--- Queue processed. %d jobs executed, %d skipped, %d killed. Now restarting.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
254 sub run_perpetually {
255 if (-e $lockfile) {
256 die "Lockfile exists. Please make sure no other instance of jobd is running.";
258 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
259 print LOCK $$;
260 close LOCK;
261 $locked = 1;
263 while ($perpetual) {
264 queue_all();
265 run_queue();
267 unlink $lockfile;
270 ######### Helpers {{{1
272 sub error($) {
273 print STDERR shift()."\n";
275 sub fatal($) {
276 error(shift);
277 exit 1;
280 ######### Main {{{1
282 # Parse options
283 Getopt::Long::Configure('bundling');
284 my $parse_res = GetOptions(
285 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
286 'quiet|q' => \$quiet,
287 'progress|P' => \$progress,
288 'kill-after|k=i' => \$kill_after,
289 'max-parallel|p=i' => \$max_par,
290 'lockfile|l=s' => \$lockfile,
291 'all-once|a' => \$all_once,
292 'one|o=s' => \$one,
293 ) || pod2usage(2);
294 fatal("Error: can only use one out of --all-once and --one")
295 if ($all_once && $one);
297 unless ($quiet) {
298 $ENV{'show_progress'} = '1';
299 $progress = 1;
302 if ($one) {
303 queue_one($one);
304 run_queue();
305 exit;
308 if ($all_once) {
309 queue_all();
310 run_queue();
311 exit;
314 run_perpetually();
316 ########## Documentation {{{1
318 __END__
320 =head1 NAME
322 jobd - Perform Girocco maintenance jobs
324 =head1 SYNOPSIS
326 jobd [options]
328 Options:
329 -h | --help detailed instructions
330 -q | --quiet run quietly
331 -P | --progress show occasional status updates
332 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
333 -p NUM | --max-parallel=NUM how many jobs to run at the same time
334 -l FILE | --lockfile=FILE create a lockfile in the given location
335 -a | --all-once process the list only once
336 -o PRJNAME | --one=PRJNAME process only one project
338 =head1 OPTIONS
340 =over 8
342 =item B<--help>
344 Print the full description of jobd's options.
346 =item B<--quiet>
348 Suppress non-error messages, e.g. for use when running this task as a cronjob.
350 =item B<--progress>
352 Show information about the current status of the job queue occasionally. This
353 is automatically enabled if --quiet is not given.
355 =item B<--kill-after=SECONDS>
357 Kill supervised jobs after a certain time to avoid hanging the daemon.
359 =item B<--max-parallel=NUM>
361 Run no more than that many jobs at the same time.
363 =item B<--lockfile=FILE>
365 For perpetual operation, create a lockfile in that place and clean it up after
366 finishing/aborting.
368 =item B<--all-once>
370 Instead of perpetuously processing all projects over and over again, process
371 them just once and then exit.
373 =item B<--one=PRJNAME>
375 Process only the given project (given as just the project name without C<.git>
376 suffix) and then exit.
378 =back
380 =head1 DESCRIPTION
382 jobd is Girocco's repositories maintenance servant; it periodically checks all
383 the repositories and updates mirrored repositories and repacks push-mode
384 repositories when needed.
386 =cut