Released as 20240522 ('Tbilisi')
[parallel.git] / src / parcat
blobb4956b0704e7e63bc4aed8edfe55dd8985b2793a
1 #!/usr/bin/perl
3 # Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free
4 # Software Foundation, Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, see <https://www.gnu.org/licenses/>
18 # or write to the Free Software Foundation, Inc., 51 Franklin St,
19 # Fifth Floor, Boston, MA 02110-1301 USA
21 # SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc.
22 # SPDX-License-Identifier: GPL-3.0-or-later
24 use Symbol qw(gensym);
25 use IPC::Open3;
26 use POSIX qw(:errno_h);
27 use IO::Select;
28 use strict;
29 use threads;
30 use threads::shared;
31 use Thread::Queue;
34 my $opened :shared;
35 my $q = Thread::Queue->new();
36 my $okq = Thread::Queue->new();
37 my @producers;
39 if(not @ARGV) {
40 if(-t *STDIN) {
41 print "Usage:\n";
42 print " parcat file(s)\n";
43 print " cat argfile | parcat\n";
44 } else {
45 # Read arguments from stdin
46 chomp(@ARGV = <STDIN>);
50 my $files_to_open = 0;
51 # Default: fd = stdout
52 my $fd = 1;
53 for (@ARGV) {
54 # --rm = remove file when opened
55 /^--rm$/ and do { $opt::rm = 1; next; };
56 # -1 = output to fd 1, -2 = output to fd 2
57 /^-(\d+)$/ and do { $fd = $1; next; };
58 push @producers, threads->create('producer', $_, $fd);
59 $files_to_open++;
62 sub producer {
63 # Open a file/fifo, set non blocking, enqueue fileno of the file handle
64 my $file = shift;
65 my $output_fd = shift;
66 open(my $fh, "<", $file) || do {
67 print STDERR "parcat: Cannot open $file\n";
68 exit(1);
70 # Remove file when it has been opened
71 if($opt::rm) {
72 unlink $file;
74 set_fh_non_blocking($fh);
75 $opened++;
76 # Pass the fileno to parent
77 $q->enqueue(fileno($fh),$output_fd);
78 # Get an OK that the $fh is opened and we can release the $fh
79 while(1) {
80 my $ok = $okq->dequeue();
81 if($ok == fileno($fh)) { last; }
82 # Not ours - very unlikely to happen
83 $okq->enqueue($ok);
85 return;
88 my $s = IO::Select->new();
89 my %buffer;
91 sub add_file {
92 my $infd = shift;
93 my $outfd = shift;
94 open(my $infh, "<&=", $infd) || die;
95 open(my $outfh, ">&=", $outfd) || die;
96 $s->add($infh);
97 # Tell the producer now opened here and can be released
98 $okq->enqueue($infd);
99 # Initialize the buffer
100 @{$buffer{$infh}{$outfd}} = ();
101 $Global::fh{$outfd} = $outfh;
104 sub add_files {
105 # Non-blocking dequeue
106 my ($infd,$outfd);
107 do {
108 ($infd,$outfd) = $q->dequeue_nb(2);
109 if(defined($outfd)) {
110 add_file($infd,$outfd);
112 } while(defined($outfd));
115 sub add_files_block {
116 # Blocking dequeue
117 my ($infd,$outfd) = $q->dequeue(2);
118 add_file($infd,$outfd);
122 my $fd;
123 my (@ready,$infh,$rv,$buf);
124 do {
125 # Wait until at least one file is opened
126 add_files_block();
127 while($q->pending or keys %buffer) {
128 add_files();
129 while(keys %buffer) {
130 @ready = $s->can_read(0.01);
131 if(not @ready) {
132 add_files();
134 for $infh (@ready) {
135 # There is only one key, namely the output file descriptor
136 for my $outfd (keys %{$buffer{$infh}}) {
137 $rv = sysread($infh, $buf, 65536);
138 if (!$rv) {
139 if($! == EAGAIN) {
140 # Would block: Nothing read
141 next;
142 } else {
143 # Nothing read, but would not block:
144 # This file is done
145 $s->remove($infh);
146 for(@{$buffer{$infh}{$outfd}}) {
147 syswrite($Global::fh{$outfd},$_);
149 delete $buffer{$infh};
150 # Closing the $infh causes it to block
151 # close $infh;
152 add_files();
153 next;
156 # Something read.
157 # Find \n or \r for full line
158 my $i = (rindex($buf,"\n")+1);
159 if($i) {
160 # Print full line
161 for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
162 syswrite($Global::fh{$outfd},$_);
164 # @buffer = remaining half line
165 $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
166 } else {
167 # Something read, but not a full line
168 push @{$buffer{$infh}{$outfd}}, $buf;
170 redo;
175 } while($opened < $files_to_open);
178 for (@producers) {
179 $_->join();
182 sub set_fh_non_blocking {
183 # Set filehandle as non-blocking
184 # Inputs:
185 # $fh = filehandle to be blocking
186 # Returns:
187 # N/A
188 my $fh = shift;
189 $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
190 my $flags;
191 fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
192 $flags |= &O_NONBLOCK; # Add non-blocking to the flags
193 fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle