2 * Copyright 2008-2010 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
17 package voldemort
.client
;
19 import static voldemort
.cluster
.failuredetector
.FailureDetectorUtils
.create
;
22 import java
.util
.Collection
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.Callable
;
25 import java
.util
.concurrent
.TimeUnit
;
27 import voldemort
.VoldemortException
;
28 import voldemort
.client
.protocol
.RequestFormatType
;
29 import voldemort
.cluster
.Node
;
30 import voldemort
.cluster
.failuredetector
.ClientStoreVerifier
;
31 import voldemort
.cluster
.failuredetector
.FailureDetector
;
32 import voldemort
.cluster
.failuredetector
.FailureDetectorConfig
;
33 import voldemort
.cluster
.failuredetector
.FailureDetectorListener
;
34 import voldemort
.server
.RequestRoutingType
;
35 import voldemort
.store
.Store
;
36 import voldemort
.store
.metadata
.MetadataStore
;
37 import voldemort
.store
.socket
.SocketDestination
;
38 import voldemort
.store
.socket
.SocketStoreFactory
;
39 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
40 import voldemort
.utils
.ByteArray
;
41 import voldemort
.versioning
.InconsistencyResolver
;
42 import voldemort
.versioning
.Versioned
;
45 * A StoreClientFactory abstracts away the connection pooling, threading, and
46 * bootstrapping mechanism. It can be used to create any number of
47 * {@link voldemort.client.StoreClient StoreClient} instances for different
52 public class SocketStoreClientFactory
extends AbstractStoreClientFactory
{
54 public static final String URL_SCHEME
= "tcp";
56 private final SocketStoreFactory storeFactory
;
57 private FailureDetectorListener failureDetectorListener
;
58 private final RequestRoutingType requestRoutingType
;
60 public SocketStoreClientFactory(ClientConfig config
) {
62 this.requestRoutingType
= RequestRoutingType
.getRequestRoutingType(RoutingTier
.SERVER
.equals(config
.getRoutingTier()),
65 this.storeFactory
= new ClientRequestExecutorPool(config
.getSelectors(),
66 config
.getMaxConnectionsPerNode(),
67 config
.getConnectionTimeout(TimeUnit
.MILLISECONDS
),
68 config
.getSocketTimeout(TimeUnit
.MILLISECONDS
),
69 config
.getSocketBufferSize(),
70 config
.getSocketKeepAlive());
71 if(config
.isJmxEnabled()) {
72 ((ClientRequestExecutorPool
) storeFactory
).registerJmx();
77 public <K
, V
> StoreClient
<K
, V
> getStoreClient(final String storeName
,
78 final InconsistencyResolver
<Versioned
<V
>> resolver
) {
79 if(getConfig().isLazyEnabled())
80 return new LazyStoreClient
<K
, V
>(new Callable
<StoreClient
<K
, V
>>() {
82 public StoreClient
<K
, V
> call() throws Exception
{
83 return getParentStoreClient(storeName
, resolver
);
87 return getParentStoreClient(storeName
, resolver
);
90 private <K
, V
> StoreClient
<K
, V
> getParentStoreClient(String storeName
,
91 InconsistencyResolver
<Versioned
<V
>> resolver
) {
92 return super.getStoreClient(storeName
, resolver
);
96 protected List
<Versioned
<String
>> getRemoteMetadata(String key
, URI url
) {
98 return super.getRemoteMetadata(key
, url
);
99 } catch(VoldemortException e
) {
100 // Fix SNA-4227: When an error occurs during bootstrap, close the
102 SocketDestination destination
= new SocketDestination(url
.getHost(),
104 getRequestFormatType());
105 storeFactory
.close(destination
);
106 throw new VoldemortException(e
);
111 protected Store
<ByteArray
, byte[], byte[]> getStore(String storeName
,
114 RequestFormatType type
) {
115 return storeFactory
.create(storeName
, host
, port
, type
, requestRoutingType
);
119 protected FailureDetector
initFailureDetector(final ClientConfig config
,
120 final Collection
<Node
> nodes
) {
121 failureDetectorListener
= new FailureDetectorListener() {
123 public void nodeAvailable(Node node
) {
127 public void nodeUnavailable(Node node
) {
128 if(logger
.isInfoEnabled())
129 logger
.info(node
+ " has been marked as unavailable, destroying socket pool");
131 // Kill the socket pool for this node...
132 SocketDestination destination
= new SocketDestination(node
.getHost(),
133 node
.getSocketPort(),
134 config
.getRequestFormatType());
135 storeFactory
.close(destination
);
140 ClientStoreVerifier storeVerifier
= new ClientStoreVerifier() {
143 protected Store
<ByteArray
, byte[], byte[]> getStoreInternal(Node node
) {
144 return SocketStoreClientFactory
.this.getStore(MetadataStore
.METADATA_STORE_NAME
,
146 node
.getSocketPort(),
147 config
.getRequestFormatType());
152 FailureDetectorConfig failureDetectorConfig
= new FailureDetectorConfig(config
).setNodes(nodes
)
153 .setStoreVerifier(storeVerifier
);
155 return create(failureDetectorConfig
, true, failureDetectorListener
);
159 protected int getPort(Node node
) {
160 return node
.getSocketPort();
164 protected void validateUrl(URI url
) {
165 if(!URL_SCHEME
.equals(url
.getScheme()))
166 throw new IllegalArgumentException("Illegal scheme in bootstrap URL for SocketStoreClientFactory:"
170 + url
.getScheme() + "'.");
174 public void close() {
175 this.storeFactory
.close();
176 if(failureDetector
!= null)
177 this.failureDetector
.removeFailureDetectorListener(failureDetectorListener
);