3 # daemon to watch the zebraqueue and update zebra as needed
6 #use warnings; FIXME - Bug 2505
8 # find Koha's Perl modules
9 # test carefully before changing this
11 eval { require "$FindBin::Bin/kohalib.pl" };
14 use POE
qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
15 use Unix
::Syslog
qw(:macros);
20 use C4
::AuthoritiesMarc
;
26 # wait periods governing connection attempts
27 my $min_connection_wait = 1; # start off at 1 second
28 my $max_connection_wait = 1024; # max about 17 minutes
30 # keep separate wait period for bib and authority Zebra databases
31 my %zoom_connection_waits = ();
33 my $db_connection_wait = $min_connection_wait;
35 # ZOOM and Z39.50 errors that are potentially
36 # resolvable by connecting again and retrying
38 my %retriable_zoom_errors = (
39 10000 => 'ZOOM_ERROR_CONNECT',
40 10001 => 'ZOOM_ERROR_MEMORY',
41 10002 => 'ZOOM_ERROR_ENCODE',
42 10003 => 'ZOOM_ERROR_DECODE',
43 10004 => 'ZOOM_ERROR_CONNECTION_LOST',
44 10005 => 'ZOOM_ERROR_INIT',
45 10006 => 'ZOOM_ERROR_INTERNAL',
46 10007 => 'ZOOM_ERROR_TIMEOUT',
49 # structure to store updates that have
50 # failed and are to be retrieved. The
51 # structure is a hashref of hashrefs,
54 # $postoned_updates->{$server}->{$record_number} = 1;
56 # If an operation is attempted and fails because
57 # of a retriable error (see above), the daemon
58 # will try several times to recover as follows:
60 # 1. close and reopen the connection to the
61 # Zebra server, unless the error was a timeout,
63 # 2. retry the operation
65 # If, after trying this five times, the operation still
66 # fails, the daemon will mark the record number as
67 # postponed, and try to process other entries in
68 # zebraqueue. When an update is postponed, the
69 # error will be reported to syslog.
71 # If more than 100 postponed updates are
72 # accumulated, the daemon will assume that
73 # something is seriously wrong, complain loudly,
74 # and abort. If running under the daemon(1) command,
75 # this means that the daemon will respawn.
77 my $num_postponed_updates = 0;
78 my $postponed_updates = {};
80 my $max_operation_attempts = 5;
81 my $max_postponed_updates = 100;
83 # Zebra connection timeout
84 my $zconn_timeout = 30;
85 my $zconn_timeout_multiplier = 1.5;
86 my $max_zconn_timeout = 120;
88 my $ident = "Koha Zebraqueue ";
91 Unix
::Syslog
::openlog
$ident, LOG_PID
, LOG_LOCAL0
;
93 Unix
::Syslog
::syslog LOG_INFO
, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
97 # Starts session. Only ever called once only really used to set an alias
99 my ( $kernel, $heap, $session ) = @_[ KERNEL
, HEAP
, SESSION
];
101 my $time = localtime(time);
102 Unix
::Syslog
::syslog LOG_INFO
, "$time POE Session ", $session->ID, " has started.\n";
105 # $kernel->yield('status_check');
106 $kernel->yield('sleep');
111 # can be used to slow down loop execution if needed
112 my ( $kernel, $heap, $session ) = @_[ KERNEL
, HEAP
, SESSION
];
113 use Time
::HiRes qw
(sleep);
114 Time
::HiRes
::sleep(0.5);
116 $kernel->yield('status_check');
120 # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
121 my ( $kernel, $heap, $session ) = @_[ KERNEL
, HEAP
, SESSION
];
122 my $dbh = get_db_connection
();
123 my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
125 my $data = $sth->fetchrow_hashref();
126 if ($data->{'opcount'} > 0) {
127 Unix
::Syslog
::syslog LOG_INFO
, "$data->{'opcount'} operations waiting to be run\n";
129 $dbh->commit(); # needed so that we get current state of zebraqueue next time
130 # we enter handler_check
131 $kernel->yield('do_ops');
135 $dbh->commit(); # needed so that we get current state of zebraqueue next time
136 # we enter handler_check
137 $kernel->yield('sleep');
142 # execute operations waiting in the zebraqueue
143 my ( $kernel, $heap, $session ) = @_[ KERNEL
, HEAP
, SESSION
];
144 my $dbh = get_db_connection
();
145 my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0 ORDER BY id DESC");
147 Unix
::Syslog
::syslog LOG_INFO
, "Executing zebra operations\n";
149 my $completed_updates = {};
150 ZEBRAQUEUE
: while (my $data = $readsth->fetchrow_hashref()) {
151 warn "Inside while loop" if $debug;
153 my $id = $data->{'id'};
154 my $op = $data->{'operation'};
155 $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
156 # either delete_record or recordDelete
157 my $record_number = $data->{'biblio_auth_number'};
158 my $server = $data->{'server'};
160 next ZEBRAQUEUE
if exists $postponed_updates->{$server}->{$record_number};
161 next ZEBRAQUEUE
if exists $completed_updates->{$server}->{$record_number}->{$op};
165 if ($op eq 'recordDelete') {
166 $ok = process_delete
($dbh, $server, $record_number);
169 $ok = process_update
($dbh, $server, $record_number, $id);
172 mark_done
($dbh, $record_number, $op, $server);
173 $completed_updates->{$server}->{$record_number}->{$op} = 1;
174 if ($op eq 'recordDelete') {
175 $completed_updates->{$server}->{$record_number}->{'specialUpdate'} = 1;
181 $kernel->yield('sleep');
187 my $record_number = shift;
192 warn "Searching for record to delete" if $debug;
193 # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
194 my $Zconn = get_zebra_connection
($server);
195 my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
196 $results->option(elementSetName
=> 'marcxml');
197 $record = $results->record(0)->raw();
200 # this doesn't exist, so no need to wail on zebra to delete it
201 if ($@
->code() eq 13) {
204 # caught a ZOOM::Exception
205 my $message = _format_zoom_error_message
($@
);
206 postpone_update
($server, $record_number, $message);
209 # then, delete the record
210 warn "Deleting record" if $debug;
211 $ok = zebrado
($record, 'recordDelete', $server, $record_number);
219 my $record_number = shift;
225 warn "Updating record" if $debug;
228 if ($server eq "biblioserver") {
229 my $marc = GetMarcBiblio
($record_number);
230 $marcxml = $marc->as_xml_record() if $marc;
232 elsif ($server eq "authorityserver") {
233 $marcxml = C4
::AuthoritiesMarc
::GetAuthorityXML
($record_number);
235 # check it's XML, just in case
237 my $hashed = XMLin
($marcxml);
238 }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
239 ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
241 Unix
::Syslog
::syslog LOG_ERR
, "$server record $record_number is malformed: $@";
242 mark_done_by_id
($dbh, $id, $server);
245 # ok, we have everything, do the operation in zebra !
246 $ok = zebrado
($marcxml, 'specialUpdate', $server, $record_number);
251 sub mark_done_by_id
{
255 my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ? AND server = ? AND done = 0");
256 $delsth->execute($id, $server);
261 my $record_number = shift;
266 if ($op eq 'recordDelete') {
267 # if it's a deletion, we can delete every request on this biblio : in case the user
268 # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
269 # that are pending and can't succeed, as we don't have the XML anymore
270 # so, delete everything for this biblionumber
271 $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done = 1
272 WHERE biblio_auth_number = ?
275 $delsth->execute($record_number, $server);
277 # if it's not a deletion, delete every pending specialUpdate for this biblionumber
278 # in case the user add biblio, then X items, before this script runs
279 # this avoid indexing X+1 times where just 1 is enough.
280 $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1
281 WHERE biblio_auth_number = ?
282 AND operation = 'specialUpdate'
285 $delsth->execute($record_number, $server);
290 ###Accepts a $server variable thus we can use it to update biblios, authorities or other zebra dbs
291 my ($record, $op, $server, $record_number) = @_;
294 my $message = "error updating index for $server $record $record_number: no source record";
295 postpone_update
($server, $record_number, $message);
301 ATTEMPT
: while ($attempts < $max_operation_attempts) {
303 warn "Attempt $attempts for $op for $server $record_number" if $debug;
304 my $Zconn = get_zebra_connection
($server);
306 my $Zpackage = $Zconn->package();
307 $Zpackage->option(action
=> $op);
308 $Zpackage->option(record
=> $record);
310 eval { $Zpackage->send("update") };
311 if ($@
&& $@
->isa("ZOOM::Exception")) {
312 my $message = _format_zoom_error_message
($@
);
313 my $error = $@
->code();
314 if (exists $retriable_zoom_errors{$error}) {
315 warn "reattempting operation $op for $server $record_number" if $debug;
316 warn "last Zebra error was $message" if $debug;
317 $Zpackage->destroy();
319 if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
320 # bump up connection timeout
321 $zconn_timeout = POSIX
::ceil
($zconn_timeout * $zconn_timeout_multiplier);
322 $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
323 Unix
::Syslog
::syslog LOG_INFO
, "increased Zebra connection timeout to $zconn_timeout\n";
324 warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
328 postpone_update
($server, $record_number, $message);
331 # FIXME - would be more efficient to send a ES commit
332 # after a batch of records, rather than commiting after
333 # each one - Zebra handles updates relatively slowly.
334 eval { $Zpackage->send('commit'); };
336 # operation succeeded, but commit
337 # did not - we have a problem
338 my $message = _format_zoom_error_message
($@
);
339 postpone_update
($server, $record_number, $message);
347 my $message = "Made $attempts attempts to index $server record $record_number without success";
348 postpone_update
($server, $record_number, $message);
354 sub postpone_update
{
355 my ($server, $record_number, $message) = @_;
356 warn $message if $debug;
357 $message .= "\n" unless $message =~ /\n$/;
358 Unix
::Syslog
::syslog LOG_ERR
, $message;
359 $postponed_updates->{$server}->{$record_number} = 1;
361 $num_postponed_updates++;
362 if ($num_postponed_updates > $max_postponed_updates) {
363 warn "exiting, over $max_postponed_updates postponed indexing updates";
364 Unix
::Syslog
::syslog LOG_ERR
, "exiting, over $max_postponed_updates postponed indexing updates";
365 Unix
::Syslog
::closelog
;
372 my $time = localtime(time);
373 Unix
::Syslog
::syslog LOG_INFO
, "$time Session ", $_[SESSION
]->ID, " has stopped.\n";
374 delete $heap->{session
};
377 # get a DB connection
378 sub get_db_connection
{
381 $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
384 # note that C4::Context caches the
385 # DB handle; C4::Context->dbh() will
386 # check that handle first before returning
387 # it. If the connection is bad, it
388 # then tries (once) to create a new one.
389 $dbh = C4
::Context
->dbh();
393 # C4::Context->dbh dies if it cannot
394 # establish a connection
395 $db_connection_wait = $min_connection_wait;
396 $dbh->{AutoCommit
} = 0; # do this to reduce number of
397 # commits to zebraqueue
402 my $error = "failed to connect to DB: $DBI::errstr";
403 warn $error if $debug;
404 Unix
::Syslog
::syslog LOG_ERR
, $error;
405 sleep $db_connection_wait;
406 $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
410 # get a Zebra connection
411 sub get_zebra_connection
{
414 # start connection retry wait queue if necessary
415 $zoom_connection_waits{$server} = $min_connection_wait unless exists $zoom_connection_waits{$server};
417 # try to connect to Zebra forever until we succeed
419 # what follows assumes that C4::Context->Zconn
420 # makes only one attempt to create a new connection;
421 my $Zconn = C4
::Context
->Zconn($server, 0, 1, '', 'xml');
422 $Zconn->option('timeout' => $zconn_timeout);
424 # it is important to note that if the existing connection
425 # stored by C4::Context has an error (any type of error)
426 # from the last transaction, C4::Context->Zconn closes
427 # it and establishes a new one. Therefore, the
428 # following check will succeed if we have a new, good
429 # connection or we're using a previously established
430 # connection that has experienced no errors.
431 if ($Zconn->errcode() == 0) {
432 $zoom_connection_waits{$server} = $min_connection_wait;
437 my $error = _format_zoom_error_message
($Zconn);
438 warn $error if $debug;
439 Unix
::Syslog
::syslog LOG_ERR
, $error;
440 sleep $zoom_connection_waits{$server};
441 $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
445 # given a ZOOM::Exception or
446 # ZOOM::Connection object, generate
447 # a human-reaable error message
448 sub _format_zoom_error_message
{
452 if (ref($err) eq 'ZOOM::Connection') {
453 $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
454 } elsif (ref($err) eq 'ZOOM::Exception') {
455 $message = $err->message() . " (" . $err->diagset . " " . $err->code() . ") " . $err->addinfo();
460 POE
::Session
->create(
462 _start
=> \
&handler_start
,
463 sleep => \
&handler_sleep
,
464 status_check
=> \
&handler_check
,
466 _stop
=> \
&handler_stop
,
473 Unix
::Syslog
::closelog
;