Bug 26922: Regression tests
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blob4a2d75d03decd5413ceb1969601d561ae8251f3f
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
56 =head1 NAME
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
60 =head1 ACCESSORS
62 =over 4
64 =item index
66 The name of the index to use, generally 'biblios' or 'authorities'.
68 =item index_name
70 The Elasticsearch index name with Koha instance prefix.
72 =back
75 =head1 FUNCTIONS
77 =cut
79 sub new {
80 my $class = shift @_;
81 my ($params) = @_;
83 # Check for a valid index
84 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85 my $config = _read_configuration();
86 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88 my $self = $class->SUPER::new(@_);
89 return $self;
92 =head2 get_elasticsearch
94 my $elasticsearch_client = $self->get_elasticsearch();
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
99 =cut
101 sub get_elasticsearch {
102 my $self = shift @_;
103 unless (defined $self->{elasticsearch}) {
104 $self->{elasticsearch} = Search::Elasticsearch->new(
105 $self->get_elasticsearch_params()
108 return $self->{elasticsearch};
111 =head2 get_elasticsearch_params
113 my $params = $self->get_elasticsearch_params();
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
119 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120 'index_name' => 'koha_instance_index',
123 This is configured by the following in the C<config> block in koha-conf.xml:
125 <elasticsearch>
126 <server>127.0.0.1:9200</server>
127 <server>anotherserver:9200</server>
128 <index_name>koha_instance</index_name>
129 </elasticsearch>
131 =cut
133 sub get_elasticsearch_params {
134 my ($self) = @_;
136 my $conf;
137 try {
138 $conf = _read_configuration();
139 } catch {
140 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141 croak($_->message);
145 return $conf
148 =head2 get_elasticsearch_settings
150 my $settings = $self->get_elasticsearch_settings();
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
155 A hashref containing the settings is returned.
157 =cut
159 sub get_elasticsearch_settings {
160 my ($self) = @_;
162 # Use state to speed up repeated calls
163 state $settings = undef;
164 if (!defined $settings) {
165 my $config_file = C4::Context->config('elasticsearch_index_config');
166 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167 $settings = LoadFile( $config_file );
170 return $settings;
173 =head2 get_elasticsearch_mappings
175 my $mappings = $self->get_elasticsearch_mappings();
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
180 =cut
182 sub get_elasticsearch_mappings {
183 my ($self) = @_;
185 # Use state to speed up repeated calls
186 state %all_mappings;
187 state %sort_fields;
189 if (!defined $all_mappings{$self->index}) {
190 $sort_fields{$self->index} = {};
191 # Clone the general mapping to break ties with the original hash
192 my $mappings = {
193 data => clone(_get_elasticsearch_field_config('general', ''))
195 my $marcflavour = lc C4::Context->preference('marcflavour');
196 $self->_foreach_mapping(
197 sub {
198 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199 return if $marc_type ne $marcflavour;
200 # TODO if this gets any sort of complexity to it, it should
201 # be broken out into its own function.
203 # TODO be aware of date formats, but this requires pre-parsing
204 # as ES will simply reject anything with an invalid date.
205 my $es_type = 'text';
206 if ($type eq 'boolean') {
207 $es_type = 'boolean';
208 } elsif ($type eq 'number' || $type eq 'sum') {
209 $es_type = 'integer';
210 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 $es_type = 'stdno';
212 } elsif ($type eq 'year') {
213 $es_type = 'year';
216 if ($search) {
217 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
220 if ($facet) {
221 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
223 if ($suggestible) {
224 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236 $all_mappings{$self->index} = $mappings;
238 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 raw_elasticsearch_mappings
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
249 =cut
251 sub raw_elasticsearch_mappings {
252 my ( $marc_type ) = @_;
254 my $schema = Koha::Database->new()->schema();
256 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
258 my $mappings = {};
259 while ( my $search_field = $search_fields->next ) {
261 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262 { search_field_id => $search_field->id },
264 join => 'search_marc_map',
265 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
269 while ( my $marc_to_field = $marc_to_fields->next ) {
271 my $marc_map = $marc_to_field->search_marc_map;
273 next if $marc_type && $marc_map->marc_type ne $marc_type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
283 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
285 facet => $marc_to_field->facet || '',
286 marc_type => $marc_map->marc_type,
287 marc_field => $marc_map->marc_field,
288 sort => $marc_to_field->sort,
289 suggestible => $marc_to_field->suggestible || ''
295 return $mappings;
298 =head2 _get_elasticsearch_field_config
300 Get the Elasticsearch field config for the given purpose and data type.
302 $mapping = _get_elasticsearch_field_config('search', 'text');
304 =cut
306 sub _get_elasticsearch_field_config {
308 my ( $purpose, $type ) = @_;
310 # Use state to speed up repeated calls
311 state $settings = undef;
312 if (!defined $settings) {
313 my $config_file = C4::Context->config('elasticsearch_field_config');
314 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315 $settings = LoadFile( $config_file );
318 if (!defined $settings->{$purpose}) {
319 die "Field purpose $purpose not defined in field config";
321 if ($type eq '') {
322 return $settings->{$purpose};
324 if (defined $settings->{$purpose}{$type}) {
325 return $settings->{$purpose}{$type};
327 if (defined $settings->{$purpose}{'default'}) {
328 return $settings->{$purpose}{'default'};
330 return;
333 =head2 _load_elasticsearch_mappings
335 Load Elasticsearch mappings in the format of mappings.yaml.
337 $indexes = _load_elasticsearch_mappings();
339 =cut
341 sub _load_elasticsearch_mappings {
342 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
343 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
344 return LoadFile( $mappings_yaml );
347 sub reset_elasticsearch_mappings {
348 my ( $self ) = @_;
349 my $indexes = $self->_load_elasticsearch_mappings();
351 Koha::SearchMarcMaps->delete;
352 Koha::SearchFields->delete;
354 while ( my ( $index_name, $fields ) = each %$indexes ) {
355 while ( my ( $field_name, $data ) = each %$fields ) {
357 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
359 # Set default values
360 $sf_params{staff_client} //= 1;
361 $sf_params{opac} //= 1;
363 $sf_params{name} = $field_name;
365 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367 my $mappings = $data->{mappings};
368 for my $mapping ( @$mappings ) {
369 my $marc_field = Koha::SearchMarcMaps->find_or_create({
370 index_name => $index_name,
371 marc_type => $mapping->{marc_type},
372 marc_field => $mapping->{marc_field}
374 $search_field->add_to_search_marc_maps($marc_field, {
375 facet => $mapping->{facet} || 0,
376 suggestible => $mapping->{suggestible} || 0,
377 sort => $mapping->{sort},
378 search => $mapping->{search} // 1
384 $self->clear_search_fields_cache();
386 # FIXME return the mappings?
389 # This overrides the accessor provided by Class::Accessor so that if
390 # sort_fields isn't set, then it'll generate it.
391 sub sort_fields {
392 my $self = shift;
393 if (@_) {
394 $self->_sort_fields_accessor(@_);
395 return;
397 my $val = $self->_sort_fields_accessor();
398 return $val if $val;
400 # This will populate the accessor as a side effect
401 $self->get_elasticsearch_mappings();
402 return $self->_sort_fields_accessor();
405 =head2 _process_mappings($mappings, $data, $record_document, $meta)
407 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
409 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
410 Since we group all mappings by MARC field targets C<$mappings> will contain
411 all targets for C<$data> and thus we need to fetch the MARC field only once.
412 C<$mappings> will be applied to C<$record_document> and new field values added.
413 The method has no return value.
415 =over 4
417 =item C<$mappings>
419 Arrayref of mappings containing arrayrefs in the format
420 [C<$target>, C<$options>] where C<$target> is the name of the target field and
421 C<$options> is a hashref containing processing directives for this particular
422 mapping.
424 =item C<$data>
426 The source data from a MARC record field.
428 =item C<$record_document>
430 Hashref representing the Elasticsearch document on which mappings should be
431 applied.
433 =item C<$meta>
435 A hashref containing metadata useful for enforcing per mapping rules. For
436 example for providing extra context for mapping options, or treating mapping
437 targets differently depending on type (sort, search, facet etc). Combining
438 this metadata with the mapping options and metadata allows us to mutate the
439 data per mapping, or even replace it with other data retrieved from the
440 metadata context.
442 Current properties are:
444 C<altscript>: A boolean value indicating whether an alternate script presentation is being
445 processed.
447 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
448 'subfield' or 'subfields_group'.
450 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
452 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
453 is 'subfields_group'.
455 C<field>: The original C<MARC::Record> object.
457 =back
459 =cut
461 sub _process_mappings {
462 my ($_self, $mappings, $data, $record_document, $meta) = @_;
463 foreach my $mapping (@{$mappings}) {
464 my ($target, $options) = @{$mapping};
466 # Don't process sort fields for alternate scripts
467 my $sort = $target =~ /__sort$/;
468 if ($sort && $meta->{altscript}) {
469 next;
472 # Copy (scalar) data since can have multiple targets
473 # with differing options for (possibly) mutating data
474 # so need a different copy for each
475 my $data_copy = $data;
476 if (defined $options->{substr}) {
477 my ($start, $length) = @{$options->{substr}};
478 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
481 # Add data to values array for callbacks processing
482 my $values = [$data_copy];
484 # Value callbacks takes subfield data (or values from previous
485 # callbacks) as argument, and returns a possibly different list of values.
486 # Note that the returned list may also be empty.
487 if (defined $options->{value_callbacks}) {
488 foreach my $callback (@{$options->{value_callbacks}}) {
489 # Pass each value to current callback which returns a list
490 # (scalar is fine too) resulting either in a list or
491 # a list of lists that will be flattened by perl.
492 # The next callback will receive the possibly expanded list of values.
493 $values = [ map { $callback->($_) } @{$values} ];
497 # Skip mapping if all values has been removed
498 next unless @{$values};
500 if (defined $options->{property}) {
501 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
503 if (defined $options->{nonfiling_characters_indicator}) {
504 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
505 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
506 # Nonfiling chars does not make sense for multiple values
507 # Only apply on first element
508 $values->[0] = substr $values->[0], $nonfiling_chars;
511 $values = [ grep(!/^$/, @{$values}) ];
513 $record_document->{$target} //= [];
514 push @{$record_document->{$target}}, @{$values};
518 =head2 marc_records_to_documents($marc_records)
520 my $record_documents = $self->marc_records_to_documents($marc_records);
522 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
524 Returns array of hash references, representing Elasticsearch documents,
525 acceptable as body payload in C<Search::Elasticsearch> requests.
527 =over 4
529 =item C<$marc_documents>
531 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
533 =back
535 =cut
537 sub marc_records_to_documents {
538 my ($self, $records) = @_;
539 my $rules = $self->_get_marc_mapping_rules();
540 my $control_fields_rules = $rules->{control_fields};
541 my $data_fields_rules = $rules->{data_fields};
542 my $marcflavour = lc C4::Context->preference('marcflavour');
543 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
545 my @record_documents;
547 my %auth_match_headings;
548 if( $self->index eq 'authorities' ){
549 my @auth_types = Koha::Authority::Types->search();
550 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
553 foreach my $record (@{$records}) {
554 my $record_document = {};
556 if ( $self->index eq 'authorities' ){
557 my $authtypecode = GuessAuthTypeCode( $record );
558 if( $authtypecode ){
559 my $field = $record->field( $auth_match_headings{ $authtypecode } );
560 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
561 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
562 } else {
563 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
567 my $mappings = $rules->{leader};
568 if ($mappings) {
569 $self->_process_mappings($mappings, $record->leader(), $record_document, {
570 altscript => 0,
571 data_source => 'leader'
575 foreach my $field ($record->fields()) {
576 if ($field->is_control_field()) {
577 my $mappings = $control_fields_rules->{$field->tag()};
578 if ($mappings) {
579 $self->_process_mappings($mappings, $field->data(), $record_document, {
580 altscript => 0,
581 data_source => 'control_field',
582 field => $field
587 else {
588 my $tag = $field->tag();
589 # Handle alternate scripts in MARC 21
590 my $altscript = 0;
591 if ($marcflavour eq 'marc21' && $tag eq '880') {
592 my $sub6 = $field->subfield('6');
593 if ($sub6 =~ /^(...)-\d+/) {
594 $tag = $1;
595 $altscript = 1;
599 my $data_field_rules = $data_fields_rules->{$tag};
600 if ($data_field_rules) {
601 my $subfields_mappings = $data_field_rules->{subfields};
602 my $wildcard_mappings = $subfields_mappings->{'*'};
603 foreach my $subfield ($field->subfields()) {
604 my ($code, $data) = @{$subfield};
605 my $mappings = $subfields_mappings->{$code} // [];
606 if ($wildcard_mappings) {
607 $mappings = [@{$mappings}, @{$wildcard_mappings}];
609 if (@{$mappings}) {
610 $self->_process_mappings($mappings, $data, $record_document, {
611 altscript => $altscript,
612 data_source => 'subfield',
613 code => $code,
614 field => $field
620 my $subfields_join_mappings = $data_field_rules->{subfields_join};
621 if ($subfields_join_mappings) {
622 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
623 my $data_field = $field->clone; #copy field to preserve for alt scripts
624 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
625 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
626 if ($data) {
627 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
628 altscript => $altscript,
629 data_source => 'subfields_group',
630 codes => $subfields_group,
631 field => $field
640 foreach my $field (keys %{$rules->{defaults}}) {
641 unless (defined $record_document->{$field}) {
642 $record_document->{$field} = $rules->{defaults}->{$field};
645 foreach my $field (@{$rules->{sum}}) {
646 if (defined $record_document->{$field}) {
647 # TODO: validate numeric? filter?
648 # TODO: Or should only accept fields without nested values?
649 # TODO: Quick and dirty, improve if needed
650 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
653 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
654 foreach my $field (@{$rules->{isbn}}) {
655 if (defined $record_document->{$field}) {
656 my @isbns = ();
657 foreach my $input_isbn (@{$record_document->{$field}}) {
658 my $isbn = Business::ISBN->new($input_isbn);
659 if (defined $isbn && $isbn->is_valid) {
660 my $isbn13 = $isbn->as_isbn13->as_string;
661 push @isbns, $isbn13;
662 $isbn13 =~ s/\-//g;
663 push @isbns, $isbn13;
665 my $isbn10 = $isbn->as_isbn10;
666 if ($isbn10) {
667 $isbn10 = $isbn10->as_string;
668 push @isbns, $isbn10;
669 $isbn10 =~ s/\-//g;
670 push @isbns, $isbn10;
672 } else {
673 push @isbns, $input_isbn;
676 $record_document->{$field} = \@isbns;
680 # Remove duplicate values and collapse sort fields
681 foreach my $field (keys %{$record_document}) {
682 if (ref($record_document->{$field}) eq 'ARRAY') {
683 @{$record_document->{$field}} = do {
684 my %seen;
685 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
687 if ($field =~ /__sort$/) {
688 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
689 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
694 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
695 $record->encoding('UTF-8');
696 if ($use_array) {
697 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
698 $record_document->{'marc_format'} = 'ARRAY';
699 } else {
700 my @warnings;
702 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
703 local $SIG{__WARN__} = sub {
704 push @warnings, $_[0];
706 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
708 if (@warnings) {
709 # Suppress warnings if record length exceeded
710 unless (substr($record->leader(), 0, 5) eq '99999') {
711 foreach my $warning (@warnings) {
712 carp $warning;
715 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
716 $record_document->{'marc_format'} = 'MARCXML';
718 else {
719 $record_document->{'marc_format'} = 'base64ISO2709';
722 push @record_documents, $record_document;
724 return \@record_documents;
727 =head2 _marc_to_array($record)
729 my @fields = _marc_to_array($record)
731 Convert a MARC::Record to an array modeled after MARC-in-JSON
732 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
734 =over 4
736 =item C<$record>
738 A MARC::Record object
740 =back
742 =cut
744 sub _marc_to_array {
745 my ($self, $record) = @_;
747 my $data = {
748 leader => $record->leader(),
749 fields => []
751 for my $field ($record->fields()) {
752 my $tag = $field->tag();
753 if ($field->is_control_field()) {
754 push @{$data->{fields}}, {$tag => $field->data()};
755 } else {
756 my $subfields = ();
757 foreach my $subfield ($field->subfields()) {
758 my ($code, $contents) = @{$subfield};
759 push @{$subfields}, {$code => $contents};
761 push @{$data->{fields}}, {
762 $tag => {
763 ind1 => $field->indicator(1),
764 ind2 => $field->indicator(2),
765 subfields => $subfields
770 return $data;
773 =head2 _array_to_marc($data)
775 my $record = _array_to_marc($data)
777 Convert an array modeled after MARC-in-JSON to a MARC::Record
779 =over 4
781 =item C<$data>
783 An array modeled after MARC-in-JSON
784 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
786 =back
788 =cut
790 sub _array_to_marc {
791 my ($self, $data) = @_;
793 my $record = MARC::Record->new();
795 $record->leader($data->{leader});
796 for my $field (@{$data->{fields}}) {
797 my $tag = (keys %{$field})[0];
798 $field = $field->{$tag};
799 my $marc_field;
800 if (ref($field) eq 'HASH') {
801 my @subfields;
802 foreach my $subfield (@{$field->{subfields}}) {
803 my $code = (keys %{$subfield})[0];
804 push @subfields, $code;
805 push @subfields, $subfield->{$code};
807 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
808 } else {
809 $marc_field = MARC::Field->new($tag, $field)
811 $record->append_fields($marc_field);
814 return $record;
817 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
819 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
821 Get mappings, an internal data structure later used by
822 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
823 data for a MARC mapping.
825 The returned C<$mappings> is not to to be confused with mappings provided by
826 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
827 provided by C<_foreach_mapping> and expands it to this internal data structure.
828 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
829 is then applied to each MARC target (leader, control field data, subfield or
830 joined subfields) and integrated into the mapping rules data structure used in
831 C<marc_records_to_documents> to transform MARC records into Elasticsearch
832 documents.
834 =over 4
836 =item C<$facet>
838 Boolean indicating whether to create a facet field for this mapping.
840 =item C<$suggestible>
842 Boolean indicating whether to create a suggestion field for this mapping.
844 =item C<$sort>
846 Boolean indicating whether to create a sort field for this mapping.
848 =item C<$search>
850 Boolean indicating whether to create a search field for this mapping.
852 =item C<$target_name>
854 Elasticsearch document target field name.
856 =item C<$target_type>
858 Elasticsearch document target field type.
860 =item C<$range>
862 An optional range as a string in the format "<START>-<END>" or "<START>",
863 where "<START>" and "<END>" are integers specifying a range that will be used
864 for extracting a substring from MARC data as Elasticsearch field target value.
866 The first character position is "0", and the range is inclusive,
867 so "0-2" means the first three characters of MARC data.
869 If only "<START>" is provided only one character at position "<START>" will
870 be extracted.
872 =back
874 =cut
876 sub _field_mappings {
877 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
878 my %mapping_defaults = ();
879 my @mappings;
881 my $substr_args = undef;
882 if (defined $range) {
883 # TODO: use value_callback instead?
884 my ($start, $end) = map(int, split /-/, $range, 2);
885 $substr_args = [$start];
886 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
888 my $default_options = {};
889 if ($substr_args) {
890 $default_options->{substr} = $substr_args;
893 # TODO: Should probably have per type value callback/hook
894 # but hard code for now
895 if ($target_type eq 'boolean') {
896 $default_options->{value_callbacks} //= [];
897 push @{$default_options->{value_callbacks}}, sub {
898 my ($value) = @_;
899 # Trim whitespace at both ends
900 $value =~ s/^\s+|\s+$//g;
901 return $value ? 'true' : 'false';
904 elsif ($target_type eq 'year') {
905 $default_options->{value_callbacks} //= [];
906 # Only accept years containing digits and "u"
907 push @{$default_options->{value_callbacks}}, sub {
908 my ($value) = @_;
909 # Replace "u" with "0" for sorting
910 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
914 if ($search) {
915 my $mapping = [$target_name, $default_options];
916 push @mappings, $mapping;
919 my @suffixes = ();
920 push @suffixes, 'facet' if $facet;
921 push @suffixes, 'suggestion' if $suggestible;
922 push @suffixes, 'sort' if !defined $sort || $sort;
924 foreach my $suffix (@suffixes) {
925 my $mapping = ["${target_name}__$suffix"];
926 # TODO: Hack, fix later in less hideous manner
927 if ($suffix eq 'suggestion') {
928 push @{$mapping}, {%{$default_options}, property => 'input'};
930 else {
931 # Important! Make shallow clone, or we end up with the same hashref
932 # shared by all mappings
933 push @{$mapping}, {%{$default_options}};
935 push @mappings, $mapping;
937 return @mappings;
940 =head2 _get_marc_mapping_rules
942 my $mapping_rules = $self->_get_marc_mapping_rules()
944 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
946 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
947 each call to C<MARC::Record>->field) we create an optimized structure of mapping
948 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
950 We can then iterate through all MARC fields for each record and apply all relevant
951 rules once per fields instead of retreiving fields multiple times for each mapping rule
952 which is terribly slow.
954 =cut
956 # TODO: This structure can be used for processing multiple MARC::Records so is currently
957 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
958 # memory cache which it is currently not. The performance gain of caching
959 # would probably be marginal, but to do this could be a further improvement.
961 sub _get_marc_mapping_rules {
962 my ($self) = @_;
963 my $marcflavour = lc C4::Context->preference('marcflavour');
964 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
965 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
966 my $rules = {
967 'leader' => [],
968 'control_fields' => {},
969 'data_fields' => {},
970 'sum' => [],
971 'isbn' => [],
972 'defaults' => {}
975 $self->_foreach_mapping(sub {
976 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
977 return if $marc_type ne $marcflavour;
979 if ($type eq 'sum') {
980 push @{$rules->{sum}}, $name;
981 push @{$rules->{sum}}, $name."__sort" if $sort;
983 elsif ($type eq 'isbn') {
984 push @{$rules->{isbn}}, $name;
986 elsif ($type eq 'boolean') {
987 # boolean gets special handling, if value doesn't exist for a field,
988 # it is set to false
989 $rules->{defaults}->{$name} = 'false';
992 if ($marc_field =~ $field_spec_regexp) {
993 my $field_tag = $1;
995 my @subfields;
996 my @subfield_groups;
997 # Parse and separate subfields form subfield groups
998 if (defined $2) {
999 my $subfield_group = '';
1000 my $open_group = 0;
1002 foreach my $token (split //, $2) {
1003 if ($token eq "(") {
1004 if ($open_group) {
1005 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1006 "Unmatched opening parenthesis for $marc_field"
1009 else {
1010 $open_group = 1;
1013 elsif ($token eq ")") {
1014 if ($open_group) {
1015 if ($subfield_group) {
1016 push @subfield_groups, $subfield_group;
1017 $subfield_group = '';
1019 $open_group = 0;
1021 else {
1022 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1023 "Unmatched closing parenthesis for $marc_field"
1027 elsif ($open_group) {
1028 $subfield_group .= $token;
1030 else {
1031 push @subfields, $token;
1035 else {
1036 push @subfields, '*';
1039 my $range = defined $3 ? $3 : undef;
1040 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1041 if ($field_tag < 10) {
1042 $rules->{control_fields}->{$field_tag} //= [];
1043 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1045 else {
1046 $rules->{data_fields}->{$field_tag} //= {};
1047 foreach my $subfield (@subfields) {
1048 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1049 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1051 foreach my $subfield_group (@subfield_groups) {
1052 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1053 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1057 elsif ($marc_field =~ $leader_regexp) {
1058 my $range = defined $1 ? $1 : undef;
1059 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1060 push @{$rules->{leader}}, @mappings;
1062 else {
1063 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1064 "Invalid MARC field expression: $marc_field"
1069 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1070 if ($marcflavour eq 'marc21') {
1071 # Nonfiling characters processing for sort fields
1072 my %title_fields;
1073 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1074 # Format is: nonfiling characters indicator => field names list
1075 %title_fields = (
1076 1 => [130, 630, 730, 740],
1077 2 => [222, 240, 242, 243, 245, 440, 830]
1080 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1081 %title_fields = (
1082 1 => [730],
1083 2 => [130, 430, 530]
1086 foreach my $indicator (keys %title_fields) {
1087 foreach my $field_tag (@{$title_fields{$indicator}}) {
1088 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1089 foreach my $mapping (@{$mappings}) {
1090 if ($mapping->[0] =~ /__sort$/) {
1091 # Mark this as to be processed for nonfiling characters indicator
1092 # later on in _process_mappings
1093 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1100 return $rules;
1103 =head2 _foreach_mapping
1105 $self->_foreach_mapping(
1106 sub {
1107 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1108 $marc_field )
1109 = @_;
1110 return unless $marc_type eq 'marc21';
1111 print "Data comes from: " . $marc_field . "\n";
1115 This allows you to apply a function to each entry in the elasticsearch mappings
1116 table, in order to build the mappings for whatever is needed.
1118 In the provided function, the files are:
1120 =over 4
1122 =item C<$name>
1124 The field name for elasticsearch (corresponds to the 'mapping' column in the
1125 database.
1127 =item C<$type>
1129 The type for this value, e.g. 'string'.
1131 =item C<$facet>
1133 True if this value should be facetised. This only really makes sense if the
1134 field is understood by the facet processing code anyway.
1136 =item C<$sort>
1138 True if this is a field that a) needs special sort handling, and b) if it
1139 should be sorted on. False if a) but not b). Undef if not a). This allows,
1140 for example, author to be sorted on but not everything marked with "author"
1141 to be included in that sort.
1143 =item C<$marc_type>
1145 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1146 'unimarc', 'normarc'.
1148 =item C<$marc_field>
1150 A string that describes the MARC field that contains the data to extract.
1152 =back
1154 =cut
1156 sub _foreach_mapping {
1157 my ( $self, $sub ) = @_;
1159 # TODO use a caching framework here
1160 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1162 'search_marc_map.index_name' => $self->index,
1164 { join => { search_marc_to_fields => 'search_marc_map' },
1165 '+select' => [
1166 'search_marc_to_fields.facet',
1167 'search_marc_to_fields.suggestible',
1168 'search_marc_to_fields.sort',
1169 'search_marc_to_fields.search',
1170 'search_marc_map.marc_type',
1171 'search_marc_map.marc_field',
1173 '+as' => [
1174 'facet',
1175 'suggestible',
1176 'sort',
1177 'search',
1178 'marc_type',
1179 'marc_field',
1184 while ( my $search_field = $search_fields->next ) {
1185 $sub->(
1186 # Force lower case on indexed field names for case insensitive
1187 # field name searches
1188 lc($search_field->name),
1189 $search_field->type,
1190 $search_field->get_column('facet'),
1191 $search_field->get_column('suggestible'),
1192 $search_field->get_column('sort'),
1193 $search_field->get_column('search'),
1194 $search_field->get_column('marc_type'),
1195 $search_field->get_column('marc_field'),
1200 =head2 process_error
1202 die process_error($@);
1204 This parses an Elasticsearch error message and produces a human-readable
1205 result from it. This result is probably missing all the useful information
1206 that you might want in diagnosing an issue, so the warning is also logged.
1208 Note that currently the resulting message is not internationalised. This
1209 will happen eventually by some method or other.
1211 =cut
1213 sub process_error {
1214 my ($self, $msg) = @_;
1216 warn $msg; # simple logging
1218 # This is super-primitive
1219 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1221 return "Unable to perform your search. Please try again.\n";
1224 =head2 _read_configuration
1226 my $conf = _read_configuration();
1228 Reads the I<configuration file> and returns a hash structure with the
1229 configuration information. It raises an exception if mandatory entries
1230 are missing.
1232 The hashref structure has the following form:
1235 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1236 'index_name' => 'koha_instance',
1239 This is configured by the following in the C<config> block in koha-conf.xml:
1241 <elasticsearch>
1242 <server>127.0.0.1:9200</server>
1243 <server>anotherserver:9200</server>
1244 <index_name>koha_instance</index_name>
1245 </elasticsearch>
1247 =cut
1249 sub _read_configuration {
1251 my $configuration;
1253 my $conf = C4::Context->config('elasticsearch');
1254 unless ( defined $conf ) {
1255 Koha::Exceptions::Config::MissingEntry->throw(
1256 "Missing <elasticsearch> entry in koha-conf.xml"
1260 if ( $conf && $conf->{server} ) {
1261 my $nodes = $conf->{server};
1262 if ( ref($nodes) eq 'ARRAY' ) {
1263 $configuration->{nodes} = $nodes;
1265 else {
1266 $configuration->{nodes} = [$nodes];
1269 else {
1270 Koha::Exceptions::Config::MissingEntry->throw(
1271 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1275 if ( defined $conf->{index_name} ) {
1276 $configuration->{index_name} = $conf->{index_name};
1278 else {
1279 Koha::Exceptions::Config::MissingEntry->throw(
1280 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1284 $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1286 $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1288 return $configuration;
1291 =head2 get_facetable_fields
1293 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1295 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1297 =cut
1299 sub get_facetable_fields {
1300 my ($self) = @_;
1302 # These should correspond to the ES field names, as opposed to the CCL
1303 # things that zebra uses.
1304 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1305 my @faceted_fields = Koha::SearchFields->search(
1306 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1308 my @not_faceted_fields = Koha::SearchFields->search(
1309 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1311 # This could certainly be improved
1312 return ( @faceted_fields, @not_faceted_fields );
1315 =head2 clear_search_fields_cache
1317 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1319 Clear cached values for ES search fields
1321 =cut
1323 sub clear_search_fields_cache {
1325 my $cache = Koha::Caches->get_instance();
1326 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1327 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1328 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1329 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1335 __END__
1337 =head1 AUTHOR
1339 =over 4
1341 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1343 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1345 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1347 =back
1349 =cut