Added per socket monitoring for client executor pool. These monitoring reveals stats...
[voldemort/jeffpc.git] / src / java / voldemort / client / SocketStoreClientFactory.java
blob8da1bd71355035db5dd142e0f6ae8cadb8a92b03
1 /*
2 * Copyright 2008-2010 LinkedIn, Inc
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
17 package voldemort.client;
19 import static voldemort.cluster.failuredetector.FailureDetectorUtils.create;
21 import java.net.URI;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.TimeUnit;
27 import voldemort.VoldemortException;
28 import voldemort.client.protocol.RequestFormatType;
29 import voldemort.cluster.Node;
30 import voldemort.cluster.failuredetector.ClientStoreVerifier;
31 import voldemort.cluster.failuredetector.FailureDetector;
32 import voldemort.cluster.failuredetector.FailureDetectorConfig;
33 import voldemort.cluster.failuredetector.FailureDetectorListener;
34 import voldemort.server.RequestRoutingType;
35 import voldemort.store.Store;
36 import voldemort.store.metadata.MetadataStore;
37 import voldemort.store.socket.SocketDestination;
38 import voldemort.store.socket.SocketStoreFactory;
39 import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
40 import voldemort.utils.ByteArray;
41 import voldemort.versioning.InconsistencyResolver;
42 import voldemort.versioning.Versioned;
44 /**
45 * A StoreClientFactory abstracts away the connection pooling, threading, and
46 * bootstrapping mechanism. It can be used to create any number of
47 * {@link voldemort.client.StoreClient StoreClient} instances for different
48 * stores.
52 public class SocketStoreClientFactory extends AbstractStoreClientFactory {
54 public static final String URL_SCHEME = "tcp";
56 private final SocketStoreFactory storeFactory;
57 private FailureDetectorListener failureDetectorListener;
58 private final RequestRoutingType requestRoutingType;
60 public SocketStoreClientFactory(ClientConfig config) {
61 super(config);
62 this.requestRoutingType = RequestRoutingType.getRequestRoutingType(RoutingTier.SERVER.equals(config.getRoutingTier()),
63 false);
65 this.storeFactory = new ClientRequestExecutorPool(config.getSelectors(),
66 config.getMaxConnectionsPerNode(),
67 config.getConnectionTimeout(TimeUnit.MILLISECONDS),
68 config.getSocketTimeout(TimeUnit.MILLISECONDS),
69 config.getSocketBufferSize(),
70 config.getSocketKeepAlive());
71 if(config.isJmxEnabled()) {
72 ((ClientRequestExecutorPool) storeFactory).registerJmx();
76 @Override
77 public <K, V> StoreClient<K, V> getStoreClient(final String storeName,
78 final InconsistencyResolver<Versioned<V>> resolver) {
79 if(getConfig().isLazyEnabled())
80 return new LazyStoreClient<K, V>(new Callable<StoreClient<K, V>>() {
82 public StoreClient<K, V> call() throws Exception {
83 return getParentStoreClient(storeName, resolver);
85 });
87 return getParentStoreClient(storeName, resolver);
90 private <K, V> StoreClient<K, V> getParentStoreClient(String storeName,
91 InconsistencyResolver<Versioned<V>> resolver) {
92 return super.getStoreClient(storeName, resolver);
95 @Override
96 protected List<Versioned<String>> getRemoteMetadata(String key, URI url) {
97 try {
98 return super.getRemoteMetadata(key, url);
99 } catch(VoldemortException e) {
100 // Fix SNA-4227: When an error occurs during bootstrap, close the
101 // socket
102 SocketDestination destination = new SocketDestination(url.getHost(),
103 url.getPort(),
104 getRequestFormatType());
105 storeFactory.close(destination);
106 throw new VoldemortException(e);
110 @Override
111 protected Store<ByteArray, byte[], byte[]> getStore(String storeName,
112 String host,
113 int port,
114 RequestFormatType type) {
115 return storeFactory.create(storeName, host, port, type, requestRoutingType);
118 @Override
119 protected FailureDetector initFailureDetector(final ClientConfig config,
120 final Collection<Node> nodes) {
121 failureDetectorListener = new FailureDetectorListener() {
123 public void nodeAvailable(Node node) {
127 public void nodeUnavailable(Node node) {
128 if(logger.isInfoEnabled())
129 logger.info(node + " has been marked as unavailable, destroying socket pool");
131 // Kill the socket pool for this node...
132 SocketDestination destination = new SocketDestination(node.getHost(),
133 node.getSocketPort(),
134 config.getRequestFormatType());
135 storeFactory.close(destination);
140 ClientStoreVerifier storeVerifier = new ClientStoreVerifier() {
142 @Override
143 protected Store<ByteArray, byte[], byte[]> getStoreInternal(Node node) {
144 return SocketStoreClientFactory.this.getStore(MetadataStore.METADATA_STORE_NAME,
145 node.getHost(),
146 node.getSocketPort(),
147 config.getRequestFormatType());
152 FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setNodes(nodes)
153 .setStoreVerifier(storeVerifier);
155 return create(failureDetectorConfig, true, failureDetectorListener);
158 @Override
159 protected int getPort(Node node) {
160 return node.getSocketPort();
163 @Override
164 protected void validateUrl(URI url) {
165 if(!URL_SCHEME.equals(url.getScheme()))
166 throw new IllegalArgumentException("Illegal scheme in bootstrap URL for SocketStoreClientFactory:"
167 + " expected '"
168 + URL_SCHEME
169 + "' but found '"
170 + url.getScheme() + "'.");
173 @Override
174 public void close() {
175 this.storeFactory.close();
176 if(failureDetector != null)
177 this.failureDetector.removeFailureDetectorListener(failureDetectorListener);
179 super.close();