Bug 24584: Rewrite optional/parameters to YAML
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blob85574cefc0609296c6442b77b346ad89b329dc7f
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
46 __PACKAGE__->mk_ro_accessors(qw( index ));
47 __PACKAGE__->mk_accessors(qw( sort_fields ));
49 # Constants to refer to the standard index names
50 Readonly our $BIBLIOS_INDEX => 'biblios';
51 Readonly our $AUTHORITIES_INDEX => 'authorities';
53 =head1 NAME
55 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
57 =head1 ACCESSORS
59 =over 4
61 =item index
63 The name of the index to use, generally 'biblios' or 'authorities'.
65 =back
67 =head1 FUNCTIONS
69 =cut
71 sub new {
72 my $class = shift @_;
73 my $self = $class->SUPER::new(@_);
74 # Check for a valid index
75 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
76 return $self;
79 =head2 get_elasticsearch
81 my $elasticsearch_client = $self->get_elasticsearch();
83 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
84 instance level and will be reused if method is called multiple times.
86 =cut
88 sub get_elasticsearch {
89 my $self = shift @_;
90 unless (defined $self->{elasticsearch}) {
91 my $conf = $self->get_elasticsearch_params();
92 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
94 return $self->{elasticsearch};
97 =head2 get_elasticsearch_params
99 my $params = $self->get_elasticsearch_params();
101 This provides a hashref that contains the parameters for connecting to the
102 ElasicSearch servers, in the form:
105 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
106 'index_name' => 'koha_instance_index',
109 This is configured by the following in the C<config> block in koha-conf.xml:
111 <elasticsearch>
112 <server>127.0.0.1:9200</server>
113 <server>anotherserver:9200</server>
114 <index_name>koha_instance</index_name>
115 </elasticsearch>
117 =cut
119 sub get_elasticsearch_params {
120 my ($self) = @_;
122 # Copy the hash so that we're not modifying the original
123 my $conf = C4::Context->config('elasticsearch');
124 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
125 my $es = { %{ $conf } };
127 # Helpfully, the multiple server lines end up in an array for us anyway
128 # if there are multiple ones, but not if there's only one.
129 my $server = $es->{server};
130 delete $es->{server};
131 if ( ref($server) eq 'ARRAY' ) {
133 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
134 $es->{nodes} = $server;
136 elsif ($server) {
137 $es->{nodes} = [$server];
139 else {
140 die "No elasticsearch servers were specified in koha-conf.xml.\n";
142 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
143 if ( !$es->{index_name} );
144 # Append the name of this particular index to our namespace
145 $es->{index_name} .= '_' . $self->index;
147 $es->{key_prefix} = 'es_';
148 $es->{cxn_pool} //= 'Static';
149 $es->{request_timeout} //= 60;
151 return $es;
154 =head2 get_elasticsearch_settings
156 my $settings = $self->get_elasticsearch_settings();
158 This provides the settings provided to Elasticsearch when an index is created.
159 These can do things like define tokenization methods.
161 A hashref containing the settings is returned.
163 =cut
165 sub get_elasticsearch_settings {
166 my ($self) = @_;
168 # Use state to speed up repeated calls
169 state $settings = undef;
170 if (!defined $settings) {
171 my $config_file = C4::Context->config('elasticsearch_index_config');
172 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
173 $settings = LoadFile( $config_file );
176 return $settings;
179 =head2 get_elasticsearch_mappings
181 my $mappings = $self->get_elasticsearch_mappings();
183 This provides the mappings that get passed to Elasticsearch when an index is
184 created.
186 =cut
188 sub get_elasticsearch_mappings {
189 my ($self) = @_;
191 # Use state to speed up repeated calls
192 state %all_mappings;
193 state %sort_fields;
195 if (!defined $all_mappings{$self->index}) {
196 $sort_fields{$self->index} = {};
197 # Clone the general mapping to break ties with the original hash
198 my $mappings = {
199 data => clone(_get_elasticsearch_field_config('general', ''))
201 my $marcflavour = lc C4::Context->preference('marcflavour');
202 $self->_foreach_mapping(
203 sub {
204 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
205 return if $marc_type ne $marcflavour;
206 # TODO if this gets any sort of complexity to it, it should
207 # be broken out into its own function.
209 # TODO be aware of date formats, but this requires pre-parsing
210 # as ES will simply reject anything with an invalid date.
211 my $es_type = 'text';
212 if ($type eq 'boolean') {
213 $es_type = 'boolean';
214 } elsif ($type eq 'number' || $type eq 'sum') {
215 $es_type = 'integer';
216 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217 $es_type = 'stdno';
220 if ($search) {
221 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
224 if ($facet) {
225 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
227 if ($suggestible) {
228 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
230 # Sort is a bit special as it can be true, false, undef.
231 # We care about "true" or "undef",
232 # "undef" means to do the default thing, which is make it sortable.
233 if (!defined $sort || $sort) {
234 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
235 $sort_fields{$self->index}{$name} = 1;
239 $all_mappings{$self->index} = $mappings;
241 $self->sort_fields(\%{$sort_fields{$self->index}});
243 return $all_mappings{$self->index};
246 =head2 _get_elasticsearch_field_config
248 Get the Elasticsearch field config for the given purpose and data type.
250 $mapping = _get_elasticsearch_field_config('search', 'text');
252 =cut
254 sub _get_elasticsearch_field_config {
256 my ( $purpose, $type ) = @_;
258 # Use state to speed up repeated calls
259 state $settings = undef;
260 if (!defined $settings) {
261 my $config_file = C4::Context->config('elasticsearch_field_config');
262 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
263 $settings = LoadFile( $config_file );
266 if (!defined $settings->{$purpose}) {
267 die "Field purpose $purpose not defined in field config";
269 if ($type eq '') {
270 return $settings->{$purpose};
272 if (defined $settings->{$purpose}{$type}) {
273 return $settings->{$purpose}{$type};
275 if (defined $settings->{$purpose}{'default'}) {
276 return $settings->{$purpose}{'default'};
278 return;
281 =head2 _load_elasticsearch_mappings
283 Load Elasticsearch mappings in the format of mappings.yaml.
285 $indexes = _load_elasticsearch_mappings();
287 =cut
289 sub _load_elasticsearch_mappings {
290 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
291 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
292 return LoadFile( $mappings_yaml );
295 sub reset_elasticsearch_mappings {
296 my ( $self ) = @_;
297 my $indexes = $self->_load_elasticsearch_mappings();
299 Koha::SearchMarcMaps->delete;
300 Koha::SearchFields->delete;
302 while ( my ( $index_name, $fields ) = each %$indexes ) {
303 while ( my ( $field_name, $data ) = each %$fields ) {
305 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
307 # Set default values
308 $sf_params{staff_client} //= 1;
309 $sf_params{opac} //= 1;
311 $sf_params{name} = $field_name;
313 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
315 my $mappings = $data->{mappings};
316 for my $mapping ( @$mappings ) {
317 my $marc_field = Koha::SearchMarcMaps->find_or_create({
318 index_name => $index_name,
319 marc_type => $mapping->{marc_type},
320 marc_field => $mapping->{marc_field}
322 $search_field->add_to_search_marc_maps($marc_field, {
323 facet => $mapping->{facet} || 0,
324 suggestible => $mapping->{suggestible} || 0,
325 sort => $mapping->{sort},
326 search => $mapping->{search} // 1
333 # This overrides the accessor provided by Class::Accessor so that if
334 # sort_fields isn't set, then it'll generate it.
335 sub sort_fields {
336 my $self = shift;
337 if (@_) {
338 $self->_sort_fields_accessor(@_);
339 return;
341 my $val = $self->_sort_fields_accessor();
342 return $val if $val;
344 # This will populate the accessor as a side effect
345 $self->get_elasticsearch_mappings();
346 return $self->_sort_fields_accessor();
349 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
351 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
353 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
354 Since we group all mappings by MARC field targets C<$mappings> will contain
355 all targets for C<$data> and thus we need to fetch the MARC field only once.
356 C<$mappings> will be applied to C<$record_document> and new field values added.
357 The method has no return value.
359 =over 4
361 =item C<$mappings>
363 Arrayref of mappings containing arrayrefs in the format
364 [C<$target>, C<$options>] where C<$target> is the name of the target field and
365 C<$options> is a hashref containing processing directives for this particular
366 mapping.
368 =item C<$data>
370 The source data from a MARC record field.
372 =item C<$record_document>
374 Hashref representing the Elasticsearch document on which mappings should be
375 applied.
377 =item C<$altscript>
379 A boolean value indicating whether an alternate script presentation is being
380 processed.
382 =back
384 =cut
386 sub _process_mappings {
387 my ($_self, $mappings, $data, $record_document, $altscript) = @_;
388 foreach my $mapping (@{$mappings}) {
389 my ($target, $options) = @{$mapping};
391 # Don't process sort fields for alternate scripts
392 my $sort = $target =~ /__sort$/;
393 if ($sort && $altscript) {
394 next;
397 # Copy (scalar) data since can have multiple targets
398 # with differing options for (possibly) mutating data
399 # so need a different copy for each
400 my $_data = $data;
401 $record_document->{$target} //= [];
402 if (defined $options->{substr}) {
403 my ($start, $length) = @{$options->{substr}};
404 $_data = length($data) > $start ? substr $data, $start, $length : '';
406 if (defined $options->{value_callbacks}) {
407 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
409 if (defined $options->{property}) {
410 $_data = {
411 $options->{property} => $_data
414 push @{$record_document->{$target}}, $_data;
418 =head2 marc_records_to_documents($marc_records)
420 my $record_documents = $self->marc_records_to_documents($marc_records);
422 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
424 Returns array of hash references, representing Elasticsearch documents,
425 acceptable as body payload in C<Search::Elasticsearch> requests.
427 =over 4
429 =item C<$marc_documents>
431 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
433 =back
435 =cut
437 sub marc_records_to_documents {
438 my ($self, $records) = @_;
439 my $rules = $self->_get_marc_mapping_rules();
440 my $control_fields_rules = $rules->{control_fields};
441 my $data_fields_rules = $rules->{data_fields};
442 my $marcflavour = lc C4::Context->preference('marcflavour');
443 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
445 my @record_documents;
447 foreach my $record (@{$records}) {
448 my $record_document = {};
449 my $mappings = $rules->{leader};
450 if ($mappings) {
451 $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
453 foreach my $field ($record->fields()) {
454 if ($field->is_control_field()) {
455 my $mappings = $control_fields_rules->{$field->tag()};
456 if ($mappings) {
457 $self->_process_mappings($mappings, $field->data(), $record_document, 0);
460 else {
461 my $tag = $field->tag();
462 # Handle alternate scripts in MARC 21
463 my $altscript = 0;
464 if ($marcflavour eq 'marc21' && $tag eq '880') {
465 my $sub6 = $field->subfield('6');
466 if ($sub6 =~ /^(...)-\d+/) {
467 $tag = $1;
468 $altscript = 1;
472 my $data_field_rules = $data_fields_rules->{$tag};
473 if ($data_field_rules) {
474 my $subfields_mappings = $data_field_rules->{subfields};
475 my $wildcard_mappings = $subfields_mappings->{'*'};
476 foreach my $subfield ($field->subfields()) {
477 my ($code, $data) = @{$subfield};
478 my $mappings = $subfields_mappings->{$code} // [];
479 if ($wildcard_mappings) {
480 $mappings = [@{$mappings}, @{$wildcard_mappings}];
482 if (@{$mappings}) {
483 $self->_process_mappings($mappings, $data, $record_document, $altscript);
485 if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
486 # Used by the authority linker the match-heading field requires a specific syntax
487 # that is specified in C4/Heading
488 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
489 next unless $heading;
490 push @{$record_document->{'match-heading'}}, $heading->search_form;
494 my $subfields_join_mappings = $data_field_rules->{subfields_join};
495 if ($subfields_join_mappings) {
496 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
497 # Map each subfield to values, remove empty values, join with space
498 my $data = join(
499 ' ',
500 grep(
502 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
505 if ($data) {
506 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
508 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
509 # Used by the authority linker the match-heading field requires a specific syntax
510 # that is specified in C4/Heading
511 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
512 next unless $heading;
513 push @{$record_document->{'match-heading'}}, $heading->search_form;
520 foreach my $field (keys %{$rules->{defaults}}) {
521 unless (defined $record_document->{$field}) {
522 $record_document->{$field} = $rules->{defaults}->{$field};
525 foreach my $field (@{$rules->{sum}}) {
526 if (defined $record_document->{$field}) {
527 # TODO: validate numeric? filter?
528 # TODO: Or should only accept fields without nested values?
529 # TODO: Quick and dirty, improve if needed
530 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
533 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
534 foreach my $field (@{$rules->{isbn}}) {
535 if (defined $record_document->{$field}) {
536 my @isbns = ();
537 foreach my $input_isbn (@{$record_document->{$field}}) {
538 my $isbn = Business::ISBN->new($input_isbn);
539 if (defined $isbn && $isbn->is_valid) {
540 my $isbn13 = $isbn->as_isbn13->as_string;
541 push @isbns, $isbn13;
542 $isbn13 =~ s/\-//g;
543 push @isbns, $isbn13;
545 my $isbn10 = $isbn->as_isbn10;
546 if ($isbn10) {
547 $isbn10 = $isbn10->as_string;
548 push @isbns, $isbn10;
549 $isbn10 =~ s/\-//g;
550 push @isbns, $isbn10;
552 } else {
553 push @isbns, $input_isbn;
556 $record_document->{$field} = \@isbns;
560 # Remove duplicate values and collapse sort fields
561 foreach my $field (keys %{$record_document}) {
562 if (ref($record_document->{$field}) eq 'ARRAY') {
563 @{$record_document->{$field}} = do {
564 my %seen;
565 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
567 if ($field =~ /__sort$/) {
568 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
569 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
574 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
575 $record->encoding('UTF-8');
576 if ($use_array) {
577 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
578 $record_document->{'marc_format'} = 'ARRAY';
579 } else {
580 my @warnings;
582 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
583 local $SIG{__WARN__} = sub {
584 push @warnings, $_[0];
586 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
588 if (@warnings) {
589 # Suppress warnings if record length exceeded
590 unless (substr($record->leader(), 0, 5) eq '99999') {
591 foreach my $warning (@warnings) {
592 carp $warning;
595 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
596 $record_document->{'marc_format'} = 'MARCXML';
598 else {
599 $record_document->{'marc_format'} = 'base64ISO2709';
602 push @record_documents, $record_document;
604 return \@record_documents;
607 =head2 _marc_to_array($record)
609 my @fields = _marc_to_array($record)
611 Convert a MARC::Record to an array modeled after MARC-in-JSON
612 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
614 =over 4
616 =item C<$record>
618 A MARC::Record object
620 =back
622 =cut
624 sub _marc_to_array {
625 my ($self, $record) = @_;
627 my $data = {
628 leader => $record->leader(),
629 fields => []
631 for my $field ($record->fields()) {
632 my $tag = $field->tag();
633 if ($field->is_control_field()) {
634 push @{$data->{fields}}, {$tag => $field->data()};
635 } else {
636 my $subfields = ();
637 foreach my $subfield ($field->subfields()) {
638 my ($code, $contents) = @{$subfield};
639 push @{$subfields}, {$code => $contents};
641 push @{$data->{fields}}, {
642 $tag => {
643 ind1 => $field->indicator(1),
644 ind2 => $field->indicator(2),
645 subfields => $subfields
650 return $data;
653 =head2 _array_to_marc($data)
655 my $record = _array_to_marc($data)
657 Convert an array modeled after MARC-in-JSON to a MARC::Record
659 =over 4
661 =item C<$data>
663 An array modeled after MARC-in-JSON
664 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
666 =back
668 =cut
670 sub _array_to_marc {
671 my ($self, $data) = @_;
673 my $record = MARC::Record->new();
675 $record->leader($data->{leader});
676 for my $field (@{$data->{fields}}) {
677 my $tag = (keys %{$field})[0];
678 $field = $field->{$tag};
679 my $marc_field;
680 if (ref($field) eq 'HASH') {
681 my @subfields;
682 foreach my $subfield (@{$field->{subfields}}) {
683 my $code = (keys %{$subfield})[0];
684 push @subfields, $code;
685 push @subfields, $subfield->{$code};
687 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
688 } else {
689 $marc_field = MARC::Field->new($tag, $field)
691 $record->append_fields($marc_field);
694 return $record;
697 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
699 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
701 Get mappings, an internal data structure later used by
702 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
703 data for a MARC mapping.
705 The returned C<$mappings> is not to to be confused with mappings provided by
706 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
707 provided by C<_foreach_mapping> and expands it to this internal data structure.
708 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
709 is then applied to each MARC target (leader, control field data, subfield or
710 joined subfields) and integrated into the mapping rules data structure used in
711 C<marc_records_to_documents> to transform MARC records into Elasticsearch
712 documents.
714 =over 4
716 =item C<$facet>
718 Boolean indicating whether to create a facet field for this mapping.
720 =item C<$suggestible>
722 Boolean indicating whether to create a suggestion field for this mapping.
724 =item C<$sort>
726 Boolean indicating whether to create a sort field for this mapping.
728 =item C<$search>
730 Boolean indicating whether to create a search field for this mapping.
732 =item C<$target_name>
734 Elasticsearch document target field name.
736 =item C<$target_type>
738 Elasticsearch document target field type.
740 =item C<$range>
742 An optional range as a string in the format "<START>-<END>" or "<START>",
743 where "<START>" and "<END>" are integers specifying a range that will be used
744 for extracting a substring from MARC data as Elasticsearch field target value.
746 The first character position is "0", and the range is inclusive,
747 so "0-2" means the first three characters of MARC data.
749 If only "<START>" is provided only one character at position "<START>" will
750 be extracted.
752 =back
754 =cut
756 sub _field_mappings {
757 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
758 my %mapping_defaults = ();
759 my @mappings;
761 my $substr_args = undef;
762 if (defined $range) {
763 # TODO: use value_callback instead?
764 my ($start, $end) = map(int, split /-/, $range, 2);
765 $substr_args = [$start];
766 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
768 my $default_options = {};
769 if ($substr_args) {
770 $default_options->{substr} = $substr_args;
773 # TODO: Should probably have per type value callback/hook
774 # but hard code for now
775 if ($target_type eq 'boolean') {
776 $default_options->{value_callbacks} //= [];
777 push @{$default_options->{value_callbacks}}, sub {
778 my ($value) = @_;
779 # Trim whitespace at both ends
780 $value =~ s/^\s+|\s+$//g;
781 return $value ? 'true' : 'false';
785 if ($search) {
786 my $mapping = [$target_name, $default_options];
787 push @mappings, $mapping;
790 my @suffixes = ();
791 push @suffixes, 'facet' if $facet;
792 push @suffixes, 'suggestion' if $suggestible;
793 push @suffixes, 'sort' if !defined $sort || $sort;
795 foreach my $suffix (@suffixes) {
796 my $mapping = ["${target_name}__$suffix"];
797 # TODO: Hack, fix later in less hideous manner
798 if ($suffix eq 'suggestion') {
799 push @{$mapping}, {%{$default_options}, property => 'input'};
801 else {
802 push @{$mapping}, $default_options;
804 push @mappings, $mapping;
806 return @mappings;
809 =head2 _get_marc_mapping_rules
811 my $mapping_rules = $self->_get_marc_mapping_rules()
813 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
815 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
816 each call to C<MARC::Record>->field) we create an optimized structure of mapping
817 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
819 We can then iterate through all MARC fields for each record and apply all relevant
820 rules once per fields instead of retreiving fields multiple times for each mapping rule
821 which is terribly slow.
823 =cut
825 # TODO: This structure can be used for processing multiple MARC::Records so is currently
826 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
827 # memory cache which it is currently not. The performance gain of caching
828 # would probably be marginal, but to do this could be a further improvement.
830 sub _get_marc_mapping_rules {
831 my ($self) = @_;
832 my $marcflavour = lc C4::Context->preference('marcflavour');
833 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
834 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
835 my $rules = {
836 'leader' => [],
837 'control_fields' => {},
838 'data_fields' => {},
839 'sum' => [],
840 'isbn' => [],
841 'defaults' => {}
844 $self->_foreach_mapping(sub {
845 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
846 return if $marc_type ne $marcflavour;
848 if ($type eq 'sum') {
849 push @{$rules->{sum}}, $name;
850 push @{$rules->{sum}}, $name."__sort" if $sort;
852 elsif ($type eq 'isbn') {
853 push @{$rules->{isbn}}, $name;
855 elsif ($type eq 'boolean') {
856 # boolean gets special handling, if value doesn't exist for a field,
857 # it is set to false
858 $rules->{defaults}->{$name} = 'false';
861 if ($marc_field =~ $field_spec_regexp) {
862 my $field_tag = $1;
864 my @subfields;
865 my @subfield_groups;
866 # Parse and separate subfields form subfield groups
867 if (defined $2) {
868 my $subfield_group = '';
869 my $open_group = 0;
871 foreach my $token (split //, $2) {
872 if ($token eq "(") {
873 if ($open_group) {
874 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
875 "Unmatched opening parenthesis for $marc_field"
878 else {
879 $open_group = 1;
882 elsif ($token eq ")") {
883 if ($open_group) {
884 if ($subfield_group) {
885 push @subfield_groups, $subfield_group;
886 $subfield_group = '';
888 $open_group = 0;
890 else {
891 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
892 "Unmatched closing parenthesis for $marc_field"
896 elsif ($open_group) {
897 $subfield_group .= $token;
899 else {
900 push @subfields, $token;
904 else {
905 push @subfields, '*';
908 my $range = defined $3 ? $3 : undef;
909 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
910 if ($field_tag < 10) {
911 $rules->{control_fields}->{$field_tag} //= [];
912 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
914 else {
915 $rules->{data_fields}->{$field_tag} //= {};
916 foreach my $subfield (@subfields) {
917 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
918 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
920 foreach my $subfield_group (@subfield_groups) {
921 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
922 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
926 elsif ($marc_field =~ $leader_regexp) {
927 my $range = defined $1 ? $1 : undef;
928 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
929 push @{$rules->{leader}}, @mappings;
931 else {
932 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
933 "Invalid MARC field expression: $marc_field"
937 return $rules;
940 =head2 _foreach_mapping
942 $self->_foreach_mapping(
943 sub {
944 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
945 $marc_field )
946 = @_;
947 return unless $marc_type eq 'marc21';
948 print "Data comes from: " . $marc_field . "\n";
952 This allows you to apply a function to each entry in the elasticsearch mappings
953 table, in order to build the mappings for whatever is needed.
955 In the provided function, the files are:
957 =over 4
959 =item C<$name>
961 The field name for elasticsearch (corresponds to the 'mapping' column in the
962 database.
964 =item C<$type>
966 The type for this value, e.g. 'string'.
968 =item C<$facet>
970 True if this value should be facetised. This only really makes sense if the
971 field is understood by the facet processing code anyway.
973 =item C<$sort>
975 True if this is a field that a) needs special sort handling, and b) if it
976 should be sorted on. False if a) but not b). Undef if not a). This allows,
977 for example, author to be sorted on but not everything marked with "author"
978 to be included in that sort.
980 =item C<$marc_type>
982 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
983 'unimarc', 'normarc'.
985 =item C<$marc_field>
987 A string that describes the MARC field that contains the data to extract.
988 These are of a form suited to Catmandu's MARC fixers.
990 =back
992 =cut
994 sub _foreach_mapping {
995 my ( $self, $sub ) = @_;
997 # TODO use a caching framework here
998 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1000 'search_marc_map.index_name' => $self->index,
1002 { join => { search_marc_to_fields => 'search_marc_map' },
1003 '+select' => [
1004 'search_marc_to_fields.facet',
1005 'search_marc_to_fields.suggestible',
1006 'search_marc_to_fields.sort',
1007 'search_marc_to_fields.search',
1008 'search_marc_map.marc_type',
1009 'search_marc_map.marc_field',
1011 '+as' => [
1012 'facet',
1013 'suggestible',
1014 'sort',
1015 'search',
1016 'marc_type',
1017 'marc_field',
1022 while ( my $search_field = $search_fields->next ) {
1023 $sub->(
1024 # Force lower case on indexed field names for case insensitive
1025 # field name searches
1026 lc($search_field->name),
1027 $search_field->type,
1028 $search_field->get_column('facet'),
1029 $search_field->get_column('suggestible'),
1030 $search_field->get_column('sort'),
1031 $search_field->get_column('search'),
1032 $search_field->get_column('marc_type'),
1033 $search_field->get_column('marc_field'),
1038 =head2 process_error
1040 die process_error($@);
1042 This parses an Elasticsearch error message and produces a human-readable
1043 result from it. This result is probably missing all the useful information
1044 that you might want in diagnosing an issue, so the warning is also logged.
1046 Note that currently the resulting message is not internationalised. This
1047 will happen eventually by some method or other.
1049 =cut
1051 sub process_error {
1052 my ($self, $msg) = @_;
1054 warn $msg; # simple logging
1056 # This is super-primitive
1057 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1059 return "Unable to perform your search. Please try again.\n";
1062 =head2 _read_configuration
1064 my $conf = _read_configuration();
1066 Reads the I<configuration file> and returns a hash structure with the
1067 configuration information. It raises an exception if mandatory entries
1068 are missing.
1070 The hashref structure has the following form:
1073 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1074 'index_name' => 'koha_instance',
1077 This is configured by the following in the C<config> block in koha-conf.xml:
1079 <elasticsearch>
1080 <server>127.0.0.1:9200</server>
1081 <server>anotherserver:9200</server>
1082 <index_name>koha_instance</index_name>
1083 </elasticsearch>
1085 =cut
1087 sub _read_configuration {
1089 my $configuration;
1091 my $conf = C4::Context->config('elasticsearch');
1092 Koha::Exceptions::Config::MissingEntry->throw(
1093 "Missing 'elasticsearch' block in config file")
1094 unless defined $conf;
1096 if ( $conf && $conf->{server} ) {
1097 my $nodes = $conf->{server};
1098 if ( ref($nodes) eq 'ARRAY' ) {
1099 $configuration->{nodes} = $nodes;
1101 else {
1102 $configuration->{nodes} = [$nodes];
1105 else {
1106 Koha::Exceptions::Config::MissingEntry->throw(
1107 "Missing 'server' entry in config file for elasticsearch");
1110 if ( defined $conf->{index_name} ) {
1111 $configuration->{index_name} = $conf->{index_name};
1113 else {
1114 Koha::Exceptions::Config::MissingEntry->throw(
1115 "Missing 'index_name' entry in config file for elasticsearch");
1118 return $configuration;
1121 =head2 get_facetable_fields
1123 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1125 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1127 =cut
1129 sub get_facetable_fields {
1130 my ($self) = @_;
1132 # These should correspond to the ES field names, as opposed to the CCL
1133 # things that zebra uses.
1134 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1135 my @faceted_fields = Koha::SearchFields->search(
1136 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1138 my @not_faceted_fields = Koha::SearchFields->search(
1139 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1141 # This could certainly be improved
1142 return ( @faceted_fields, @not_faceted_fields );
1147 __END__
1149 =head1 AUTHOR
1151 =over 4
1153 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1155 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1157 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1159 =back
1161 =cut