Bug 23204: Sort search fields by name
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blob850a4373d1c3a41d0abfe5c243df07cb83e5b169
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45 use Scalar::Util qw(looks_like_number);
47 __PACKAGE__->mk_ro_accessors(qw( index ));
48 __PACKAGE__->mk_accessors(qw( sort_fields ));
50 # Constants to refer to the standard index names
51 Readonly our $BIBLIOS_INDEX => 'biblios';
52 Readonly our $AUTHORITIES_INDEX => 'authorities';
54 =head1 NAME
56 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58 =head1 ACCESSORS
60 =over 4
62 =item index
64 The name of the index to use, generally 'biblios' or 'authorities'.
66 =back
68 =head1 FUNCTIONS
70 =cut
72 sub new {
73 my $class = shift @_;
74 my $self = $class->SUPER::new(@_);
75 # Check for a valid index
76 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
77 return $self;
80 =head2 get_elasticsearch
82 my $elasticsearch_client = $self->get_elasticsearch();
84 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
85 instance level and will be reused if method is called multiple times.
87 =cut
89 sub get_elasticsearch {
90 my $self = shift @_;
91 unless (defined $self->{elasticsearch}) {
92 my $conf = $self->get_elasticsearch_params();
93 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95 return $self->{elasticsearch};
98 =head2 get_elasticsearch_params
100 my $params = $self->get_elasticsearch_params();
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
106 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107 'index_name' => 'koha_instance_index',
110 This is configured by the following in the C<config> block in koha-conf.xml:
112 <elasticsearch>
113 <server>127.0.0.1:9200</server>
114 <server>anotherserver:9200</server>
115 <index_name>koha_instance</index_name>
116 </elasticsearch>
118 =cut
120 sub get_elasticsearch_params {
121 my ($self) = @_;
123 # Copy the hash so that we're not modifying the original
124 my $conf = C4::Context->config('elasticsearch');
125 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126 my $es = { %{ $conf } };
128 # Helpfully, the multiple server lines end up in an array for us anyway
129 # if there are multiple ones, but not if there's only one.
130 my $server = $es->{server};
131 delete $es->{server};
132 if ( ref($server) eq 'ARRAY' ) {
134 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135 $es->{nodes} = $server;
137 elsif ($server) {
138 $es->{nodes} = [$server];
140 else {
141 die "No elasticsearch servers were specified in koha-conf.xml.\n";
143 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
144 if ( !$es->{index_name} );
145 # Append the name of this particular index to our namespace
146 $es->{index_name} .= '_' . $self->index;
148 $es->{key_prefix} = 'es_';
149 $es->{cxn_pool} //= 'Static';
150 $es->{request_timeout} //= 60;
152 return $es;
155 =head2 get_elasticsearch_settings
157 my $settings = $self->get_elasticsearch_settings();
159 This provides the settings provided to Elasticsearch when an index is created.
160 These can do things like define tokenization methods.
162 A hashref containing the settings is returned.
164 =cut
166 sub get_elasticsearch_settings {
167 my ($self) = @_;
169 # Use state to speed up repeated calls
170 state $settings = undef;
171 if (!defined $settings) {
172 my $config_file = C4::Context->config('elasticsearch_index_config');
173 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
174 $settings = LoadFile( $config_file );
177 return $settings;
180 =head2 get_elasticsearch_mappings
182 my $mappings = $self->get_elasticsearch_mappings();
184 This provides the mappings that get passed to Elasticsearch when an index is
185 created.
187 =cut
189 sub get_elasticsearch_mappings {
190 my ($self) = @_;
192 # Use state to speed up repeated calls
193 state %all_mappings;
194 state %sort_fields;
196 if (!defined $all_mappings{$self->index}) {
197 $sort_fields{$self->index} = {};
198 # Clone the general mapping to break ties with the original hash
199 my $mappings = {
200 data => clone(_get_elasticsearch_field_config('general', ''))
202 my $marcflavour = lc C4::Context->preference('marcflavour');
203 $self->_foreach_mapping(
204 sub {
205 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
206 return if $marc_type ne $marcflavour;
207 # TODO if this gets any sort of complexity to it, it should
208 # be broken out into its own function.
210 # TODO be aware of date formats, but this requires pre-parsing
211 # as ES will simply reject anything with an invalid date.
212 my $es_type = 'text';
213 if ($type eq 'boolean') {
214 $es_type = 'boolean';
215 } elsif ($type eq 'number' || $type eq 'sum') {
216 $es_type = 'integer';
217 } elsif ($type eq 'isbn' || $type eq 'stdno') {
218 $es_type = 'stdno';
221 if ($search) {
222 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
225 if ($facet) {
226 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
228 if ($suggestible) {
229 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231 # Sort is a bit special as it can be true, false, undef.
232 # We care about "true" or "undef",
233 # "undef" means to do the default thing, which is make it sortable.
234 if (!defined $sort || $sort) {
235 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
236 $sort_fields{$self->index}{$name} = 1;
240 $all_mappings{$self->index} = $mappings;
242 $self->sort_fields(\%{$sort_fields{$self->index}});
244 return $all_mappings{$self->index};
247 =head2 raw_elasticsearch_mappings
249 Return elasticsearch mapping as it is in database.
250 marc_type: marc21|unimarc|normarc
252 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
254 =cut
256 sub raw_elasticsearch_mappings {
257 my ( $marc_type ) = @_;
259 my $schema = Koha::Database->new()->schema();
261 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
263 my $mappings = {};
264 while ( my $search_field = $search_fields->next ) {
266 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search( { search_field_id => $search_field->id } );
268 while ( my $marc_to_field = $marc_to_fields->next ) {
269 my $marc_map = Koha::SearchMarcMaps->find( $marc_to_field->search_marc_map_id );
271 next if $marc_type && $marc_map->marc_type ne $marc_type;
273 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
274 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight || undef;
278 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
280 facet => $marc_to_field->facet || '',
281 marc_type => $marc_map->marc_type,
282 marc_field => $marc_map->marc_field,
283 sort => $marc_to_field->sort,
284 suggestible => $marc_to_field->suggestible || ''
290 return $mappings;
293 =head2 _get_elasticsearch_field_config
295 Get the Elasticsearch field config for the given purpose and data type.
297 $mapping = _get_elasticsearch_field_config('search', 'text');
299 =cut
301 sub _get_elasticsearch_field_config {
303 my ( $purpose, $type ) = @_;
305 # Use state to speed up repeated calls
306 state $settings = undef;
307 if (!defined $settings) {
308 my $config_file = C4::Context->config('elasticsearch_field_config');
309 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
310 $settings = LoadFile( $config_file );
313 if (!defined $settings->{$purpose}) {
314 die "Field purpose $purpose not defined in field config";
316 if ($type eq '') {
317 return $settings->{$purpose};
319 if (defined $settings->{$purpose}{$type}) {
320 return $settings->{$purpose}{$type};
322 if (defined $settings->{$purpose}{'default'}) {
323 return $settings->{$purpose}{'default'};
325 return;
328 =head2 _load_elasticsearch_mappings
330 Load Elasticsearch mappings in the format of mappings.yaml.
332 $indexes = _load_elasticsearch_mappings();
334 =cut
336 sub _load_elasticsearch_mappings {
337 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
338 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
339 return LoadFile( $mappings_yaml );
342 sub reset_elasticsearch_mappings {
343 my ( $self ) = @_;
344 my $indexes = $self->_load_elasticsearch_mappings();
346 Koha::SearchMarcMaps->delete;
347 Koha::SearchFields->delete;
349 while ( my ( $index_name, $fields ) = each %$indexes ) {
350 while ( my ( $field_name, $data ) = each %$fields ) {
352 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
354 # Set default values
355 $sf_params{staff_client} //= 1;
356 $sf_params{opac} //= 1;
358 $sf_params{name} = $field_name;
360 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
362 my $mappings = $data->{mappings};
363 for my $mapping ( @$mappings ) {
364 my $marc_field = Koha::SearchMarcMaps->find_or_create({
365 index_name => $index_name,
366 marc_type => $mapping->{marc_type},
367 marc_field => $mapping->{marc_field}
369 $search_field->add_to_search_marc_maps($marc_field, {
370 facet => $mapping->{facet} || 0,
371 suggestible => $mapping->{suggestible} || 0,
372 sort => $mapping->{sort},
373 search => $mapping->{search} // 1
380 # This overrides the accessor provided by Class::Accessor so that if
381 # sort_fields isn't set, then it'll generate it.
382 sub sort_fields {
383 my $self = shift;
384 if (@_) {
385 $self->_sort_fields_accessor(@_);
386 return;
388 my $val = $self->_sort_fields_accessor();
389 return $val if $val;
391 # This will populate the accessor as a side effect
392 $self->get_elasticsearch_mappings();
393 return $self->_sort_fields_accessor();
396 =head2 _process_mappings($mappings, $data, $record_document, $meta)
398 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
400 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
401 Since we group all mappings by MARC field targets C<$mappings> will contain
402 all targets for C<$data> and thus we need to fetch the MARC field only once.
403 C<$mappings> will be applied to C<$record_document> and new field values added.
404 The method has no return value.
406 =over 4
408 =item C<$mappings>
410 Arrayref of mappings containing arrayrefs in the format
411 [C<$target>, C<$options>] where C<$target> is the name of the target field and
412 C<$options> is a hashref containing processing directives for this particular
413 mapping.
415 =item C<$data>
417 The source data from a MARC record field.
419 =item C<$record_document>
421 Hashref representing the Elasticsearch document on which mappings should be
422 applied.
424 =item C<$meta>
426 A hashref containing metadata useful for enforcing per mapping rules. For
427 example for providing extra context for mapping options, or treating mapping
428 targets differently depending on type (sort, search, facet etc). Combining
429 this metadata with the mapping options and metadata allows us to mutate the
430 data per mapping, or even replace it with other data retrieved from the
431 metadata context.
433 Current properties are:
435 C<altscript>: A boolean value indicating whether an alternate script presentation is being
436 processed.
438 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
439 'subfield' or 'subfields_group'.
441 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
443 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
444 is 'subfields_group'.
446 C<field>: The original C<MARC::Record> object.
448 =back
450 =cut
452 sub _process_mappings {
453 my ($_self, $mappings, $data, $record_document, $meta) = @_;
454 foreach my $mapping (@{$mappings}) {
455 my ($target, $options) = @{$mapping};
457 # Don't process sort fields for alternate scripts
458 my $sort = $target =~ /__sort$/;
459 if ($sort && $meta->{altscript}) {
460 next;
463 # Copy (scalar) data since can have multiple targets
464 # with differing options for (possibly) mutating data
465 # so need a different copy for each
466 my $_data = $data;
467 $record_document->{$target} //= [];
468 if (defined $options->{substr}) {
469 my ($start, $length) = @{$options->{substr}};
470 $_data = length($data) > $start ? substr $data, $start, $length : '';
472 if (defined $options->{value_callbacks}) {
473 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
475 if (defined $options->{property}) {
476 $_data = {
477 $options->{property} => $_data
480 if (defined $options->{nonfiling_characters_indicator}) {
481 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
482 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
483 if ($nonfiling_chars) {
484 $_data = substr $_data, $nonfiling_chars;
487 push @{$record_document->{$target}}, $_data;
491 =head2 marc_records_to_documents($marc_records)
493 my $record_documents = $self->marc_records_to_documents($marc_records);
495 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
497 Returns array of hash references, representing Elasticsearch documents,
498 acceptable as body payload in C<Search::Elasticsearch> requests.
500 =over 4
502 =item C<$marc_documents>
504 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
506 =back
508 =cut
510 sub marc_records_to_documents {
511 my ($self, $records) = @_;
512 my $rules = $self->_get_marc_mapping_rules();
513 my $control_fields_rules = $rules->{control_fields};
514 my $data_fields_rules = $rules->{data_fields};
515 my $marcflavour = lc C4::Context->preference('marcflavour');
516 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
518 my @record_documents;
520 foreach my $record (@{$records}) {
521 my $record_document = {};
522 my $mappings = $rules->{leader};
523 if ($mappings) {
524 $self->_process_mappings($mappings, $record->leader(), $record_document, {
525 altscript => 0,
526 data_source => 'leader'
530 foreach my $field ($record->fields()) {
531 if ($field->is_control_field()) {
532 my $mappings = $control_fields_rules->{$field->tag()};
533 if ($mappings) {
534 $self->_process_mappings($mappings, $field->data(), $record_document, {
535 altscript => 0,
536 data_source => 'control_field',
537 field => $field
542 else {
543 my $tag = $field->tag();
544 # Handle alternate scripts in MARC 21
545 my $altscript = 0;
546 if ($marcflavour eq 'marc21' && $tag eq '880') {
547 my $sub6 = $field->subfield('6');
548 if ($sub6 =~ /^(...)-\d+/) {
549 $tag = $1;
550 $altscript = 1;
554 my $data_field_rules = $data_fields_rules->{$tag};
555 if ($data_field_rules) {
556 my $subfields_mappings = $data_field_rules->{subfields};
557 my $wildcard_mappings = $subfields_mappings->{'*'};
558 foreach my $subfield ($field->subfields()) {
559 my ($code, $data) = @{$subfield};
560 my $mappings = $subfields_mappings->{$code} // [];
561 if ($wildcard_mappings) {
562 $mappings = [@{$mappings}, @{$wildcard_mappings}];
564 if (@{$mappings}) {
565 $self->_process_mappings($mappings, $data, $record_document, {
566 altscript => $altscript,
567 data_source => 'subfield',
568 code => $code,
569 field => $field
573 if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
574 # Used by the authority linker the match-heading field requires a specific syntax
575 # that is specified in C4/Heading
576 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
577 next unless $heading;
578 push @{$record_document->{'match-heading'}}, $heading->search_form;
582 my $subfields_join_mappings = $data_field_rules->{subfields_join};
583 if ($subfields_join_mappings) {
584 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
585 # Map each subfield to values, remove empty values, join with space
586 my $data = join(
587 ' ',
588 grep(
590 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
593 if ($data) {
594 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
595 altscript => $altscript,
596 data_source => 'subfields_group',
597 codes => $subfields_group,
598 field => $field
602 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
603 # Used by the authority linker the match-heading field requires a specific syntax
604 # that is specified in C4/Heading
605 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
606 next unless $heading;
607 push @{$record_document->{'match-heading'}}, $heading->search_form;
614 foreach my $field (keys %{$rules->{defaults}}) {
615 unless (defined $record_document->{$field}) {
616 $record_document->{$field} = $rules->{defaults}->{$field};
619 foreach my $field (@{$rules->{sum}}) {
620 if (defined $record_document->{$field}) {
621 # TODO: validate numeric? filter?
622 # TODO: Or should only accept fields without nested values?
623 # TODO: Quick and dirty, improve if needed
624 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
627 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
628 foreach my $field (@{$rules->{isbn}}) {
629 if (defined $record_document->{$field}) {
630 my @isbns = ();
631 foreach my $input_isbn (@{$record_document->{$field}}) {
632 my $isbn = Business::ISBN->new($input_isbn);
633 if (defined $isbn && $isbn->is_valid) {
634 my $isbn13 = $isbn->as_isbn13->as_string;
635 push @isbns, $isbn13;
636 $isbn13 =~ s/\-//g;
637 push @isbns, $isbn13;
639 my $isbn10 = $isbn->as_isbn10;
640 if ($isbn10) {
641 $isbn10 = $isbn10->as_string;
642 push @isbns, $isbn10;
643 $isbn10 =~ s/\-//g;
644 push @isbns, $isbn10;
646 } else {
647 push @isbns, $input_isbn;
650 $record_document->{$field} = \@isbns;
654 # Remove duplicate values and collapse sort fields
655 foreach my $field (keys %{$record_document}) {
656 if (ref($record_document->{$field}) eq 'ARRAY') {
657 @{$record_document->{$field}} = do {
658 my %seen;
659 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
661 if ($field =~ /__sort$/) {
662 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
663 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
668 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
669 $record->encoding('UTF-8');
670 if ($use_array) {
671 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
672 $record_document->{'marc_format'} = 'ARRAY';
673 } else {
674 my @warnings;
676 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
677 local $SIG{__WARN__} = sub {
678 push @warnings, $_[0];
680 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
682 if (@warnings) {
683 # Suppress warnings if record length exceeded
684 unless (substr($record->leader(), 0, 5) eq '99999') {
685 foreach my $warning (@warnings) {
686 carp $warning;
689 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
690 $record_document->{'marc_format'} = 'MARCXML';
692 else {
693 $record_document->{'marc_format'} = 'base64ISO2709';
696 push @record_documents, $record_document;
698 return \@record_documents;
701 =head2 _marc_to_array($record)
703 my @fields = _marc_to_array($record)
705 Convert a MARC::Record to an array modeled after MARC-in-JSON
706 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
708 =over 4
710 =item C<$record>
712 A MARC::Record object
714 =back
716 =cut
718 sub _marc_to_array {
719 my ($self, $record) = @_;
721 my $data = {
722 leader => $record->leader(),
723 fields => []
725 for my $field ($record->fields()) {
726 my $tag = $field->tag();
727 if ($field->is_control_field()) {
728 push @{$data->{fields}}, {$tag => $field->data()};
729 } else {
730 my $subfields = ();
731 foreach my $subfield ($field->subfields()) {
732 my ($code, $contents) = @{$subfield};
733 push @{$subfields}, {$code => $contents};
735 push @{$data->{fields}}, {
736 $tag => {
737 ind1 => $field->indicator(1),
738 ind2 => $field->indicator(2),
739 subfields => $subfields
744 return $data;
747 =head2 _array_to_marc($data)
749 my $record = _array_to_marc($data)
751 Convert an array modeled after MARC-in-JSON to a MARC::Record
753 =over 4
755 =item C<$data>
757 An array modeled after MARC-in-JSON
758 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
760 =back
762 =cut
764 sub _array_to_marc {
765 my ($self, $data) = @_;
767 my $record = MARC::Record->new();
769 $record->leader($data->{leader});
770 for my $field (@{$data->{fields}}) {
771 my $tag = (keys %{$field})[0];
772 $field = $field->{$tag};
773 my $marc_field;
774 if (ref($field) eq 'HASH') {
775 my @subfields;
776 foreach my $subfield (@{$field->{subfields}}) {
777 my $code = (keys %{$subfield})[0];
778 push @subfields, $code;
779 push @subfields, $subfield->{$code};
781 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
782 } else {
783 $marc_field = MARC::Field->new($tag, $field)
785 $record->append_fields($marc_field);
788 return $record;
791 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
793 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
795 Get mappings, an internal data structure later used by
796 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
797 data for a MARC mapping.
799 The returned C<$mappings> is not to to be confused with mappings provided by
800 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
801 provided by C<_foreach_mapping> and expands it to this internal data structure.
802 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
803 is then applied to each MARC target (leader, control field data, subfield or
804 joined subfields) and integrated into the mapping rules data structure used in
805 C<marc_records_to_documents> to transform MARC records into Elasticsearch
806 documents.
808 =over 4
810 =item C<$facet>
812 Boolean indicating whether to create a facet field for this mapping.
814 =item C<$suggestible>
816 Boolean indicating whether to create a suggestion field for this mapping.
818 =item C<$sort>
820 Boolean indicating whether to create a sort field for this mapping.
822 =item C<$search>
824 Boolean indicating whether to create a search field for this mapping.
826 =item C<$target_name>
828 Elasticsearch document target field name.
830 =item C<$target_type>
832 Elasticsearch document target field type.
834 =item C<$range>
836 An optional range as a string in the format "<START>-<END>" or "<START>",
837 where "<START>" and "<END>" are integers specifying a range that will be used
838 for extracting a substring from MARC data as Elasticsearch field target value.
840 The first character position is "0", and the range is inclusive,
841 so "0-2" means the first three characters of MARC data.
843 If only "<START>" is provided only one character at position "<START>" will
844 be extracted.
846 =back
848 =cut
850 sub _field_mappings {
851 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
852 my %mapping_defaults = ();
853 my @mappings;
855 my $substr_args = undef;
856 if (defined $range) {
857 # TODO: use value_callback instead?
858 my ($start, $end) = map(int, split /-/, $range, 2);
859 $substr_args = [$start];
860 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
862 my $default_options = {};
863 if ($substr_args) {
864 $default_options->{substr} = $substr_args;
867 # TODO: Should probably have per type value callback/hook
868 # but hard code for now
869 if ($target_type eq 'boolean') {
870 $default_options->{value_callbacks} //= [];
871 push @{$default_options->{value_callbacks}}, sub {
872 my ($value) = @_;
873 # Trim whitespace at both ends
874 $value =~ s/^\s+|\s+$//g;
875 return $value ? 'true' : 'false';
879 if ($search) {
880 my $mapping = [$target_name, $default_options];
881 push @mappings, $mapping;
884 my @suffixes = ();
885 push @suffixes, 'facet' if $facet;
886 push @suffixes, 'suggestion' if $suggestible;
887 push @suffixes, 'sort' if !defined $sort || $sort;
889 foreach my $suffix (@suffixes) {
890 my $mapping = ["${target_name}__$suffix"];
891 # TODO: Hack, fix later in less hideous manner
892 if ($suffix eq 'suggestion') {
893 push @{$mapping}, {%{$default_options}, property => 'input'};
895 else {
896 # Important! Make shallow clone, or we end up with the same hashref
897 # shared by all mappings
898 push @{$mapping}, {%{$default_options}};
900 push @mappings, $mapping;
902 return @mappings;
905 =head2 _get_marc_mapping_rules
907 my $mapping_rules = $self->_get_marc_mapping_rules()
909 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
911 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
912 each call to C<MARC::Record>->field) we create an optimized structure of mapping
913 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
915 We can then iterate through all MARC fields for each record and apply all relevant
916 rules once per fields instead of retreiving fields multiple times for each mapping rule
917 which is terribly slow.
919 =cut
921 # TODO: This structure can be used for processing multiple MARC::Records so is currently
922 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
923 # memory cache which it is currently not. The performance gain of caching
924 # would probably be marginal, but to do this could be a further improvement.
926 sub _get_marc_mapping_rules {
927 my ($self) = @_;
928 my $marcflavour = lc C4::Context->preference('marcflavour');
929 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
930 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
931 my $rules = {
932 'leader' => [],
933 'control_fields' => {},
934 'data_fields' => {},
935 'sum' => [],
936 'isbn' => [],
937 'defaults' => {}
940 $self->_foreach_mapping(sub {
941 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
942 return if $marc_type ne $marcflavour;
944 if ($type eq 'sum') {
945 push @{$rules->{sum}}, $name;
946 push @{$rules->{sum}}, $name."__sort" if $sort;
948 elsif ($type eq 'isbn') {
949 push @{$rules->{isbn}}, $name;
951 elsif ($type eq 'boolean') {
952 # boolean gets special handling, if value doesn't exist for a field,
953 # it is set to false
954 $rules->{defaults}->{$name} = 'false';
957 if ($marc_field =~ $field_spec_regexp) {
958 my $field_tag = $1;
960 my @subfields;
961 my @subfield_groups;
962 # Parse and separate subfields form subfield groups
963 if (defined $2) {
964 my $subfield_group = '';
965 my $open_group = 0;
967 foreach my $token (split //, $2) {
968 if ($token eq "(") {
969 if ($open_group) {
970 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
971 "Unmatched opening parenthesis for $marc_field"
974 else {
975 $open_group = 1;
978 elsif ($token eq ")") {
979 if ($open_group) {
980 if ($subfield_group) {
981 push @subfield_groups, $subfield_group;
982 $subfield_group = '';
984 $open_group = 0;
986 else {
987 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
988 "Unmatched closing parenthesis for $marc_field"
992 elsif ($open_group) {
993 $subfield_group .= $token;
995 else {
996 push @subfields, $token;
1000 else {
1001 push @subfields, '*';
1004 my $range = defined $3 ? $3 : undef;
1005 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1006 if ($field_tag < 10) {
1007 $rules->{control_fields}->{$field_tag} //= [];
1008 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1010 else {
1011 $rules->{data_fields}->{$field_tag} //= {};
1012 foreach my $subfield (@subfields) {
1013 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1014 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1016 foreach my $subfield_group (@subfield_groups) {
1017 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1018 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1022 elsif ($marc_field =~ $leader_regexp) {
1023 my $range = defined $1 ? $1 : undef;
1024 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1025 push @{$rules->{leader}}, @mappings;
1027 else {
1028 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1029 "Invalid MARC field expression: $marc_field"
1034 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1035 if ($marcflavour eq 'marc21') {
1036 # Nonfiling characters processing for sort fields
1037 my %title_fields;
1038 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1039 # Format is: nonfiling characters indicator => field names list
1040 %title_fields = (
1041 1 => [130, 630, 730, 740],
1042 2 => [222, 240, 242, 243, 245, 440, 830]
1045 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1046 %title_fields = (
1047 1 => [730],
1048 2 => [130, 430, 530]
1051 foreach my $indicator (keys %title_fields) {
1052 foreach my $field_tag (@{$title_fields{$indicator}}) {
1053 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1054 foreach my $mapping (@{$mappings}) {
1055 if ($mapping->[0] =~ /__sort$/) {
1056 # Mark this as to be processed for nonfiling characters indicator
1057 # later on in _process_mappings
1058 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1065 return $rules;
1068 =head2 _foreach_mapping
1070 $self->_foreach_mapping(
1071 sub {
1072 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1073 $marc_field )
1074 = @_;
1075 return unless $marc_type eq 'marc21';
1076 print "Data comes from: " . $marc_field . "\n";
1080 This allows you to apply a function to each entry in the elasticsearch mappings
1081 table, in order to build the mappings for whatever is needed.
1083 In the provided function, the files are:
1085 =over 4
1087 =item C<$name>
1089 The field name for elasticsearch (corresponds to the 'mapping' column in the
1090 database.
1092 =item C<$type>
1094 The type for this value, e.g. 'string'.
1096 =item C<$facet>
1098 True if this value should be facetised. This only really makes sense if the
1099 field is understood by the facet processing code anyway.
1101 =item C<$sort>
1103 True if this is a field that a) needs special sort handling, and b) if it
1104 should be sorted on. False if a) but not b). Undef if not a). This allows,
1105 for example, author to be sorted on but not everything marked with "author"
1106 to be included in that sort.
1108 =item C<$marc_type>
1110 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1111 'unimarc', 'normarc'.
1113 =item C<$marc_field>
1115 A string that describes the MARC field that contains the data to extract.
1116 These are of a form suited to Catmandu's MARC fixers.
1118 =back
1120 =cut
1122 sub _foreach_mapping {
1123 my ( $self, $sub ) = @_;
1125 # TODO use a caching framework here
1126 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1128 'search_marc_map.index_name' => $self->index,
1130 { join => { search_marc_to_fields => 'search_marc_map' },
1131 '+select' => [
1132 'search_marc_to_fields.facet',
1133 'search_marc_to_fields.suggestible',
1134 'search_marc_to_fields.sort',
1135 'search_marc_to_fields.search',
1136 'search_marc_map.marc_type',
1137 'search_marc_map.marc_field',
1139 '+as' => [
1140 'facet',
1141 'suggestible',
1142 'sort',
1143 'search',
1144 'marc_type',
1145 'marc_field',
1150 while ( my $search_field = $search_fields->next ) {
1151 $sub->(
1152 # Force lower case on indexed field names for case insensitive
1153 # field name searches
1154 lc($search_field->name),
1155 $search_field->type,
1156 $search_field->get_column('facet'),
1157 $search_field->get_column('suggestible'),
1158 $search_field->get_column('sort'),
1159 $search_field->get_column('search'),
1160 $search_field->get_column('marc_type'),
1161 $search_field->get_column('marc_field'),
1166 =head2 process_error
1168 die process_error($@);
1170 This parses an Elasticsearch error message and produces a human-readable
1171 result from it. This result is probably missing all the useful information
1172 that you might want in diagnosing an issue, so the warning is also logged.
1174 Note that currently the resulting message is not internationalised. This
1175 will happen eventually by some method or other.
1177 =cut
1179 sub process_error {
1180 my ($self, $msg) = @_;
1182 warn $msg; # simple logging
1184 # This is super-primitive
1185 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1187 return "Unable to perform your search. Please try again.\n";
1190 =head2 _read_configuration
1192 my $conf = _read_configuration();
1194 Reads the I<configuration file> and returns a hash structure with the
1195 configuration information. It raises an exception if mandatory entries
1196 are missing.
1198 The hashref structure has the following form:
1201 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1202 'index_name' => 'koha_instance',
1205 This is configured by the following in the C<config> block in koha-conf.xml:
1207 <elasticsearch>
1208 <server>127.0.0.1:9200</server>
1209 <server>anotherserver:9200</server>
1210 <index_name>koha_instance</index_name>
1211 </elasticsearch>
1213 =cut
1215 sub _read_configuration {
1217 my $configuration;
1219 my $conf = C4::Context->config('elasticsearch');
1220 Koha::Exceptions::Config::MissingEntry->throw(
1221 "Missing 'elasticsearch' block in config file")
1222 unless defined $conf;
1224 if ( $conf && $conf->{server} ) {
1225 my $nodes = $conf->{server};
1226 if ( ref($nodes) eq 'ARRAY' ) {
1227 $configuration->{nodes} = $nodes;
1229 else {
1230 $configuration->{nodes} = [$nodes];
1233 else {
1234 Koha::Exceptions::Config::MissingEntry->throw(
1235 "Missing 'server' entry in config file for elasticsearch");
1238 if ( defined $conf->{index_name} ) {
1239 $configuration->{index_name} = $conf->{index_name};
1241 else {
1242 Koha::Exceptions::Config::MissingEntry->throw(
1243 "Missing 'index_name' entry in config file for elasticsearch");
1246 return $configuration;
1249 =head2 get_facetable_fields
1251 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1253 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1255 =cut
1257 sub get_facetable_fields {
1258 my ($self) = @_;
1260 # These should correspond to the ES field names, as opposed to the CCL
1261 # things that zebra uses.
1262 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1263 my @faceted_fields = Koha::SearchFields->search(
1264 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1266 my @not_faceted_fields = Koha::SearchFields->search(
1267 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1269 # This could certainly be improved
1270 return ( @faceted_fields, @not_faceted_fields );
1275 __END__
1277 =head1 AUTHOR
1279 =over 4
1281 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1283 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1285 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1287 =back
1289 =cut