4 client module for memcached (memory cache daemon)
9 See U{the MemCached homepage<http://www.danga.com/memcached>} for more about memcached.
14 This should give you a feel for how this module operates::
17 mc = memcache.Client(['127.0.0.1:11211'], debug=0)
19 mc.set("some_key", "Some value")
20 value = mc.get("some_key")
22 mc.set("another_key", 3)
23 mc.delete("another_key")
25 mc.set("key", "1") # note that the key used for incr/decr must be a string.
29 The standard way to use memcache with a database is like this::
34 obj = backend_api.get(...)
37 # we now have obj, and future passes through this code
38 # will use the object from the cache.
40 Detailed Documentation
41 ======================
43 More detailed documentation is available in the L{Client} class.
51 import cPickle
as pickle
55 __author__
= "Evan Martin <martine@danga.com>"
56 __version__
= "1.2_tummy5"
57 __copyright__
= "Copyright (C) 2003 Danga Interactive"
58 __license__
= "Python"
60 class _Error(Exception):
65 Object representing a pool of memcache servers.
67 See L{memcache} for an overview.
69 In all cases where a key is used, the key can be either:
70 1. A simple hashable type (string, integer, etc.).
71 2. A tuple of C{(hashvalue, key)}. This is useful if you want to avoid
72 making this module calculate a hash value. You may prefer, for
73 example, to keep all of a given user's objects on the same memcache
74 server, so you could use the user's unique id as the hash value.
76 @group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog
77 @group Insertion: set, add, replace
78 @group Retrieval: get, get_multi
79 @group Integers: incr, decr
80 @group Removal: delete
81 @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\
82 set, add, replace, get, get_multi, incr, decr, delete
90 _SERVER_RETRIES
= 10 # how many times to try finding a free server.
92 def __init__(self
, servers
, debug
=0):
94 Create a new Client object with the given list of servers.
96 @param servers: C{servers} is passed to L{set_servers}.
97 @param debug: whether to display error messages when a server can't be
100 self
.set_servers(servers
)
104 def set_servers(self
, servers
):
106 Set the pool of servers used by this client.
108 @param servers: an array of servers.
109 Servers can be passed in two forms:
110 1. Strings of the form C{"host:port"}, which implies a default weight of 1.
111 2. Tuples of the form C{("host:port", weight)}, where C{weight} is
112 an integer weight value.
114 self
.servers
= [_Host(s
, self
.debuglog
) for s
in servers
]
118 '''Get statistics from each of the servers.
120 @return: A list of tuples ( server_identifier, stats_dictionary ).
121 The dictionary contains a number of name/value pairs specifying
122 the name of the status field and the string value associated with
123 it. The values are not converted from strings.
126 for s
in self
.servers
:
127 if not s
.connect(): continue
128 name
= '%s:%s (%s)' % ( s
.ip
, s
.port
, s
.weight
)
131 data
.append(( name
, serverData
))
132 readline
= s
.readline
135 if not line
or line
.strip() == 'END': break
136 stats
= line
.split(' ', 2)
137 serverData
[stats
[1]] = stats
[2]
142 'Expire all data currently in the memcache servers.'
143 for s
in self
.servers
:
144 if not s
.connect(): continue
145 s
.send_cmd('flush_all')
148 def debuglog(self
, str):
150 sys
.stderr
.write("MemCached: %s\n" % str)
152 def _statlog(self
, func
):
153 if not self
.stats
.has_key(func
):
156 self
.stats
[func
] += 1
158 def forget_dead_hosts(self
):
160 Reset every host in the pool to an "alive" state.
162 for s
in self
.servers
:
165 def _init_buckets(self
):
167 for server
in self
.servers
:
168 for i
in range(server
.weight
):
169 self
.buckets
.append(server
)
171 def _get_server(self
, key
):
172 if type(key
) == types
.TupleType
:
176 serverhash
= hash(key
)
178 for i
in range(Client
._SERVER
_RETRIES
):
179 server
= self
.buckets
[serverhash
% len(self
.buckets
)]
181 #print "(using server %s)" % server,
183 serverhash
= hash(str(serverhash
) + str(i
))
186 def disconnect_all(self
):
187 for s
in self
.servers
:
190 def delete(self
, key
, time
=0):
191 '''Deletes a key from the memcache.
193 @return: Nonzero on success.
196 server
, key
= self
._get
_server
(key
)
199 self
._statlog
('delete')
201 cmd
= "delete %s %d" % (key
, time
)
203 cmd
= "delete %s" % key
207 server
.expect("DELETED")
208 except socket
.error
, msg
:
209 server
.mark_dead(msg
[1])
213 def incr(self
, key
, delta
=1):
215 Sends a command to the server to atomically increment the value for C{key} by
216 C{delta}, or by 1 if C{delta} is unspecified. Returns None if C{key} doesn't
217 exist on server, otherwise it returns the new value after incrementing.
219 Note that the value for C{key} must already exist in the memcache, and it
220 must be the string representation of an integer.
222 >>> mc.set("counter", "20") # returns 1, indicating success
224 >>> mc.incr("counter")
226 >>> mc.incr("counter")
229 Overflow on server is not checked. Be aware of values approaching
232 @param delta: Integer amount to increment by (should be zero or greater).
233 @return: New value after incrementing.
236 return self
._incrdecr
("incr", key
, delta
)
238 def decr(self
, key
, delta
=1):
240 Like L{incr}, but decrements. Unlike L{incr}, underflow is checked and
241 new values are capped at 0. If server value is 1, a decrement of 2
244 @param delta: Integer amount to decrement by (should be zero or greater).
245 @return: New value after decrementing.
248 return self
._incrdecr
("decr", key
, delta
)
250 def _incrdecr(self
, cmd
, key
, delta
):
251 server
, key
= self
._get
_server
(key
)
255 cmd
= "%s %s %d" % (cmd
, key
, delta
)
258 line
= server
.readline()
260 except socket
.error
, msg
:
261 server
.mark_dead(msg
[1])
264 def add(self
, key
, val
, time
=0):
266 Add new key with value.
268 Like L{set}, but only stores in memcache if the key doesn\'t already exist.
270 @return: Nonzero on success.
273 return self
._set
("add", key
, val
, time
)
274 def replace(self
, key
, val
, time
=0):
275 '''Replace existing key with value.
277 Like L{set}, but only stores in memcache if the key already exists.
278 The opposite of L{add}.
280 @return: Nonzero on success.
283 return self
._set
("replace", key
, val
, time
)
284 def set(self
, key
, val
, time
=0):
285 '''Unconditionally sets a key to a given value in the memcache.
287 The C{key} can optionally be an tuple, with the first element being the
288 hash value, if you want to avoid making this module calculate a hash value.
289 You may prefer, for example, to keep all of a given user's objects on the
290 same memcache server, so you could use the user's unique id as the hash
293 @return: Nonzero on success.
296 return self
._set
("set", key
, val
, time
)
298 def _set(self
, cmd
, key
, val
, time
):
299 server
, key
= self
._get
_server
(key
)
306 if isinstance(val
, types
.StringTypes
):
308 elif isinstance(val
, int):
309 flags |
= Client
._FLAG
_INTEGER
311 elif isinstance(val
, long):
312 flags |
= Client
._FLAG
_LONG
314 elif self
._usePickle
:
315 flags |
= Client
._FLAG
_PICKLE
316 val
= pickle
.dumps(val
, 2)
320 fullcmd
= "%s %s %d %d %d\r\n%s" % (cmd
, key
, flags
, time
, len(val
), val
)
322 server
.send_cmd(fullcmd
)
323 server
.expect("STORED")
324 except socket
.error
, msg
:
325 server
.mark_dead(msg
[1])
330 '''Retrieves a key from the memcache.
332 @return: The value or None.
334 server
, key
= self
._get
_server
(key
)
341 server
.send_cmd("get %s" % key
)
342 rkey
, flags
, rlen
, = self
._expectvalue
(server
)
345 value
= self
._recv
_value
(server
, flags
, rlen
)
347 except (_Error
, socket
.error
), msg
:
348 if type(msg
) is types
.TupleType
:
350 server
.mark_dead(msg
)
354 def get_multi(self
, keys
):
356 Retrieves multiple keys from the memcache doing just one query.
358 >>> success = mc.set("foo", "bar")
359 >>> success = mc.set("baz", 42)
360 >>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42}
363 This method is recommended over regular L{get} as it lowers the number of
364 total packets flying around your network, reducing total latency, since
365 your app doesn\'t have to wait for each round-trip of L{get} before sending
368 @param keys: An array of keys.
369 @return: A dictionary of key/value pairs that were available.
373 self
._statlog
('get_multi')
377 # build up a list for each server of all the keys we want.
379 server
, key
= self
._get
_server
(key
)
382 if not server_keys
.has_key(server
):
383 server_keys
[server
] = []
384 server_keys
[server
].append(key
)
386 # send out all requests on each server before reading anything
388 for server
in server_keys
.keys():
390 server
.send_cmd("get %s" % " ".join(server_keys
[server
]))
391 except socket
.error
, msg
:
392 server
.mark_dead(msg
[1])
393 dead_servers
.append(server
)
395 # if any servers died on the way, don't expect them to respond.
396 for server
in dead_servers
:
397 del server_keys
[server
]
400 for server
in server_keys
.keys():
402 line
= server
.readline()
403 while line
and line
!= 'END':
404 rkey
, flags
, rlen
= self
._expectvalue
(server
, line
)
405 # Bo Yang reports that this can sometimes be None
407 val
= self
._recv
_value
(server
, flags
, rlen
)
409 line
= server
.readline()
410 except (_Error
, socket
.error
), msg
:
411 server
.mark_dead(msg
)
414 def _expectvalue(self
, server
, line
=None):
416 line
= server
.readline()
418 if line
[:5] == 'VALUE':
419 resp
, rkey
, flags
, len = line
.split()
422 return (rkey
, flags
, rlen
)
424 return (None, None, None)
426 def _recv_value(self
, server
, flags
, rlen
):
427 rlen
+= 2 # include \r\n
428 buf
= server
.recv(rlen
)
430 raise _Error("received %d bytes when expecting %d" % (len(buf
), rlen
))
433 buf
= buf
[:-2] # strip \r\n
437 elif flags
& Client
._FLAG
_INTEGER
:
439 elif flags
& Client
._FLAG
_LONG
:
441 elif self
._usePickle
and flags
& Client
._FLAG
_PICKLE
:
443 val
= pickle
.loads(buf
)
445 self
.debuglog('Pickle error...\n')
448 self
.debuglog("unknown flags on get: %x\n" % flags
)
453 _DEAD_RETRY
= 30 # number of seconds before retrying a dead server.
455 def __init__(self
, host
, debugfunc
=None):
456 if isinstance(host
, types
.TupleType
):
458 self
.weight
= host
[1]
462 if host
.find(":") > 0:
463 self
.ip
, self
.port
= host
.split(":")
464 self
.port
= int(self
.port
)
466 self
.ip
, self
.port
= host
, 11211
469 debugfunc
= lambda x
: x
470 self
.debuglog
= debugfunc
475 def _check_dead(self
):
476 if self
.deaduntil
and self
.deaduntil
> time
.time():
482 if self
._get
_socket
():
486 def mark_dead(self
, reason
):
487 self
.debuglog("MemCache: %s: %s. Marking dead." % (self
, reason
))
488 self
.deaduntil
= time
.time() + _Host
._DEAD
_RETRY
491 def _get_socket(self
):
492 if self
._check
_dead
():
496 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
497 # Python 2.3-ism: s.settimeout(1)
499 s
.connect((self
.ip
, self
.port
))
500 except socket
.error
, msg
:
501 self
.mark_dead("connect: %s" % msg
[1])
506 def close_socket(self
):
511 def send_cmd(self
, cmd
):
513 self
.socket
.sendall(cmd
)
514 self
.socket
.sendall('\r\n')
516 self
.socket
.sendall(cmd
+ '\r\n')
520 recv
= self
.socket
.recv
524 self
.mark_dead('Connection closed while reading from %s'
527 if data
== '\n' and buffers
and buffers
[-1] == '\r':
529 buffers
= buffers
+ data
532 def expect(self
, text
):
533 line
= self
.readline()
535 self
.debuglog("while expecting '%s', got unexpected response '%s'" % (text
, line
))
538 def recv(self
, rlen
):
540 recv
= self
.socket
.recv
541 while len(buf
) < rlen
:
542 buf
= buf
+ recv(rlen
- len(buf
))
548 d
= " (dead until %d)" % self
.deaduntil
549 return "%s:%d%s" % (self
.ip
, self
.port
, d
)
552 import doctest
, memcache
553 servers
= ["127.0.0.1:11211"]
554 mc
= Client(servers
, debug
=1)
556 return doctest
.testmod(memcache
, globs
=globs
)
558 if __name__
== "__main__":
559 print "Testing docstrings..."
561 print "Running tests:"
563 #servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
564 servers
= ["127.0.0.1:11211"]
565 mc
= Client(servers
, debug
=1)
568 if not isinstance(val
, types
.StringTypes
):
569 return "%s (%s)" % (val
, type(val
))
571 def test_setget(key
, val
):
572 print "Testing set/get {'%s': %s} ..." % (to_s(key
), to_s(val
)),
587 def __eq__(self
, other
):
588 if isinstance(other
, FooStruct
):
589 return self
.bar
== other
.bar
592 test_setget("a_string", "some random string")
593 test_setget("an_integer", 42)
594 if test_setget("long", long(1<<30)):
595 print "Testing delete ...",
596 if mc
.delete("long"):
600 print "Testing get_multi ...",
601 print mc
.get_multi(["a_string", "an_integer"])
603 print "Testing get(unknown value) ...",
604 print to_s(mc
.get("unknown_value"))
607 test_setget("foostruct", f
)
609 print "Testing incr ...",
610 x
= mc
.incr("an_integer", 1)
616 print "Testing decr ...",
617 x
= mc
.decr("an_integer", 1)
625 # vim: ts=4 sw=4 et :