various fixes and enhancements
[AnyEvent-HTTPD.git] / lib / BS / HTTPD / TCPConnection.pm
blob1477a2299ff9166bb12d23498d0453a11dcae293
1 package BS::HTTPD::TCPConnection;
2 use feature ':5.10';
3 use strict;
4 no warnings;
6 #use Compress::Zlib; # No need for compression yet
8 use AnyEvent;
9 use Fcntl;
10 use POSIX;
11 use IO::Socket::INET;
12 use Socket qw/IPPROTO_TCP TCP_NODELAY/;
13 use BS::Event;
14 our @ISA = qw/BS::Event/;
16 =head1 NAME
18 BS::HTTPD::TCPConnection - This class handles basic TCP input/output
20 =head1 DESCRIPTION
22 This class is a helper class for L<BS:HTTPD::HTTPConnection>.
24 It has no public interface yet.
26 =head1 COPYRIGHT & LICENSE
28 Copyright 2008 Robin Redeker, all rights reserved.
30 This program is free software; you can redistribute it and/or modify it
31 under the same terms as Perl itself.
33 =cut
35 sub new {
36 my $this = shift;
37 my $class = ref($this) || $this;
38 my $self = { @_ };
39 bless $self, $class;
40 if (exists $self->{socket}) {
41 binmode $self->{socket};
42 _set_noblock ($self->{socket});
45 $self->reg_cb (
46 before_connect => sub {
47 my ($self) = @_;
48 $self->{local_port} = $self->{socket}->sockport;
49 $self->{local_host} = $self->{socket}->sockhost;
50 $self->{peer_port} = $self->{socket}->sockport;
51 $self->{peer_host} = $self->{socket}->sockhost;
52 $self->unreg_me;
56 $self->init;
57 return $self;
60 sub init { }
62 sub _set_noblock {
63 my ($s) = @_;
64 my $flags = 0;
65 fcntl($s, F_GETFL, $flags)
66 or die "Couldn't get flags for HANDLE : $!\n";
67 $flags |= O_NONBLOCK;
68 fcntl($s, F_SETFL, $flags)
69 or die "Couldn't set flags for HANDLE: $!\n";
72 sub is_connected {
73 my ($self) = @_;
74 not not $self->{socket}
77 sub connect {
78 my ($self, $host, $port) = @_;
80 unless (defined $host) { $host = $self->{host}; }
81 unless (defined $port) { $port = $self->{port}; }
83 $self->{socket}
84 and return;
86 my $sock = IO::Socket::INET->new (
87 PeerAddr => $host,
88 PeerPort => $port,
89 Proto => 'tcp',
90 Blocking => 0,
91 ) or die "Couldn't connect to $host:$port: $!";
93 $self->{socket} = $sock;
94 $self->{host} = $host;
95 $self->{port} = $port;
97 binmode $sock;
98 setsockopt ($sock, IPPROTO_TCP, TCP_NODELAY, 1);
100 delete $self->{write_buffer};
101 delete $self->{read_buffer};
103 $self->{cw} =
104 AnyEvent->io (poll => 'w', fh => $sock, cb => sub {
105 if ($! = $sock->sockopt (SO_ERROR)) {
106 $self->event (connect_error => $!);
107 $self->cleanup;
108 } else {
109 _set_noblock ($self->{socket});
110 $self->start_reader;
111 $self->start_writer;
113 $self->event ('connect');
115 delete $self->{cw};
119 sub cleanup {
120 my ($self) = @_;
121 delete $self->{cw};
122 delete $self->{r};
123 delete $self->{w};
124 delete $self->{write_buffer};
125 delete $self->{read_buffer};
126 delete $self->{compress};
127 delete $self->{uncompress};
128 eval {
129 $self->{socket}->close;
131 delete $self->{socket};
134 sub disconnect {
135 my ($self, $reason) = @_;
136 $self->event (disconnect => $reason || "Disconnect without reason.");
137 $self->cleanup;
140 sub read_buffer { $_[0]->{read_buffer} }
142 sub start_reader {
143 my ($self) = @_;
145 my ($host, $port, $sock) = ($self->{host}, $self->{port}, $self->{socket});
147 $self->{r} =
148 AnyEvent->io (poll => 'r', fh => $sock, cb => sub {
149 my $l = sysread $sock, my $data, 4096;
151 if (defined $l) {
152 if ($l == 0) {
153 $self->disconnect ("EOF from bummskraut_server '$host:$port'");
154 } else {
155 $self->{read_buffer} .= $data;
156 $self->{compress_stat}->{in_comp} += length $data if $self->{uncompress};
157 $self->handle_data (\$self->{read_buffer});
160 } else {
161 return if $! == EAGAIN();
162 $self->disconnect (
163 "Error while reading from bummskraut server '$host:$port': $!"
169 sub start_writer {
170 my ($self) = @_;
171 return unless $self->{r};
172 return unless length ($self->{write_buffer}) > 0;
174 unless ($self->{w}) {
175 $self->{w} =
176 AnyEvent->io (poll => 'w', fh => $self->{socket}, cb => sub {
177 my $data = $self->{write_buffer};
179 if (defined ($data) && $data ne '') {
180 my $len = syswrite $self->{socket}, $data;
182 if (defined $len) {
183 if ($len == length $self->{write_buffer}) {
184 if ($self->{buffer_empty_close}) {
185 $self->disconnect ("simple request finished");
187 delete $self->{w};
190 $self->{write_buffer} = substr $self->{write_buffer}, $len;
191 } else {
192 return if $! == EAGAIN();
193 $self->disconnect (
194 "Error when writing data on $self->{host}:$self->{port}: $!"
202 sub handle_data {
203 my ($self, $buf) = @_;
205 # if ($self->{uncompress}) {
206 # my ($out, $status) = $self->{uncompress}->inflate ($$buf);
207 # defined $out or die "Couldn't uncompress, error!";
208 # $self->{uncompress_buffer} .= $out;
209 # $self->{compress_stat}->{in_uncomp} += length $out if $self->{uncompress};
210 # $buf = \$self->{uncompress_buffer};
213 $self->event (data => $buf);
216 # TODO: no need for compression yet
217 #sub enable_compression {
218 # my ($self) = @_;
220 # my ($d, $status) = deflateInit ();
221 # $self->{compress} = $d;
222 # my ($i, $status_i) = inflateInit ();
223 # $self->{uncompress} = $i;
225 # $self->{read_buffer} = '';
226 # $self->{write_buffer} = '';
228 # $self->start_compres_statistics_timer;
231 sub start_compres_statistics_timer {
232 my ($self) = @_;
233 $self->{compres_stat_timer} = AnyEvent->timer (after => 10, cb => sub {
234 my $s = $self->{compress_stat};
235 if ($s->{in_uncomp} != $self->{last_in_uncomp} || $s->{out_uncomp} != $self->{last_out_uncomp}) {
236 warn
237 (sprintf "IN: %d/%d %.2f%% OUT: %d/%d %.2f%% POUT: (%d pkts) %.1f bpp PIN: (%d pkts) %.1f bpp\n",
238 $s->{in_uncomp}, $s->{in_comp}, (100 / $s->{in_uncomp}) * ($s->{in_uncomp} - $s->{in_comp}),
239 $s->{out_uncomp}, $s->{out_comp}, (100 / $s->{out_uncomp}) * ($s->{out_uncomp} - $s->{out_comp}),
240 $s->{out_packets}, ($s->{out_uncomp} - $s->{out_comp}) / $s->{out_packets},
241 $s->{in_packets}, ($s->{in_uncomp} - $s->{in_comp}) / $s->{in_packets});
243 $self->{last_in_uncomp} = $s->{in_uncomp};
244 $self->{last_out_uncomp} = $s->{out_uncomp};
246 $self->start_compres_statistics_timer;
250 sub write_data {
251 my ($self, $data) = @_;
253 # if ($self->{compress}) {
254 # my $pkt_len = length $data;
255 # $self->{compress_stat}->{out_uncomp} += $pkt_len;
256 # my $out = $self->{compress}->deflate ($data);
257 # defined $out or die "Couldn't deflate, error!";
258 # my $inl = length $data;
259 # $data = $out;
260 # my $fout = $self->{compress}->flush (Z_SYNC_FLUSH);
261 # defined $fout or die "Couldn't flush deflate, error!";
262 # $data .= $fout;
263 # $self->{compress_stat}->{out_comp} += length $data;
264 # $self->{compress_stat}->{out_packets}++;
265 # open OUTCOMPR, ">/tmp/infl.tmp";
266 # print OUTCOMPR $data;
267 # close OUTCOMPR;
270 $self->{write_buffer} .= $data;
271 $self->start_writer;
274 sub set_close_on_write_completion {
275 my ($self) = @_;
276 $self->{buffer_empty_close} = 1;