Change how checking for missing values works.
[pspp.git] / src / language / data-io / combine-files.c
blob8322f5c03307f7e82597bcc5975a90621c0c7186
1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
17 #include <config.h>
19 #include <stdlib.h>
21 #include "data/any-reader.h"
22 #include "data/case-matcher.h"
23 #include "data/case.h"
24 #include "data/casereader.h"
25 #include "data/casewriter.h"
26 #include "data/dataset.h"
27 #include "data/dictionary.h"
28 #include "data/format.h"
29 #include "data/subcase.h"
30 #include "data/variable.h"
31 #include "language/command.h"
32 #include "language/data-io/file-handle.h"
33 #include "language/data-io/trim.h"
34 #include "language/lexer/lexer.h"
35 #include "language/lexer/variable-parser.h"
36 #include "language/stats/sort-criteria.h"
37 #include "libpspp/assertion.h"
38 #include "libpspp/i18n.h"
39 #include "libpspp/message.h"
40 #include "libpspp/string-array.h"
41 #include "libpspp/taint.h"
42 #include "math/sort.h"
44 #include "gl/xalloc.h"
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
49 enum comb_command_type
51 COMB_ADD,
52 COMB_MATCH,
53 COMB_UPDATE
56 /* File types. */
57 enum comb_file_type
59 COMB_FILE, /* Specified on FILE= subcommand. */
60 COMB_TABLE /* Specified on TABLE= subcommand. */
63 /* One FILE or TABLE subcommand. */
64 struct comb_file
66 /* Basics. */
67 enum comb_file_type type; /* COMB_FILE or COMB_TABLE. */
69 /* Variables. */
70 struct subcase by_vars; /* BY variables in this input file. */
71 struct subcase src, dst; /* Data to copy to output; where to put it. */
72 const struct missing_values **mv; /* Each variable's missing values. */
74 /* Input files. */
75 struct file_handle *handle; /* Input file handle. */
76 struct dictionary *dict; /* Input file dictionary. */
77 struct casereader *reader; /* Input data source. */
78 struct ccase *data; /* The current input case. */
79 bool is_minimal; /* Does 'data' have minimum BY values across
80 all input files? */
81 bool is_sorted; /* Is file presorted on the BY variables? */
83 /* IN subcommand. */
84 char *in_name;
85 struct variable *in_var;
88 struct comb_proc
90 struct comb_file *files; /* All the files being merged. */
91 size_t n_files; /* Number of files. */
93 struct dictionary *dict; /* Dictionary of output file. */
94 struct subcase by_vars; /* BY variables in the output. */
95 struct casewriter *output; /* Destination for output. */
97 struct case_matcher *matcher;
99 /* FIRST, LAST.
100 Only if "first" or "last" is nonnull are the remaining
101 members used. */
102 struct variable *first; /* Variable specified on FIRST (if any). */
103 struct variable *last; /* Variable specified on LAST (if any). */
104 struct ccase *buffered_case; /* Case ready for output except that we don't
105 know the value for the LAST var yet. */
106 union value *prev_BY; /* Values of BY vars in buffered_case. */
109 static int combine_files (enum comb_command_type, struct lexer *,
110 struct dataset *);
111 static void free_comb_proc (struct comb_proc *);
113 static void close_all_comb_files (struct comb_proc *);
114 static bool merge_dictionary (struct dictionary *const, struct comb_file *);
116 static void execute_update (struct comb_proc *);
117 static void execute_match_files (struct comb_proc *);
118 static void execute_add_files (struct comb_proc *);
120 static bool create_flag_var (const char *subcommand_name, const char *var_name,
121 struct dictionary *, struct variable **);
122 static void output_case (struct comb_proc *, struct ccase *, union value *by);
123 static void output_buffered_case (struct comb_proc *);
126 cmd_add_files (struct lexer *lexer, struct dataset *ds)
128 return combine_files (COMB_ADD, lexer, ds);
132 cmd_match_files (struct lexer *lexer, struct dataset *ds)
134 return combine_files (COMB_MATCH, lexer, ds);
138 cmd_update (struct lexer *lexer, struct dataset *ds)
140 return combine_files (COMB_UPDATE, lexer, ds);
143 static int
144 combine_files (enum comb_command_type command,
145 struct lexer *lexer, struct dataset *ds)
147 struct comb_proc proc;
149 bool saw_by = false;
150 bool saw_sort = false;
151 struct casereader *active_file = NULL;
153 char *first_name = NULL;
154 char *last_name = NULL;
156 struct taint *taint = NULL;
158 size_t n_tables = 0;
159 size_t allocated_files = 0;
161 size_t i;
163 proc.files = NULL;
164 proc.n_files = 0;
165 proc.dict = dict_create (get_default_encoding ());
166 proc.output = NULL;
167 proc.matcher = NULL;
168 subcase_init_empty (&proc.by_vars);
169 proc.first = NULL;
170 proc.last = NULL;
171 proc.buffered_case = NULL;
172 proc.prev_BY = NULL;
174 dict_set_case_limit (proc.dict, dict_get_case_limit (dataset_dict (ds)));
176 lex_match (lexer, T_SLASH);
177 for (;;)
179 struct comb_file *file;
180 enum comb_file_type type;
182 if (lex_match_id (lexer, "FILE"))
183 type = COMB_FILE;
184 else if (command == COMB_MATCH && lex_match_id (lexer, "TABLE"))
186 type = COMB_TABLE;
187 n_tables++;
189 else
190 break;
191 lex_match (lexer, T_EQUALS);
193 if (proc.n_files >= allocated_files)
194 proc.files = x2nrealloc (proc.files, &allocated_files,
195 sizeof *proc.files);
196 file = &proc.files[proc.n_files++];
197 file->type = type;
198 subcase_init_empty (&file->by_vars);
199 subcase_init_empty (&file->src);
200 subcase_init_empty (&file->dst);
201 file->mv = NULL;
202 file->handle = NULL;
203 file->dict = NULL;
204 file->reader = NULL;
205 file->data = NULL;
206 file->is_sorted = true;
207 file->in_name = NULL;
208 file->in_var = NULL;
210 if (lex_match (lexer, T_ASTERISK))
212 if (!dataset_has_source (ds))
214 msg (SE, _("Cannot specify the active dataset since none "
215 "has been defined."));
216 goto error;
219 if (proc_make_temporary_transformations_permanent (ds))
220 msg (SE, _("This command may not be used after TEMPORARY when "
221 "the active dataset is an input source. "
222 "Temporary transformations will be made permanent."));
224 file->dict = dict_clone (dataset_dict (ds));
226 else
228 file->handle = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
229 if (file->handle == NULL)
230 goto error;
232 file->reader = any_reader_open_and_decode (file->handle, NULL,
233 &file->dict, NULL);
234 if (file->reader == NULL)
235 goto error;
238 while (lex_match (lexer, T_SLASH))
239 if (lex_match_id (lexer, "RENAME"))
241 if (!parse_dict_rename (lexer, file->dict, false))
242 goto error;
244 else if (lex_match_id (lexer, "IN"))
246 lex_match (lexer, T_EQUALS);
247 if (lex_token (lexer) != T_ID)
249 lex_error (lexer, NULL);
250 goto error;
253 if (file->in_name)
255 msg (SE, _("Multiple IN subcommands for a single FILE or "
256 "TABLE."));
257 goto error;
259 file->in_name = xstrdup (lex_tokcstr (lexer));
260 lex_get (lexer);
262 else if (lex_match_id (lexer, "SORT"))
264 file->is_sorted = false;
265 saw_sort = true;
268 if (!merge_dictionary (proc.dict, file))
269 goto error;
272 while (lex_token (lexer) != T_ENDCMD)
274 if (lex_match (lexer, T_BY))
276 const struct variable **by_vars;
277 size_t i;
278 bool ok;
280 if (saw_by)
282 lex_sbc_only_once ("BY");
283 goto error;
285 saw_by = true;
287 lex_match (lexer, T_EQUALS);
288 if (!parse_sort_criteria (lexer, proc.dict, &proc.by_vars,
289 &by_vars, NULL))
290 goto error;
292 ok = true;
293 for (i = 0; i < proc.n_files; i++)
295 struct comb_file *file = &proc.files[i];
296 size_t j;
298 for (j = 0; j < subcase_get_n_fields (&proc.by_vars); j++)
300 const char *name = var_get_name (by_vars[j]);
301 struct variable *var = dict_lookup_var (file->dict, name);
302 if (var != NULL)
303 subcase_add_var (&file->by_vars, var,
304 subcase_get_direction (&proc.by_vars, j));
305 else
307 if (file->handle != NULL)
308 msg (SE, _("File %s lacks BY variable %s."),
309 fh_get_name (file->handle), name);
310 else
311 msg (SE, _("Active dataset lacks BY variable %s."),
312 name);
313 ok = false;
316 assert (!ok || subcase_conformable (&file->by_vars,
317 &proc.files[0].by_vars));
319 free (by_vars);
321 if (!ok)
322 goto error;
324 else if (command != COMB_UPDATE && lex_match_id (lexer, "FIRST"))
326 if (first_name != NULL)
328 lex_sbc_only_once ("FIRST");
329 goto error;
332 lex_match (lexer, T_EQUALS);
333 if (!lex_force_id (lexer))
334 goto error;
335 first_name = xstrdup (lex_tokcstr (lexer));
336 lex_get (lexer);
338 else if (command != COMB_UPDATE && lex_match_id (lexer, "LAST"))
340 if (last_name != NULL)
342 lex_sbc_only_once ("LAST");
343 goto error;
346 lex_match (lexer, T_EQUALS);
347 if (!lex_force_id (lexer))
348 goto error;
349 last_name = xstrdup (lex_tokcstr (lexer));
350 lex_get (lexer);
352 else if (lex_match_id (lexer, "MAP"))
354 /* FIXME. */
356 else if (lex_match_id (lexer, "DROP"))
358 if (!parse_dict_drop (lexer, proc.dict))
359 goto error;
361 else if (lex_match_id (lexer, "KEEP"))
363 if (!parse_dict_keep (lexer, proc.dict))
364 goto error;
366 else
368 lex_error (lexer, NULL);
369 goto error;
372 if (!lex_match (lexer, T_SLASH) && lex_token (lexer) != T_ENDCMD)
374 lex_end_of_command (lexer);
375 goto error;
379 if (!saw_by)
381 if (command == COMB_UPDATE)
383 lex_sbc_missing ("BY");
384 goto error;
386 if (n_tables)
388 msg (SE, _("BY is required when %s is specified."), "TABLE");
389 goto error;
391 if (saw_sort)
393 msg (SE, _("BY is required when %s is specified."), "SORT");
394 goto error;
398 /* Add IN, FIRST, and LAST variables to master dictionary. */
399 for (i = 0; i < proc.n_files; i++)
401 struct comb_file *file = &proc.files[i];
402 if (!create_flag_var ("IN", file->in_name, proc.dict, &file->in_var))
403 goto error;
405 if (!create_flag_var ("FIRST", first_name, proc.dict, &proc.first)
406 || !create_flag_var ("LAST", last_name, proc.dict, &proc.last))
407 goto error;
409 dict_delete_scratch_vars (proc.dict);
410 dict_compact_values (proc.dict);
412 /* Set up mapping from each file's variables to master
413 variables. */
414 for (i = 0; i < proc.n_files; i++)
416 struct comb_file *file = &proc.files[i];
417 size_t src_n_vars = dict_get_n_vars (file->dict);
418 size_t j;
420 file->mv = xnmalloc (src_n_vars, sizeof *file->mv);
421 for (j = 0; j < src_n_vars; j++)
423 struct variable *src_var = dict_get_var (file->dict, j);
424 struct variable *dst_var = dict_lookup_var (proc.dict,
425 var_get_name (src_var));
426 if (dst_var != NULL)
428 size_t n = subcase_get_n_fields (&file->src);
429 file->mv[n] = var_get_missing_values (src_var);
430 subcase_add_var (&file->src, src_var, SC_ASCEND);
431 subcase_add_var (&file->dst, dst_var, SC_ASCEND);
436 proc.output = autopaging_writer_create (dict_get_proto (proc.dict));
437 taint = taint_clone (casewriter_get_taint (proc.output));
439 /* Set up case matcher. */
440 proc.matcher = case_matcher_create ();
441 for (i = 0; i < proc.n_files; i++)
443 struct comb_file *file = &proc.files[i];
444 if (file->reader == NULL)
446 if (active_file == NULL)
448 proc_discard_output (ds);
449 file->reader = active_file = proc_open_filtering (ds, false);
451 else
452 file->reader = casereader_clone (active_file);
454 if (!file->is_sorted)
455 file->reader = sort_execute (file->reader, &file->by_vars);
456 taint_propagate (casereader_get_taint (file->reader), taint);
457 file->data = casereader_read (file->reader);
458 if (file->type == COMB_FILE)
459 case_matcher_add_input (proc.matcher, &file->by_vars,
460 &file->data, &file->is_minimal);
463 if (command == COMB_ADD)
464 execute_add_files (&proc);
465 else if (command == COMB_MATCH)
466 execute_match_files (&proc);
467 else if (command == COMB_UPDATE)
468 execute_update (&proc);
469 else
470 NOT_REACHED ();
472 case_matcher_destroy (proc.matcher);
473 proc.matcher = NULL;
474 close_all_comb_files (&proc);
475 if (active_file != NULL)
476 proc_commit (ds);
478 dataset_set_dict (ds, proc.dict);
479 dataset_set_source (ds, casewriter_make_reader (proc.output));
480 proc.dict = NULL;
481 proc.output = NULL;
483 free_comb_proc (&proc);
485 free (first_name);
486 free (last_name);
488 return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
490 error:
491 if (active_file != NULL)
492 proc_commit (ds);
493 free_comb_proc (&proc);
494 taint_destroy (taint);
495 free (first_name);
496 free (last_name);
497 return CMD_CASCADING_FAILURE;
500 /* Merge the dictionary for file F into master dictionary M. */
501 static bool
502 merge_dictionary (struct dictionary *const m, struct comb_file *f)
504 struct dictionary *d = f->dict;
505 const struct string_array *d_docs, *m_docs;
506 int i;
508 if (dict_get_label (m) == NULL)
509 dict_set_label (m, dict_get_label (d));
511 d_docs = dict_get_documents (d);
512 m_docs = dict_get_documents (m);
515 /* FIXME: If the input files have different encodings, then
516 the result is undefined.
517 The correct thing to do would be to convert to an encoding
518 which can cope with all the input files (eg UTF-8).
520 if (0 != strcmp (dict_get_encoding (f->dict), dict_get_encoding (m)))
521 msg (MW, _("Combining files with incompatible encodings. String data may "
522 "not be represented correctly."));
524 if (d_docs != NULL)
526 if (m_docs == NULL)
527 dict_set_documents (m, d_docs);
528 else
530 struct string_array new_docs;
531 size_t i;
533 new_docs.n = m_docs->n + d_docs->n;
534 new_docs.strings = xmalloc (new_docs.n * sizeof *new_docs.strings);
535 for (i = 0; i < m_docs->n; i++)
536 new_docs.strings[i] = m_docs->strings[i];
537 for (i = 0; i < d_docs->n; i++)
538 new_docs.strings[m_docs->n + i] = d_docs->strings[i];
540 dict_set_documents (m, &new_docs);
542 free (new_docs.strings);
546 for (i = 0; i < dict_get_n_vars (d); i++)
548 struct variable *dv = dict_get_var (d, i);
549 struct variable *mv = dict_lookup_var (m, var_get_name (dv));
551 if (dict_class_from_id (var_get_name (dv)) == DC_SCRATCH)
552 continue;
554 if (mv != NULL)
556 if (var_get_width (mv) != var_get_width (dv))
558 const char *var_name = var_get_name (dv);
559 struct string s = DS_EMPTY_INITIALIZER;
560 const char *file_name;
562 file_name = f->handle ? fh_get_name (f->handle) : "*";
563 ds_put_format (&s,
564 _("Variable %s in file %s has different "
565 "type or width from the same variable in "
566 "earlier file."),
567 var_name, file_name);
568 ds_put_cstr (&s, " ");
569 if (var_is_numeric (dv))
570 ds_put_format (&s, _("In file %s, %s is numeric."),
571 file_name, var_name);
572 else
573 ds_put_format (&s, _("In file %s, %s is a string variable "
574 "with width %d."),
575 file_name, var_name, var_get_width (dv));
576 ds_put_cstr (&s, " ");
577 if (var_is_numeric (mv))
578 ds_put_format (&s, _("In an earlier file, %s was numeric."),
579 var_name);
580 else
581 ds_put_format (&s, _("In an earlier file, %s was a string "
582 "variable with width %d."),
583 var_name, var_get_width (mv));
584 msg (SE, "%s", ds_cstr (&s));
585 ds_destroy (&s);
586 return false;
589 if (var_has_value_labels (dv) && !var_has_value_labels (mv))
590 var_set_value_labels (mv, var_get_value_labels (dv));
591 if (var_has_missing_values (dv) && !var_has_missing_values (mv))
592 var_set_missing_values (mv, var_get_missing_values (dv));
593 if (var_get_label (dv) && !var_get_label (mv))
594 var_set_label (mv, var_get_label (dv));
596 else
597 mv = dict_clone_var_assert (m, dv);
600 return true;
603 /* If VAR_NAME is non-NULL, attempts to create a
604 variable named VAR_NAME, with format F1.0, in DICT, and stores
605 a pointer to the variable in *VAR. Returns true if
606 successful, false if the variable name is a duplicate (in
607 which case a message saying that the variable specified on the
608 given SUBCOMMAND is a duplicate is emitted).
610 Does nothing and returns true if VAR_NAME is null. */
611 static bool
612 create_flag_var (const char *subcommand, const char *var_name,
613 struct dictionary *dict, struct variable **var)
615 if (var_name != NULL)
617 struct fmt_spec format = fmt_for_output (FMT_F, 1, 0);
618 *var = dict_create_var (dict, var_name, 0);
619 if (*var == NULL)
621 msg (SE, _("Variable name %s specified on %s subcommand "
622 "duplicates an existing variable name."),
623 subcommand, var_name);
624 return false;
626 var_set_both_formats (*var, &format);
628 else
629 *var = NULL;
630 return true;
633 /* Closes all the files in PROC and frees their associated data. */
634 static void
635 close_all_comb_files (struct comb_proc *proc)
637 size_t i;
639 for (i = 0; i < proc->n_files; i++)
641 struct comb_file *file = &proc->files[i];
642 subcase_destroy (&file->by_vars);
643 subcase_destroy (&file->src);
644 subcase_destroy (&file->dst);
645 free (file->mv);
646 fh_unref (file->handle);
647 dict_unref (file->dict);
648 casereader_destroy (file->reader);
649 case_unref (file->data);
650 free (file->in_name);
652 free (proc->files);
653 proc->files = NULL;
654 proc->n_files = 0;
657 /* Frees all the data for the procedure. */
658 static void
659 free_comb_proc (struct comb_proc *proc)
661 close_all_comb_files (proc);
662 dict_unref (proc->dict);
663 casewriter_destroy (proc->output);
664 case_matcher_destroy (proc->matcher);
665 if (proc->prev_BY)
667 caseproto_destroy_values (subcase_get_proto (&proc->by_vars),
668 proc->prev_BY);
669 free (proc->prev_BY);
671 subcase_destroy (&proc->by_vars);
672 case_unref (proc->buffered_case);
675 static bool scan_table (struct comb_file *, union value by[]);
676 static struct ccase *create_output_case (const struct comb_proc *);
677 static void apply_case (const struct comb_file *, struct ccase *);
678 static void apply_nonmissing_case (const struct comb_file *, struct ccase *);
679 static void advance_file (struct comb_file *, union value by[]);
680 static void output_case (struct comb_proc *, struct ccase *, union value by[]);
681 static void output_buffered_case (struct comb_proc *);
683 /* Executes the ADD FILES command. */
684 static void
685 execute_add_files (struct comb_proc *proc)
687 union value *by;
689 while (case_matcher_match (proc->matcher, &by))
691 size_t i;
693 for (i = 0; i < proc->n_files; i++)
695 struct comb_file *file = &proc->files[i];
696 while (file->is_minimal)
698 struct ccase *output = create_output_case (proc);
699 apply_case (file, output);
700 advance_file (file, by);
701 output_case (proc, output, by);
705 output_buffered_case (proc);
708 /* Executes the MATCH FILES command. */
709 static void
710 execute_match_files (struct comb_proc *proc)
712 union value *by;
714 while (case_matcher_match (proc->matcher, &by))
716 struct ccase *output;
717 size_t i;
719 output = create_output_case (proc);
720 for (i = proc->n_files; i-- > 0;)
722 struct comb_file *file = &proc->files[i];
723 if (file->type == COMB_FILE)
725 if (file->is_minimal)
727 apply_case (file, output);
728 advance_file (file, NULL);
731 else
733 if (scan_table (file, by))
734 apply_case (file, output);
737 output_case (proc, output, by);
739 output_buffered_case (proc);
742 /* Executes the UPDATE command. */
743 static void
744 execute_update (struct comb_proc *proc)
746 union value *by;
747 size_t n_duplicates = 0;
749 while (case_matcher_match (proc->matcher, &by))
751 struct comb_file *first, *file;
752 struct ccase *output;
754 /* Find first nonnull case in array and make an output case
755 from it. */
756 output = create_output_case (proc);
757 for (first = &proc->files[0]; ; first++)
758 if (first->is_minimal)
759 break;
760 apply_case (first, output);
761 advance_file (first, by);
763 /* Read additional cases and update the output case from
764 them. (Don't update the output case from any duplicate
765 cases in the master file.) */
766 for (file = first + (first == proc->files);
767 file < &proc->files[proc->n_files]; file++)
769 while (file->is_minimal)
771 apply_nonmissing_case (file, output);
772 advance_file (file, by);
775 casewriter_write (proc->output, output);
777 /* Write duplicate cases in the master file directly to the
778 output. */
779 if (first == proc->files && first->is_minimal)
781 n_duplicates++;
782 while (first->is_minimal)
784 output = create_output_case (proc);
785 apply_case (first, output);
786 advance_file (first, by);
787 casewriter_write (proc->output, output);
792 if (n_duplicates)
793 msg (SW, _("Encountered %zu sets of duplicate cases in the master file."),
794 n_duplicates);
797 /* Reads FILE, which must be of type COMB_TABLE, until it
798 encounters a case with BY or greater for its BY variables.
799 Returns true if a case with exactly BY for its BY variables
800 was found, otherwise false. */
801 static bool
802 scan_table (struct comb_file *file, union value by[])
804 while (file->data != NULL)
806 int cmp = subcase_compare_3way_xc (&file->by_vars, by, file->data);
807 if (cmp > 0)
809 case_unref (file->data);
810 file->data = casereader_read (file->reader);
812 else
813 return cmp == 0;
815 return false;
818 /* Creates and returns an output case for PROC, initializing each
819 of its values to system-missing or blanks, except that the
820 values of IN variables are set to 0. */
821 static struct ccase *
822 create_output_case (const struct comb_proc *proc)
824 size_t n_vars = dict_get_n_vars (proc->dict);
825 struct ccase *output;
826 size_t i;
828 output = case_create (dict_get_proto (proc->dict));
829 for (i = 0; i < n_vars; i++)
831 struct variable *v = dict_get_var (proc->dict, i);
832 value_set_missing (case_data_rw (output, v), var_get_width (v));
834 for (i = 0; i < proc->n_files; i++)
836 struct comb_file *file = &proc->files[i];
837 if (file->in_var != NULL)
838 *case_num_rw (output, file->in_var) = false;
840 return output;
843 static void
844 mark_file_used (const struct comb_file *file, struct ccase *output)
846 if (file->in_var != NULL)
847 *case_num_rw (output, file->in_var) = true;
850 /* Copies the data from FILE's case into output case OUTPUT.
851 If FILE has an IN variable, then it is set to 1 in OUTPUT. */
852 static void
853 apply_case (const struct comb_file *file, struct ccase *output)
855 subcase_copy (&file->src, file->data, &file->dst, output);
856 mark_file_used (file, output);
859 /* Copies the data from FILE's case into output case OUTPUT,
860 skipping values that are missing or all spaces.
862 If FILE has an IN variable, then it is set to 1 in OUTPUT. */
863 static void
864 apply_nonmissing_case (const struct comb_file *file, struct ccase *output)
866 size_t i;
868 for (i = 0; i < subcase_get_n_fields (&file->src); i++)
870 const struct subcase_field *src_field = &file->src.fields[i];
871 const struct subcase_field *dst_field = &file->dst.fields[i];
872 const union value *src_value
873 = case_data_idx (file->data, src_field->case_index);
874 int width = src_field->width;
876 if (!mv_is_value_missing (file->mv[i], src_value)
877 && !(width > 0 && value_is_spaces (src_value, width)))
878 value_copy (case_data_rw_idx (output, dst_field->case_index),
879 src_value, width);
881 mark_file_used (file, output);
884 /* Advances FILE to its next case. If BY is nonnull, then FILE's is_minimal
885 member is updated based on whether the new case's BY values still match
886 those in BY. */
887 static void
888 advance_file (struct comb_file *file, union value by[])
890 case_unref (file->data);
891 file->data = casereader_read (file->reader);
892 if (by)
893 file->is_minimal = (file->data != NULL
894 && subcase_equal_cx (&file->by_vars, file->data, by));
897 /* Writes OUTPUT, whose BY values has been extracted into BY, to
898 PROC's output file, first initializing any FIRST or LAST
899 variables in OUTPUT to the correct values. */
900 static void
901 output_case (struct comb_proc *proc, struct ccase *output, union value by[])
903 if (proc->first == NULL && proc->last == NULL)
904 casewriter_write (proc->output, output);
905 else
907 /* It's harder with LAST, because we can't know whether
908 this case is the last in a group until we've prepared
909 the *next* case also. Thus, we buffer the previous
910 output case until the next one is ready. */
911 bool new_BY;
912 if (proc->prev_BY != NULL)
914 new_BY = !subcase_equal_xx (&proc->by_vars, proc->prev_BY, by);
915 if (proc->last != NULL)
916 *case_num_rw (proc->buffered_case, proc->last) = new_BY;
917 casewriter_write (proc->output, proc->buffered_case);
919 else
920 new_BY = true;
922 proc->buffered_case = output;
923 if (proc->first != NULL)
924 *case_num_rw (proc->buffered_case, proc->first) = new_BY;
926 if (new_BY)
928 size_t n_values = subcase_get_n_fields (&proc->by_vars);
929 const struct caseproto *proto = subcase_get_proto (&proc->by_vars);
930 if (proc->prev_BY == NULL)
932 proc->prev_BY = xmalloc (n_values * sizeof *proc->prev_BY);
933 caseproto_init_values (proto, proc->prev_BY);
935 caseproto_copy (subcase_get_proto (&proc->by_vars), 0, n_values,
936 proc->prev_BY, by);
941 /* Writes a trailing buffered case to the output, if FIRST or
942 LAST is in use. */
943 static void
944 output_buffered_case (struct comb_proc *proc)
946 if (proc->prev_BY != NULL)
948 if (proc->last != NULL)
949 *case_num_rw (proc->buffered_case, proc->last) = 1.0;
950 casewriter_write (proc->output, proc->buffered_case);
951 proc->buffered_case = NULL;