10seconds_install: Update copyright year.
[parallel.git] / src / parcat
blob721da3d4786c201238d49cd26a71a94953b1f392
1 #!/usr/bin/perl
3 use Symbol qw(gensym);
4 use IPC::Open3;
5 use POSIX qw(:errno_h);
6 use IO::Select;
7 use strict;
8 use threads;
9 use threads::shared;
10 use Thread::Queue;
13 my $opened :shared;
14 my $q = Thread::Queue->new();
15 my $okq = Thread::Queue->new();
16 my @producers;
18 if(not @ARGV) {
19 if(-t *STDIN) {
20 print "Usage:\n";
21 print " parcat file(s)\n";
22 print " cat argfile | parcat\n";
23 } else {
24 # Read arguments from stdin
25 chomp(@ARGV = <STDIN>);
29 my $files_to_open = 0;
30 # Default: fd = stdout
31 my $fd = 1;
32 for (@ARGV) {
33 # --rm = remove file when opened
34 /^--rm$/ and do { $opt::rm = 1; next; };
35 # -1 = output to fd 1, -2 = output to fd 2
36 /^-(\d+)$/ and do { $fd = $1; next; };
37 push @producers, threads->create('producer', $_, $fd);
38 $files_to_open++;
41 sub producer {
42 # Open a file/fifo, set non blocking, enqueue fileno of the file handle
43 my $file = shift;
44 my $output_fd = shift;
45 open(my $fh, "<", $file) || do {
46 print STDERR "parcat: Cannot open $file\n";
47 exit(1);
49 # Remove file when it has been opened
50 if($opt::rm) {
51 unlink $file;
53 set_fh_non_blocking($fh);
54 $opened++;
55 # Pass the fileno to parent
56 $q->enqueue(fileno($fh),$output_fd);
57 # Get an OK that the $fh is opened and we can release the $fh
58 while(1) {
59 my $ok = $okq->dequeue();
60 if($ok == fileno($fh)) { last; }
61 # Not ours - very unlikely to happen
62 $okq->enqueue($ok);
64 return;
67 my $s = IO::Select->new();
68 my %buffer;
70 sub add_file {
71 my $infd = shift;
72 my $outfd = shift;
73 open(my $infh, "<&=", $infd) || die;
74 open(my $outfh, ">&=", $outfd) || die;
75 $s->add($infh);
76 # Tell the producer now opened here and can be released
77 $okq->enqueue($infd);
78 # Initialize the buffer
79 @{$buffer{$infh}{$outfd}} = ();
80 $Global::fh{$outfd} = $outfh;
83 sub add_files {
84 # Non-blocking dequeue
85 my ($infd,$outfd);
86 do {
87 ($infd,$outfd) = $q->dequeue_nb(2);
88 if(defined($outfd)) {
89 add_file($infd,$outfd);
91 } while(defined($outfd));
94 sub add_files_block {
95 # Blocking dequeue
96 my ($infd,$outfd) = $q->dequeue(2);
97 add_file($infd,$outfd);
101 my $fd;
102 my (@ready,$infh,$rv,$buf);
103 do {
104 # Wait until at least one file is opened
105 add_files_block();
106 while($q->pending or keys %buffer) {
107 add_files();
108 while(keys %buffer) {
109 @ready = $s->can_read(0.01);
110 if(not @ready) {
111 add_files();
113 for $infh (@ready) {
114 # There is only one key, namely the output file descriptor
115 for my $outfd (keys %{$buffer{$infh}}) {
116 $rv = sysread($infh, $buf, 65536);
117 if (!$rv) {
118 if($! == EAGAIN) {
119 # Would block: Nothing read
120 next;
121 } else {
122 # Nothing read, but would not block:
123 # This file is done
124 $s->remove($infh);
125 for(@{$buffer{$infh}{$outfd}}) {
126 syswrite($Global::fh{$outfd},$_);
128 delete $buffer{$infh};
129 # Closing the $infh causes it to block
130 # close $infh;
131 add_files();
132 next;
135 # Something read.
136 # Find \n or \r for full line
137 my $i = (rindex($buf,"\n")+1);
138 if($i) {
139 # Print full line
140 for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
141 syswrite($Global::fh{$outfd},$_);
143 # @buffer = remaining half line
144 $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
145 } else {
146 # Something read, but not a full line
147 push @{$buffer{$infh}{$outfd}}, $buf;
149 redo;
154 } while($opened < $files_to_open);
157 for (@producers) {
158 $_->join();
161 sub set_fh_non_blocking {
162 # Set filehandle as non-blocking
163 # Inputs:
164 # $fh = filehandle to be blocking
165 # Returns:
166 # N/A
167 my $fh = shift;
168 $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
169 my $flags;
170 fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
171 $flags |= &O_NONBLOCK; # Add non-blocking to the flags
172 fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle