Merge branch 'master' into subfolders-8.3
[pyTivo.git] / Cheetah / Utils / memcache.py
blob03ba03267ecc88960a0a1daed3f95bc4fb4923fc
1 #!/usr/bin/env python
3 """
4 client module for memcached (memory cache daemon)
6 Overview
7 ========
9 See U{the MemCached homepage<http://www.danga.com/memcached>} for more about memcached.
11 Usage summary
12 =============
14 This should give you a feel for how this module operates::
16 import memcache
17 mc = memcache.Client(['127.0.0.1:11211'], debug=0)
19 mc.set("some_key", "Some value")
20 value = mc.get("some_key")
22 mc.set("another_key", 3)
23 mc.delete("another_key")
25 mc.set("key", "1") # note that the key used for incr/decr must be a string.
26 mc.incr("key")
27 mc.decr("key")
29 The standard way to use memcache with a database is like this::
31 key = derive_key(obj)
32 obj = mc.get(key)
33 if not obj:
34 obj = backend_api.get(...)
35 mc.set(key, obj)
37 # we now have obj, and future passes through this code
38 # will use the object from the cache.
40 Detailed Documentation
41 ======================
43 More detailed documentation is available in the L{Client} class.
44 """
46 import sys
47 import socket
48 import time
49 import types
50 try:
51 import cPickle as pickle
52 except ImportError:
53 import pickle
55 __author__ = "Evan Martin <martine@danga.com>"
56 __version__ = "1.2_tummy5"
57 __copyright__ = "Copyright (C) 2003 Danga Interactive"
58 __license__ = "Python"
60 class _Error(Exception):
61 pass
63 class Client:
64 """
65 Object representing a pool of memcache servers.
67 See L{memcache} for an overview.
69 In all cases where a key is used, the key can be either:
70 1. A simple hashable type (string, integer, etc.).
71 2. A tuple of C{(hashvalue, key)}. This is useful if you want to avoid
72 making this module calculate a hash value. You may prefer, for
73 example, to keep all of a given user's objects on the same memcache
74 server, so you could use the user's unique id as the hash value.
76 @group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog
77 @group Insertion: set, add, replace
78 @group Retrieval: get, get_multi
79 @group Integers: incr, decr
80 @group Removal: delete
81 @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\
82 set, add, replace, get, get_multi, incr, decr, delete
83 """
85 _usePickle = False
86 _FLAG_PICKLE = 1<<0
87 _FLAG_INTEGER = 1<<1
88 _FLAG_LONG = 1<<2
90 _SERVER_RETRIES = 10 # how many times to try finding a free server.
92 def __init__(self, servers, debug=0):
93 """
94 Create a new Client object with the given list of servers.
96 @param servers: C{servers} is passed to L{set_servers}.
97 @param debug: whether to display error messages when a server can't be
98 contacted.
99 """
100 self.set_servers(servers)
101 self.debug = debug
102 self.stats = {}
104 def set_servers(self, servers):
106 Set the pool of servers used by this client.
108 @param servers: an array of servers.
109 Servers can be passed in two forms:
110 1. Strings of the form C{"host:port"}, which implies a default weight of 1.
111 2. Tuples of the form C{("host:port", weight)}, where C{weight} is
112 an integer weight value.
114 self.servers = [_Host(s, self.debuglog) for s in servers]
115 self._init_buckets()
117 def get_stats(self):
118 '''Get statistics from each of the servers.
120 @return: A list of tuples ( server_identifier, stats_dictionary ).
121 The dictionary contains a number of name/value pairs specifying
122 the name of the status field and the string value associated with
123 it. The values are not converted from strings.
125 data = []
126 for s in self.servers:
127 if not s.connect(): continue
128 name = '%s:%s (%s)' % ( s.ip, s.port, s.weight )
129 s.send_cmd('stats')
130 serverData = {}
131 data.append(( name, serverData ))
132 readline = s.readline
133 while 1:
134 line = readline()
135 if not line or line.strip() == 'END': break
136 stats = line.split(' ', 2)
137 serverData[stats[1]] = stats[2]
139 return(data)
141 def flush_all(self):
142 'Expire all data currently in the memcache servers.'
143 for s in self.servers:
144 if not s.connect(): continue
145 s.send_cmd('flush_all')
146 s.expect("OK")
148 def debuglog(self, str):
149 if self.debug:
150 sys.stderr.write("MemCached: %s\n" % str)
152 def _statlog(self, func):
153 if not self.stats.has_key(func):
154 self.stats[func] = 1
155 else:
156 self.stats[func] += 1
158 def forget_dead_hosts(self):
160 Reset every host in the pool to an "alive" state.
162 for s in self.servers:
163 s.dead_until = 0
165 def _init_buckets(self):
166 self.buckets = []
167 for server in self.servers:
168 for i in range(server.weight):
169 self.buckets.append(server)
171 def _get_server(self, key):
172 if type(key) == types.TupleType:
173 serverhash = key[0]
174 key = key[1]
175 else:
176 serverhash = hash(key)
178 for i in range(Client._SERVER_RETRIES):
179 server = self.buckets[serverhash % len(self.buckets)]
180 if server.connect():
181 #print "(using server %s)" % server,
182 return server, key
183 serverhash = hash(str(serverhash) + str(i))
184 return None, None
186 def disconnect_all(self):
187 for s in self.servers:
188 s.close_socket()
190 def delete(self, key, time=0):
191 '''Deletes a key from the memcache.
193 @return: Nonzero on success.
194 @rtype: int
196 server, key = self._get_server(key)
197 if not server:
198 return 0
199 self._statlog('delete')
200 if time != None:
201 cmd = "delete %s %d" % (key, time)
202 else:
203 cmd = "delete %s" % key
205 try:
206 server.send_cmd(cmd)
207 server.expect("DELETED")
208 except socket.error, msg:
209 server.mark_dead(msg[1])
210 return 0
211 return 1
213 def incr(self, key, delta=1):
215 Sends a command to the server to atomically increment the value for C{key} by
216 C{delta}, or by 1 if C{delta} is unspecified. Returns None if C{key} doesn't
217 exist on server, otherwise it returns the new value after incrementing.
219 Note that the value for C{key} must already exist in the memcache, and it
220 must be the string representation of an integer.
222 >>> mc.set("counter", "20") # returns 1, indicating success
224 >>> mc.incr("counter")
226 >>> mc.incr("counter")
229 Overflow on server is not checked. Be aware of values approaching
230 2**32. See L{decr}.
232 @param delta: Integer amount to increment by (should be zero or greater).
233 @return: New value after incrementing.
234 @rtype: int
236 return self._incrdecr("incr", key, delta)
238 def decr(self, key, delta=1):
240 Like L{incr}, but decrements. Unlike L{incr}, underflow is checked and
241 new values are capped at 0. If server value is 1, a decrement of 2
242 returns 0, not -1.
244 @param delta: Integer amount to decrement by (should be zero or greater).
245 @return: New value after decrementing.
246 @rtype: int
248 return self._incrdecr("decr", key, delta)
250 def _incrdecr(self, cmd, key, delta):
251 server, key = self._get_server(key)
252 if not server:
253 return 0
254 self._statlog(cmd)
255 cmd = "%s %s %d" % (cmd, key, delta)
256 try:
257 server.send_cmd(cmd)
258 line = server.readline()
259 return int(line)
260 except socket.error, msg:
261 server.mark_dead(msg[1])
262 return None
264 def add(self, key, val, time=0):
266 Add new key with value.
268 Like L{set}, but only stores in memcache if the key doesn\'t already exist.
270 @return: Nonzero on success.
271 @rtype: int
273 return self._set("add", key, val, time)
274 def replace(self, key, val, time=0):
275 '''Replace existing key with value.
277 Like L{set}, but only stores in memcache if the key already exists.
278 The opposite of L{add}.
280 @return: Nonzero on success.
281 @rtype: int
283 return self._set("replace", key, val, time)
284 def set(self, key, val, time=0):
285 '''Unconditionally sets a key to a given value in the memcache.
287 The C{key} can optionally be an tuple, with the first element being the
288 hash value, if you want to avoid making this module calculate a hash value.
289 You may prefer, for example, to keep all of a given user's objects on the
290 same memcache server, so you could use the user's unique id as the hash
291 value.
293 @return: Nonzero on success.
294 @rtype: int
296 return self._set("set", key, val, time)
298 def _set(self, cmd, key, val, time):
299 server, key = self._get_server(key)
300 if not server:
301 return 0
303 self._statlog(cmd)
305 flags = 0
306 if isinstance(val, types.StringTypes):
307 pass
308 elif isinstance(val, int):
309 flags |= Client._FLAG_INTEGER
310 val = "%d" % val
311 elif isinstance(val, long):
312 flags |= Client._FLAG_LONG
313 val = "%d" % val
314 elif self._usePickle:
315 flags |= Client._FLAG_PICKLE
316 val = pickle.dumps(val, 2)
317 else:
318 pass
320 fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, flags, time, len(val), val)
321 try:
322 server.send_cmd(fullcmd)
323 server.expect("STORED")
324 except socket.error, msg:
325 server.mark_dead(msg[1])
326 return 0
327 return 1
329 def get(self, key):
330 '''Retrieves a key from the memcache.
332 @return: The value or None.
334 server, key = self._get_server(key)
335 if not server:
336 return None
338 self._statlog('get')
340 try:
341 server.send_cmd("get %s" % key)
342 rkey, flags, rlen, = self._expectvalue(server)
343 if not rkey:
344 return None
345 value = self._recv_value(server, flags, rlen)
346 server.expect("END")
347 except (_Error, socket.error), msg:
348 if type(msg) is types.TupleType:
349 msg = msg[1]
350 server.mark_dead(msg)
351 return None
352 return value
354 def get_multi(self, keys):
356 Retrieves multiple keys from the memcache doing just one query.
358 >>> success = mc.set("foo", "bar")
359 >>> success = mc.set("baz", 42)
360 >>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42}
363 This method is recommended over regular L{get} as it lowers the number of
364 total packets flying around your network, reducing total latency, since
365 your app doesn\'t have to wait for each round-trip of L{get} before sending
366 the next one.
368 @param keys: An array of keys.
369 @return: A dictionary of key/value pairs that were available.
373 self._statlog('get_multi')
375 server_keys = {}
377 # build up a list for each server of all the keys we want.
378 for key in keys:
379 server, key = self._get_server(key)
380 if not server:
381 continue
382 if not server_keys.has_key(server):
383 server_keys[server] = []
384 server_keys[server].append(key)
386 # send out all requests on each server before reading anything
387 dead_servers = []
388 for server in server_keys.keys():
389 try:
390 server.send_cmd("get %s" % " ".join(server_keys[server]))
391 except socket.error, msg:
392 server.mark_dead(msg[1])
393 dead_servers.append(server)
395 # if any servers died on the way, don't expect them to respond.
396 for server in dead_servers:
397 del server_keys[server]
399 retvals = {}
400 for server in server_keys.keys():
401 try:
402 line = server.readline()
403 while line and line != 'END':
404 rkey, flags, rlen = self._expectvalue(server, line)
405 # Bo Yang reports that this can sometimes be None
406 if rkey is not None:
407 val = self._recv_value(server, flags, rlen)
408 retvals[rkey] = val
409 line = server.readline()
410 except (_Error, socket.error), msg:
411 server.mark_dead(msg)
412 return retvals
414 def _expectvalue(self, server, line=None):
415 if not line:
416 line = server.readline()
418 if line[:5] == 'VALUE':
419 resp, rkey, flags, len = line.split()
420 flags = int(flags)
421 rlen = int(len)
422 return (rkey, flags, rlen)
423 else:
424 return (None, None, None)
426 def _recv_value(self, server, flags, rlen):
427 rlen += 2 # include \r\n
428 buf = server.recv(rlen)
429 if len(buf) != rlen:
430 raise _Error("received %d bytes when expecting %d" % (len(buf), rlen))
432 if len(buf) == rlen:
433 buf = buf[:-2] # strip \r\n
435 if flags == 0:
436 val = buf
437 elif flags & Client._FLAG_INTEGER:
438 val = int(buf)
439 elif flags & Client._FLAG_LONG:
440 val = long(buf)
441 elif self._usePickle and flags & Client._FLAG_PICKLE:
442 try:
443 val = pickle.loads(buf)
444 except:
445 self.debuglog('Pickle error...\n')
446 val = None
447 else:
448 self.debuglog("unknown flags on get: %x\n" % flags)
450 return val
452 class _Host:
453 _DEAD_RETRY = 30 # number of seconds before retrying a dead server.
455 def __init__(self, host, debugfunc=None):
456 if isinstance(host, types.TupleType):
457 host = host[0]
458 self.weight = host[1]
459 else:
460 self.weight = 1
462 if host.find(":") > 0:
463 self.ip, self.port = host.split(":")
464 self.port = int(self.port)
465 else:
466 self.ip, self.port = host, 11211
468 if not debugfunc:
469 debugfunc = lambda x: x
470 self.debuglog = debugfunc
472 self.deaduntil = 0
473 self.socket = None
475 def _check_dead(self):
476 if self.deaduntil and self.deaduntil > time.time():
477 return 1
478 self.deaduntil = 0
479 return 0
481 def connect(self):
482 if self._get_socket():
483 return 1
484 return 0
486 def mark_dead(self, reason):
487 self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason))
488 self.deaduntil = time.time() + _Host._DEAD_RETRY
489 self.close_socket()
491 def _get_socket(self):
492 if self._check_dead():
493 return None
494 if self.socket:
495 return self.socket
496 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
497 # Python 2.3-ism: s.settimeout(1)
498 try:
499 s.connect((self.ip, self.port))
500 except socket.error, msg:
501 self.mark_dead("connect: %s" % msg[1])
502 return None
503 self.socket = s
504 return s
506 def close_socket(self):
507 if self.socket:
508 self.socket.close()
509 self.socket = None
511 def send_cmd(self, cmd):
512 if len(cmd) > 100:
513 self.socket.sendall(cmd)
514 self.socket.sendall('\r\n')
515 else:
516 self.socket.sendall(cmd + '\r\n')
518 def readline(self):
519 buffers = ''
520 recv = self.socket.recv
521 while 1:
522 data = recv(1)
523 if not data:
524 self.mark_dead('Connection closed while reading from %s'
525 % repr(self))
526 break
527 if data == '\n' and buffers and buffers[-1] == '\r':
528 return(buffers[:-1])
529 buffers = buffers + data
530 return(buffers)
532 def expect(self, text):
533 line = self.readline()
534 if line != text:
535 self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, line))
536 return line
538 def recv(self, rlen):
539 buf = ''
540 recv = self.socket.recv
541 while len(buf) < rlen:
542 buf = buf + recv(rlen - len(buf))
543 return buf
545 def __str__(self):
546 d = ''
547 if self.deaduntil:
548 d = " (dead until %d)" % self.deaduntil
549 return "%s:%d%s" % (self.ip, self.port, d)
551 def _doctest():
552 import doctest, memcache
553 servers = ["127.0.0.1:11211"]
554 mc = Client(servers, debug=1)
555 globs = {"mc": mc}
556 return doctest.testmod(memcache, globs=globs)
558 if __name__ == "__main__":
559 print "Testing docstrings..."
560 _doctest()
561 print "Running tests:"
562 print
563 #servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
564 servers = ["127.0.0.1:11211"]
565 mc = Client(servers, debug=1)
567 def to_s(val):
568 if not isinstance(val, types.StringTypes):
569 return "%s (%s)" % (val, type(val))
570 return "%s" % val
571 def test_setget(key, val):
572 print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
573 mc.set(key, val)
574 newval = mc.get(key)
575 if newval == val:
576 print "OK"
577 return 1
578 else:
579 print "FAIL"
580 return 0
582 class FooStruct:
583 def __init__(self):
584 self.bar = "baz"
585 def __str__(self):
586 return "A FooStruct"
587 def __eq__(self, other):
588 if isinstance(other, FooStruct):
589 return self.bar == other.bar
590 return 0
592 test_setget("a_string", "some random string")
593 test_setget("an_integer", 42)
594 if test_setget("long", long(1<<30)):
595 print "Testing delete ...",
596 if mc.delete("long"):
597 print "OK"
598 else:
599 print "FAIL"
600 print "Testing get_multi ...",
601 print mc.get_multi(["a_string", "an_integer"])
603 print "Testing get(unknown value) ...",
604 print to_s(mc.get("unknown_value"))
606 f = FooStruct()
607 test_setget("foostruct", f)
609 print "Testing incr ...",
610 x = mc.incr("an_integer", 1)
611 if x == 43:
612 print "OK"
613 else:
614 print "FAIL"
616 print "Testing decr ...",
617 x = mc.decr("an_integer", 1)
618 if x == 42:
619 print "OK"
620 else:
621 print "FAIL"
625 # vim: ts=4 sw=4 et :