Bug 23204: Move code in a unit tested sub
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blob31ab4708640a3babf2e3880b3c4aa8fe7e63f0ec
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
54 =head1 NAME
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58 =head1 ACCESSORS
60 =over 4
62 =item index
64 The name of the index to use, generally 'biblios' or 'authorities'.
66 =back
68 =head1 FUNCTIONS
70 =cut
72 sub new {
73 my $class = shift @_;
74 my $self = $class->SUPER::new(@_);
75 # Check for a valid index
76 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77 return $self;
80 =head2 get_elasticsearch
82 my $elasticsearch_client = $self->get_elasticsearch();
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
87 =cut
89 sub get_elasticsearch {
90 my $self = shift @_;
91 unless (defined $self->{elasticsearch}) {
92 my $conf = $self->get_elasticsearch_params();
93 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95 return $self->{elasticsearch};
98 =head2 get_elasticsearch_params
100 my $params = $self->get_elasticsearch_params();
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
106 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107 'index_name' => 'koha_instance_index',
110 This is configured by the following in the C<config> block in koha-conf.xml:
112 <elasticsearch>
113 <server>127.0.0.1:9200</server>
114 <server>anotherserver:9200</server>
115 <index_name>koha_instance</index_name>
116 </elasticsearch>
118 =cut
120 sub get_elasticsearch_params {
121 my ($self) = @_;
123 # Copy the hash so that we're not modifying the original
124 my $conf = C4::Context->config('elasticsearch');
125 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126 my $es = { %{ $conf } };
128 # Helpfully, the multiple server lines end up in an array for us anyway
129 # if there are multiple ones, but not if there's only one.
130 my $server = $es->{server};
131 delete $es->{server};
132 if ( ref($server) eq 'ARRAY' ) {
134 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135 $es->{nodes} = $server;
137 elsif ($server) {
138 $es->{nodes} = [$server];
140 else {
141 die "No elasticsearch servers were specified in koha-conf.xml.\n";
143 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144 if ( !$es->{index_name} );
145 # Append the name of this particular index to our namespace
146 $es->{index_name} .= '_' . $self->index;
148 $es->{key_prefix} = 'es_';
149 $es->{cxn_pool} //= 'Static';
150 $es->{request_timeout} //= 60;
152 return $es;
155 =head2 get_elasticsearch_settings
157 my $settings = $self->get_elasticsearch_settings();
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
162 A hashref containing the settings is returned.
164 =cut
166 sub get_elasticsearch_settings {
167 my ($self) = @_;
169 # Use state to speed up repeated calls
170 state $settings = undef;
171 if (!defined $settings) {
172 my $config_file = C4::Context->config('elasticsearch_index_config');
173 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174 $settings = LoadFile( $config_file );
177 return $settings;
180 =head2 get_elasticsearch_mappings
182 my $mappings = $self->get_elasticsearch_mappings();
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
187 =cut
189 sub get_elasticsearch_mappings {
190 my ($self) = @_;
192 # Use state to speed up repeated calls
193 state %all_mappings;
194 state %sort_fields;
196 if (!defined $all_mappings{$self->index}) {
197 $sort_fields{$self->index} = {};
198 # Clone the general mapping to break ties with the original hash
199 my $mappings = {
200 data => clone(_get_elasticsearch_field_config('general', ''))
202 my $marcflavour = lc C4::Context->preference('marcflavour');
203 $self->_foreach_mapping(
204 sub {
205 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206 return if $marc_type ne $marcflavour;
207 # TODO if this gets any sort of complexity to it, it should
208 # be broken out into its own function.
210 # TODO be aware of date formats, but this requires pre-parsing
211 # as ES will simply reject anything with an invalid date.
212 my $es_type = 'text';
213 if ($type eq 'boolean') {
214 $es_type = 'boolean';
215 } elsif ($type eq 'number' || $type eq 'sum') {
216 $es_type = 'integer';
217 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218 $es_type = 'stdno';
221 if ($search) {
222 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
225 if ($facet) {
226 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
228 if ($suggestible) {
229 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231 # Sort is a bit special as it can be true, false, undef.
232 # We care about "true" or "undef",
233 # "undef" means to do the default thing, which is make it sortable.
234 if (!defined $sort || $sort) {
235 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236 $sort_fields{$self->index}{$name} = 1;
240 $all_mappings{$self->index} = $mappings;
242 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 raw_elasticsearch_mappings
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc|normarc
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
254 =cut
256 sub raw_elasticsearch_mappings {
257 my ( $marc_type ) = @_;
259 my $schema = Koha::Database->new()->schema();
261 my $search_fields = Koha::SearchFields->search();
263 my $mappings = {};
264 while ( my $search_field = $search_fields->next ) {
266 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search( { search_field_id => $search_field->id } );
268 while ( my $marc_to_field = $marc_to_fields->next ) {
269 my $marc_map = Koha::SearchMarcMaps->find( $marc_to_field->search_marc_map_id );
271 next if $marc_type && $marc_map->marc_type ne $marc_type;
273 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
274 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order;
277 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
279 facet => $marc_to_field->facet || '',
280 marc_type => $marc_map->marc_type,
281 marc_field => $marc_map->marc_field,
282 sort => $marc_to_field->sort,
283 suggestible => $marc_to_field->suggestible || ''
289 return $mappings;
292 =head2 _get_elasticsearch_field_config
294 Get the Elasticsearch field config for the given purpose and data type.
296 $mapping = _get_elasticsearch_field_config('search', 'text');
298 =cut
300 sub _get_elasticsearch_field_config {
302 my ( $purpose, $type ) = @_;
304 # Use state to speed up repeated calls
305 state $settings = undef;
306 if (!defined $settings) {
307 my $config_file = C4::Context->config('elasticsearch_field_config');
308 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
309 $settings = LoadFile( $config_file );
312 if (!defined $settings->{$purpose}) {
313 die "Field purpose $purpose not defined in field config";
315 if ($type eq '') {
316 return $settings->{$purpose};
318 if (defined $settings->{$purpose}{$type}) {
319 return $settings->{$purpose}{$type};
321 if (defined $settings->{$purpose}{'default'}) {
322 return $settings->{$purpose}{'default'};
324 return;
327 =head2 _load_elasticsearch_mappings
329 Load Elasticsearch mappings in the format of mappings.yaml.
331 $indexes = _load_elasticsearch_mappings();
333 =cut
335 sub _load_elasticsearch_mappings {
336 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
337 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
338 return LoadFile( $mappings_yaml );
341 sub reset_elasticsearch_mappings {
342 my ( $self ) = @_;
343 my $indexes = $self->_load_elasticsearch_mappings();
345 Koha::SearchMarcMaps->delete;
346 Koha::SearchFields->delete;
348 while ( my ( $index_name, $fields ) = each %$indexes ) {
349 while ( my ( $field_name, $data ) = each %$fields ) {
351 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
353 # Set default values
354 $sf_params{staff_client} //= 1;
355 $sf_params{opac} //= 1;
357 $sf_params{name} = $field_name;
359 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
361 my $mappings = $data->{mappings};
362 for my $mapping ( @$mappings ) {
363 my $marc_field = Koha::SearchMarcMaps->find_or_create({
364 index_name => $index_name,
365 marc_type => $mapping->{marc_type},
366 marc_field => $mapping->{marc_field}
368 $search_field->add_to_search_marc_maps($marc_field, {
369 facet => $mapping->{facet} || 0,
370 suggestible => $mapping->{suggestible} || 0,
371 sort => $mapping->{sort},
372 search => $mapping->{search} // 1
379 # This overrides the accessor provided by Class::Accessor so that if
380 # sort_fields isn't set, then it'll generate it.
381 sub sort_fields {
382 my $self = shift;
383 if (@_) {
384 $self->_sort_fields_accessor(@_);
385 return;
387 my $val = $self->_sort_fields_accessor();
388 return $val if $val;
390 # This will populate the accessor as a side effect
391 $self->get_elasticsearch_mappings();
392 return $self->_sort_fields_accessor();
395 =head2 _process_mappings($mappings, $data, $record_document, $meta)
397 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
399 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
400 Since we group all mappings by MARC field targets C<$mappings> will contain
401 all targets for C<$data> and thus we need to fetch the MARC field only once.
402 C<$mappings> will be applied to C<$record_document> and new field values added.
403 The method has no return value.
405 =over 4
407 =item C<$mappings>
409 Arrayref of mappings containing arrayrefs in the format
410 [C<$target>, C<$options>] where C<$target> is the name of the target field and
411 C<$options> is a hashref containing processing directives for this particular
412 mapping.
414 =item C<$data>
416 The source data from a MARC record field.
418 =item C<$record_document>
420 Hashref representing the Elasticsearch document on which mappings should be
421 applied.
423 =item C<$meta>
425 A hashref containing metadata useful for enforcing per mapping rules. For
426 example for providing extra context for mapping options, or treating mapping
427 targets differently depending on type (sort, search, facet etc). Combining
428 this metadata with the mapping options and metadata allows us to mutate the
429 data per mapping, or even replace it with other data retrieved from the
430 metadata context.
432 Current properties are:
434 C<altscript>: A boolean value indicating whether an alternate script presentation is being
435 processed.
437 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
438 'subfield' or 'subfields_group'.
440 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
442 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
443 is 'subfields_group'.
445 C<field>: The original C<MARC::Record> object.
447 =back
449 =cut
451 sub _process_mappings {
452 my ($_self, $mappings, $data, $record_document, $meta) = @_;
453 foreach my $mapping (@{$mappings}) {
454 my ($target, $options) = @{$mapping};
456 # Don't process sort fields for alternate scripts
457 my $sort = $target =~ /__sort$/;
458 if ($sort && $meta->{altscript}) {
459 next;
462 # Copy (scalar) data since can have multiple targets
463 # with differing options for (possibly) mutating data
464 # so need a different copy for each
465 my $_data = $data;
466 $record_document->{$target} //= [];
467 if (defined $options->{substr}) {
468 my ($start, $length) = @{$options->{substr}};
469 $_data = length($data) > $start ? substr $data, $start, $length : '';
471 if (defined $options->{value_callbacks}) {
472 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
474 if (defined $options->{property}) {
475 $_data = {
476 $options->{property} => $_data
479 if (defined $options->{nonfiling_characters_indicator}) {
480 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
481 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
482 if ($nonfiling_chars) {
483 $_data = substr $_data, $nonfiling_chars;
486 push @{$record_document->{$target}}, $_data;
490 =head2 marc_records_to_documents($marc_records)
492 my $record_documents = $self->marc_records_to_documents($marc_records);
494 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
496 Returns array of hash references, representing Elasticsearch documents,
497 acceptable as body payload in C<Search::Elasticsearch> requests.
499 =over 4
501 =item C<$marc_documents>
503 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
505 =back
507 =cut
509 sub marc_records_to_documents {
510 my ($self, $records) = @_;
511 my $rules = $self->_get_marc_mapping_rules();
512 my $control_fields_rules = $rules->{control_fields};
513 my $data_fields_rules = $rules->{data_fields};
514 my $marcflavour = lc C4::Context->preference('marcflavour');
515 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
517 my @record_documents;
519 foreach my $record (@{$records}) {
520 my $record_document = {};
521 my $mappings = $rules->{leader};
522 if ($mappings) {
523 $self->_process_mappings($mappings, $record->leader(), $record_document, {
524 altscript => 0,
525 data_source => 'leader'
529 foreach my $field ($record->fields()) {
530 if ($field->is_control_field()) {
531 my $mappings = $control_fields_rules->{$field->tag()};
532 if ($mappings) {
533 $self->_process_mappings($mappings, $field->data(), $record_document, {
534 altscript => 0,
535 data_source => 'control_field',
536 field => $field
541 else {
542 my $tag = $field->tag();
543 # Handle alternate scripts in MARC 21
544 my $altscript = 0;
545 if ($marcflavour eq 'marc21' && $tag eq '880') {
546 my $sub6 = $field->subfield('6');
547 if ($sub6 =~ /^(...)-\d+/) {
548 $tag = $1;
549 $altscript = 1;
553 my $data_field_rules = $data_fields_rules->{$tag};
554 if ($data_field_rules) {
555 my $subfields_mappings = $data_field_rules->{subfields};
556 my $wildcard_mappings = $subfields_mappings->{'*'};
557 foreach my $subfield ($field->subfields()) {
558 my ($code, $data) = @{$subfield};
559 my $mappings = $subfields_mappings->{$code} // [];
560 if ($wildcard_mappings) {
561 $mappings = [@{$mappings}, @{$wildcard_mappings}];
563 if (@{$mappings}) {
564 $self->_process_mappings($mappings, $data, $record_document, {
565 altscript => $altscript,
566 data_source => 'subfield',
567 code => $code,
568 field => $field
572 if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
573 # Used by the authority linker the match-heading field requires a specific syntax
574 # that is specified in C4/Heading
575 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
576 next unless $heading;
577 push @{$record_document->{'match-heading'}}, $heading->search_form;
581 my $subfields_join_mappings = $data_field_rules->{subfields_join};
582 if ($subfields_join_mappings) {
583 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
584 # Map each subfield to values, remove empty values, join with space
585 my $data = join(
586 ' ',
587 grep(
589 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
592 if ($data) {
593 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
594 altscript => $altscript,
595 data_source => 'subfields_group',
596 codes => $subfields_group,
597 field => $field
601 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
602 # Used by the authority linker the match-heading field requires a specific syntax
603 # that is specified in C4/Heading
604 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
605 next unless $heading;
606 push @{$record_document->{'match-heading'}}, $heading->search_form;
613 foreach my $field (keys %{$rules->{defaults}}) {
614 unless (defined $record_document->{$field}) {
615 $record_document->{$field} = $rules->{defaults}->{$field};
618 foreach my $field (@{$rules->{sum}}) {
619 if (defined $record_document->{$field}) {
620 # TODO: validate numeric? filter?
621 # TODO: Or should only accept fields without nested values?
622 # TODO: Quick and dirty, improve if needed
623 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
626 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
627 foreach my $field (@{$rules->{isbn}}) {
628 if (defined $record_document->{$field}) {
629 my @isbns = ();
630 foreach my $input_isbn (@{$record_document->{$field}}) {
631 my $isbn = Business::ISBN->new($input_isbn);
632 if (defined $isbn && $isbn->is_valid) {
633 my $isbn13 = $isbn->as_isbn13->as_string;
634 push @isbns, $isbn13;
635 $isbn13 =~ s/\-//g;
636 push @isbns, $isbn13;
638 my $isbn10 = $isbn->as_isbn10;
639 if ($isbn10) {
640 $isbn10 = $isbn10->as_string;
641 push @isbns, $isbn10;
642 $isbn10 =~ s/\-//g;
643 push @isbns, $isbn10;
645 } else {
646 push @isbns, $input_isbn;
649 $record_document->{$field} = \@isbns;
653 # Remove duplicate values and collapse sort fields
654 foreach my $field (keys %{$record_document}) {
655 if (ref($record_document->{$field}) eq 'ARRAY') {
656 @{$record_document->{$field}} = do {
657 my %seen;
658 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
660 if ($field =~ /__sort$/) {
661 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
662 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
667 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
668 $record->encoding('UTF-8');
669 if ($use_array) {
670 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
671 $record_document->{'marc_format'} = 'ARRAY';
672 } else {
673 my @warnings;
675 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
676 local $SIG{__WARN__} = sub {
677 push @warnings, $_[0];
679 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
681 if (@warnings) {
682 # Suppress warnings if record length exceeded
683 unless (substr($record->leader(), 0, 5) eq '99999') {
684 foreach my $warning (@warnings) {
685 carp $warning;
688 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
689 $record_document->{'marc_format'} = 'MARCXML';
691 else {
692 $record_document->{'marc_format'} = 'base64ISO2709';
695 push @record_documents, $record_document;
697 return \@record_documents;
700 =head2 _marc_to_array($record)
702 my @fields = _marc_to_array($record)
704 Convert a MARC::Record to an array modeled after MARC-in-JSON
705 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
707 =over 4
709 =item C<$record>
711 A MARC::Record object
713 =back
715 =cut
717 sub _marc_to_array {
718 my ($self, $record) = @_;
720 my $data = {
721 leader => $record->leader(),
722 fields => []
724 for my $field ($record->fields()) {
725 my $tag = $field->tag();
726 if ($field->is_control_field()) {
727 push @{$data->{fields}}, {$tag => $field->data()};
728 } else {
729 my $subfields = ();
730 foreach my $subfield ($field->subfields()) {
731 my ($code, $contents) = @{$subfield};
732 push @{$subfields}, {$code => $contents};
734 push @{$data->{fields}}, {
735 $tag => {
736 ind1 => $field->indicator(1),
737 ind2 => $field->indicator(2),
738 subfields => $subfields
743 return $data;
746 =head2 _array_to_marc($data)
748 my $record = _array_to_marc($data)
750 Convert an array modeled after MARC-in-JSON to a MARC::Record
752 =over 4
754 =item C<$data>
756 An array modeled after MARC-in-JSON
757 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
759 =back
761 =cut
763 sub _array_to_marc {
764 my ($self, $data) = @_;
766 my $record = MARC::Record->new();
768 $record->leader($data->{leader});
769 for my $field (@{$data->{fields}}) {
770 my $tag = (keys %{$field})[0];
771 $field = $field->{$tag};
772 my $marc_field;
773 if (ref($field) eq 'HASH') {
774 my @subfields;
775 foreach my $subfield (@{$field->{subfields}}) {
776 my $code = (keys %{$subfield})[0];
777 push @subfields, $code;
778 push @subfields, $subfield->{$code};
780 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
781 } else {
782 $marc_field = MARC::Field->new($tag, $field)
784 $record->append_fields($marc_field);
787 return $record;
790 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
792 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
794 Get mappings, an internal data structure later used by
795 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
796 data for a MARC mapping.
798 The returned C<$mappings> is not to to be confused with mappings provided by
799 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
800 provided by C<_foreach_mapping> and expands it to this internal data structure.
801 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
802 is then applied to each MARC target (leader, control field data, subfield or
803 joined subfields) and integrated into the mapping rules data structure used in
804 C<marc_records_to_documents> to transform MARC records into Elasticsearch
805 documents.
807 =over 4
809 =item C<$facet>
811 Boolean indicating whether to create a facet field for this mapping.
813 =item C<$suggestible>
815 Boolean indicating whether to create a suggestion field for this mapping.
817 =item C<$sort>
819 Boolean indicating whether to create a sort field for this mapping.
821 =item C<$search>
823 Boolean indicating whether to create a search field for this mapping.
825 =item C<$target_name>
827 Elasticsearch document target field name.
829 =item C<$target_type>
831 Elasticsearch document target field type.
833 =item C<$range>
835 An optional range as a string in the format "<START>-<END>" or "<START>",
836 where "<START>" and "<END>" are integers specifying a range that will be used
837 for extracting a substring from MARC data as Elasticsearch field target value.
839 The first character position is "0", and the range is inclusive,
840 so "0-2" means the first three characters of MARC data.
842 If only "<START>" is provided only one character at position "<START>" will
843 be extracted.
845 =back
847 =cut
849 sub _field_mappings {
850 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
851 my %mapping_defaults = ();
852 my @mappings;
854 my $substr_args = undef;
855 if (defined $range) {
856 # TODO: use value_callback instead?
857 my ($start, $end) = map(int, split /-/, $range, 2);
858 $substr_args = [$start];
859 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
861 my $default_options = {};
862 if ($substr_args) {
863 $default_options->{substr} = $substr_args;
866 # TODO: Should probably have per type value callback/hook
867 # but hard code for now
868 if ($target_type eq 'boolean') {
869 $default_options->{value_callbacks} //= [];
870 push @{$default_options->{value_callbacks}}, sub {
871 my ($value) = @_;
872 # Trim whitespace at both ends
873 $value =~ s/^\s+|\s+$//g;
874 return $value ? 'true' : 'false';
878 if ($search) {
879 my $mapping = [$target_name, $default_options];
880 push @mappings, $mapping;
883 my @suffixes = ();
884 push @suffixes, 'facet' if $facet;
885 push @suffixes, 'suggestion' if $suggestible;
886 push @suffixes, 'sort' if !defined $sort || $sort;
888 foreach my $suffix (@suffixes) {
889 my $mapping = ["${target_name}__$suffix"];
890 # TODO: Hack, fix later in less hideous manner
891 if ($suffix eq 'suggestion') {
892 push @{$mapping}, {%{$default_options}, property => 'input'};
894 else {
895 # Important! Make shallow clone, or we end up with the same hashref
896 # shared by all mappings
897 push @{$mapping}, {%{$default_options}};
899 push @mappings, $mapping;
901 return @mappings;
904 =head2 _get_marc_mapping_rules
906 my $mapping_rules = $self->_get_marc_mapping_rules()
908 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
910 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
911 each call to C<MARC::Record>->field) we create an optimized structure of mapping
912 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
914 We can then iterate through all MARC fields for each record and apply all relevant
915 rules once per fields instead of retreiving fields multiple times for each mapping rule
916 which is terribly slow.
918 =cut
920 # TODO: This structure can be used for processing multiple MARC::Records so is currently
921 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
922 # memory cache which it is currently not. The performance gain of caching
923 # would probably be marginal, but to do this could be a further improvement.
925 sub _get_marc_mapping_rules {
926 my ($self) = @_;
927 my $marcflavour = lc C4::Context->preference('marcflavour');
928 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
929 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
930 my $rules = {
931 'leader' => [],
932 'control_fields' => {},
933 'data_fields' => {},
934 'sum' => [],
935 'isbn' => [],
936 'defaults' => {}
939 $self->_foreach_mapping(sub {
940 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
941 return if $marc_type ne $marcflavour;
943 if ($type eq 'sum') {
944 push @{$rules->{sum}}, $name;
945 push @{$rules->{sum}}, $name."__sort" if $sort;
947 elsif ($type eq 'isbn') {
948 push @{$rules->{isbn}}, $name;
950 elsif ($type eq 'boolean') {
951 # boolean gets special handling, if value doesn't exist for a field,
952 # it is set to false
953 $rules->{defaults}->{$name} = 'false';
956 if ($marc_field =~ $field_spec_regexp) {
957 my $field_tag = $1;
959 my @subfields;
960 my @subfield_groups;
961 # Parse and separate subfields form subfield groups
962 if (defined $2) {
963 my $subfield_group = '';
964 my $open_group = 0;
966 foreach my $token (split //, $2) {
967 if ($token eq "(") {
968 if ($open_group) {
969 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
970 "Unmatched opening parenthesis for $marc_field"
973 else {
974 $open_group = 1;
977 elsif ($token eq ")") {
978 if ($open_group) {
979 if ($subfield_group) {
980 push @subfield_groups, $subfield_group;
981 $subfield_group = '';
983 $open_group = 0;
985 else {
986 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
987 "Unmatched closing parenthesis for $marc_field"
991 elsif ($open_group) {
992 $subfield_group .= $token;
994 else {
995 push @subfields, $token;
999 else {
1000 push @subfields, '*';
1003 my $range = defined $3 ? $3 : undef;
1004 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1005 if ($field_tag < 10) {
1006 $rules->{control_fields}->{$field_tag} //= [];
1007 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1009 else {
1010 $rules->{data_fields}->{$field_tag} //= {};
1011 foreach my $subfield (@subfields) {
1012 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1013 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1015 foreach my $subfield_group (@subfield_groups) {
1016 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1017 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1021 elsif ($marc_field =~ $leader_regexp) {
1022 my $range = defined $1 ? $1 : undef;
1023 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1024 push @{$rules->{leader}}, @mappings;
1026 else {
1027 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1028 "Invalid MARC field expression: $marc_field"
1033 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1034 if ($marcflavour eq 'marc21') {
1035 # Nonfiling characters processing for sort fields
1036 my %title_fields;
1037 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1038 # Format is: nonfiling characters indicator => field names list
1039 %title_fields = (
1040 1 => [130, 630, 730, 740],
1041 2 => [222, 240, 242, 243, 245, 440, 830]
1044 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1045 %title_fields = (
1046 1 => [730],
1047 2 => [130, 430, 530]
1050 foreach my $indicator (keys %title_fields) {
1051 foreach my $field_tag (@{$title_fields{$indicator}}) {
1052 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1053 foreach my $mapping (@{$mappings}) {
1054 if ($mapping->[0] =~ /__sort$/) {
1055 # Mark this as to be processed for nonfiling characters indicator
1056 # later on in _process_mappings
1057 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1064 return $rules;
1067 =head2 _foreach_mapping
1069 $self->_foreach_mapping(
1070 sub {
1071 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1072 $marc_field )
1073 = @_;
1074 return unless $marc_type eq 'marc21';
1075 print "Data comes from: " . $marc_field . "\n";
1079 This allows you to apply a function to each entry in the elasticsearch mappings
1080 table, in order to build the mappings for whatever is needed.
1082 In the provided function, the files are:
1084 =over 4
1086 =item C<$name>
1088 The field name for elasticsearch (corresponds to the 'mapping' column in the
1089 database.
1091 =item C<$type>
1093 The type for this value, e.g. 'string'.
1095 =item C<$facet>
1097 True if this value should be facetised. This only really makes sense if the
1098 field is understood by the facet processing code anyway.
1100 =item C<$sort>
1102 True if this is a field that a) needs special sort handling, and b) if it
1103 should be sorted on. False if a) but not b). Undef if not a). This allows,
1104 for example, author to be sorted on but not everything marked with "author"
1105 to be included in that sort.
1107 =item C<$marc_type>
1109 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1110 'unimarc', 'normarc'.
1112 =item C<$marc_field>
1114 A string that describes the MARC field that contains the data to extract.
1115 These are of a form suited to Catmandu's MARC fixers.
1117 =back
1119 =cut
1121 sub _foreach_mapping {
1122 my ( $self, $sub ) = @_;
1124 # TODO use a caching framework here
1125 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1127 'search_marc_map.index_name' => $self->index,
1129 { join => { search_marc_to_fields => 'search_marc_map' },
1130 '+select' => [
1131 'search_marc_to_fields.facet',
1132 'search_marc_to_fields.suggestible',
1133 'search_marc_to_fields.sort',
1134 'search_marc_to_fields.search',
1135 'search_marc_map.marc_type',
1136 'search_marc_map.marc_field',
1138 '+as' => [
1139 'facet',
1140 'suggestible',
1141 'sort',
1142 'search',
1143 'marc_type',
1144 'marc_field',
1149 while ( my $search_field = $search_fields->next ) {
1150 $sub->(
1151 # Force lower case on indexed field names for case insensitive
1152 # field name searches
1153 lc($search_field->name),
1154 $search_field->type,
1155 $search_field->get_column('facet'),
1156 $search_field->get_column('suggestible'),
1157 $search_field->get_column('sort'),
1158 $search_field->get_column('search'),
1159 $search_field->get_column('marc_type'),
1160 $search_field->get_column('marc_field'),
1165 =head2 process_error
1167 die process_error($@);
1169 This parses an Elasticsearch error message and produces a human-readable
1170 result from it. This result is probably missing all the useful information
1171 that you might want in diagnosing an issue, so the warning is also logged.
1173 Note that currently the resulting message is not internationalised. This
1174 will happen eventually by some method or other.
1176 =cut
1178 sub process_error {
1179 my ($self, $msg) = @_;
1181 warn $msg; # simple logging
1183 # This is super-primitive
1184 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1186 return "Unable to perform your search. Please try again.\n";
1189 =head2 _read_configuration
1191 my $conf = _read_configuration();
1193 Reads the I<configuration file> and returns a hash structure with the
1194 configuration information. It raises an exception if mandatory entries
1195 are missing.
1197 The hashref structure has the following form:
1200 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1201 'index_name' => 'koha_instance',
1204 This is configured by the following in the C<config> block in koha-conf.xml:
1206 <elasticsearch>
1207 <server>127.0.0.1:9200</server>
1208 <server>anotherserver:9200</server>
1209 <index_name>koha_instance</index_name>
1210 </elasticsearch>
1212 =cut
1214 sub _read_configuration {
1216 my $configuration;
1218 my $conf = C4::Context->config('elasticsearch');
1219 Koha::Exceptions::Config::MissingEntry->throw(
1220 "Missing 'elasticsearch' block in config file")
1221 unless defined $conf;
1223 if ( $conf && $conf->{server} ) {
1224 my $nodes = $conf->{server};
1225 if ( ref($nodes) eq 'ARRAY' ) {
1226 $configuration->{nodes} = $nodes;
1228 else {
1229 $configuration->{nodes} = [$nodes];
1232 else {
1233 Koha::Exceptions::Config::MissingEntry->throw(
1234 "Missing 'server' entry in config file for elasticsearch");
1237 if ( defined $conf->{index_name} ) {
1238 $configuration->{index_name} = $conf->{index_name};
1240 else {
1241 Koha::Exceptions::Config::MissingEntry->throw(
1242 "Missing 'index_name' entry in config file for elasticsearch");
1245 return $configuration;
1248 =head2 get_facetable_fields
1250 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1252 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1254 =cut
1256 sub get_facetable_fields {
1257 my ($self) = @_;
1259 # These should correspond to the ES field names, as opposed to the CCL
1260 # things that zebra uses.
1261 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1262 my @faceted_fields = Koha::SearchFields->search(
1263 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1265 my @not_faceted_fields = Koha::SearchFields->search(
1266 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1268 # This could certainly be improved
1269 return ( @faceted_fields, @not_faceted_fields );
1274 __END__
1276 =head1 AUTHOR
1278 =over 4
1280 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1282 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1284 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1286 =back
1288 =cut