1 package BS
::HTTPD
::TCPConnection
;
6 #use Compress::Zlib; # No need for compression yet
12 use Socket qw
/IPPROTO_TCP TCP_NODELAY/;
14 our @ISA = qw
/BS::Event/;
18 BS::HTTPD::TCPConnection - This class handles basic TCP input/output
22 This class is a helper class for L<BS:HTTPD::HTTPConnection>.
24 It has no public interface yet.
26 =head1 COPYRIGHT & LICENSE
28 Copyright 2008 Robin Redeker, all rights reserved.
30 This program is free software; you can redistribute it and/or modify it
31 under the same terms as Perl itself.
37 my $class = ref($this) || $this;
40 if (exists $self->{socket}) {
41 binmode $self->{socket};
42 _set_noblock
($self->{socket});
46 before_connect
=> sub {
48 $self->{local_port
} = $self->{socket}->sockport;
49 $self->{local_host
} = $self->{socket}->sockhost;
50 $self->{peer_port
} = $self->{socket}->sockport;
51 $self->{peer_host
} = $self->{socket}->sockhost;
65 fcntl($s, F_GETFL
, $flags)
66 or die "Couldn't get flags for HANDLE : $!\n";
68 fcntl($s, F_SETFL
, $flags)
69 or die "Couldn't set flags for HANDLE: $!\n";
74 not not $self->{socket}
78 my ($self, $host, $port) = @_;
80 unless (defined $host) { $host = $self->{host
}; }
81 unless (defined $port) { $port = $self->{port
}; }
86 my $sock = IO
::Socket
::INET
->new (
91 ) or die "Couldn't connect to $host:$port: $!";
93 $self->{socket} = $sock;
94 $self->{host
} = $host;
95 $self->{port
} = $port;
98 setsockopt ($sock, IPPROTO_TCP
, TCP_NODELAY
, 1);
100 delete $self->{write_buffer
};
101 delete $self->{read_buffer
};
104 AnyEvent
->io (poll
=> 'w', fh
=> $sock, cb
=> sub {
105 if ($! = $sock->sockopt (SO_ERROR
)) {
106 $self->event (connect_error
=> $!);
109 _set_noblock
($self->{socket});
113 $self->event ('connect');
124 delete $self->{write_buffer
};
125 delete $self->{read_buffer
};
126 delete $self->{compress
};
127 delete $self->{uncompress
};
129 $self->{socket}->close;
131 delete $self->{socket};
135 my ($self, $reason) = @_;
136 $self->event (disconnect
=> $reason || "Disconnect without reason.");
140 sub read_buffer
{ $_[0]->{read_buffer
} }
145 my ($host, $port, $sock) = ($self->{host
}, $self->{port
}, $self->{socket});
148 AnyEvent
->io (poll
=> 'r', fh
=> $sock, cb
=> sub {
149 my $l = sysread $sock, my $data, 4096;
153 $self->disconnect ("EOF from bummskraut_server '$host:$port'");
155 $self->{read_buffer
} .= $data;
156 $self->{compress_stat
}->{in_comp
} += length $data if $self->{uncompress
};
157 $self->handle_data (\
$self->{read_buffer
});
161 return if $! == EAGAIN
();
163 "Error while reading from bummskraut server '$host:$port': $!"
171 return unless $self->{r
};
172 return unless length ($self->{write_buffer
}) > 0;
174 unless ($self->{w
}) {
176 AnyEvent
->io (poll
=> 'w', fh
=> $self->{socket}, cb
=> sub {
177 my $data = $self->{write_buffer
};
179 if (defined ($data) && $data ne '') {
180 my $len = syswrite $self->{socket}, $data;
183 if ($len == length $self->{write_buffer
}) {
184 if ($self->{buffer_empty_close
}) {
185 $self->disconnect ("simple request finished");
190 $self->{write_buffer
} = substr $self->{write_buffer
}, $len;
192 return if $! == EAGAIN
();
194 "Error when writing data on $self->{host}:$self->{port}: $!"
203 my ($self, $buf) = @_;
205 # if ($self->{uncompress}) {
206 # my ($out, $status) = $self->{uncompress}->inflate ($$buf);
207 # defined $out or die "Couldn't uncompress, error!";
208 # $self->{uncompress_buffer} .= $out;
209 # $self->{compress_stat}->{in_uncomp} += length $out if $self->{uncompress};
210 # $buf = \$self->{uncompress_buffer};
213 $self->event (data
=> $buf);
216 # TODO: no need for compression yet
217 #sub enable_compression {
220 # my ($d, $status) = deflateInit ();
221 # $self->{compress} = $d;
222 # my ($i, $status_i) = inflateInit ();
223 # $self->{uncompress} = $i;
225 # $self->{read_buffer} = '';
226 # $self->{write_buffer} = '';
228 # $self->start_compres_statistics_timer;
231 sub start_compres_statistics_timer
{
233 $self->{compres_stat_timer
} = AnyEvent
->timer (after
=> 10, cb
=> sub {
234 my $s = $self->{compress_stat
};
235 if ($s->{in_uncomp
} != $self->{last_in_uncomp
} || $s->{out_uncomp
} != $self->{last_out_uncomp
}) {
237 (sprintf "IN: %d/%d %.2f%% OUT: %d/%d %.2f%% POUT: (%d pkts) %.1f bpp PIN: (%d pkts) %.1f bpp\n",
238 $s->{in_uncomp
}, $s->{in_comp
}, (100 / $s->{in_uncomp
}) * ($s->{in_uncomp
} - $s->{in_comp
}),
239 $s->{out_uncomp
}, $s->{out_comp
}, (100 / $s->{out_uncomp
}) * ($s->{out_uncomp
} - $s->{out_comp
}),
240 $s->{out_packets
}, ($s->{out_uncomp
} - $s->{out_comp
}) / $s->{out_packets
},
241 $s->{in_packets
}, ($s->{in_uncomp
} - $s->{in_comp
}) / $s->{in_packets
});
243 $self->{last_in_uncomp
} = $s->{in_uncomp
};
244 $self->{last_out_uncomp
} = $s->{out_uncomp
};
246 $self->start_compres_statistics_timer;
251 my ($self, $data) = @_;
253 # if ($self->{compress}) {
254 # my $pkt_len = length $data;
255 # $self->{compress_stat}->{out_uncomp} += $pkt_len;
256 # my $out = $self->{compress}->deflate ($data);
257 # defined $out or die "Couldn't deflate, error!";
258 # my $inl = length $data;
260 # my $fout = $self->{compress}->flush (Z_SYNC_FLUSH);
261 # defined $fout or die "Couldn't flush deflate, error!";
263 # $self->{compress_stat}->{out_comp} += length $data;
264 # $self->{compress_stat}->{out_packets}++;
265 # open OUTCOMPR, ">/tmp/infl.tmp";
266 # print OUTCOMPR $data;
270 $self->{write_buffer
} .= $data;
274 sub set_close_on_write_completion
{
276 $self->{buffer_empty_close
} = 1;