Bug 24807: (follow-up) Add support for spaces as unknown characters
[koha.git] / Koha / SearchEngine / Elasticsearch.pm
blob1df589c07c878de046c4448538661d368c2e69a5
1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
22 use C4::Context;
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::Syck;
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
56 =head1 NAME
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
60 =head1 ACCESSORS
62 =over 4
64 =item index
66 The name of the index to use, generally 'biblios' or 'authorities'.
68 =item index_name
70 The Elasticsearch index name with Koha instance prefix.
72 =back
75 =head1 FUNCTIONS
77 =cut
79 sub new {
80 my $class = shift @_;
81 my ($params) = @_;
83 # Check for a valid index
84 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85 my $config = _read_configuration();
86 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
88 my $self = $class->SUPER::new(@_);
89 return $self;
92 =head2 get_elasticsearch
94 my $elasticsearch_client = $self->get_elasticsearch();
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
99 =cut
101 sub get_elasticsearch {
102 my $self = shift @_;
103 unless (defined $self->{elasticsearch}) {
104 $self->{elasticsearch} = Search::Elasticsearch->new(
105 $self->get_elasticsearch_params()
108 return $self->{elasticsearch};
111 =head2 get_elasticsearch_params
113 my $params = $self->get_elasticsearch_params();
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
119 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120 'index_name' => 'koha_instance_index',
123 This is configured by the following in the C<config> block in koha-conf.xml:
125 <elasticsearch>
126 <server>127.0.0.1:9200</server>
127 <server>anotherserver:9200</server>
128 <index_name>koha_instance</index_name>
129 </elasticsearch>
131 =cut
133 sub get_elasticsearch_params {
134 my ($self) = @_;
136 my $conf;
137 try {
138 $conf = _read_configuration();
139 } catch {
140 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141 croak($_->message);
145 return $conf
148 =head2 get_elasticsearch_settings
150 my $settings = $self->get_elasticsearch_settings();
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
155 A hashref containing the settings is returned.
157 =cut
159 sub get_elasticsearch_settings {
160 my ($self) = @_;
162 # Use state to speed up repeated calls
163 state $settings = undef;
164 if (!defined $settings) {
165 my $config_file = C4::Context->config('elasticsearch_index_config');
166 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167 $settings = LoadFile( $config_file );
170 return $settings;
173 =head2 get_elasticsearch_mappings
175 my $mappings = $self->get_elasticsearch_mappings();
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
180 =cut
182 sub get_elasticsearch_mappings {
183 my ($self) = @_;
185 # Use state to speed up repeated calls
186 state %all_mappings;
187 state %sort_fields;
189 if (!defined $all_mappings{$self->index}) {
190 $sort_fields{$self->index} = {};
191 # Clone the general mapping to break ties with the original hash
192 my $mappings = {
193 data => clone(_get_elasticsearch_field_config('general', ''))
195 my $marcflavour = lc C4::Context->preference('marcflavour');
196 $self->_foreach_mapping(
197 sub {
198 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199 return if $marc_type ne $marcflavour;
200 # TODO if this gets any sort of complexity to it, it should
201 # be broken out into its own function.
203 # TODO be aware of date formats, but this requires pre-parsing
204 # as ES will simply reject anything with an invalid date.
205 my $es_type = 'text';
206 if ($type eq 'boolean') {
207 $es_type = 'boolean';
208 } elsif ($type eq 'number' || $type eq 'sum') {
209 $es_type = 'integer';
210 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 $es_type = 'stdno';
212 } elsif ($type eq 'year') {
213 $es_type = 'year';
216 if ($search) {
217 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
220 if ($facet) {
221 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
223 if ($suggestible) {
224 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
226 # Sort is a bit special as it can be true, false, undef.
227 # We care about "true" or "undef",
228 # "undef" means to do the default thing, which is make it sortable.
229 if (!defined $sort || $sort) {
230 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231 $sort_fields{$self->index}{$name} = 1;
235 $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236 $all_mappings{$self->index} = $mappings;
238 $self->sort_fields(\%{$sort_fields{$self->index}});
239 return $all_mappings{$self->index};
242 =head2 raw_elasticsearch_mappings
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
249 =cut
251 sub raw_elasticsearch_mappings {
252 my ( $marc_type ) = @_;
254 my $schema = Koha::Database->new()->schema();
256 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
258 my $mappings = {};
259 while ( my $search_field = $search_fields->next ) {
261 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262 { search_field_id => $search_field->id },
264 join => 'search_marc_map',
265 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
269 while ( my $marc_to_field = $marc_to_fields->next ) {
271 my $marc_map = $marc_to_field->search_marc_map;
273 next if $marc_type && $marc_map->marc_type ne $marc_type;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284 facet => $marc_to_field->facet || '',
285 marc_type => $marc_map->marc_type,
286 marc_field => $marc_map->marc_field,
287 sort => $marc_to_field->sort,
288 suggestible => $marc_to_field->suggestible || ''
294 return $mappings;
297 =head2 _get_elasticsearch_field_config
299 Get the Elasticsearch field config for the given purpose and data type.
301 $mapping = _get_elasticsearch_field_config('search', 'text');
303 =cut
305 sub _get_elasticsearch_field_config {
307 my ( $purpose, $type ) = @_;
309 # Use state to speed up repeated calls
310 state $settings = undef;
311 if (!defined $settings) {
312 my $config_file = C4::Context->config('elasticsearch_field_config');
313 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314 $settings = LoadFile( $config_file );
317 if (!defined $settings->{$purpose}) {
318 die "Field purpose $purpose not defined in field config";
320 if ($type eq '') {
321 return $settings->{$purpose};
323 if (defined $settings->{$purpose}{$type}) {
324 return $settings->{$purpose}{$type};
326 if (defined $settings->{$purpose}{'default'}) {
327 return $settings->{$purpose}{'default'};
329 return;
332 =head2 _load_elasticsearch_mappings
334 Load Elasticsearch mappings in the format of mappings.yaml.
336 $indexes = _load_elasticsearch_mappings();
338 =cut
340 sub _load_elasticsearch_mappings {
341 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
342 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
343 return LoadFile( $mappings_yaml );
346 sub reset_elasticsearch_mappings {
347 my ( $self ) = @_;
348 my $indexes = $self->_load_elasticsearch_mappings();
350 Koha::SearchMarcMaps->delete;
351 Koha::SearchFields->delete;
353 while ( my ( $index_name, $fields ) = each %$indexes ) {
354 while ( my ( $field_name, $data ) = each %$fields ) {
356 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
358 # Set default values
359 $sf_params{staff_client} //= 1;
360 $sf_params{opac} //= 1;
362 $sf_params{name} = $field_name;
364 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
366 my $mappings = $data->{mappings};
367 for my $mapping ( @$mappings ) {
368 my $marc_field = Koha::SearchMarcMaps->find_or_create({
369 index_name => $index_name,
370 marc_type => $mapping->{marc_type},
371 marc_field => $mapping->{marc_field}
373 $search_field->add_to_search_marc_maps($marc_field, {
374 facet => $mapping->{facet} || 0,
375 suggestible => $mapping->{suggestible} || 0,
376 sort => $mapping->{sort},
377 search => $mapping->{search} // 1
383 $self->clear_search_fields_cache();
385 # FIXME return the mappings?
388 # This overrides the accessor provided by Class::Accessor so that if
389 # sort_fields isn't set, then it'll generate it.
390 sub sort_fields {
391 my $self = shift;
392 if (@_) {
393 $self->_sort_fields_accessor(@_);
394 return;
396 my $val = $self->_sort_fields_accessor();
397 return $val if $val;
399 # This will populate the accessor as a side effect
400 $self->get_elasticsearch_mappings();
401 return $self->_sort_fields_accessor();
404 =head2 _process_mappings($mappings, $data, $record_document, $meta)
406 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
408 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
409 Since we group all mappings by MARC field targets C<$mappings> will contain
410 all targets for C<$data> and thus we need to fetch the MARC field only once.
411 C<$mappings> will be applied to C<$record_document> and new field values added.
412 The method has no return value.
414 =over 4
416 =item C<$mappings>
418 Arrayref of mappings containing arrayrefs in the format
419 [C<$target>, C<$options>] where C<$target> is the name of the target field and
420 C<$options> is a hashref containing processing directives for this particular
421 mapping.
423 =item C<$data>
425 The source data from a MARC record field.
427 =item C<$record_document>
429 Hashref representing the Elasticsearch document on which mappings should be
430 applied.
432 =item C<$meta>
434 A hashref containing metadata useful for enforcing per mapping rules. For
435 example for providing extra context for mapping options, or treating mapping
436 targets differently depending on type (sort, search, facet etc). Combining
437 this metadata with the mapping options and metadata allows us to mutate the
438 data per mapping, or even replace it with other data retrieved from the
439 metadata context.
441 Current properties are:
443 C<altscript>: A boolean value indicating whether an alternate script presentation is being
444 processed.
446 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
447 'subfield' or 'subfields_group'.
449 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
451 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
452 is 'subfields_group'.
454 C<field>: The original C<MARC::Record> object.
456 =back
458 =cut
460 sub _process_mappings {
461 my ($_self, $mappings, $data, $record_document, $meta) = @_;
462 foreach my $mapping (@{$mappings}) {
463 my ($target, $options) = @{$mapping};
465 # Don't process sort fields for alternate scripts
466 my $sort = $target =~ /__sort$/;
467 if ($sort && $meta->{altscript}) {
468 next;
471 # Copy (scalar) data since can have multiple targets
472 # with differing options for (possibly) mutating data
473 # so need a different copy for each
474 my $data_copy = $data;
475 if (defined $options->{substr}) {
476 my ($start, $length) = @{$options->{substr}};
477 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
480 # Add data to values array for callbacks processing
481 my $values = [$data_copy];
483 # Value callbacks takes subfield data (or values from previous
484 # callbacks) as argument, and returns a possibly different list of values.
485 # Note that the returned list may also be empty.
486 if (defined $options->{value_callbacks}) {
487 foreach my $callback (@{$options->{value_callbacks}}) {
488 # Pass each value to current callback which returns a list
489 # (scalar is fine too) resulting either in a list or
490 # a list of lists that will be flattened by perl.
491 # The next callback will receive the possibly expanded list of values.
492 $values = [ map { $callback->($_) } @{$values} ];
496 # Skip mapping if all values has been removed
497 next unless @{$values};
499 if (defined $options->{property}) {
500 $values = [ map { { $options->{property} => $_ } } @{$values} ];
502 if (defined $options->{nonfiling_characters_indicator}) {
503 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
504 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
505 # Nonfiling chars does not make sense for multiple values
506 # Only apply on first element
507 $values->[0] = substr $values->[0], $nonfiling_chars;
510 $record_document->{$target} //= [];
511 push @{$record_document->{$target}}, @{$values};
515 =head2 marc_records_to_documents($marc_records)
517 my $record_documents = $self->marc_records_to_documents($marc_records);
519 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
521 Returns array of hash references, representing Elasticsearch documents,
522 acceptable as body payload in C<Search::Elasticsearch> requests.
524 =over 4
526 =item C<$marc_documents>
528 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
530 =back
532 =cut
534 sub marc_records_to_documents {
535 my ($self, $records) = @_;
536 my $rules = $self->_get_marc_mapping_rules();
537 my $control_fields_rules = $rules->{control_fields};
538 my $data_fields_rules = $rules->{data_fields};
539 my $marcflavour = lc C4::Context->preference('marcflavour');
540 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
542 my @record_documents;
544 my %auth_match_headings;
545 if( $self->index eq 'authorities' ){
546 my @auth_types = Koha::Authority::Types->search();
547 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
550 foreach my $record (@{$records}) {
551 my $record_document = {};
553 if ( $self->index eq 'authorities' ){
554 my $authtypecode = GuessAuthTypeCode( $record );
555 if( $authtypecode ){
556 my $field = $record->field( $auth_match_headings{ $authtypecode } );
557 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
558 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
559 } else {
560 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
564 my $mappings = $rules->{leader};
565 if ($mappings) {
566 $self->_process_mappings($mappings, $record->leader(), $record_document, {
567 altscript => 0,
568 data_source => 'leader'
572 foreach my $field ($record->fields()) {
573 if ($field->is_control_field()) {
574 my $mappings = $control_fields_rules->{$field->tag()};
575 if ($mappings) {
576 $self->_process_mappings($mappings, $field->data(), $record_document, {
577 altscript => 0,
578 data_source => 'control_field',
579 field => $field
584 else {
585 my $tag = $field->tag();
586 # Handle alternate scripts in MARC 21
587 my $altscript = 0;
588 if ($marcflavour eq 'marc21' && $tag eq '880') {
589 my $sub6 = $field->subfield('6');
590 if ($sub6 =~ /^(...)-\d+/) {
591 $tag = $1;
592 $altscript = 1;
596 my $data_field_rules = $data_fields_rules->{$tag};
597 if ($data_field_rules) {
598 my $subfields_mappings = $data_field_rules->{subfields};
599 my $wildcard_mappings = $subfields_mappings->{'*'};
600 foreach my $subfield ($field->subfields()) {
601 my ($code, $data) = @{$subfield};
602 my $mappings = $subfields_mappings->{$code} // [];
603 if ($wildcard_mappings) {
604 $mappings = [@{$mappings}, @{$wildcard_mappings}];
606 if (@{$mappings}) {
607 $self->_process_mappings($mappings, $data, $record_document, {
608 altscript => $altscript,
609 data_source => 'subfield',
610 code => $code,
611 field => $field
617 my $subfields_join_mappings = $data_field_rules->{subfields_join};
618 if ($subfields_join_mappings) {
619 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
620 # Map each subfield to values, remove empty values, join with space
621 my $data = join(
622 ' ',
623 grep(
625 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
628 if ($data) {
629 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
630 altscript => $altscript,
631 data_source => 'subfields_group',
632 codes => $subfields_group,
633 field => $field
642 foreach my $field (keys %{$rules->{defaults}}) {
643 unless (defined $record_document->{$field}) {
644 $record_document->{$field} = $rules->{defaults}->{$field};
647 foreach my $field (@{$rules->{sum}}) {
648 if (defined $record_document->{$field}) {
649 # TODO: validate numeric? filter?
650 # TODO: Or should only accept fields without nested values?
651 # TODO: Quick and dirty, improve if needed
652 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
655 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
656 foreach my $field (@{$rules->{isbn}}) {
657 if (defined $record_document->{$field}) {
658 my @isbns = ();
659 foreach my $input_isbn (@{$record_document->{$field}}) {
660 my $isbn = Business::ISBN->new($input_isbn);
661 if (defined $isbn && $isbn->is_valid) {
662 my $isbn13 = $isbn->as_isbn13->as_string;
663 push @isbns, $isbn13;
664 $isbn13 =~ s/\-//g;
665 push @isbns, $isbn13;
667 my $isbn10 = $isbn->as_isbn10;
668 if ($isbn10) {
669 $isbn10 = $isbn10->as_string;
670 push @isbns, $isbn10;
671 $isbn10 =~ s/\-//g;
672 push @isbns, $isbn10;
674 } else {
675 push @isbns, $input_isbn;
678 $record_document->{$field} = \@isbns;
682 # Remove duplicate values and collapse sort fields
683 foreach my $field (keys %{$record_document}) {
684 if (ref($record_document->{$field}) eq 'ARRAY') {
685 @{$record_document->{$field}} = do {
686 my %seen;
687 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
689 if ($field =~ /__sort$/) {
690 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
691 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
696 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
697 $record->encoding('UTF-8');
698 if ($use_array) {
699 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
700 $record_document->{'marc_format'} = 'ARRAY';
701 } else {
702 my @warnings;
704 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
705 local $SIG{__WARN__} = sub {
706 push @warnings, $_[0];
708 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
710 if (@warnings) {
711 # Suppress warnings if record length exceeded
712 unless (substr($record->leader(), 0, 5) eq '99999') {
713 foreach my $warning (@warnings) {
714 carp $warning;
717 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
718 $record_document->{'marc_format'} = 'MARCXML';
720 else {
721 $record_document->{'marc_format'} = 'base64ISO2709';
724 push @record_documents, $record_document;
726 return \@record_documents;
729 =head2 _marc_to_array($record)
731 my @fields = _marc_to_array($record)
733 Convert a MARC::Record to an array modeled after MARC-in-JSON
734 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
736 =over 4
738 =item C<$record>
740 A MARC::Record object
742 =back
744 =cut
746 sub _marc_to_array {
747 my ($self, $record) = @_;
749 my $data = {
750 leader => $record->leader(),
751 fields => []
753 for my $field ($record->fields()) {
754 my $tag = $field->tag();
755 if ($field->is_control_field()) {
756 push @{$data->{fields}}, {$tag => $field->data()};
757 } else {
758 my $subfields = ();
759 foreach my $subfield ($field->subfields()) {
760 my ($code, $contents) = @{$subfield};
761 push @{$subfields}, {$code => $contents};
763 push @{$data->{fields}}, {
764 $tag => {
765 ind1 => $field->indicator(1),
766 ind2 => $field->indicator(2),
767 subfields => $subfields
772 return $data;
775 =head2 _array_to_marc($data)
777 my $record = _array_to_marc($data)
779 Convert an array modeled after MARC-in-JSON to a MARC::Record
781 =over 4
783 =item C<$data>
785 An array modeled after MARC-in-JSON
786 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
788 =back
790 =cut
792 sub _array_to_marc {
793 my ($self, $data) = @_;
795 my $record = MARC::Record->new();
797 $record->leader($data->{leader});
798 for my $field (@{$data->{fields}}) {
799 my $tag = (keys %{$field})[0];
800 $field = $field->{$tag};
801 my $marc_field;
802 if (ref($field) eq 'HASH') {
803 my @subfields;
804 foreach my $subfield (@{$field->{subfields}}) {
805 my $code = (keys %{$subfield})[0];
806 push @subfields, $code;
807 push @subfields, $subfield->{$code};
809 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
810 } else {
811 $marc_field = MARC::Field->new($tag, $field)
813 $record->append_fields($marc_field);
816 return $record;
819 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
821 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
823 Get mappings, an internal data structure later used by
824 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
825 data for a MARC mapping.
827 The returned C<$mappings> is not to to be confused with mappings provided by
828 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
829 provided by C<_foreach_mapping> and expands it to this internal data structure.
830 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
831 is then applied to each MARC target (leader, control field data, subfield or
832 joined subfields) and integrated into the mapping rules data structure used in
833 C<marc_records_to_documents> to transform MARC records into Elasticsearch
834 documents.
836 =over 4
838 =item C<$facet>
840 Boolean indicating whether to create a facet field for this mapping.
842 =item C<$suggestible>
844 Boolean indicating whether to create a suggestion field for this mapping.
846 =item C<$sort>
848 Boolean indicating whether to create a sort field for this mapping.
850 =item C<$search>
852 Boolean indicating whether to create a search field for this mapping.
854 =item C<$target_name>
856 Elasticsearch document target field name.
858 =item C<$target_type>
860 Elasticsearch document target field type.
862 =item C<$range>
864 An optional range as a string in the format "<START>-<END>" or "<START>",
865 where "<START>" and "<END>" are integers specifying a range that will be used
866 for extracting a substring from MARC data as Elasticsearch field target value.
868 The first character position is "0", and the range is inclusive,
869 so "0-2" means the first three characters of MARC data.
871 If only "<START>" is provided only one character at position "<START>" will
872 be extracted.
874 =back
876 =cut
878 sub _field_mappings {
879 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
880 my %mapping_defaults = ();
881 my @mappings;
883 my $substr_args = undef;
884 if (defined $range) {
885 # TODO: use value_callback instead?
886 my ($start, $end) = map(int, split /-/, $range, 2);
887 $substr_args = [$start];
888 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
890 my $default_options = {};
891 if ($substr_args) {
892 $default_options->{substr} = $substr_args;
895 # TODO: Should probably have per type value callback/hook
896 # but hard code for now
897 if ($target_type eq 'boolean') {
898 $default_options->{value_callbacks} //= [];
899 push @{$default_options->{value_callbacks}}, sub {
900 my ($value) = @_;
901 # Trim whitespace at both ends
902 $value =~ s/^\s+|\s+$//g;
903 return $value ? 'true' : 'false';
906 elsif ($target_type eq 'year') {
907 $default_options->{value_callbacks} //= [];
908 # Only accept years containing digits and "u"
909 push @{$default_options->{value_callbacks}}, sub {
910 my ($value) = @_;
911 # Replace "u" with "0" for sorting
912 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
916 if ($search) {
917 my $mapping = [$target_name, $default_options];
918 push @mappings, $mapping;
921 my @suffixes = ();
922 push @suffixes, 'facet' if $facet;
923 push @suffixes, 'suggestion' if $suggestible;
924 push @suffixes, 'sort' if !defined $sort || $sort;
926 foreach my $suffix (@suffixes) {
927 my $mapping = ["${target_name}__$suffix"];
928 # TODO: Hack, fix later in less hideous manner
929 if ($suffix eq 'suggestion') {
930 push @{$mapping}, {%{$default_options}, property => 'input'};
932 else {
933 # Important! Make shallow clone, or we end up with the same hashref
934 # shared by all mappings
935 push @{$mapping}, {%{$default_options}};
937 push @mappings, $mapping;
939 return @mappings;
942 =head2 _get_marc_mapping_rules
944 my $mapping_rules = $self->_get_marc_mapping_rules()
946 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
948 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
949 each call to C<MARC::Record>->field) we create an optimized structure of mapping
950 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
952 We can then iterate through all MARC fields for each record and apply all relevant
953 rules once per fields instead of retreiving fields multiple times for each mapping rule
954 which is terribly slow.
956 =cut
958 # TODO: This structure can be used for processing multiple MARC::Records so is currently
959 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
960 # memory cache which it is currently not. The performance gain of caching
961 # would probably be marginal, but to do this could be a further improvement.
963 sub _get_marc_mapping_rules {
964 my ($self) = @_;
965 my $marcflavour = lc C4::Context->preference('marcflavour');
966 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
967 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
968 my $rules = {
969 'leader' => [],
970 'control_fields' => {},
971 'data_fields' => {},
972 'sum' => [],
973 'isbn' => [],
974 'defaults' => {}
977 $self->_foreach_mapping(sub {
978 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
979 return if $marc_type ne $marcflavour;
981 if ($type eq 'sum') {
982 push @{$rules->{sum}}, $name;
983 push @{$rules->{sum}}, $name."__sort" if $sort;
985 elsif ($type eq 'isbn') {
986 push @{$rules->{isbn}}, $name;
988 elsif ($type eq 'boolean') {
989 # boolean gets special handling, if value doesn't exist for a field,
990 # it is set to false
991 $rules->{defaults}->{$name} = 'false';
994 if ($marc_field =~ $field_spec_regexp) {
995 my $field_tag = $1;
997 my @subfields;
998 my @subfield_groups;
999 # Parse and separate subfields form subfield groups
1000 if (defined $2) {
1001 my $subfield_group = '';
1002 my $open_group = 0;
1004 foreach my $token (split //, $2) {
1005 if ($token eq "(") {
1006 if ($open_group) {
1007 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1008 "Unmatched opening parenthesis for $marc_field"
1011 else {
1012 $open_group = 1;
1015 elsif ($token eq ")") {
1016 if ($open_group) {
1017 if ($subfield_group) {
1018 push @subfield_groups, $subfield_group;
1019 $subfield_group = '';
1021 $open_group = 0;
1023 else {
1024 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1025 "Unmatched closing parenthesis for $marc_field"
1029 elsif ($open_group) {
1030 $subfield_group .= $token;
1032 else {
1033 push @subfields, $token;
1037 else {
1038 push @subfields, '*';
1041 my $range = defined $3 ? $3 : undef;
1042 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1043 if ($field_tag < 10) {
1044 $rules->{control_fields}->{$field_tag} //= [];
1045 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1047 else {
1048 $rules->{data_fields}->{$field_tag} //= {};
1049 foreach my $subfield (@subfields) {
1050 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1051 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1053 foreach my $subfield_group (@subfield_groups) {
1054 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1055 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1059 elsif ($marc_field =~ $leader_regexp) {
1060 my $range = defined $1 ? $1 : undef;
1061 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1062 push @{$rules->{leader}}, @mappings;
1064 else {
1065 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1066 "Invalid MARC field expression: $marc_field"
1071 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1072 if ($marcflavour eq 'marc21') {
1073 # Nonfiling characters processing for sort fields
1074 my %title_fields;
1075 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1076 # Format is: nonfiling characters indicator => field names list
1077 %title_fields = (
1078 1 => [130, 630, 730, 740],
1079 2 => [222, 240, 242, 243, 245, 440, 830]
1082 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1083 %title_fields = (
1084 1 => [730],
1085 2 => [130, 430, 530]
1088 foreach my $indicator (keys %title_fields) {
1089 foreach my $field_tag (@{$title_fields{$indicator}}) {
1090 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1091 foreach my $mapping (@{$mappings}) {
1092 if ($mapping->[0] =~ /__sort$/) {
1093 # Mark this as to be processed for nonfiling characters indicator
1094 # later on in _process_mappings
1095 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1102 return $rules;
1105 =head2 _foreach_mapping
1107 $self->_foreach_mapping(
1108 sub {
1109 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1110 $marc_field )
1111 = @_;
1112 return unless $marc_type eq 'marc21';
1113 print "Data comes from: " . $marc_field . "\n";
1117 This allows you to apply a function to each entry in the elasticsearch mappings
1118 table, in order to build the mappings for whatever is needed.
1120 In the provided function, the files are:
1122 =over 4
1124 =item C<$name>
1126 The field name for elasticsearch (corresponds to the 'mapping' column in the
1127 database.
1129 =item C<$type>
1131 The type for this value, e.g. 'string'.
1133 =item C<$facet>
1135 True if this value should be facetised. This only really makes sense if the
1136 field is understood by the facet processing code anyway.
1138 =item C<$sort>
1140 True if this is a field that a) needs special sort handling, and b) if it
1141 should be sorted on. False if a) but not b). Undef if not a). This allows,
1142 for example, author to be sorted on but not everything marked with "author"
1143 to be included in that sort.
1145 =item C<$marc_type>
1147 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1148 'unimarc', 'normarc'.
1150 =item C<$marc_field>
1152 A string that describes the MARC field that contains the data to extract.
1154 =back
1156 =cut
1158 sub _foreach_mapping {
1159 my ( $self, $sub ) = @_;
1161 # TODO use a caching framework here
1162 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1164 'search_marc_map.index_name' => $self->index,
1166 { join => { search_marc_to_fields => 'search_marc_map' },
1167 '+select' => [
1168 'search_marc_to_fields.facet',
1169 'search_marc_to_fields.suggestible',
1170 'search_marc_to_fields.sort',
1171 'search_marc_to_fields.search',
1172 'search_marc_map.marc_type',
1173 'search_marc_map.marc_field',
1175 '+as' => [
1176 'facet',
1177 'suggestible',
1178 'sort',
1179 'search',
1180 'marc_type',
1181 'marc_field',
1186 while ( my $search_field = $search_fields->next ) {
1187 $sub->(
1188 # Force lower case on indexed field names for case insensitive
1189 # field name searches
1190 lc($search_field->name),
1191 $search_field->type,
1192 $search_field->get_column('facet'),
1193 $search_field->get_column('suggestible'),
1194 $search_field->get_column('sort'),
1195 $search_field->get_column('search'),
1196 $search_field->get_column('marc_type'),
1197 $search_field->get_column('marc_field'),
1202 =head2 process_error
1204 die process_error($@);
1206 This parses an Elasticsearch error message and produces a human-readable
1207 result from it. This result is probably missing all the useful information
1208 that you might want in diagnosing an issue, so the warning is also logged.
1210 Note that currently the resulting message is not internationalised. This
1211 will happen eventually by some method or other.
1213 =cut
1215 sub process_error {
1216 my ($self, $msg) = @_;
1218 warn $msg; # simple logging
1220 # This is super-primitive
1221 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1223 return "Unable to perform your search. Please try again.\n";
1226 =head2 _read_configuration
1228 my $conf = _read_configuration();
1230 Reads the I<configuration file> and returns a hash structure with the
1231 configuration information. It raises an exception if mandatory entries
1232 are missing.
1234 The hashref structure has the following form:
1237 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1238 'index_name' => 'koha_instance',
1241 This is configured by the following in the C<config> block in koha-conf.xml:
1243 <elasticsearch>
1244 <server>127.0.0.1:9200</server>
1245 <server>anotherserver:9200</server>
1246 <index_name>koha_instance</index_name>
1247 </elasticsearch>
1249 =cut
1251 sub _read_configuration {
1253 my $configuration;
1255 my $conf = C4::Context->config('elasticsearch');
1256 unless ( defined $conf ) {
1257 Koha::Exceptions::Config::MissingEntry->throw(
1258 "Missing <elasticsearch> entry in koha-conf.xml"
1262 if ( $conf && $conf->{server} ) {
1263 my $nodes = $conf->{server};
1264 if ( ref($nodes) eq 'ARRAY' ) {
1265 $configuration->{nodes} = $nodes;
1267 else {
1268 $configuration->{nodes} = [$nodes];
1271 else {
1272 Koha::Exceptions::Config::MissingEntry->throw(
1273 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1277 if ( defined $conf->{index_name} ) {
1278 $configuration->{index_name} = $conf->{index_name};
1280 else {
1281 Koha::Exceptions::Config::MissingEntry->throw(
1282 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1286 $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1288 return $configuration;
1291 =head2 get_facetable_fields
1293 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1295 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1297 =cut
1299 sub get_facetable_fields {
1300 my ($self) = @_;
1302 # These should correspond to the ES field names, as opposed to the CCL
1303 # things that zebra uses.
1304 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1305 my @faceted_fields = Koha::SearchFields->search(
1306 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1308 my @not_faceted_fields = Koha::SearchFields->search(
1309 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1311 # This could certainly be improved
1312 return ( @faceted_fields, @not_faceted_fields );
1315 =head2 clear_search_fields_cache
1317 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1319 Clear cached values for ES search fields
1321 =cut
1323 sub clear_search_fields_cache {
1325 my $cache = Koha::Caches->get_instance();
1326 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1327 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1328 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1329 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1335 __END__
1337 =head1 AUTHOR
1339 =over 4
1341 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1343 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1345 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1347 =back
1349 =cut