jobd: when running job callbacks, properly pass the job info structure
[girocco/testingthisout.git] / jobd / jobd.pl
blob3c96a070a84fd8d703d0213e8e3af51e088f1658
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
12 use POSIX ":sys_wait_h";
14 use Girocco::Config;
15 use Girocco::Project;
16 use Girocco::User;
18 # Options
19 my $quiet;
20 my $progress;
21 my $kill_after = 900;
22 my $max_par = 20;
23 my $lockfile = "/tmp/jobd.lock";
24 my $all_once;
25 my $one;
27 ######### Jobs {{{1
29 sub update_project {
30 my $job = shift;
31 my $p = $job->{'project'};
32 check_project_exists($job) || return;
33 (-e "$Girocco::Config::reporoot/$p.git/.nofetch") && do {
34 job_skip($job);
35 setup_gc($job);
36 return;
38 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
41 sub gc_project {
42 my $job = shift;
43 my $p = $job->{'project'};
44 check_project_exists($job) || return;
45 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
48 sub setup_gc {
49 my $job = shift;
50 queue_job(
51 project => $job->{'project'},
52 type => 'gc',
53 command => \&gc_project,
54 intensive => 1,
58 sub check_project_exists {
59 my $job = shift;
60 my $p = $job->{'project'};
61 if (!-d "$Girocco::Config::reporoot/$p.git") {
62 error("Warning: skipping non-existent project: $job->{project}")
63 unless $quiet;
64 job_skip();
65 return 0;
70 sub queue_one {
71 my $project = shift;
72 queue_job(
73 project => $project,
74 type => 'update',
75 command => \&update_project,
76 on_success => \&setup_gc,
77 on_error => \&setup_gc,
81 sub queue_all {
82 queue_one($_) for (Girocco::Project->get_full_list());
85 ######### Daemon operation {{{1
87 my @queue;
88 my @running;
89 my $perpetual = 1;
90 my $locked = 0;
91 my $jobs_executed;
92 my $jobs_skipped;
93 my @jobs_killed;
95 sub handle_softexit {
96 error("Waiting for outstanding jobs to finish... ".
97 "^C again to exit immediately");
98 @queue = ();
99 $perpetual = 0;
100 $SIG{'INT'} = \&handle_exit;
103 sub handle_exit {
104 error("Killing outstanding jobs...");
105 $SIG{'TERM'} = 'IGNORE';
106 for (@running) {
107 kill 'KILL', $_->{'pid'};
109 unlink $lockfile if ($locked);
110 exit(0);
113 sub queue_job {
114 my %opts = @_;
115 $opts{'queued_at'} = time;
116 $opts{'dont_run'} = 0;
117 $opts{'intensive'} = 0 unless exists $opts{'intensive'};
118 push @queue, \%opts;
121 sub run_job {
122 my $job = shift;
124 push @running, $job;
125 $job->{'command'}->($job);
126 if ($job->{'dont_run'}) {
127 pop @running;
128 $jobs_skipped++;
129 return;
133 sub _job_name {
134 my $job = shift;
135 "[".$job->{'type'}."::".$job->{'project'}."]";
138 # Only one of those per job!
139 sub exec_job_command {
140 my ($job, $command, $err_only) = @_;
142 my $pid;
143 if (!defined($pid = fork)) {
144 error(_job_name($job) ." Can't fork job: $!");
145 $job->{'finished'} = 1;
146 return;
148 if (!$pid) {
149 open STDIN, '/dev/null' || do {
150 error(_job_name($job) ."Can't read from /dev/null: $!");
151 $job->{'finished'} = 1;
152 return;
154 if ($err_only) {
155 open STDOUT, '>/dev/null' || do {
156 error(_job_name($job) ." Can't write to /dev/null: $!");
157 $job->{'finished'} = 1;
158 return;
161 # "Prevent" races
162 select(undef, undef, undef, 0.1);
163 exec @$command;
164 # Stop perl from complaining
165 exit $?;
167 $job->{'pid'} = $pid;
168 $job->{'finished'} = 0;
169 $job->{'started_at'} = time;
172 sub job_skip {
173 my $job = shift;
174 $job->{'dont_run'} = 1;
177 sub reap_hanging_jobs {
178 for (@running) {
179 if (defined($_->{'started_at'}) && (time - $_->{'started_at'}) > $kill_after) {
180 $_->{'finished'} = 1;
181 kill 'KILL', $_->{'pid'};
182 print STDERR _job_name($_) ." KILLED due to timeout\n";
183 push @jobs_killed, _job_name($_);
188 sub reap_finished_jobs {
189 my $pid;
190 while (1) {
191 $pid = waitpid(-1, WNOHANG);
192 last if $pid < 1;
194 my @child = grep { $_->{'pid'} && $_->{'pid'} == $pid } @running;
195 if ($?) {
196 # XXX- we currently don't care
198 if (@child && !$child[0]->{'finished'}) {
199 $child[0]->{'on_success'}->($child[0]) if defined($child[0]->{'on_success'});
200 $child[0]->{'finished'} = 1;
201 $jobs_executed++;
202 } elsif (@child) {
203 $child[0]->{'on_error'}->($child[0]) if defined($child[0]->{'on_error'});
206 @running = grep { $_->{'finished'} == 0 } @running;
209 sub have_intensive_jobs {
210 grep { $_->{'intensive'} == 1 } @running;
213 sub run_queue {
214 my $last_progress = time;
215 $jobs_executed = 0;
216 $jobs_skipped = 0;
217 @jobs_killed = ();
218 if ($progress) {
219 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
221 $SIG{'INT'} = \&handle_softexit;
222 $SIG{'TERM'} = \&handle_exit;
223 while (@queue || @running) {
224 reap_hanging_jobs();
225 reap_finished_jobs();
226 # Back off if we're too busy
227 if (@running >= $max_par || have_intensive_jobs() || !@queue) {
228 sleep 1;
229 if ($progress && (time - $last_progress) >= 60) {
230 printf STDERR "STATUS: %d queued, %d running, %d finished, %d skipped, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
231 if (@running) {
232 my @run_status;
233 for (@running) {
234 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
236 error("STATUS: currently running: ". join(', ', @run_status));
238 $last_progress = time;
240 next;
242 # Run next
243 run_job(shift(@queue)) if @queue;
245 if ($progress) {
246 printf STDERR "--- Queue processed. %d jobs executed, %d skipped, %d killed. Now restarting.\n", $jobs_executed, $jobs_skipped, scalar(@jobs_killed);
250 sub run_perpetually {
251 if (-e $lockfile) {
252 die "Lockfile exists. Please make sure no other instance of jobd is running.";
254 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
255 print LOCK $$;
256 close LOCK;
257 $locked = 1;
259 while ($perpetual) {
260 queue_all();
261 run_queue();
263 unlink $lockfile;
266 ######### Helpers {{{1
268 sub error($) {
269 print STDERR shift()."\n";
271 sub fatal($) {
272 error(shift);
273 exit 1;
276 ######### Main {{{1
278 # Parse options
279 Getopt::Long::Configure('bundling');
280 my $parse_res = GetOptions(
281 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
282 'quiet|q' => \$quiet,
283 'progress|P' => \$progress,
284 'kill-after|k=i' => \$kill_after,
285 'max-parallel|p=i' => \$max_par,
286 'lockfile|l=s' => \$lockfile,
287 'all-once|a' => \$all_once,
288 'one|o=s' => \$one,
289 ) || pod2usage(2);
290 fatal("Error: can only use one out of --all-once and --one")
291 if ($all_once && $one);
293 unless ($quiet) {
294 $ENV{'show_progress'} = '1';
295 $progress = 1;
298 if ($one) {
299 queue_one($one);
300 run_queue();
301 exit;
304 if ($all_once) {
305 queue_all();
306 run_queue();
307 exit;
310 run_perpetually();
312 ########## Documentation {{{1
314 __END__
316 =head1 NAME
318 jobd - Perform Girocco maintenance jobs
320 =head1 SYNOPSIS
322 jobd [options]
324 Options:
325 -h | --help detailed instructions
326 -q | --quiet run quietly
327 -P | --progress show occasional status updates
328 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
329 -p NUM | --max-parallel=NUM how many jobs to run at the same time
330 -l FILE | --lockfile=FILE create a lockfile in the given location
331 -a | --all-once process the list only once
332 -o PRJNAME | --one=PRJNAME process only one project
334 =head1 OPTIONS
336 =over 8
338 =item B<--help>
340 Print the full description of jobd's options.
342 =item B<--quiet>
344 Suppress non-error messages, e.g. for use when running this task as a cronjob.
346 =item B<--progress>
348 Show information about the current status of the job queue occasionally. This
349 is automatically enabled if --quiet is not given.
351 =item B<--kill-after=SECONDS>
353 Kill supervised jobs after a certain time to avoid hanging the daemon.
355 =item B<--max-parallel=NUM>
357 Run no more than that many jobs at the same time.
359 =item B<--lockfile=FILE>
361 For perpetual operation, create a lockfile in that place and clean it up after
362 finishing/aborting.
364 =item B<--all-once>
366 Instead of perpetuously processing all projects over and over again, process
367 them just once and then exit.
369 =item B<--one=PRJNAME>
371 Process only the given project (given as just the project name without C<.git>
372 suffix) and then exit.
374 =back
376 =head1 DESCRIPTION
378 jobd is Girocco's repositories maintenance servant; it periodically checks all
379 the repositories and updates mirrored repositories and repacks push-mode
380 repositories when needed.
382 =cut