connection/poolable: defer expiry of timed out connections
[MogileFS-Server.git] / lib / MogileFS / Connection / Poolable.pm
blob86b99d488a94a751a1531b363b4593c50c36203b
1 # private base class for poolable HTTP/Mogstored sidechannel connections
2 # This is currently only used by HTTP, but is intended for Mogstored
3 # connections, too.
4 package MogileFS::Connection::Poolable;
5 use strict;
6 use warnings;
7 use Danga::Socket;
8 use base qw(Danga::Socket);
9 use fields (
10 'mfs_pool', # owner of the connection (MogileFS::ConnectionPool)
11 'mfs_hostport', # [ ip, port ]
12 'mfs_expire', # Danga::Socket::Timer object
13 'mfs_expire_cb', # Danga::Socket::Timer callback
14 'mfs_requests', # number of requests made on this object
15 'mfs_err', # used to propagate an error to start()
16 'mfs_writeq', # arrayref if connecting, undef otherwise
18 use Socket qw(SO_KEEPALIVE);
19 use Time::HiRes;
21 # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub
22 sub new {
23 my ($self, $sock, $ip, $port) = @_;
24 $self->SUPER::new($sock); # Danga::Socket->new
26 # connection may not be established, yet
27 # so Danga::Socket->peer_addr_string can't be used here
28 $self->{mfs_hostport} = [ $ip, $port ];
29 $self->{mfs_requests} = 0;
31 # newly-created socket, we buffer writes until event_write is triggered
32 $self->{mfs_writeq} = [];
34 return $self;
37 # used by ConnectionPool for tracking per-hostport connection counts
38 sub key { join(':', @{$_[0]->{mfs_hostport}}); }
40 # backwards compatibility
41 sub host_port { $_[0]->key; }
43 sub ip_port { @{$_[0]->{mfs_hostport}}; }
45 sub fd { fileno($_[0]->sock); }
47 # marks a connection as idle, call this before putting it in a connection
48 # pool for eventual reuse.
49 sub mark_idle {
50 my ($self) = @_;
52 $self->watch_read(0);
54 # set the keepalive flag the first time we're idle
55 $self->sock->sockopt(SO_KEEPALIVE, 1) if $self->{mfs_requests} == 0;
57 $self->{mfs_requests}++;
60 sub write {
61 my ($self, $arg) = @_;
62 my $writeq = $self->{mfs_writeq};
64 if (ref($writeq) eq "ARRAY") {
65 # if we're still connecting, we must buffer explicitly for *BSD
66 # and not attempt a real write() until event_write is triggered
67 push @$writeq, $arg;
68 $self->watch_write(1); # enable event_write triggering
69 0; # match Danga::Socket::write return value
70 } else {
71 $self->SUPER::write($arg);
75 # Danga::Socket will trigger this when a socket is writable
76 sub event_write {
77 my ($self) = @_;
79 # we may have buffered writes in mfs_writeq during non-blocking connect(),
80 # this is needed on *BSD but unnecessary (but harmless) on Linux.
81 my $writeq = delete $self->{mfs_writeq};
82 if ($writeq) {
83 $self->watch_write(0); # ->write will re-enable if needed
84 foreach my $queued (@$writeq) {
85 $self->write($queued);
87 } else {
88 $self->SUPER::event_write();
92 # the request running on this connection is retryable if this socket
93 # has ever been marked idle. The connection pool can never be 100%
94 # reliable for detecting dead sockets, and all HTTP requests made by
95 # MogileFS are idempotent.
96 sub retryable {
97 my ($self, $reason) = @_;
98 return ($reason !~ /timeout/ && $self->{mfs_requests} > 0);
101 # Sets (or updates) the timeout of the connection
102 # timeout_key is "node_timeout" or "conn_timeout"
103 # clears the current timeout if timeout_key is undef
104 sub set_timeout {
105 my ($self, $timeout_key) = @_;
106 my $mfs_pool = $self->{mfs_pool};
108 $self->SetPostLoopCallback(undef);
109 if ($timeout_key) {
110 my $timeout;
112 if ($timeout_key =~ /\A[a-z_]+\z/) {
113 $timeout = MogileFS->config($timeout_key) || 2;
114 } else {
115 $timeout = $timeout_key;
116 $timeout_key = "timeout";
119 my $t0 = Time::HiRes::time();
120 $self->{mfs_expire} = $t0 + $timeout;
121 $self->{mfs_expire_cb} = sub {
122 my ($now) = @_;
123 my $elapsed = $now - $t0;
125 # for HTTP, this will fake an HTTP error response like LWP does
126 $self->err("$timeout_key: $timeout (elapsed: $elapsed)");
128 $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;
129 } else {
130 $self->{mfs_expire} = $self->{mfs_expire_cb} = undef;
131 $mfs_pool->register_timeout($self, undef) if $mfs_pool;
135 # returns the expiry time of the connection
136 sub expiry { $_[0]->{mfs_expire} }
138 # runs expiry callback and returns true if time is up,
139 # returns false if there is time remaining
140 sub expired {
141 my ($self, $now) = @_;
142 my $expire = $self->{mfs_expire} or return 0;
143 $now ||= Time::HiRes::time();
145 if ($now >= $expire) {
146 my $expire_cb = delete $self->{mfs_expire_cb};
147 if ($expire_cb && $self->sock) {
148 $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });
150 return 1;
152 return 0;
155 # may be overriden in subclass, called only on errors
156 # The HTTP version of this will fake an HTTP response for LWP compatibility
157 sub err {
158 my ($self, $close_reason) = @_;
160 $self->inflight_expire; # ensure we don't call new_err on eventual close()
162 if ($close_reason =~ /\A:event_(?:hup|err)\z/) {
163 # there's a chance this can be invoked while inflight,
164 # conn_drop will handle this case appropriately
165 $self->{mfs_pool}->conn_drop($self, $close_reason) if $self->{mfs_pool};
166 } else {
167 $self->close($close_reason);
171 # sets the pool this connection belongs to, only call from ConnectionPool
172 sub set_pool {
173 my ($self, $pool) = @_;
175 $self->{mfs_pool} = $pool;
178 # closes a connection, and may reschedule the inflight callback if
179 # close_reason is ":retry"
180 sub close {
181 my ($self, $close_reason) = @_;
183 delete $self->{mfs_expire_cb}; # avoid circular ref
185 my $mfs_pool = delete $self->{mfs_pool}; # avoid circular ref
186 my $inflight_cb;
188 if ($mfs_pool) {
189 $mfs_pool->schedule_queued;
190 $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);
192 $self->SUPER::close($close_reason); # Danga::Socket->close
194 if ($inflight_cb && $close_reason) {
195 if ($close_reason eq ":retry") {
196 my ($ip, $port) = $self->ip_port;
198 $mfs_pool->enqueue($ip, $port, $inflight_cb);
199 } else {
200 # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,
201 # EPIPE, or "write_error" after an initial (non-blocking)
202 # connect()
203 $mfs_pool->on_next_tick(sub {
204 ref($self)->new_err($close_reason || "error", $inflight_cb);
210 # Marks a connection as no-longer inflight. Calling this prevents retries.
211 sub inflight_expire {
212 my ($self) = @_;
213 my $mfs_pool = $self->{mfs_pool};
214 die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;
215 $mfs_pool->inflight_cb_expire($self);
218 # Danga::Socket callbacks
219 sub event_hup { $_[0]->err(':event_hup'); }
220 sub event_err { $_[0]->err(':event_err'); }
222 # called when we couldn't create a socket, but need to create an object
223 # anyways for errors (creating fake, LWP-style error responses)
224 sub new_err {
225 my ($class, $err, $start_cb) = @_;
226 my $self = fields::new($class);
227 $self->{mfs_err} = $err;
228 # on socket errors
229 $start_cb->($self);
232 # returns this connection back to its associated pool.
233 # Returns false if not successful (pool is full)
234 sub persist {
235 my ($self) = @_;
236 my $mfs_pool = $self->{mfs_pool};
238 return $mfs_pool ? $mfs_pool->conn_persist($self) : 0;