4 * Copyright (c) 2006- Facebook
5 * Distributed under the Thrift Software License
7 * See accompanying file LICENSE or visit the Thrift site at:
8 * http://developers.facebook.com/thrift/
10 * @package thrift.transport
11 * @author Mark Slee <mcslee@facebook.com>
14 /** Inherits from Socket */
15 include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
18 * This library makes use of APC cache to make hosts as down in a web
19 * environment. If you are running from the CLI or on a system without APC
20 * installed, then these null functions will step in and act like cache
23 if (!function_exists('apc_fetch')) {
24 function apc_fetch($key) { return FALSE; }
25 function apc_store($key, $var, $ttl=0) { return FALSE; }
29 * Sockets implementation of the TTransport interface that allows connection
30 * to a pool of servers.
32 * @package thrift.transport
33 * @author Mark Slee <mcslee@facebook.com>
35 class TSocketPool
extends TSocket
{
38 * Remote servers. Array of associative arrays with 'host' and 'port' keys
40 private $servers_ = array();
43 * How many times to retry each host in connect
47 private $numRetries_ = 1;
50 * Retry interval in seconds, how long to not try a host if it has been
55 private $retryInterval_ = 60;
58 * Max consecutive failures before marking a host down.
62 private $maxConsecutiveFailures_ = 1;
65 * Try hosts in order? or Randomized?
69 private $randomize_ = TRUE;
72 * Always try last host, even if marked down?
76 private $alwaysTryLast_ = TRUE;
79 * Socket pool constructor
81 * @param array $hosts List of remote hostnames
82 * @param mixed $ports Array of remote ports, or a single common port
83 * @param bool $persist Whether to use a persistent socket
84 * @param mixed $debugHandler Function for error logging
86 public function __construct($hosts=array('localhost'),
90 parent
::__construct(null, 0, $persist, $debugHandler);
92 if (!is_array($ports)) {
95 foreach ($hosts as $key => $val) {
100 foreach ($hosts as $key => $host) {
101 $this->servers_
[]= array('host' => $host,
102 'port' => $ports[$key]);
107 * Add a server to the pool
109 * This function does not prevent you from adding a duplicate server entry.
111 * @param string $host hostname or IP
112 * @param int $port port
114 public function addServer($host, $port) {
115 $this->servers_
[] = array('host' => $host, 'port' => $port);
119 * Sets how many time to keep retrying a host in the connect function.
121 * @param int $numRetries
123 public function setNumRetries($numRetries) {
124 $this->numRetries_
= $numRetries;
128 * Sets how long to wait until retrying a host if it was marked down
130 * @param int $numRetries
132 public function setRetryInterval($retryInterval) {
133 $this->retryInterval_
= $retryInterval;
137 * Sets how many time to keep retrying a host before marking it as down.
139 * @param int $numRetries
141 public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
142 $this->maxConsecutiveFailures_
= $maxConsecutiveFailures;
146 * Turns randomization in connect order on or off.
148 * @param bool $randomize
150 public function setRandomize($randomize) {
151 $this->randomize_
= $randomize;
155 * Whether to always try the last server.
157 * @param bool $alwaysTryLast
159 public function setAlwaysTryLast($alwaysTryLast) {
160 $this->alwaysTryLast_
= $alwaysTryLast;
165 * Connects the socket by iterating through all the servers in the pool
166 * and trying to find one that works.
168 public function open() {
169 // Check if we want order randomization
170 if ($this->randomize_
) {
171 shuffle($this->servers_
);
174 // Count servers to identify the "last" one
175 $numServers = count($this->servers_
);
177 for ($i = 0; $i < $numServers; ++
$i) {
179 // This extracts the $host and $port variables
180 extract($this->servers_
[$i]);
182 // Check APC cache for a record of this server being down
183 $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
185 // Cache miss? Assume it's OK
186 $lastFailtime = apc_fetch($failtimeKey);
187 if ($lastFailtime === FALSE) {
191 $retryIntervalPassed = FALSE;
193 // Cache hit...make sure enough the retry interval has elapsed
194 if ($lastFailtime > 0) {
195 $elapsed = time() - $lastFailtime;
196 if ($elapsed > $this->retryInterval_
) {
197 $retryIntervalPassed = TRUE;
199 call_user_func($this->debugHandler_
,
200 'TSocketPool: retryInterval '.
201 '('.$this->retryInterval_
.') '.
202 'has passed for host '.$host.':'.$port);
207 // Only connect if not in the middle of a fail interval, OR if this
208 // is the LAST server we are trying, just hammer away on it
209 $isLastServer = FALSE;
210 if ($this->alwaysTryLast_
) {
211 $isLastServer = ($i == ($numServers - 1));
214 if (($lastFailtime === 0) ||
216 ($lastFailtime > 0 && $retryIntervalPassed)) {
218 // Set underlying TSocket params to this one
219 $this->host_
= $host;
220 $this->port_
= $port;
222 // Try up to numRetries_ connections per server
223 for ($attempt = 0; $attempt < $this->numRetries_
; $attempt++
) {
225 // Use the underlying TSocket open function
228 // Only clear the failure counts if required to do so
229 if ($lastFailtime > 0) {
230 apc_store($failtimeKey, 0);
233 // Successful connection, return now
236 } catch (TException
$tx) {
241 // Mark failure of this host in the cache
242 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
244 // Ignore cache misses
245 $consecfails = apc_fetch($consecfailsKey);
246 if ($consecfails === FALSE) {
253 // Log and cache this failure
254 if ($consecfails >= $this->maxConsecutiveFailures_
) {
256 call_user_func($this->debugHandler_
,
257 'TSocketPool: marking '.$host.':'.$port.
258 ' as down for '.$this->retryInterval_
.' secs '.
259 'after '.$consecfails.' failed attempts.');
261 // Store the failure time
262 apc_store($failtimeKey, time());
264 // Clear the count of consecutive failures
265 apc_store($consecfailsKey, 0);
267 apc_store($consecfailsKey, $consecfails);
272 // Holy shit we failed them all. The system is totally ill!
273 $error = 'TSocketPool: All hosts in pool are down. ';
275 foreach ($this->servers_
as $server) {
276 $hosts []= $server['host'].':'.$server['port'];
278 $hostlist = implode(',', $hosts);
279 $error .= '('.$hostlist.')';
281 call_user_func($this->debugHandler_
, $error);
283 throw new TException($error);