jobd: make progress output time-dependent
[girocco/msimkins.git] / jobd / jobd.pl
blobcaf1bcd22a7ab00485370be0e82afa110d4c67f2
1 #!/usr/bin/perl
3 # jobd - perform Girocco maintenance jobs
5 # Run with --help for details
7 use strict;
8 use warnings;
10 use Getopt::Long;
11 use Pod::Usage;
13 use Girocco::Config;
14 use Girocco::Project;
15 use Girocco::User;
17 # Options
18 my $quiet;
19 my $progress;
20 my $kill_after = 900;
21 my $max_par = 3;
22 my $lockfile = "/tmp/jobd.lock";
23 my $all_once;
24 my $one;
26 ######### Jobs {{{1
28 sub update_project {
29 my $job = shift;
30 my $p = $job->{'project'};
31 check_project_exists($job) || return;
32 (-e "$Girocco::Config::reporoot/$p.git/.nofetch") && do {
33 job_skip($job);
34 return;
36 exec_job_command($job, ["$Girocco::Config::basedir/jobd/update.sh", $p], $quiet);
39 sub gc_project {
40 my $job = shift;
41 my $p = $job->{'project'};
42 check_project_exists($job) || return;
43 exec_job_command($job, ["$Girocco::Config::basedir/jobd/gc.sh", $p], $quiet);
46 sub setup_gc {
47 my $job = shift;
48 queue_job(
49 project => $job->{'project'},
50 type => 'gc',
51 command => \&gc_project,
55 sub check_project_exists {
56 my $job = shift;
57 my $p = $job->{'project'};
58 if (!-d "$Girocco::Config::reporoot/$p.git") {
59 error("Warning: skipping non-existent project: $job->{project}")
60 unless $quiet;
61 job_skip();
62 return 0;
67 sub queue_one {
68 my $project = shift;
69 queue_job(
70 project => $project,
71 type => 'update',
72 command => \&update_project,
73 on_success => \&setup_gc,
74 on_error => \&setup_gc,
78 sub queue_all {
79 queue_one($_) for (Girocco::Project->get_full_list());
82 ######### Daemon operation {{{1
84 my @queue;
85 my @running;
86 my $perpetual = 1;
87 my $locked = 0;
88 my $jobs_executed;
89 my @jobs_killed;
91 sub handle_softexit {
92 error("Waiting for outstanding jobs to finish... ".
93 "^C again to exit immediately");
94 @queue = ();
95 $perpetual = 0;
96 $SIG{'INT'} = \&handle_exit;
99 sub handle_exit {
100 error("Killing outstanding jobs...");
101 $SIG{'CHLD'} = 'IGNORE';
102 $SIG{'TERM'} = 'IGNORE';
103 for (@running) {
104 kill 'KILL', $_->{'pid'};
106 unlink $lockfile if ($locked);
107 exit(0);
110 sub handle_childgone {
111 my $pid = wait;
112 if ($pid != -1) {
113 my @child = grep { $_->{'pid'} == $pid } @running;
114 if ($?) {
115 # XXX- we currently don't care
117 $child[0]->{'finished'} = 2 if (@child);
118 $jobs_executed++;
120 # Just to be safe
121 $SIG{'CHLD'} = \&handle_childgone;
124 sub queue_job {
125 my %opts = @_;
126 $opts{'queued_at'} = time;
127 push @queue, \%opts;
130 sub run_job {
131 my $job = shift;
133 push @running, $job;
134 $job->{'command'}->($job);
137 sub _job_name {
138 my $job = shift;
139 "[".$job->{'type'}."::".$job->{'project'}."]";
142 # Only one of those per job!
143 sub exec_job_command {
144 my ($job, $command, $err_only) = @_;
146 my $pid;
147 if (!defined($pid = fork)) {
148 error(_job_name($job) ." Can't fork job: $!");
149 $job->{'finished'} = 1;
150 return;
152 if (!$pid) {
153 if ($err_only) {
154 open STDOUT, '>/dev/null' || do {
155 error(_job_name($job) ." Can't write to /dev/null: $!");
156 $job->{'finished'} = 1;
157 return;
160 exec @$command;
161 exit $?;
163 $job->{'pid'} = $pid;
164 $job->{'finished'} = 0;
165 $job->{'started_at'} = time;
168 sub job_skip {
169 my $job = shift;
170 exec_job_command($job, ['/bin/false']);
173 sub reap_hanging_jobs {
174 for (@running) {
175 if ((time - $_->{'started_at'}) > $kill_after) {
176 $_->{'finished'} = 1;
177 kill 'KILL', $_->{'pid'};
178 print STDERR _job_name($_) ." KILLED due to timeout\n";
179 push @jobs_killed, _job_name($_);
184 sub reap_finished_jobs {
185 for (@running) {
186 my $status = $_->{'finished'};
187 if ($status == 0) { next; }
188 elsif ($status == 1 && defined($_->{'on_error'})) {
189 $_->{'on_error'}->($_);
190 } elsif ($status == 2 && defined($_->{'on_success'})) {
191 $_->{'on_success'}->($_);
194 @running = grep { $_->{'finished'} == 0 } @running;
197 sub run_queue {
198 my $last_progress = time;
199 $jobs_executed = 0;
200 @jobs_killed = ();
201 unless ($quiet) {
202 printf STDERR "--- Processing %d queued jobs\n", scalar(@queue);
204 $SIG{'CHLD'} = \&handle_childgone;
205 while (@queue || @running) {
206 reap_hanging_jobs();
207 reap_finished_jobs();
208 # Back off if we're too busy
209 if (@running >= $max_par) {
210 sleep 10;
211 unless (($quiet && !$progress) || (time - $last_progress) < 60) {
212 printf STDERR "STATUS: %d queued, %d running, %d finished, %d killed\n", scalar(@queue), scalar(@running), $jobs_executed, scalar(@jobs_killed);
213 if (@running) {
214 my @run_status;
215 for (@running) {
216 push @run_status, _job_name($_)." ". (time - $_->{'started_at'}) ."s";
218 error("STATUS: currently running: ". join(', ', @run_status));
220 $last_progress = time;
222 next;
224 # Run next
225 run_job(shift(@queue)) if @queue;
227 unless ($quiet) {
228 printf STDERR "--- Queue processed. %d jobs executed, %d killed due to timeouts. Now restarting.\n", $jobs_executed, scalar(@jobs_killed);
232 sub run_perpetually {
233 if (-e $lockfile) {
234 die "Lockfile exists. Please make sure no other instance of jobd is running.";
236 open LOCK, '>', $lockfile || die "Cannot create lockfile $lockfile: $!";
237 print LOCK $$;
238 close LOCK;
239 $locked = 1;
241 while ($perpetual) {
242 queue_all();
243 run_queue();
245 unlink $lockfile;
248 ######### Helpers {{{1
250 sub error($) {
251 print STDERR shift()."\n";
253 sub fatal($) {
254 error(shift);
255 exit 1;
258 ######### Main {{{1
260 # Parse options
261 Getopt::Long::Configure('bundling');
262 my $parse_res = GetOptions(
263 'help|?' => sub { pod2usage(-verbose => 1, -exitval => 0); },
264 'quiet|q' => \$quiet,
265 'progress|P' => \$progress,
266 'kill-after|k=i' => \$kill_after,
267 'max-parallel|p=i' => \$max_par,
268 'lockfile|l=s' => \$lockfile,
269 'all-once|a' => \$all_once,
270 'one|o=s' => \$one,
271 ) || pod2usage(2);
272 fatal("Error: can only use one out of --all-once and --one")
273 if ($all_once && $one);
275 unless ($quiet) {
276 $ENV{'show_progress'} = '1';
279 if ($one) {
280 queue_one($one);
281 run_queue();
282 exit;
285 if ($all_once) {
286 queue_all();
287 run_queue();
288 exit;
291 run_perpetually();
293 ########## Documentation {{{1
295 __END__
297 =head1 NAME
299 jobd - Perform Girocco maintenance jobs
301 =head1 SYNOPSIS
303 jobd [options]
305 Options:
306 -h | --help detailed instructions
307 -q | --quiet run quietly
308 -P | --progress show occasional status updates
309 -k SECONDS | --kill-after=SECONDS how long to wait before killing jobs
310 -p NUM | --max-parallel=NUM how many jobs to run at the same time
311 -l FILE | --lockfile=FILE create a lockfile in the given location
312 -a | --all-once process the list only once
313 -o PRJNAME | --one=PRJNAME process only one project
315 =head1 OPTIONS
317 =over 8
319 =item B<--help>
321 Print the full description of jobd's options.
323 =item B<--quiet>
325 Suppress non-error messages, e.g. for use when running this task as a cronjob.
327 =item B<--progress>
329 Show information about the current status of the job queue occasionally. This
330 is automatically enabled if --quiet is not given.
332 =item B<--kill-after=SECONDS>
334 Kill supervised jobs after a certain time to avoid hanging the daemon.
336 =item B<--max-parallel=NUM>
338 Run no more than that many jobs at the same time.
340 =item B<--lockfile=FILE>
342 For perpetual operation, create a lockfile in that place and clean it up after
343 finishing/aborting.
345 =item B<--all-once>
347 Instead of perpetuously processing all projects over and over again, process
348 them just once and then exit.
350 =item B<--one=PRJNAME>
352 Process only the given project (given as just the project name without C<.git>
353 suffix) and then exit.
355 =back
357 =head1 DESCRIPTION
359 jobd is Girocco's repositories maintenance servant; it periodically checks all
360 the repositories and updates mirrored repositories and repacks push-mode
361 repositories when needed.
363 =cut