Bug 23204: (follow-up) Also sort by marc field
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blobce762a00ce4c595a7d03a23475ebce23e78474a5
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
54 =head1 NAME
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58 =head1 ACCESSORS
60 =over 4
62 =item index
64 The name of the index to use, generally 'biblios' or 'authorities'.
66 =back
68 =head1 FUNCTIONS
70 =cut
72 sub new {
73 my $class = shift @_;
74 my $self = $class->SUPER::new(@_);
75 # Check for a valid index
76 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77 return $self;
80 =head2 get_elasticsearch
82 my $elasticsearch_client = $self->get_elasticsearch();
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
87 =cut
89 sub get_elasticsearch {
90 my $self = shift @_;
91 unless (defined $self->{elasticsearch}) {
92 my $conf = $self->get_elasticsearch_params();
93 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95 return $self->{elasticsearch};
98 =head2 get_elasticsearch_params
100 my $params = $self->get_elasticsearch_params();
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
106 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107 'index_name' => 'koha_instance_index',
110 This is configured by the following in the C<config> block in koha-conf.xml:
112 <elasticsearch>
113 <server>127.0.0.1:9200</server>
114 <server>anotherserver:9200</server>
115 <index_name>koha_instance</index_name>
116 </elasticsearch>
118 =cut
120 sub get_elasticsearch_params {
121 my ($self) = @_;
123 # Copy the hash so that we're not modifying the original
124 my $conf = C4::Context->config('elasticsearch');
125 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126 my $es = { %{ $conf } };
128 # Helpfully, the multiple server lines end up in an array for us anyway
129 # if there are multiple ones, but not if there's only one.
130 my $server = $es->{server};
131 delete $es->{server};
132 if ( ref($server) eq 'ARRAY' ) {
134 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135 $es->{nodes} = $server;
137 elsif ($server) {
138 $es->{nodes} = [$server];
140 else {
141 die "No elasticsearch servers were specified in koha-conf.xml.\n";
143 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144 if ( !$es->{index_name} );
145 # Append the name of this particular index to our namespace
146 $es->{index_name} .= '_' . $self->index;
148 $es->{key_prefix} = 'es_';
149 $es->{cxn_pool} //= 'Static';
150 $es->{request_timeout} //= 60;
152 return $es;
155 =head2 get_elasticsearch_settings
157 my $settings = $self->get_elasticsearch_settings();
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
162 A hashref containing the settings is returned.
164 =cut
166 sub get_elasticsearch_settings {
167 my ($self) = @_;
169 # Use state to speed up repeated calls
170 state $settings = undef;
171 if (!defined $settings) {
172 my $config_file = C4::Context->config('elasticsearch_index_config');
173 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174 $settings = LoadFile( $config_file );
177 return $settings;
180 =head2 get_elasticsearch_mappings
182 my $mappings = $self->get_elasticsearch_mappings();
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
187 =cut
189 sub get_elasticsearch_mappings {
190 my ($self) = @_;
192 # Use state to speed up repeated calls
193 state %all_mappings;
194 state %sort_fields;
196 if (!defined $all_mappings{$self->index}) {
197 $sort_fields{$self->index} = {};
198 # Clone the general mapping to break ties with the original hash
199 my $mappings = {
200 data => clone(_get_elasticsearch_field_config('general', ''))
202 my $marcflavour = lc C4::Context->preference('marcflavour');
203 $self->_foreach_mapping(
204 sub {
205 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206 return if $marc_type ne $marcflavour;
207 # TODO if this gets any sort of complexity to it, it should
208 # be broken out into its own function.
210 # TODO be aware of date formats, but this requires pre-parsing
211 # as ES will simply reject anything with an invalid date.
212 my $es_type = 'text';
213 if ($type eq 'boolean') {
214 $es_type = 'boolean';
215 } elsif ($type eq 'number' || $type eq 'sum') {
216 $es_type = 'integer';
217 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218 $es_type = 'stdno';
221 if ($search) {
222 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
225 if ($facet) {
226 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
228 if ($suggestible) {
229 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231 # Sort is a bit special as it can be true, false, undef.
232 # We care about "true" or "undef",
233 # "undef" means to do the default thing, which is make it sortable.
234 if (!defined $sort || $sort) {
235 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236 $sort_fields{$self->index}{$name} = 1;
240 $all_mappings{$self->index} = $mappings;
242 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 raw_elasticsearch_mappings
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc|normarc
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
254 =cut
256 sub raw_elasticsearch_mappings {
257 my ( $marc_type ) = @_;
259 my $schema = Koha::Database->new()->schema();
261 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
263 my $mappings = {};
264 while ( my $search_field = $search_fields->next ) {
266 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
267 { search_field_id => $search_field->id },
269 join => 'search_marc_map',
270 order_by => { -asc => 'search_marc_map.marc_field' }
274 while ( my $marc_to_field = $marc_to_fields->next ) {
276 my $marc_map = $marc_to_field->search_marc_map;
278 next if $marc_type && $marc_map->marc_type ne $marc_type;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight || undef;
285 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
287 facet => $marc_to_field->facet || '',
288 marc_type => $marc_map->marc_type,
289 marc_field => $marc_map->marc_field,
290 sort => $marc_to_field->sort,
291 suggestible => $marc_to_field->suggestible || ''
297 return $mappings;
300 =head2 _get_elasticsearch_field_config
302 Get the Elasticsearch field config for the given purpose and data type.
304 $mapping = _get_elasticsearch_field_config('search', 'text');
306 =cut
308 sub _get_elasticsearch_field_config {
310 my ( $purpose, $type ) = @_;
312 # Use state to speed up repeated calls
313 state $settings = undef;
314 if (!defined $settings) {
315 my $config_file = C4::Context->config('elasticsearch_field_config');
316 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
317 $settings = LoadFile( $config_file );
320 if (!defined $settings->{$purpose}) {
321 die "Field purpose $purpose not defined in field config";
323 if ($type eq '') {
324 return $settings->{$purpose};
326 if (defined $settings->{$purpose}{$type}) {
327 return $settings->{$purpose}{$type};
329 if (defined $settings->{$purpose}{'default'}) {
330 return $settings->{$purpose}{'default'};
332 return;
335 =head2 _load_elasticsearch_mappings
337 Load Elasticsearch mappings in the format of mappings.yaml.
339 $indexes = _load_elasticsearch_mappings();
341 =cut
343 sub _load_elasticsearch_mappings {
344 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
345 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
346 return LoadFile( $mappings_yaml );
349 sub reset_elasticsearch_mappings {
350 my ( $self ) = @_;
351 my $indexes = $self->_load_elasticsearch_mappings();
353 Koha::SearchMarcMaps->delete;
354 Koha::SearchFields->delete;
356 while ( my ( $index_name, $fields ) = each %$indexes ) {
357 while ( my ( $field_name, $data ) = each %$fields ) {
359 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
361 # Set default values
362 $sf_params{staff_client} //= 1;
363 $sf_params{opac} //= 1;
365 $sf_params{name} = $field_name;
367 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
369 my $mappings = $data->{mappings};
370 for my $mapping ( @$mappings ) {
371 my $marc_field = Koha::SearchMarcMaps->find_or_create({
372 index_name => $index_name,
373 marc_type => $mapping->{marc_type},
374 marc_field => $mapping->{marc_field}
376 $search_field->add_to_search_marc_maps($marc_field, {
377 facet => $mapping->{facet} || 0,
378 suggestible => $mapping->{suggestible} || 0,
379 sort => $mapping->{sort},
380 search => $mapping->{search} // 1
387 # This overrides the accessor provided by Class::Accessor so that if
388 # sort_fields isn't set, then it'll generate it.
389 sub sort_fields {
390 my $self = shift;
391 if (@_) {
392 $self->_sort_fields_accessor(@_);
393 return;
395 my $val = $self->_sort_fields_accessor();
396 return $val if $val;
398 # This will populate the accessor as a side effect
399 $self->get_elasticsearch_mappings();
400 return $self->_sort_fields_accessor();
403 =head2 _process_mappings($mappings, $data, $record_document, $meta)
405 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
407 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
408 Since we group all mappings by MARC field targets C<$mappings> will contain
409 all targets for C<$data> and thus we need to fetch the MARC field only once.
410 C<$mappings> will be applied to C<$record_document> and new field values added.
411 The method has no return value.
413 =over 4
415 =item C<$mappings>
417 Arrayref of mappings containing arrayrefs in the format
418 [C<$target>, C<$options>] where C<$target> is the name of the target field and
419 C<$options> is a hashref containing processing directives for this particular
420 mapping.
422 =item C<$data>
424 The source data from a MARC record field.
426 =item C<$record_document>
428 Hashref representing the Elasticsearch document on which mappings should be
429 applied.
431 =item C<$meta>
433 A hashref containing metadata useful for enforcing per mapping rules. For
434 example for providing extra context for mapping options, or treating mapping
435 targets differently depending on type (sort, search, facet etc). Combining
436 this metadata with the mapping options and metadata allows us to mutate the
437 data per mapping, or even replace it with other data retrieved from the
438 metadata context.
440 Current properties are:
442 C<altscript>: A boolean value indicating whether an alternate script presentation is being
443 processed.
445 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
446 'subfield' or 'subfields_group'.
448 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
450 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
451 is 'subfields_group'.
453 C<field>: The original C<MARC::Record> object.
455 =back
457 =cut
459 sub _process_mappings {
460 my ($_self, $mappings, $data, $record_document, $meta) = @_;
461 foreach my $mapping (@{$mappings}) {
462 my ($target, $options) = @{$mapping};
464 # Don't process sort fields for alternate scripts
465 my $sort = $target =~ /__sort$/;
466 if ($sort && $meta->{altscript}) {
467 next;
470 # Copy (scalar) data since can have multiple targets
471 # with differing options for (possibly) mutating data
472 # so need a different copy for each
473 my $_data = $data;
474 $record_document->{$target} //= [];
475 if (defined $options->{substr}) {
476 my ($start, $length) = @{$options->{substr}};
477 $_data = length($data) > $start ? substr $data, $start, $length : '';
479 if (defined $options->{value_callbacks}) {
480 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
482 if (defined $options->{property}) {
483 $_data = {
484 $options->{property} => $_data
487 if (defined $options->{nonfiling_characters_indicator}) {
488 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
489 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
490 if ($nonfiling_chars) {
491 $_data = substr $_data, $nonfiling_chars;
494 push @{$record_document->{$target}}, $_data;
498 =head2 marc_records_to_documents($marc_records)
500 my $record_documents = $self->marc_records_to_documents($marc_records);
502 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
504 Returns array of hash references, representing Elasticsearch documents,
505 acceptable as body payload in C<Search::Elasticsearch> requests.
507 =over 4
509 =item C<$marc_documents>
511 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
513 =back
515 =cut
517 sub marc_records_to_documents {
518 my ($self, $records) = @_;
519 my $rules = $self->_get_marc_mapping_rules();
520 my $control_fields_rules = $rules->{control_fields};
521 my $data_fields_rules = $rules->{data_fields};
522 my $marcflavour = lc C4::Context->preference('marcflavour');
523 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
525 my @record_documents;
527 foreach my $record (@{$records}) {
528 my $record_document = {};
529 my $mappings = $rules->{leader};
530 if ($mappings) {
531 $self->_process_mappings($mappings, $record->leader(), $record_document, {
532 altscript => 0,
533 data_source => 'leader'
537 foreach my $field ($record->fields()) {
538 if ($field->is_control_field()) {
539 my $mappings = $control_fields_rules->{$field->tag()};
540 if ($mappings) {
541 $self->_process_mappings($mappings, $field->data(), $record_document, {
542 altscript => 0,
543 data_source => 'control_field',
544 field => $field
549 else {
550 my $tag = $field->tag();
551 # Handle alternate scripts in MARC 21
552 my $altscript = 0;
553 if ($marcflavour eq 'marc21' && $tag eq '880') {
554 my $sub6 = $field->subfield('6');
555 if ($sub6 =~ /^(...)-\d+/) {
556 $tag = $1;
557 $altscript = 1;
561 my $data_field_rules = $data_fields_rules->{$tag};
562 if ($data_field_rules) {
563 my $subfields_mappings = $data_field_rules->{subfields};
564 my $wildcard_mappings = $subfields_mappings->{'*'};
565 foreach my $subfield ($field->subfields()) {
566 my ($code, $data) = @{$subfield};
567 my $mappings = $subfields_mappings->{$code} // [];
568 if ($wildcard_mappings) {
569 $mappings = [@{$mappings}, @{$wildcard_mappings}];
571 if (@{$mappings}) {
572 $self->_process_mappings($mappings, $data, $record_document, {
573 altscript => $altscript,
574 data_source => 'subfield',
575 code => $code,
576 field => $field
580 if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
581 # Used by the authority linker the match-heading field requires a specific syntax
582 # that is specified in C4/Heading
583 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
584 next unless $heading;
585 push @{$record_document->{'match-heading'}}, $heading->search_form;
589 my $subfields_join_mappings = $data_field_rules->{subfields_join};
590 if ($subfields_join_mappings) {
591 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
592 # Map each subfield to values, remove empty values, join with space
593 my $data = join(
594 ' ',
595 grep(
597 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
600 if ($data) {
601 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
602 altscript => $altscript,
603 data_source => 'subfields_group',
604 codes => $subfields_group,
605 field => $field
609 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
610 # Used by the authority linker the match-heading field requires a specific syntax
611 # that is specified in C4/Heading
612 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
613 next unless $heading;
614 push @{$record_document->{'match-heading'}}, $heading->search_form;
621 foreach my $field (keys %{$rules->{defaults}}) {
622 unless (defined $record_document->{$field}) {
623 $record_document->{$field} = $rules->{defaults}->{$field};
626 foreach my $field (@{$rules->{sum}}) {
627 if (defined $record_document->{$field}) {
628 # TODO: validate numeric? filter?
629 # TODO: Or should only accept fields without nested values?
630 # TODO: Quick and dirty, improve if needed
631 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
634 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
635 foreach my $field (@{$rules->{isbn}}) {
636 if (defined $record_document->{$field}) {
637 my @isbns = ();
638 foreach my $input_isbn (@{$record_document->{$field}}) {
639 my $isbn = Business::ISBN->new($input_isbn);
640 if (defined $isbn && $isbn->is_valid) {
641 my $isbn13 = $isbn->as_isbn13->as_string;
642 push @isbns, $isbn13;
643 $isbn13 =~ s/\-//g;
644 push @isbns, $isbn13;
646 my $isbn10 = $isbn->as_isbn10;
647 if ($isbn10) {
648 $isbn10 = $isbn10->as_string;
649 push @isbns, $isbn10;
650 $isbn10 =~ s/\-//g;
651 push @isbns, $isbn10;
653 } else {
654 push @isbns, $input_isbn;
657 $record_document->{$field} = \@isbns;
661 # Remove duplicate values and collapse sort fields
662 foreach my $field (keys %{$record_document}) {
663 if (ref($record_document->{$field}) eq 'ARRAY') {
664 @{$record_document->{$field}} = do {
665 my %seen;
666 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
668 if ($field =~ /__sort$/) {
669 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
670 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
675 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
676 $record->encoding('UTF-8');
677 if ($use_array) {
678 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
679 $record_document->{'marc_format'} = 'ARRAY';
680 } else {
681 my @warnings;
683 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
684 local $SIG{__WARN__} = sub {
685 push @warnings, $_[0];
687 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
689 if (@warnings) {
690 # Suppress warnings if record length exceeded
691 unless (substr($record->leader(), 0, 5) eq '99999') {
692 foreach my $warning (@warnings) {
693 carp $warning;
696 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
697 $record_document->{'marc_format'} = 'MARCXML';
699 else {
700 $record_document->{'marc_format'} = 'base64ISO2709';
703 push @record_documents, $record_document;
705 return \@record_documents;
708 =head2 _marc_to_array($record)
710 my @fields = _marc_to_array($record)
712 Convert a MARC::Record to an array modeled after MARC-in-JSON
713 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
715 =over 4
717 =item C<$record>
719 A MARC::Record object
721 =back
723 =cut
725 sub _marc_to_array {
726 my ($self, $record) = @_;
728 my $data = {
729 leader => $record->leader(),
730 fields => []
732 for my $field ($record->fields()) {
733 my $tag = $field->tag();
734 if ($field->is_control_field()) {
735 push @{$data->{fields}}, {$tag => $field->data()};
736 } else {
737 my $subfields = ();
738 foreach my $subfield ($field->subfields()) {
739 my ($code, $contents) = @{$subfield};
740 push @{$subfields}, {$code => $contents};
742 push @{$data->{fields}}, {
743 $tag => {
744 ind1 => $field->indicator(1),
745 ind2 => $field->indicator(2),
746 subfields => $subfields
751 return $data;
754 =head2 _array_to_marc($data)
756 my $record = _array_to_marc($data)
758 Convert an array modeled after MARC-in-JSON to a MARC::Record
760 =over 4
762 =item C<$data>
764 An array modeled after MARC-in-JSON
765 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
767 =back
769 =cut
771 sub _array_to_marc {
772 my ($self, $data) = @_;
774 my $record = MARC::Record->new();
776 $record->leader($data->{leader});
777 for my $field (@{$data->{fields}}) {
778 my $tag = (keys %{$field})[0];
779 $field = $field->{$tag};
780 my $marc_field;
781 if (ref($field) eq 'HASH') {
782 my @subfields;
783 foreach my $subfield (@{$field->{subfields}}) {
784 my $code = (keys %{$subfield})[0];
785 push @subfields, $code;
786 push @subfields, $subfield->{$code};
788 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
789 } else {
790 $marc_field = MARC::Field->new($tag, $field)
792 $record->append_fields($marc_field);
795 return $record;
798 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
800 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
802 Get mappings, an internal data structure later used by
803 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
804 data for a MARC mapping.
806 The returned C<$mappings> is not to to be confused with mappings provided by
807 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
808 provided by C<_foreach_mapping> and expands it to this internal data structure.
809 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
810 is then applied to each MARC target (leader, control field data, subfield or
811 joined subfields) and integrated into the mapping rules data structure used in
812 C<marc_records_to_documents> to transform MARC records into Elasticsearch
813 documents.
815 =over 4
817 =item C<$facet>
819 Boolean indicating whether to create a facet field for this mapping.
821 =item C<$suggestible>
823 Boolean indicating whether to create a suggestion field for this mapping.
825 =item C<$sort>
827 Boolean indicating whether to create a sort field for this mapping.
829 =item C<$search>
831 Boolean indicating whether to create a search field for this mapping.
833 =item C<$target_name>
835 Elasticsearch document target field name.
837 =item C<$target_type>
839 Elasticsearch document target field type.
841 =item C<$range>
843 An optional range as a string in the format "<START>-<END>" or "<START>",
844 where "<START>" and "<END>" are integers specifying a range that will be used
845 for extracting a substring from MARC data as Elasticsearch field target value.
847 The first character position is "0", and the range is inclusive,
848 so "0-2" means the first three characters of MARC data.
850 If only "<START>" is provided only one character at position "<START>" will
851 be extracted.
853 =back
855 =cut
857 sub _field_mappings {
858 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
859 my %mapping_defaults = ();
860 my @mappings;
862 my $substr_args = undef;
863 if (defined $range) {
864 # TODO: use value_callback instead?
865 my ($start, $end) = map(int, split /-/, $range, 2);
866 $substr_args = [$start];
867 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
869 my $default_options = {};
870 if ($substr_args) {
871 $default_options->{substr} = $substr_args;
874 # TODO: Should probably have per type value callback/hook
875 # but hard code for now
876 if ($target_type eq 'boolean') {
877 $default_options->{value_callbacks} //= [];
878 push @{$default_options->{value_callbacks}}, sub {
879 my ($value) = @_;
880 # Trim whitespace at both ends
881 $value =~ s/^\s+|\s+$//g;
882 return $value ? 'true' : 'false';
886 if ($search) {
887 my $mapping = [$target_name, $default_options];
888 push @mappings, $mapping;
891 my @suffixes = ();
892 push @suffixes, 'facet' if $facet;
893 push @suffixes, 'suggestion' if $suggestible;
894 push @suffixes, 'sort' if !defined $sort || $sort;
896 foreach my $suffix (@suffixes) {
897 my $mapping = ["${target_name}__$suffix"];
898 # TODO: Hack, fix later in less hideous manner
899 if ($suffix eq 'suggestion') {
900 push @{$mapping}, {%{$default_options}, property => 'input'};
902 else {
903 # Important! Make shallow clone, or we end up with the same hashref
904 # shared by all mappings
905 push @{$mapping}, {%{$default_options}};
907 push @mappings, $mapping;
909 return @mappings;
912 =head2 _get_marc_mapping_rules
914 my $mapping_rules = $self->_get_marc_mapping_rules()
916 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
918 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
919 each call to C<MARC::Record>->field) we create an optimized structure of mapping
920 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
922 We can then iterate through all MARC fields for each record and apply all relevant
923 rules once per fields instead of retreiving fields multiple times for each mapping rule
924 which is terribly slow.
926 =cut
928 # TODO: This structure can be used for processing multiple MARC::Records so is currently
929 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
930 # memory cache which it is currently not. The performance gain of caching
931 # would probably be marginal, but to do this could be a further improvement.
933 sub _get_marc_mapping_rules {
934 my ($self) = @_;
935 my $marcflavour = lc C4::Context->preference('marcflavour');
936 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
937 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
938 my $rules = {
939 'leader' => [],
940 'control_fields' => {},
941 'data_fields' => {},
942 'sum' => [],
943 'isbn' => [],
944 'defaults' => {}
947 $self->_foreach_mapping(sub {
948 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
949 return if $marc_type ne $marcflavour;
951 if ($type eq 'sum') {
952 push @{$rules->{sum}}, $name;
953 push @{$rules->{sum}}, $name."__sort" if $sort;
955 elsif ($type eq 'isbn') {
956 push @{$rules->{isbn}}, $name;
958 elsif ($type eq 'boolean') {
959 # boolean gets special handling, if value doesn't exist for a field,
960 # it is set to false
961 $rules->{defaults}->{$name} = 'false';
964 if ($marc_field =~ $field_spec_regexp) {
965 my $field_tag = $1;
967 my @subfields;
968 my @subfield_groups;
969 # Parse and separate subfields form subfield groups
970 if (defined $2) {
971 my $subfield_group = '';
972 my $open_group = 0;
974 foreach my $token (split //, $2) {
975 if ($token eq "(") {
976 if ($open_group) {
977 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
978 "Unmatched opening parenthesis for $marc_field"
981 else {
982 $open_group = 1;
985 elsif ($token eq ")") {
986 if ($open_group) {
987 if ($subfield_group) {
988 push @subfield_groups, $subfield_group;
989 $subfield_group = '';
991 $open_group = 0;
993 else {
994 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
995 "Unmatched closing parenthesis for $marc_field"
999 elsif ($open_group) {
1000 $subfield_group .= $token;
1002 else {
1003 push @subfields, $token;
1007 else {
1008 push @subfields, '*';
1011 my $range = defined $3 ? $3 : undef;
1012 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1013 if ($field_tag < 10) {
1014 $rules->{control_fields}->{$field_tag} //= [];
1015 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1017 else {
1018 $rules->{data_fields}->{$field_tag} //= {};
1019 foreach my $subfield (@subfields) {
1020 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1021 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1023 foreach my $subfield_group (@subfield_groups) {
1024 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1025 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1029 elsif ($marc_field =~ $leader_regexp) {
1030 my $range = defined $1 ? $1 : undef;
1031 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1032 push @{$rules->{leader}}, @mappings;
1034 else {
1035 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1036 "Invalid MARC field expression: $marc_field"
1041 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1042 if ($marcflavour eq 'marc21') {
1043 # Nonfiling characters processing for sort fields
1044 my %title_fields;
1045 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1046 # Format is: nonfiling characters indicator => field names list
1047 %title_fields = (
1048 1 => [130, 630, 730, 740],
1049 2 => [222, 240, 242, 243, 245, 440, 830]
1052 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1053 %title_fields = (
1054 1 => [730],
1055 2 => [130, 430, 530]
1058 foreach my $indicator (keys %title_fields) {
1059 foreach my $field_tag (@{$title_fields{$indicator}}) {
1060 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1061 foreach my $mapping (@{$mappings}) {
1062 if ($mapping->[0] =~ /__sort$/) {
1063 # Mark this as to be processed for nonfiling characters indicator
1064 # later on in _process_mappings
1065 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1072 return $rules;
1075 =head2 _foreach_mapping
1077 $self->_foreach_mapping(
1078 sub {
1079 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1080 $marc_field )
1081 = @_;
1082 return unless $marc_type eq 'marc21';
1083 print "Data comes from: " . $marc_field . "\n";
1087 This allows you to apply a function to each entry in the elasticsearch mappings
1088 table, in order to build the mappings for whatever is needed.
1090 In the provided function, the files are:
1092 =over 4
1094 =item C<$name>
1096 The field name for elasticsearch (corresponds to the 'mapping' column in the
1097 database.
1099 =item C<$type>
1101 The type for this value, e.g. 'string'.
1103 =item C<$facet>
1105 True if this value should be facetised. This only really makes sense if the
1106 field is understood by the facet processing code anyway.
1108 =item C<$sort>
1110 True if this is a field that a) needs special sort handling, and b) if it
1111 should be sorted on. False if a) but not b). Undef if not a). This allows,
1112 for example, author to be sorted on but not everything marked with "author"
1113 to be included in that sort.
1115 =item C<$marc_type>
1117 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1118 'unimarc', 'normarc'.
1120 =item C<$marc_field>
1122 A string that describes the MARC field that contains the data to extract.
1123 These are of a form suited to Catmandu's MARC fixers.
1125 =back
1127 =cut
1129 sub _foreach_mapping {
1130 my ( $self, $sub ) = @_;
1132 # TODO use a caching framework here
1133 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1135 'search_marc_map.index_name' => $self->index,
1137 { join => { search_marc_to_fields => 'search_marc_map' },
1138 '+select' => [
1139 'search_marc_to_fields.facet',
1140 'search_marc_to_fields.suggestible',
1141 'search_marc_to_fields.sort',
1142 'search_marc_to_fields.search',
1143 'search_marc_map.marc_type',
1144 'search_marc_map.marc_field',
1146 '+as' => [
1147 'facet',
1148 'suggestible',
1149 'sort',
1150 'search',
1151 'marc_type',
1152 'marc_field',
1157 while ( my $search_field = $search_fields->next ) {
1158 $sub->(
1159 # Force lower case on indexed field names for case insensitive
1160 # field name searches
1161 lc($search_field->name),
1162 $search_field->type,
1163 $search_field->get_column('facet'),
1164 $search_field->get_column('suggestible'),
1165 $search_field->get_column('sort'),
1166 $search_field->get_column('search'),
1167 $search_field->get_column('marc_type'),
1168 $search_field->get_column('marc_field'),
1173 =head2 process_error
1175 die process_error($@);
1177 This parses an Elasticsearch error message and produces a human-readable
1178 result from it. This result is probably missing all the useful information
1179 that you might want in diagnosing an issue, so the warning is also logged.
1181 Note that currently the resulting message is not internationalised. This
1182 will happen eventually by some method or other.
1184 =cut
1186 sub process_error {
1187 my ($self, $msg) = @_;
1189 warn $msg; # simple logging
1191 # This is super-primitive
1192 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1194 return "Unable to perform your search. Please try again.\n";
1197 =head2 _read_configuration
1199 my $conf = _read_configuration();
1201 Reads the I<configuration file> and returns a hash structure with the
1202 configuration information. It raises an exception if mandatory entries
1203 are missing.
1205 The hashref structure has the following form:
1208 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1209 'index_name' => 'koha_instance',
1212 This is configured by the following in the C<config> block in koha-conf.xml:
1214 <elasticsearch>
1215 <server>127.0.0.1:9200</server>
1216 <server>anotherserver:9200</server>
1217 <index_name>koha_instance</index_name>
1218 </elasticsearch>
1220 =cut
1222 sub _read_configuration {
1224 my $configuration;
1226 my $conf = C4::Context->config('elasticsearch');
1227 Koha::Exceptions::Config::MissingEntry->throw(
1228 "Missing 'elasticsearch' block in config file")
1229 unless defined $conf;
1231 if ( $conf && $conf->{server} ) {
1232 my $nodes = $conf->{server};
1233 if ( ref($nodes) eq 'ARRAY' ) {
1234 $configuration->{nodes} = $nodes;
1236 else {
1237 $configuration->{nodes} = [$nodes];
1240 else {
1241 Koha::Exceptions::Config::MissingEntry->throw(
1242 "Missing 'server' entry in config file for elasticsearch");
1245 if ( defined $conf->{index_name} ) {
1246 $configuration->{index_name} = $conf->{index_name};
1248 else {
1249 Koha::Exceptions::Config::MissingEntry->throw(
1250 "Missing 'index_name' entry in config file for elasticsearch");
1253 return $configuration;
1256 =head2 get_facetable_fields
1258 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1260 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1262 =cut
1264 sub get_facetable_fields {
1265 my ($self) = @_;
1267 # These should correspond to the ES field names, as opposed to the CCL
1268 # things that zebra uses.
1269 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1270 my @faceted_fields = Koha::SearchFields->search(
1271 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1273 my @not_faceted_fields = Koha::SearchFields->search(
1274 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1276 # This could certainly be improved
1277 return ( @faceted_fields, @not_faceted_fields );
1282 __END__
1284 =head1 AUTHOR
1286 =over 4
1288 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1290 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1292 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1294 =back
1296 =cut