Bug 25184: (RM follow-up) Make DB update idempotent
[koha.git] / Koha / MetaSearcher.pm
blob280d1b24c0e39c90d993ced0b3ce2d19d5b390c6
1 package Koha::MetaSearcher;
3 # Copyright 2014 ByWater Solutions
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use Modern::Perl;
22 use base 'Class::Accessor';
24 use C4::Charset qw( MarcToUTF8Record );
25 use C4::Search qw(); # Purely for new_record_from_zebra
26 use DBIx::Class::ResultClass::HashRefInflator;
27 use IO::Select;
28 use Koha::Caches;
29 use Koha::Database;
30 use Koha::MetadataRecord;
31 use MARC::File::XML;
32 use Storable qw( store_fd fd_retrieve );
33 use Time::HiRes qw( clock_gettime CLOCK_MONOTONIC );
34 use UUID;
35 use ZOOM;
37 use sort 'stable';
39 __PACKAGE__->mk_accessors( qw( fetch offset on_error resultset ) );
41 sub new {
42 my ( $class, $options ) = @_;
44 my ( $uuid, $uuidstring );
45 UUID::generate($uuid);
46 UUID::unparse( $uuid, $uuidstring );
48 return bless {
49 offset => 0,
50 fetch => 100,
51 on_error => sub {},
52 results => [],
53 resultset => $uuidstring,
54 %{ $options || {} }
55 }, $class;
58 sub handle_hit {
59 my ( $self, $index, $server, $marcrecord ) = @_;
61 my $record = Koha::MetadataRecord->new( { schema => 'marc', record => $marcrecord } );
63 my %fetch = (
64 title => 'biblio.title',
65 subtitle => 'biblio.subtitle',
66 seriestitle => 'biblio.seriestitle',
67 author => 'biblio.author',
68 isbn =>'biblioitems.isbn',
69 issn =>'biblioitems.issn',
70 lccn =>'biblioitems.lccn', #LC control number (not call number)
71 edition =>'biblioitems.editionstatement',
72 date => 'biblio.copyrightdate', #MARC21
73 date2 => 'biblioitems.publicationyear', #UNIMARC
76 my $metadata = {};
77 while ( my ( $key, $kohafield ) = each %fetch ) {
78 $metadata->{$key} = $record->getKohaField($kohafield);
80 $metadata->{date} //= $metadata->{date2};
82 push @{ $self->{results} }, {
83 server => $server,
84 index => $index,
85 record => $marcrecord,
86 metadata => $metadata,
90 sub search {
91 my ( $self, $server_ids, $query ) = @_;
93 my $resultset_expiry = 300;
95 my $cache = Koha::Caches->get_instance();
96 my $schema = Koha::Database->new->schema;
97 my $stats = {
98 num_fetched => {
99 map { $_ => 0 } @$server_ids
101 num_hits => {
102 map { $_ => 0 } @$server_ids
104 total_fetched => 0,
105 total_hits => 0,
107 my $start = clock_gettime( CLOCK_MONOTONIC );
108 my $select = IO::Select->new;
110 my @cached_sets;
111 my @servers;
113 foreach my $server_id ( @$server_ids ) {
114 if ( $server_id =~ /^\d+$/ ) {
115 # Z39.50 server
116 my $server = $schema->resultset('Z3950server')->find(
117 { id => $server_id },
118 { result_class => 'DBIx::Class::ResultClass::HashRefInflator' },
120 $server->{type} = 'z3950';
122 push @servers, $server;
123 } elsif ( $server_id =~ /(\w+)(?::(\w+))?/ ) {
124 # Special server
125 push @servers, {
126 type => $1,
127 extra => $2,
128 id => $server_id,
129 host => $server_id,
130 name => $server_id,
135 # HashRefInflator is used so that the information will survive into the fork
136 foreach my $server ( @servers ) {
137 if ( $cache ) {
138 my $set = $cache->get_from_cache( 'z3950-resultset-' . $self->resultset . '-' . $server->{id} );
139 if ( ref($set) eq 'HASH' ) {
140 $set->{server} = $server;
141 push @cached_sets, $set;
142 next;
146 $select->add( $self->_start_worker( $server, $query ) );
149 # Handle these while the servers are searching
150 foreach my $set ( @cached_sets ) {
151 $self->_handle_hits( $stats, $set );
154 while ( $select->count ) {
155 foreach my $readfh ( $select->can_read() ) {
156 my $result = fd_retrieve( $readfh );
158 $select->remove( $readfh );
159 close $readfh;
160 wait;
162 next if ( ref $result ne 'HASH' );
164 if ( $result->{error} ) {
165 $self->{on_error}->( $result->{server}, $result->{error} );
166 next;
169 $self->_handle_hits( $stats, $result );
171 if ( $cache ) {
172 $cache->set_in_cache( 'z3950-resultset-' . $self->resultset . '-' . $result->{server}->{id}, {
173 hits => $result->{hits},
174 num_fetched => $result->{num_fetched},
175 num_hits => $result->{num_hits},
176 }, { expiry => $resultset_expiry } );
181 $stats->{time} = clock_gettime( CLOCK_MONOTONIC ) - $start;
183 return $stats;
186 sub _start_worker {
187 my ( $self, $server, $query ) = @_;
188 pipe my $readfh, my $writefh;
190 # Accessing the cache or Koha database after the fork is risky, so get any resources we need
191 # here.
192 my $pid;
193 my $marcflavour = C4::Context->preference('marcflavour');
195 if ( ( $pid = fork ) ) {
196 # Parent process
197 close $writefh;
199 return $readfh;
200 } elsif ( !defined $pid ) {
201 # Error
203 $self->{on_error}->( $server, 'Failed to fork' );
204 return;
207 close $readfh;
208 my $connection;
209 my ( $num_hits, $num_fetched, $hits, $results );
211 eval {
212 if ( $server->{type} eq 'z3950' ) {
213 my $zoptions = ZOOM::Options->new();
214 $zoptions->option( 'elementSetName', 'F' );
215 $zoptions->option( 'databaseName', $server->{db} );
216 $zoptions->option( 'user', $server->{userid} ) if $server->{userid};
217 $zoptions->option( 'password', $server->{password} ) if $server->{password};
218 $zoptions->option( 'preferredRecordSyntax', $server->{syntax} );
219 $zoptions->option( 'timeout', $server->{timeout} ) if $server->{timeout};
221 $connection = ZOOM::Connection->create($zoptions);
223 $connection->connect( $server->{host}, $server->{port} );
224 $results = $connection->search_pqf( $query ); # Starts the search
225 } elsif ( $server->{type} eq 'koha' ) {
226 $connection = C4::Context->Zconn( $server->{extra} );
227 $results = $connection->search_pqf( $query ); # Starts the search
228 } elsif ( $server->{type} eq 'batch' ) {
229 $server->{encoding} = 'utf-8';
232 if ($@) {
233 store_fd {
234 error => $connection ? $connection->exception() : $@,
235 server => $server,
236 }, $writefh;
237 exit;
240 if ( $server->{type} eq 'batch' ) {
241 # TODO: actually handle PQF
242 $query =~ s/@\w+ (?:\d+=\d+ )?//g;
243 $query =~ s/"//g;
245 my $schema = Koha::Database->new->schema;
246 $schema->storage->debug(1);
247 my $match_condition = [ map +{ -like => '%' . $_ . '%' }, split( /\s+/, $query ) ];
248 $hits = [ $schema->resultset('ImportRecord')->search(
250 import_batch_id => $server->{extra},
251 -or => [
252 { 'import_biblios.title' => $match_condition },
253 { 'import_biblios.author' => $match_condition },
254 { 'import_biblios.isbn' => $match_condition },
255 { 'import_biblios.issn' => $match_condition },
259 join => [ qw( import_biblios ) ],
260 rows => $self->{fetch},
262 )->get_column( 'marc' )->all ];
264 $num_hits = $num_fetched = scalar @$hits;
265 } else {
266 $num_hits = $results->size;
267 $num_fetched = ( $self->{offset} + $self->{fetch} ) < $num_hits ? $self->{fetch} : $num_hits;
269 $hits = [ map { $_->raw() } @{ $results->records( $self->{offset}, $num_fetched, 1 ) } ];
272 if ( !@$hits && $connection && $connection->exception() ) {
273 store_fd {
274 error => $connection->exception(),
275 server => $server,
276 }, $writefh;
277 exit;
280 if ( $server->{type} eq 'koha' ) {
281 $hits = [ map { C4::Search::new_record_from_zebra( $server->{extra}, $_ ) } @$hits ];
282 } else {
283 $hits = [ map { $self->_import_record( $_, $marcflavour, $server->{encoding} ? $server->{encoding} : "iso-5426" ) } @$hits ];
286 store_fd {
287 hits => $hits,
288 num_fetched => $num_fetched,
289 num_hits => $num_hits,
290 server => $server,
291 }, $writefh;
293 exit;
296 sub _import_record {
297 my ( $self, $raw, $marcflavour, $encoding ) = @_;
299 my ( $marcrecord ) = MarcToUTF8Record( $raw, $marcflavour, $encoding ); #ignores charset return values
301 return $marcrecord;
304 sub _handle_hits {
305 my ( $self, $stats, $set ) = @_;
307 my $server = $set->{server};
309 my $num_hits = $stats->{num_hits}->{ $server->{id} } = $set->{num_hits};
310 my $num_fetched = $stats->{num_fetched}->{ $server->{id} } = $set->{num_fetched};
312 $stats->{total_hits} += $num_hits;
313 $stats->{total_fetched} += $num_fetched;
315 foreach my $j ( 0..$#{ $set->{hits} } ) {
316 $self->handle_hit( $self->{offset} + $j, $server, $set->{hits}->[$j] );
320 sub sort {
321 my ( $self, $key, $direction ) = @_;
323 my $empty_flip = -1; # Determines the flip of ordering for records with empty sort keys.
325 foreach my $hit ( @{ $self->{results} } ) {
326 ( $hit->{sort_key} = $hit->{metadata}->{$key} || '' ) =~ s/\W//g;
329 $self->{results} = [ sort {
330 # Sort empty records at the end
331 return -$empty_flip unless $a->{sort_key};
332 return $empty_flip unless $b->{sort_key};
334 $direction * ( $a->{sort_key} cmp $b->{sort_key} );
335 } @{ $self->{results} } ];
338 sub results {
339 my ( $self, $offset, $length ) = @_;
341 my @subset;
343 foreach my $i ( $offset..( $offset + $length - 1 ) ) {
344 push @subset, $self->{results}->[$i] if $self->{results}->[$i];
347 return @subset;