r1281@opsdev009 (orig r69409): mcslee | 2007-11-13 02:19:08 -0800
[amiethrift.git] / lib / php / src / transport / TSocketPool.php
blobbd28db65cd709492ba67dd720255447fd33b9f66
1 <?php
3 /**
4 * Copyright (c) 2006- Facebook
5 * Distributed under the Thrift Software License
7 * See accompanying file LICENSE or visit the Thrift site at:
8 * http://developers.facebook.com/thrift/
10 * @package thrift.transport
11 * @author Mark Slee <mcslee@facebook.com>
14 /** Inherits from Socket */
15 include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
17 /**
18 * This library makes use of APC cache to make hosts as down in a web
19 * environment. If you are running from the CLI or on a system without APC
20 * installed, then these null functions will step in and act like cache
21 * misses.
23 if (!function_exists('apc_fetch')) {
24 function apc_fetch($key) { return FALSE; }
25 function apc_store($key, $var, $ttl=0) { return FALSE; }
28 /**
29 * Sockets implementation of the TTransport interface that allows connection
30 * to a pool of servers.
32 * @package thrift.transport
33 * @author Mark Slee <mcslee@facebook.com>
35 class TSocketPool extends TSocket {
37 /**
38 * Remote servers. Array of associative arrays with 'host' and 'port' keys
40 private $servers_ = array();
42 /**
43 * How many times to retry each host in connect
45 * @var int
47 private $numRetries_ = 1;
49 /**
50 * Retry interval in seconds, how long to not try a host if it has been
51 * marked as down.
53 * @var int
55 private $retryInterval_ = 60;
57 /**
58 * Max consecutive failures before marking a host down.
60 * @var int
62 private $maxConsecutiveFailures_ = 1;
64 /**
65 * Try hosts in order? or Randomized?
67 * @var bool
69 private $randomize_ = TRUE;
71 /**
72 * Always try last host, even if marked down?
74 * @var bool
76 private $alwaysTryLast_ = TRUE;
78 /**
79 * Socket pool constructor
81 * @param array $hosts List of remote hostnames
82 * @param mixed $ports Array of remote ports, or a single common port
83 * @param bool $persist Whether to use a persistent socket
84 * @param mixed $debugHandler Function for error logging
86 public function __construct($hosts=array('localhost'),
87 $ports=array(9090),
88 $persist=FALSE,
89 $debugHandler=null) {
90 parent::__construct(null, 0, $persist, $debugHandler);
92 if (!is_array($ports)) {
93 $port = $ports;
94 $ports = array();
95 foreach ($hosts as $key => $val) {
96 $ports[$key] = $port;
100 foreach ($hosts as $key => $host) {
101 $this->servers_ []= array('host' => $host,
102 'port' => $ports[$key]);
107 * Add a server to the pool
109 * This function does not prevent you from adding a duplicate server entry.
111 * @param string $host hostname or IP
112 * @param int $port port
114 public function addServer($host, $port) {
115 $this->servers_[] = array('host' => $host, 'port' => $port);
119 * Sets how many time to keep retrying a host in the connect function.
121 * @param int $numRetries
123 public function setNumRetries($numRetries) {
124 $this->numRetries_ = $numRetries;
128 * Sets how long to wait until retrying a host if it was marked down
130 * @param int $numRetries
132 public function setRetryInterval($retryInterval) {
133 $this->retryInterval_ = $retryInterval;
137 * Sets how many time to keep retrying a host before marking it as down.
139 * @param int $numRetries
141 public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
142 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
146 * Turns randomization in connect order on or off.
148 * @param bool $randomize
150 public function setRandomize($randomize) {
151 $this->randomize_ = $randomize;
155 * Whether to always try the last server.
157 * @param bool $alwaysTryLast
159 public function setAlwaysTryLast($alwaysTryLast) {
160 $this->alwaysTryLast_ = $alwaysTryLast;
165 * Connects the socket by iterating through all the servers in the pool
166 * and trying to find one that works.
168 public function open() {
169 // Check if we want order randomization
170 if ($this->randomize_) {
171 shuffle($this->servers_);
174 // Count servers to identify the "last" one
175 $numServers = count($this->servers_);
177 for ($i = 0; $i < $numServers; ++$i) {
179 // This extracts the $host and $port variables
180 extract($this->servers_[$i]);
182 // Check APC cache for a record of this server being down
183 $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
185 // Cache miss? Assume it's OK
186 $lastFailtime = apc_fetch($failtimeKey);
187 if ($lastFailtime === FALSE) {
188 $lastFailtime = 0;
191 $retryIntervalPassed = FALSE;
193 // Cache hit...make sure enough the retry interval has elapsed
194 if ($lastFailtime > 0) {
195 $elapsed = time() - $lastFailtime;
196 if ($elapsed > $this->retryInterval_) {
197 $retryIntervalPassed = TRUE;
198 if ($this->debug_) {
199 call_user_func($this->debugHandler_,
200 'TSocketPool: retryInterval '.
201 '('.$this->retryInterval_.') '.
202 'has passed for host '.$host.':'.$port);
207 // Only connect if not in the middle of a fail interval, OR if this
208 // is the LAST server we are trying, just hammer away on it
209 $isLastServer = FALSE;
210 if ($this->alwaysTryLast_) {
211 $isLastServer = ($i == ($numServers - 1));
214 if (($lastFailtime === 0) ||
215 ($isLastServer) ||
216 ($lastFailtime > 0 && $retryIntervalPassed)) {
218 // Set underlying TSocket params to this one
219 $this->host_ = $host;
220 $this->port_ = $port;
222 // Try up to numRetries_ connections per server
223 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
224 try {
225 // Use the underlying TSocket open function
226 parent::open();
228 // Only clear the failure counts if required to do so
229 if ($lastFailtime > 0) {
230 apc_store($failtimeKey, 0);
233 // Successful connection, return now
234 return;
236 } catch (TException $tx) {
237 // Connection failed
241 // Mark failure of this host in the cache
242 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
244 // Ignore cache misses
245 $consecfails = apc_fetch($consecfailsKey);
246 if ($consecfails === FALSE) {
247 $consecfails = 0;
250 // Increment by one
251 $consecfails++;
253 // Log and cache this failure
254 if ($consecfails >= $this->maxConsecutiveFailures_) {
255 if ($this->debug_) {
256 call_user_func($this->debugHandler_,
257 'TSocketPool: marking '.$host.':'.$port.
258 ' as down for '.$this->retryInterval_.' secs '.
259 'after '.$consecfails.' failed attempts.');
261 // Store the failure time
262 apc_store($failtimeKey, time());
264 // Clear the count of consecutive failures
265 apc_store($consecfailsKey, 0);
266 } else {
267 apc_store($consecfailsKey, $consecfails);
272 // Holy shit we failed them all. The system is totally ill!
273 $error = 'TSocketPool: All hosts in pool are down. ';
274 $hosts = array();
275 foreach ($this->servers_ as $server) {
276 $hosts []= $server['host'].':'.$server['port'];
278 $hostlist = implode(',', $hosts);
279 $error .= '('.$hostlist.')';
280 if ($this->debug_) {
281 call_user_func($this->debugHandler_, $error);
283 throw new TException($error);