new tickets from slaven
[andk-cpan-tools.git] / bin / poesmoe.pl
blob3423e707149dffd0dcb8caecbd81d357ee824586
1 #!/usr/bin/perl -w
2 use strict;
4 use warnings;
5 use CPAN::DistnameInfo;
6 use File::Basename qw(fileparse dirname);
7 use POE qw(Component::JobQueue Component::DebugShell);
9 use Time::HiRes qw(sleep);
10 use YAML::Syck;
12 use lib "/home/k/sources/rersyncrecent/lib/";
13 require File::Rsync::Mirror::Recentfile;
15 POE::Component::JobQueue->spawn
16 ( Alias => 'passive', # defaults to 'queuer'
17 WorkerLimit => 2, # defaults to 8
18 Worker => \&spawn_a_worker, # code which will start a session
19 Passive =>
21 Prioritizer => \&job_comparer, # defaults to sub { 1 } # FIFO
25 sub job_comparer { 1 }
27 sub spawn_a_worker {
28 my ($postback, @job_params) = @_; # same parameters as posted
29 my $first = $job_params[0];
30 POE::Session->create
31 ( inline_states => {
32 _start => sub {
33 my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
34 print "sleeping 5\n";
35 sleep 5;
36 print "slept 5\n";
38 _stop => sub {},
40 args => [ $postback, # $postback->(@results) to return
41 @job_params, # parameters of this job
44 print "firstpath[$first->{path}]firstperl[$first->{perl}]\n";
47 sub work_handler_start {
48 my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
49 $heap->{rf} = File::Rsync::Mirror::Recentfile->new(
50 canonize => "naive_path_normalize",
51 localroot => "/home/ftp/pub/PAUSE/authors/id/",
52 interval => q(2d),
54 $heap->{otherperls} = "$0.otherperls";
55 my $bbname = fileparse($0,qr{\.pl});
56 $heap->{historyfile} = "$ENV{HOME}/.cpan/$bbname.history.yml";
57 # planning to $x{join"|",$dist,$perl} = join"|",time,$state;
58 # where state in started, "ret[$ret]";
59 $heap->{rx} = qr!(?i:\.(tar.gz|tar.bz2|zip|tgz|tbz))$!;
61 $heap->{basedir} = "/home/sand/CPAN-SVN/logs";
62 $kernel->yield('do_read_recent');
63 $kernel->yield('increment');
66 sub sub_read_recent_events {
67 my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
68 my $since_last_time = time - ($heap->{have_read_recent_events_at}||0);
69 if ($since_last_time < 15) {
70 my $delay = int(15 - $since_last_time + 1);
71 print "Delaying $delay seconds\n";
72 $kernel->delay("do_read_recent", $delay);
73 return;
74 } else {
75 $kernel->yield("increment");
77 my($rf) = $heap->{rf};
78 my($rx) = $heap->{rx};
79 my $recent_events = $rf->recent_events;
80 $recent_events = [ grep { $_->{path} =~ $rx } @$recent_events ];
82 my %seen;
83 $recent_events = [ grep {
84 my $path = $_->{path};
85 my $d = CPAN::DistnameInfo->new($path);
86 my $dist = $d->dist;
87 # warn "no dist for path[$path]" unless $dist;
88 $dist ? !$seen{$dist}++ : "";
89 } @$recent_events ];
91 $heap->{recent_events} = $recent_events;
92 $heap->{have_read_recent_events_at} = time;
96 # different than in loop_... we allow only perls from otherperls here
97 # because we want no overlap
98 sub determine_perls {
99 my($basedir,$otherperls) = @_;
100 my @perls;
101 if (open my $fh2, $otherperls) {
102 while (<$fh2>) {
103 chomp;
104 s/#.*//; # remove comments
105 next if /^\s*$/; # remove empty/white lines
106 next unless -x $_;
107 push @perls, $_;
109 } else {
110 opendir my $dh, $basedir or die;
111 @perls = sort grep { /^megainstall\..*\.d$/ } readdir $dh;
112 pop @perls while ! -e "$basedir/$perls[-1]/perl-V.txt";
113 shift @perls while @perls>1;
115 open my $fh, "$basedir/@perls/perl-V.txt" or die;
116 while (<$fh>) {
117 next unless /-Dprefix=(\S+)/;
118 @perls = "$1/bin/perl";
119 last;
121 close $fh;
123 shift @perls while @perls && ! -x $perls[0];
125 \@perls;
128 sub work_handler_inc {
129 my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
130 my $cnt = $heap->{count}++;
131 print "Session ", $session->ID, " counted to $cnt.\n";
132 if (!$heap->{raw_queue}
133 || $cnt > $#{$heap->{raw_queue}}
135 my $raw_queue = $heap->{raw_queue} = [];
136 my $perls;
137 my $recent_events = $heap->{recent_events};
138 UPLOADITEM: for my $upload (reverse @$recent_events) {
139 next unless $upload->{path} =~ $heap->{rx};
140 next unless $upload->{type} eq "new";
141 $perls ||= determine_perls($heap->{basedir},$heap->{otherperls});
142 PERL: for my $perl (@$perls) {
143 push @$raw_queue, { perl => $perl, path => $upload->{path}};
147 if ($cnt <= $#{$heap->{raw_queue}}) {
148 print "cnt[$cnt]path[$heap->{raw_queue}[$cnt]{path}]perl[$heap->{raw_queue}[$cnt]{perl}]\n";
149 $kernel->delay('increment', 0.1);
150 $kernel->yield('harvest_from_queue');
151 } else {
152 $heap->{count} = 0;
153 $kernel->yield('do_read_recent');
157 sub harvest_from_queue {
158 my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
159 my $first = shift @{$heap->{raw_queue}} or return;
160 my $rest = @{$heap->{raw_queue}};
161 print "rest[$rest]\n";
162 $kernel->post( 'passive', # post to 'passive' alias
163 'enqueue', # 'enqueue' a job
164 'postback', # which of our states is notified when it's done
165 $first, # job parameters
167 $kernel->delay('harvest_from_queue', 0.2);
170 sub postback {
171 require YAML::Syck; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . YAML::Syck::Dump(\@_); # XXX
174 sub work_handler_stop {
175 print "Session ", $_[SESSION]->ID, " has stopped.\n";
178 POE::Session->create(
179 inline_states => {
180 _start => \&work_handler_start,
181 increment => \&work_handler_inc,
182 do_read_recent => \&sub_read_recent_events,
183 harvest_from_queue => \&harvest_from_queue,
184 postback => \&postback,
185 _stop => \&work_handler_stop,
189 # POE::Component::DebugShell->spawn();
191 POE::Kernel->run();
192 exit;
194 __END__
197 =head1
199 The result shall do exactly the same as loop_over_recentfile but with
200 different perls. The three important perls run over there, the
201 rest runs here. Different conf and run files, separate console.
203 In the zeroeth approximation we call jobs that run "echo" and pass
204 their output on to STDOUT. They only say what they would do. So this
205 approximation only needs a different run file, not different perls.
207 Faszinating is that POE programs need no locks, they simply delegate
208 some work to an eventhandler.
210 The simple mechanism that is used so far with $max_epoch_worked_on is
211 good enough for sending jobs to the queue but in the queue itself we
212 must take more precautions to avoid duplicate work. In other words:
213 It's probably OK to stuff everything potentially interesting into a
214 queue and let the queue decide if the thing is really still of
215 interest. We must protect against what happens when we get killed and
216 must restart. We need not protect against a concurrent run of this
217 program. It's not about locking, just about delaying decisions until
218 one needs to be reached. But still: if a job has already been done in
219 a previous run we should not stuff it into a queue again ever. And
220 here is why we need to decide in the very last moment again: our
221 policy is that if Foo-3.14 is uploaded we won't test Foo-3.13 at all.
222 So we must delay the ultimate decision to the last moment. An earlier
223 decision is just a luxury. And we can implement luxury later.
225 Next steps are POE::Component::JobQueue and a status file that
226 prevents duplicate work and filters items that have a higher version
227 number counterpart. It's OK when the job does nothing but "echo hello
228 world" for this next step but it should do it with a JobQueue.
230 Recap: POE:C:CP:YS is based on Barbie CP:YS
232 Why JobQueue: because I want to prioritize 5.10 and newest 5.8 and
233 balance the old perls out such that the queue never runs out without
234 getting really large.