added SQLTable pickle test
[pygr.git] / pygr / coordinator.py
blobb53a50e6158cf63738074c00d2c68cf72c6b14e6
1 from __future__ import generators
2 import os
3 import time
4 import thread
5 import sys
6 import xmlrpclib
7 import traceback
8 from SimpleXMLRPCServer import SimpleXMLRPCServer
9 import socket
11 import dbfile
12 import logging
14 def get_hostname(host=None):
15 'get FQDN for host, or current host if not specified'
16 if host is None:
17 host=socket.gethostname()
18 try:
19 return socket.gethostbyaddr(host)[0]
20 except socket.herror: # DNS CAN'T RESOLVE HOSTNAME
21 return host # JUST USE HOSTNAME AS REPORTED BY gethostname()
23 def get_server(host, port, logRequests=False):
24 """Start xmlrpc server on requested host:port.
26 Return bound SimpleXMLRPCServer server obj and port it's bound to.
28 Set port=0 to bind to a random port number.
29 """
30 if host is None: # use localhost as default
31 host='localhost'
32 server = SimpleXMLRPCServer((host, port), logRequests=logRequests)
33 port = server.socket.getsockname()[1]
34 logging.info("Running XMLRPC server on port %d..." % port)
35 return server, port
38 class XMLRPCClientObject(object):
39 'provides object proxy for remote object, with methods that mirror its xmlrpc_methods'
40 def __init__(self,server,name,methodDict):
41 self.name=name
42 self.server=server
43 import new
44 class methodcall(object):
45 def __init__(self,name):
46 self.name=name
47 def __call__(self,obj,*args):
48 return obj.server.server.methodCall(obj.name,self.name,args)
49 for methodName in methodDict: # CREATE METHODS TO ACCESS REMOTE OBJECT'S METHODS
50 setattr(self,methodName,new.instancemethod(methodcall(methodName),self,self.__class__))
52 class XMLRPCClient(dict):
53 'interface to XMLRPC server serving multiple named objects'
54 def __init__(self,url):
55 self.server=xmlrpclib.ServerProxy(url)
56 def __getitem__(self,name):
57 'get connection to the named server object'
58 try:
59 return dict.__getitem__(self,name)
60 except KeyError:
61 methodDict=self.server.objectInfo(name) # GET INFO ABOUT REQUESTED OBJECT
62 import types
63 if isinstance(methodDict,types.StringType):
64 raise KeyError(methodDict) # RETURNED VALUE IS ERROR MESSAGE!
65 v=XMLRPCClientObject(self,name,methodDict)
66 self[name]=v # SAVE THIS OBJECT INTO OUR DICTIONARY
67 return v
69 class ConnectionDict(dict):
70 'ensure that multiple requests for the same connection use same ServerProxy'
71 def __call__(self,url,name):
72 try:
73 s=self[url] # REUSE EXISTING CONNECTION TO THE SERVER
74 except KeyError:
75 s=XMLRPCClient(url) # GET NEW CONNECTION TO THE SERVER
76 self[url]=s # CACHE THIS CONNECTION
77 return s[name] # GET THE REQUESTED OBJECT PROXY FROM THE SERVER
79 get_connection=ConnectionDict() # THIS RETURNS SAME ServerProxy FOR SAME url
82 def safe_dispatch(self,name,args):
83 """restrict calls to selected methods, and trap all exceptions to
84 keep server alive!"""
85 import datetime
86 if name in self.xmlrpc_methods: # MAKE SURE THIS METHOD IS EXPLICITLY ALLOWED
87 try: # TRAP ALL ERRORS TO PREVENT OUR SERVER FROM DYING
88 print >>sys.stderr,'XMLRPC:',name,args,\
89 datetime.datetime.now().isoformat(' ') # LOG THE REQUEST
90 if self.xmlrpc_methods[name]: # use this as an alias for method
91 m = getattr(self,self.xmlrpc_methods[name])
92 else: # use method name as usual
93 m = getattr(self,name) # GET THE BOUND METHOD
94 val=m(*args) # CALL THE METHOD
95 sys.stderr.flush() # FLUSH ANY OUTPUT TO OUR LOG
96 return val # HAND BACK ITS RETURN VALUE
97 except SystemExit:
98 raise # WE REALLY DO WANT TO EXIT.
99 except: # METHOD RAISED AN EXCEPTION, SO PRINT TRACEBACK TO STDERR
100 traceback.print_exc(self.max_tb,sys.stderr)
101 else:
102 print >>sys.stderr,"safe_dispatch: blocked unregistered method %s" % name
103 return False # THIS RETURN VALUE IS CONFORMABLE BY XMLRPC...
106 class ObjectFromString(list):
107 """convenience class for initialization from string of format:
108 val1,val2,foo=12,bar=39,sshopts=-1 -p 1234
109 Args of format name=val are saved on the object as attributes;
110 otherwise each arg is saved as a list.
111 Argument type conversion is performed automatically if attrtype
112 mapping provided either to constructor or by the class itself.
113 Numeric keys in this mapping are applied to the corresponding
114 list arguments; string keys in this mapping are applied to
115 the corresponding attribute arguments.
116 Both the argument separator and assignment separator can be
117 customized."""
118 _separator=','
119 _eq_separator='='
120 def __init__(self,s,separator=None,eq_separator=None):
121 list.__init__(self)
122 if separator is None:
123 separator=self._separator
124 if eq_separator is None:
125 eq_separator=self._eq_separator
126 args=s.split(separator)
128 for arg in args:
129 try: # PROCESS attr=val ARGUMENT FORMAT
130 k,v=arg.split(eq_separator)
131 try: # SEE IF WE HAVE A TYPE FOR THIS ATTRIBUTE
132 v=self._attrtype[k](v)
133 except (AttributeError,KeyError):
134 pass # IF NO CONVERSION, JUST USE THE ORIGINAL STRING
135 setattr(self,k,v) # SAVE VALUE AS ATTRIBUTE
136 except ValueError: # JUST A SIMPLE ARGUMENT, SO SAVE AS ARG LIST
137 try: # SEE IF WE HAVE A TYPE FOR THIS LIST ITEM
138 arg=self._attrtype[i](arg)
139 except (AttributeError,KeyError):
140 pass # IF NO CONVERSION, JUST USE THE ORIGINAL STRING
141 self.append(arg)
142 i+=1 # ADVANCE OUR ARGUMENT COUNT
145 class FileDict(dict):
146 "read key,value pairs as WS-separated lines, with objclass(value) conversion"
147 def __init__(self,filename,objclass=str):
148 dict.__init__(self)
149 f=file(filename, 'rU') # text file
150 for line in f:
151 key=line.split()[0] # GET THE 1ST ARGUMENT
152 val=line[len(key):].lstrip().rstrip() # GET THE REST, STRIP OUTER WS
153 self[key]=objclass(val) # APPLY THE DESIRED TYPE CONVERSION
154 f.close()
156 def try_fork():
157 "standard UNIX technique c/o Jurgen Hermann's Python Cookbook recipe"
158 try:
159 pid=os.fork()
160 if pid>0: # MAKE PARENT EXIT SILENTLY
161 sys.exit(0)
162 except OSError,e:
163 print >>sys.stderr, "fork failed: %d (%s)" %(e.errno,e.strerror)
164 sys.exit(1)
166 def detach_as_demon_process(self):
167 "standard UNIX technique c/o Jurgen Hermann's Python Cookbook recipe"
168 # CREATE AN APPROPRIATE ERRORLOG FILEPATH
169 if not hasattr(self,'errlog') or self.errlog is False:
170 self.errlog = os.path.join(os.getcwd(), self.name + '.log')
171 try_fork() # DISCONNECT FROM PARENT PROCESS
172 #os.chdir("/")
173 os.setsid() # CREATE A NEW SESSION WITH NO CONTROLLING TERMINAL
174 os.umask(0) # IS THIS ABSOLUTELY NECESSARY?
175 try_fork() # DISCONNECT FROM PARENT PROCESS
176 sys.stdout=file(self.errlog,'a') # DEMONIZE BY REDIRECTING ALL OUTPUT TO LOG
177 sys.stderr=sys.stdout
179 def serve_forever(self):
180 'start the service -- this will run forever'
181 import datetime
182 print >>sys.stderr,"START_SERVER:%s %s" %(self.name,datetime.datetime.
183 now().isoformat(' '))
184 sys.stderr.flush()
185 self.server.serve_forever()
188 class CoordinatorInfo(object):
189 """stores information about individual coordinators for the controller
190 and provides interface to Coordinator that protects against possibility of
191 deadlock."""
192 min_startup_time=60.0
193 def __init__(self,name,url,user,priority,resources,job_id=0,immediate=False,
194 demand_ncpu=0):
195 self.name=name
196 self.url=url
197 self.user=user
198 self.priority=priority
199 self.job_id=job_id
200 self.immediate=immediate
201 self.server=xmlrpclib.ServerProxy(url)
202 self.processors={}
203 self.resources=resources
204 self.start_time=time.time()
205 self.demand_ncpu=demand_ncpu # SET TO NON-ZERO IF YOU WANT FIXED #CPUS
206 self.allocated_ncpu=0
207 self.new_cpus=[]
208 self.last_start_proc_time=0.0
210 def __iadd__(self,newproc):
211 "add a processor to this coordinator's list"
212 self.processors[newproc]=time.time()
213 return self
215 def __isub__(self,oldproc):
216 "remove a processor from this coordinator's list"
217 del self.processors[oldproc]
218 return self
220 def update_load(self):
221 """tell this coordinator to use only allocated_ncpu processors,
222 and to launch processors on the list of new_cpus.
223 Simply spawns a thread to do this without danger of deadlock"""
224 import threading
225 t=threading.Thread(target=self.update_load_thread,
226 args=(self.allocated_ncpu,self.new_cpus))
227 self.new_cpus=[] # DISCONNECT FROM OLD LIST TO PREVENT OVERWRITING
228 t.start()
230 def update_load_thread(self,ncpu,new_cpus):
231 """tell this coordinator to use only ncpu processors,
232 and to launch processors on the list of new_cpus.
233 Run this in a separate thread to prevent deadlock."""
234 self.server.set_max_clients(ncpu)
235 if len(new_cpus)>0 and \
236 time.time()-self.last_start_proc_time>self.min_startup_time:
237 self.server.start_processors(new_cpus) # SEND OUR LIST
238 self.last_start_proc_time=time.time()
241 class HostInfo(ObjectFromString):
242 _attrtype={'maxload':float}
244 class XMLRPCServerBase(object):
245 'Base class for creating an XMLRPC server for multiple objects'
246 xmlrpc_methods={'methodCall':0,'objectList':0,'objectInfo':0}
247 max_tb=10
248 _dispatch=safe_dispatch # RESTRICT XMLRPC TO JUST THE METHODS LISTED ABOVE
249 def __init__(self, name, host='', port=5000, logRequests=False,
250 server=None):
251 self.host=host
252 self.name=name
253 if server is not None:
254 self.server = server
255 self.port = port
256 else:
257 self.server,self.port = get_server(host, port, logRequests)
258 self.server.register_instance(self)
259 self.objDict={}
260 def __setitem__(self,name,obj):
261 'add a new object to serve'
262 self.objDict[name]=obj
263 def __delitem__(self,name):
264 del self.objDict[name]
265 def objectList(self):
266 'get list of named objects in this server: [(name,methodDict),...]'
267 return [(name,obj.xmlrpc_methods) for (name,obj) in self.objDict.items()]
268 def objectInfo(self,objname):
269 'get dict of methodnames on the named object'
270 try:
271 return self.objDict[objname].xmlrpc_methods
272 except KeyError:
273 return 'error: server has no object named %s' % objname
274 def methodCall(self,objname,methodname,args):
275 'run the named method on the named object and return its result'
276 try:
277 obj=self.objDict[objname]
278 if methodname in obj.xmlrpc_methods:
279 m=getattr(obj,methodname)
280 else:
281 print >>sys.stderr,\
282 "methodCall: blocked unregistered method %s" % methodname
283 return ''
284 except (KeyError,AttributeError):
285 return '' # RETURN FAILURE CODE
286 return m(*args) # RUN THE OBJECT METHOD
287 def serve_forever(self, demonize=None, daemonize=False, detach=False):
288 'launch the XMLRPC service. Never exits if demonize == True.'
289 if demonize is not None:
290 logging.warning("demonize is a deprecated argument to serve_forever; use 'daemonize' instead!")
291 daemonize = demonize
293 if daemonize:
294 print "Running as a daemon"
295 detach_as_demon_process(self)
296 serve_forever(self)
297 elif not daemonize and not detach:
298 serve_forever(self)
299 else: # daemonize and detach
300 print "Running in the background of active session"
301 # Check if we're running interactively, as otherwise the server will
302 # die right after starting. Two checks are needed for this: one for
303 # a truly interactive session and one for the interpreter having
304 # been run with the -i flag (makes the session interactive AFTER the
305 # script has been executed). Unfortunately, the latter only works
306 # with Python 2.6 and up.
307 if not hasattr(sys, 'ps1'):
308 if sys.version_info < (2, 6) or not sys.flags.interactive:
309 print "Warning: Running non-interactively without daemonising means the server will die right after starting. This is probably not what you want."
310 thread.start_new_thread(serve_forever, (self, ))
311 def register(self,url=None,name='index',server=None):
312 'register our server with the designated index server'
313 data=self.registrationData # RAISE ERROR IF NO DATA TO REGISTER...
314 if server is None and url is not None: # USE THE URL TO GET THE INDEX SERVER
315 server=get_connection(url,name)
316 if server is not None:
317 server.registerServer('%s:%d' % (self.host,self.port),data)
318 else: # DEFAULT: SEARCH WORLDBASEPATH TO FIND INDEX SERVER
319 from pygr import worldbase
320 worldbase._mdb.registerServer('%s:%d' % (self.host,self.port),data)
322 class ResourceController(object):
323 """Centralized controller for getting resources and rules for
324 making them.
326 xmlrpc_methods={'load_balance':0,'setrule':0,'delrule':0,'report_load':0,
327 'register_coordinator':0,'unregister_coordinator':0,
328 'register_processor':0,'unregister_processor':0,
329 'get_resource':0,'acquire_rule':0,'release_rule':0,
330 'request_cpus':0,'retry_unused_hosts':0,
331 'get_status':0,'setthrottle':0,'del_lock':0,
332 'get_hostinfo':0,'set_hostinfo':0}
333 _dispatch=safe_dispatch # RESTRICT XMLRPC TO JUST THE METHODS LISTED ABOVE
334 max_tb=10
335 def __init__(self,rc='controller',port=5000,overload_margin=0.6,
336 rebalance_frequency=1200,errlog=False,throttle=1.0):
337 self.name=rc
338 self.overload_margin=overload_margin
339 self.rebalance_frequency=rebalance_frequency
340 self.errlog=errlog
341 self.throttle=throttle
342 self.rebalance_time=time.time()
343 self.must_rebalance=False
344 self.host=get_hostname()
345 self.hosts=FileDict(self.name+'.hosts',HostInfo)
346 self.getrules()
347 self.getresources()
348 self.server,self.port = get_server(self.host,port)
349 self.server.register_instance(self)
350 self.coordinators={}
351 self.njob=0
352 self.locks={}
353 self.systemLoad={}
354 hostlist=[host for host in self.hosts]
355 for host in hostlist: # 1ST ASSUME HOST EMPTY, THEN GET LOAD REPORTS
356 hostFQDN=get_hostname(host) # CONVERT ALL HOSTNAMES TO FQDNs
357 if hostFQDN!=host: # USE FQDN FOR ALL SUBSEQUENT REFS!
358 self.hosts[hostFQDN]=self.hosts[host]
359 del self.hosts[host]
360 self.systemLoad[hostFQDN]=0.0
362 __call__=serve_forever
364 def assign_load(self):
365 "calculate the latest balanced loads"
366 maxload=0.
367 total=0.
368 current_job=99999999
369 for c in self.coordinators.values():
370 if c.priority>0.0 and c.job_id<current_job:
371 current_job=c.job_id # FIND 1ST NON-ZER0 PRIORITY JOB
372 for c in self.coordinators.values():
373 if c.demand_ncpu: # DEMANDS A FIXED #CPUS, NO LOAD BALANCING
374 c.run=True
375 elif c.job_id==current_job or c.immediate:
376 c.run=True # YES, RUN THIS JOB
377 total+=c.priority
378 else:
379 c.run=False
380 for v in self.hosts.values(): # SUM UP TOTAL CPUS
381 maxload+=v.maxload
382 maxload*=self.throttle # APPLY OUR THROTTLE CONTROL
383 for c in self.coordinators.values(): #REMOVE DEMANDED CPUS
384 if c.demand_ncpu:
385 maxload-=c.demand_ncpu
386 if maxload<0.: # DON'T ALLOW NEGATIVE VALUES
387 maxload=0.
388 if total>0.: # DON'T DIVIDE BY ZERO...
389 maxload /= float(total)
390 for c in self.coordinators.values(): # ALLOCATE SHARE OF TOTAL CPUS...
391 if c.demand_ncpu: # ALLOCATE EXACTLY THE NUMBER REQUESTED
392 c.allocated_ncpu=int(c.demand_ncpu)
393 elif c.run: # COMPUTE BASED ON PRIORITY SHARE
394 c.allocated_ncpu=int(maxload * c.priority)
395 else: # NOT RUNNING
396 c.allocated_ncpu=0
397 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
399 def assign_processors(self):
400 "hand out available processors to coordinators in order of need"
401 margin=self.overload_margin-1.0
402 free_cpus=[]
403 nproc={}
404 for c in self.coordinators.values(): # COUNT NUMBER OF PROCS
405 for host,pid in c.processors: # RUNNING ON EACH HOST
406 try:
407 nproc[host]+=1.0 # INCREMENT AN EXISTING COUNT
408 except KeyError:
409 nproc[host]=1.0 # NEW, SO SET INITIAL COUNT
410 for host in self.hosts: # BUILD LIST OF HOST CPUS TO BE ASSIGNED
411 if host not in self.systemLoad: # ADDING A NEW HOST
412 self.systemLoad[host]=0.0 # DEFAULT LOAD: ASSUME HOST EMPTY
413 try: # host MAY NOT BE IN nproc, SO CATCH THAT ERROR
414 if self.systemLoad[host]>nproc[host]:
415 raise KeyError # USE self.systemLoad[host]
416 except KeyError:
417 load=self.systemLoad[host] # MAXIMUM VALUE
418 else:
419 load=nproc[host] # MAXIMUM VALUE
420 if load<self.hosts[host].maxload+margin:
421 free_cpus+=int(self.hosts[host].maxload+self.overload_margin
422 -load)*[host]
423 if len(free_cpus)==0: # WE DON'T HAVE ANY CPUS TO GIVE OUT
424 return False
425 l=[] # BUILD A LIST OF HOW MANY CPUS EACH COORDINATOR NEEDS
426 for c in self.coordinators.values():
427 ncpu=c.allocated_ncpu-len(c.processors)
428 if ncpu>0:
429 l+=ncpu*[c] # ADD c TO l EXACTLY ncpu TIMES
430 import random
431 random.shuffle(l) # REORDER LIST OF COORDINATORS RANDOMLY
432 i=0 # INDEX INTO OUR l LIST
433 while i<len(free_cpus) and i<len(l): # HAND OUT THE FREE CPUS ONE BY ONE
434 l[i].new_cpus.append(free_cpus[i])
435 i+=1
436 return i>0 # RETURN TRUE IF WE HANDED OUT SOME PROCESSORS
438 def load_balance(self):
439 "recalculate load assignments, and assign free cpus"
440 self.rebalance_time=time.time() # RESET OUR FLAGS
441 self.must_rebalance=False
442 self.assign_load() # CALCULATE HOW MANY CPUS EACH COORDINATOR SHOULD GET
443 self.assign_processors() # ASSIGN FREE CPUS TO COORDINATORS THAT NEED THEM
444 for c in self.coordinators.values():
445 c.update_load() # INFORM THE COORDINATOR
446 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
448 def get_hostinfo(self,host,attr):
449 "get a host attribute"
450 return getattr(self.hosts[host],attr)
452 def set_hostinfo(self,host,attr,val):
453 "increase or decrease the maximum load allowed on a given host"
454 try:
455 setattr(self.hosts[host],attr,val)
456 except KeyError:
457 self.hosts[host]=HostInfo('%s=%s' %(attr,str(val)))
458 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
460 def getrules(self):
461 import shelve
462 self.rules=dbfile.shelve_open(self.name+'.rules')
464 def getresources(self):
465 import shelve
466 self.resources=dbfile.shelve_open(self.name+'.rsrc')
468 def setrule(self,rsrc,rule):
469 "save a resource generation rule into our database"
470 self.rules[rsrc]=rule
471 self.rules.close() # THIS IS THE ONLY WAY I KNOW TO FLUSH...
472 self.getrules()
473 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
475 def delrule(self,rsrc):
476 "delete a resource generation rule from our database"
477 try:
478 del self.rules[rsrc]
479 except KeyError:
480 print >>sys.stderr, "Attempt to delete unknown resource rule %s" % rsrc
481 else:
482 self.rules.close() # THIS IS THE ONLY WAY I KNOW TO FLUSH...
483 self.getrules()
484 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
486 def setthrottle(self,throttle):
487 "set the total level of usage of available CPUs, usually 1.0"
488 self.throttle=float(throttle)
489 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
491 def report_load(self,host,pid,load):
492 "save a reported load from one of our processors"
493 self.systemLoad[host]=load
494 # AT A REGULAR INTERVAL WE SHOULD REBALANCE LOAD
495 if self.must_rebalance or \
496 time.time()-self.rebalance_time>self.rebalance_frequency:
497 self.load_balance()
498 if load<self.hosts[host].maxload+self.overload_margin:
499 return True # OK TO CONTINUE
500 else:
501 return False # THIS SYSTEM OVERLOADED, TELL PROCESSOR TO EXIT
503 def register_coordinator(self,name,url,user,priority,resources,immediate,
504 demand_ncpu):
505 "save a coordinator's registration info"
506 try:
507 print >>sys.stderr,'change_priority: %s (%s,%s): %f -> %f' \
508 % (name,user,url,self.coordinators[url].priority,priority)
509 self.coordinators[url].priority=priority
510 self.coordinators[url].immediate=immediate
511 self.coordinators[url].demand_ncpu=demand_ncpu
512 except KeyError:
513 print >>sys.stderr,'register_coordinator: %s (%s,%s): %f' \
514 % (name,user,url,priority)
515 self.coordinators[url]=CoordinatorInfo(name,url,user,priority,
516 resources,self.njob,immediate,
517 demand_ncpu)
518 self.njob+=1 # INCREMENT COUNT OF JOBS WE'VE REGISTERED
519 self.must_rebalance=True # FORCE REBALANCING ON NEXT OPPORTUNITY
520 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
522 def unregister_coordinator(self,name,url,message):
523 "remove a coordinator from our list"
524 try:
525 del self.coordinators[url]
526 print >>sys.stderr,'unregister_coordinator: %s (%s): %s' \
527 % (name,url,message)
528 self.load_balance() # FORCE IT TO REBALANCE THE LOAD TO NEW JOBS...
529 except KeyError:
530 print >>sys.stderr,'unregister_coordinator: %s unknown:%s (%s)' \
531 % (name,url,message)
532 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
534 def request_cpus(self,name,url):
535 "return a list of hosts for this coordinator to run processors on"
536 try:
537 c=self.coordinators[url]
538 except KeyError:
539 print >>sys.stderr,'request_cpus: unknown coordinator %s @ %s' % (name,url)
540 return [] # HAND BACK AN EMPTY LIST
541 self.assign_load() # CALCULATE HOW MANY CPUS EACH COORDINATOR SHOULD GET
542 self.assign_processors() # ASSIGN FREE CPUS TO COORDINATORS THAT NEED THEM
543 new_cpus=tuple(c.new_cpus) # MAKE A NEW COPY OF THE LIST OF HOSTS
544 del c.new_cpus[:] # EMPTY OUR LIST
545 return new_cpus
547 def register_processor(self,host,pid,url):
548 "record a new processor starting up"
549 try:
550 self.coordinators[url]+= (host,pid)
551 self.systemLoad[host] += 1.0 # THIS PROBABLY INCREASES LOAD BY 1
552 except KeyError:
553 pass
554 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
556 def unregister_processor(self,host,pid,url):
557 "processor shutting down, so remove it from the list"
558 try:
559 self.coordinators[url]-= (host,pid)
560 self.systemLoad[host] -= 1.0 # THIS PROBABLY DECREASES LOAD BY 1
561 if self.systemLoad[host]<0.0:
562 self.systemLoad[host]=0.0
563 for k,v in self.locks.items(): # MAKE SURE THIS PROC HAS NO LOCKS...
564 h=k.split(':')[0]
565 if h==host and v==pid:
566 del self.locks[k] # REMOVE ALL ITS PENDING LOCKS
567 except KeyError:
568 pass
569 self.load_balance() # FREEING A PROCESSOR, SO REBALANCE TO USE THIS
570 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
572 def get_resource(self,host,pid,rsrc):
573 """return a filename for the resource, or False if rule must be applied,
574 or True if client must wait to get the resource"""
575 key=host+':'+rsrc
576 try: # JUST HAND BACK THE RESOURCE
577 return self.resources[key]
578 except KeyError:
579 if key in self.locks:
580 return True # TELL CLIENT TO WAIT
581 else:
582 return False # TELL CLIENT TO ACQUIRE IT VIA RULE
584 def acquire_rule(self,host,pid,rsrc):
585 "lock the resource on this specific host, and return its production rule"
586 if rsrc not in self.rules:
587 return False # TELL CLIENT NO SUCH RULE
588 key=host+':'+rsrc
589 if key in self.locks:
590 return True # TELL CLIENT TO WAIT
591 self.locks[key]=pid # LOCK THIS RESOURCE ON THIS HOST UNTIL CONSTRUCTED
592 return self.rules[rsrc] # RETURN THE CONSTRUCTION RULE
594 def release_rule(self,host,pid,rsrc):
595 "client is done applying this rule, so now safe to give out the resource"
596 key=host+':'+rsrc
597 self.del_lock(host,rsrc)
598 self.resources[key]=self.rules[rsrc][0] # ADD THE FILE NAME TO RESOURCE LIST
599 self.resources.close() # THIS IS THE ONLY WAY I KNOW TO FLUSH THIS...
600 self.getresources()
601 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
603 def del_lock(self,host,rsrc):
604 "delete a lock on a pending resource construction process"
605 key=host+':'+rsrc
606 try:
607 del self.locks[key] # REMOVE THE LOCK
608 except KeyError:
609 print >>sys.stderr,"attempt to release non-existent lock %s,%s:%d" \
610 %(host,rule,pid)
611 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
614 def retry_unused_hosts(self):
615 "reset systemLoad for hosts that have no jobs running"
616 myhosts={}
617 for c in self.coordinators.values(): # LIST HOSTS WE'RE CURRENTLY USING
618 for host,pid in c.processors:
619 myhosts[host]=None # MARK THIS HOST AS IN USE
620 for host in self.systemLoad: # RESET LOADS FOR ALL HOSTS WE'RE NOT USING
621 if host not in myhosts:
622 self.systemLoad[host]=0.0
623 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
625 def get_status(self):
626 """get report of system loads, max loads, coordinators, rules,
627 resources, locks"""
628 l=[(name,host.maxload) for name,host in self.hosts.items()]
629 l.sort()
630 return self.name,self.errlog,self.systemLoad,l,\
631 [(c.name,c.url,c.priority,c.allocated_ncpu,len(c.processors),\
632 c.start_time) for c in self.coordinators.values()], \
633 dict(self.rules),dict(self.resources),self.locks
636 class AttrProxy(object):
637 def __init__(self,getattr_proxy,k):
638 self.getattr_proxy=getattr_proxy
639 self.k=k
640 def __getattr__(self,attr):
641 try:
642 val=self.getattr_proxy(self.k,attr) # GET IT FROM OUR PROXY
643 except:
644 raise AttributeError('unable to get proxy attr '+attr)
645 setattr(self,attr,val) # CACHE THIS ATTRIBUTE RIGHT HERE!
646 return val
648 class DictAttrProxy(dict):
649 def __init__(self,getattr_proxy):
650 dict.__init__(self)
651 self.getattr_proxy=getattr_proxy
652 def __getitem__(self,k):
653 try:
654 return dict.__getitem__(self,k)
655 except KeyError:
656 val=AttrProxy(self.getattr_proxy,k)
657 self[k]=val
658 return val
660 class Coordinator(object):
661 """Run our script as Processor on one or more client nodes, using
662 XMLRPC communication between clients and server.
663 On the server all output is logged to name.log,
664 and successfully completed task IDs are stored in name.success,
665 and error task IDs are stored in name.error
666 On the clients all output is logged to the file name_#.log in the user's
667 and/or system-specific temporary directory."""
668 xmlrpc_methods={'start_processors':0,'register_client':0,'unregister_client':0,
669 'report_success':0,'report_error':0,'next':0,
670 'get_status':0,'set_max_clients':0,'stop_client':0}
671 _dispatch=safe_dispatch # RESTRICT XMLRPC TO JUST THE METHODS LISTED ABOVE
672 max_tb=10 # MAXIMUM #STACK LEVELS TO PRINT IN TRACEBACKS
673 max_ssh_errors=5 #MAXIMUM #ERRORS TO PERMIT IN A ROW BEFORE QUITTING
674 python='python' # DEFAULT EXECUTABLE FOR RUNNING OUR CLIENTS
675 def __init__(self,name,script,it,resources,port=8888,priority=1.0,rc_url=None,
676 errlog=False,immediate=False,ncpu_limit=999999,
677 demand_ncpu=0,max_initialization_errors=3,**kwargs):
678 self.name=name
679 self.script=script
680 self.it=iter(it) # MAKE SURE it IS AN ITERATOR; IF IT'S NOT, MAKE IT SO
681 self.resources=resources
682 self.priority=priority
683 self.errlog=errlog
684 self.immediate=immediate
685 self.ncpu_limit=ncpu_limit
686 self.demand_ncpu=demand_ncpu
687 self.max_initialization_errors=max_initialization_errors
688 self.kwargs=kwargs
689 self.host=get_hostname()
690 self.user=os.environ['USER']
691 try: # MAKE SURE ssh-agent IS AVAILABLE TO US BEFORE LAUNCHING LOTS OF PROCS
692 a=os.environ['SSH_AGENT_PID']
693 except KeyError:
694 raise OSError(1,'SSH_AGENT_PID not found. No ssh-agent running?')
695 self.dir=os.getcwd()
696 self.n=0
697 self.nsuccess=0
698 self.nerrors=0
699 self.nssh_errors=0
700 self.iclient=0
701 self.max_clients=40
702 if rc_url is None: # USE DEFAULT RESOURCE CONTROLLER ADDRESS ON SAME HOST
703 rc_url='http://%s:5000' % self.host
704 self.rc_url=rc_url
705 self.rc_server=xmlrpclib.ServerProxy(rc_url) #GET CONNECTION TO RESOURCE CONTROLLER
706 self.server,self.port = get_server(self.host,port) #CREATE XMLRPC SERVER
707 self.server.register_instance(self) # WE PROVIDE ALL THE METHODS FOR THE SERVER
708 self.clients={}
709 self.pending={}
710 self.already_done={}
711 self.stop_clients={}
712 self.logfile={}
713 self.clients_starting={}
714 self.clients_initializing={}
715 self.initialization_errors={}
716 try: # LOAD LIST OF IDs ALREADY SUCCESSFULLY PROCESSED, IF ANY
717 f=file(name+'.success','rU') # text file
718 for line in f:
719 self.already_done[line.strip()]=None
720 f.close()
721 except IOError: # OK IF NO SUCCESS FILE YET, WE'LL CREATE ONE.
722 pass
723 self.successfile=file(name+'.success','a') # success FILE IS CUMMULATIVE
724 self.errorfile=file(name+'.error','w') # OVERWRITE THE ERROR FILE
725 self.done=False
726 self.hosts=DictAttrProxy(self.rc_server.get_hostinfo)
727 self.register()
729 def __call__(self,*l,**kwargs):
730 "start the server, and launch a cpu request in a separate thread"
731 import threading
732 t=threading.Thread(target=self.initialize_thread)
733 t.start()
734 serve_forever(self,*l,**kwargs)
736 def initialize_thread(self):
737 "run this method in a separate thread to bootstrap our initial cpu request"
738 time.sleep(5) # GIVE serve_forever() TIME TO START SERVER
739 self.rc_server.load_balance() # NOW ASK CONTROLLER TO REBALANCE AND GIVE US CPUS
741 def start_client(self,host):
742 "start a processor on a client node"
743 import tempfile
744 if len(self.clients)>=self.ncpu_limit:
745 print >>sys.stderr,'start_client: blocked, CPU limit', \
746 len(self.clients),self.ncpu_limit
747 return # DON'T START ANOTHER PROCESS, TOO MANY ALREADY
748 if len(self.clients)>=self.max_clients:
749 print >>sys.stderr,'start_client: blocked, too many already', \
750 len(self.clients),self.max_clients
751 return # DON'T START ANOTHER PROCESS, TOO MANY ALREADY
752 try:
753 if len(self.clients_starting[host])>=self.max_ssh_errors:
754 print >>sys.stderr,\
755 'start_client: blocked, too many unstarted jobs:',\
756 host,self.clients_starting[host]
757 return # DON'T START ANOTHER PROCESS, host MAY BE DEAD...
758 except KeyError: # NO clients_starting ON host, GOOD!
759 pass
760 try:
761 if len(self.initialization_errors[host])>=self.max_initialization_errors:
762 print >>sys.stderr,\
763 'start_client: blocked, too many initialization errors:',\
764 host,self.initialization_errors[host]
765 return # DON'T START ANOTHER PROCESS, host HAS A PROBLEM
766 except KeyError: # NO initialization_errors ON host, GOOD!
767 pass
768 try:
769 sshopts=self.hosts[host].sshopts # GET sshopts VIA XMLRPC
770 except AttributeError:
771 sshopts=''
772 logfile=os.path.join(tempfile.gettempdir(), '%s_%d.log' % (self.name, self.iclient))
773 # PASS OUR KWARGS ON TO THE CLIENT PROCESSOR
774 kwargs=' '.join(['--%s=%s'%(k,v) for k,v in self.kwargs.items()])
775 cmd='cd %s;%s %s --url=http://%s:%d --rc_url=%s --logfile=%s %s %s' \
776 % (self.dir,self.python,self.script,self.host,self.port,
777 self.rc_url,logfile,self.name,kwargs)
778 # UGH, HAVE TO MIX CSH REDIRECTION (REMOTE) WITH SH REDIRECTION (LOCAL)
779 ssh_cmd="ssh %s %s '(%s) </dev/null >&%s &' </dev/null >>%s 2>&1 &" \
780 % (sshopts,host,cmd,logfile,self.errlog)
781 print >>sys.stderr,'SSH: '+ssh_cmd
782 self.logfile[logfile]=[host,False,self.iclient] # NO PID YET
783 try: # RECORD THIS CLIENT AS STARTING UP
784 self.clients_starting[host][self.iclient]=time.time()
785 except KeyError: # CREATE A NEW HOST ENTRY
786 self.clients_starting[host]={self.iclient:time.time()}
787 # RUN SSH IN BACKGROUND TO AVOID WAITING FOR IT TO TIMEOUT!!!
788 os.system(ssh_cmd) # LAUNCH THE SSH PROCESS, SHOULD RETURN IMMEDIATELY
789 self.iclient += 1 # ADVANCE OUR CLIENT COUNTER
791 def start_processors(self,hosts):
792 "start processors on the list of hosts using SSH transport"
793 for host in hosts: # LAUNCH OURSELVES AS PROCESSOR ON ALL THESE HOSTS
794 self.start_client(host)
795 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
797 def register(self):
798 "register our existence with the resource controller"
799 url='http://%s:%d' % (self.host,self.port)
800 self.rc_server.register_coordinator(self.name,url,self.user,
801 self.priority,self.resources,
802 self.immediate,self.demand_ncpu)
804 def unregister(self,message):
805 "tell the resource controller we're exiting"
806 url='http://%s:%d' % (self.host,self.port)
807 self.rc_server.unregister_coordinator(self.name,url,message)
809 def register_client(self,host,pid,logfile):
810 'XMLRPC call to register client hostname and PID as starting_up'
811 print >>sys.stderr,'register_client: %s:%d' %(host,pid)
812 self.clients[(host,pid)]=0
813 try:
814 self.logfile[logfile][1]=pid # SAVE OUR PID
815 iclient=self.logfile[logfile][2] # GET ITS CLIENT ID
816 del self.clients_starting[host][iclient] #REMOVE FROM STARTUP LIST
817 except KeyError:
818 print >>sys.stderr,'no client logfile?',host,pid,logfile
819 self.clients_initializing[(host,pid)]=logfile
820 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
822 def unregister_client(self,host,pid,message):
823 'XMLRPC call to remove client from register as exiting'
824 print >>sys.stderr,'unregister_client: %s:%d %s' % (host,pid,message)
825 try:
826 del self.clients[(host,pid)]
827 except KeyError:
828 print >>sys.stderr,'unregister_client: unknown client %s:%d' % (host,pid)
829 try: # REMOVE IT FROM THE LIST OF CLIENTS TO SHUTDOWN, IF PRESENT
830 del self.stop_clients[(host,pid)]
831 except KeyError:
832 pass
833 try: # REMOVE FROM INITIALIZATION LIST
834 del self.clients_initializing[(host,pid)]
835 except KeyError:
836 pass
837 if len(self.clients)==0 and self.done: # NO MORE TASKS AND NO MORE CLIENTS
838 self.exit("Done") # SO SERVER CAN EXIT
839 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
841 def report_success(self,host,pid,success_id):
842 'mark task as successfully completed'
843 print >>self.successfile,success_id # KEEP PERMANENT RECORD OF SUCCESS ID
844 self.successfile.flush()
845 self.nsuccess += 1
846 try:
847 self.clients[(host,pid)] += 1
848 except KeyError:
849 print >>sys.stderr,'report_success: unknown client %s:%d' % (host,pid)
850 try:
851 del self.pending[success_id]
852 except KeyError:
853 print >>sys.stderr,'report_success: unknown ID %s' % str(success_id)
854 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
856 def report_error(self,host,pid,id,tb_report):
857 "get traceback report from client as text"
858 print >>sys.stderr,"TRACEBACK: %s:%s ID %s\n%s" % \
859 (host,str(pid),str(id),tb_report)
860 if (host,pid) in self.clients_initializing:
861 logfile=self.clients_initializing[(host,pid)]
862 try:
863 self.initialization_errors[host].append(logfile)
864 except KeyError:
865 self.initialization_errors[host]=[logfile]
866 try:
867 del self.pending[id]
868 except KeyError: # NOT ASSOCIATED WITH AN ACTUAL TASK ID, SO DON'T RECORD
869 if id is not None and id is not False:
870 print >>sys.stderr,'report_error: unknown ID %s' % str(id)
871 else:
872 print >>self.errorfile,id # KEEP PERMANENT RECORD OF FAILURE ID
873 self.nerrors+=1
874 self.errorfile.flush()
875 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
877 def next(self,host,pid,success_id):
878 'return next ID from iterator to the XMLRPC caller'
879 if (host,pid) not in self.clients:
880 print >>sys.stderr,'next: unknown client %s:%d' % (host,pid)
881 return False # HAND BACK "NO MORE FOR YOU TO DO" SIGNAL
882 try: # INITIALIZATION DONE, SO REMOVE FROM INITIALIZATION LIST
883 del self.clients_initializing[(host,pid)]
884 except KeyError:
885 pass
886 if success_id is not False:
887 self.report_success(host,pid,success_id)
888 if self.done: # EXHAUSTED OUR ITERATOR, SO SHUT DOWN THIS CLIENT
889 return False # HAND BACK "NO MORE FOR YOU TO DO" SIGNAL
890 try: # CHECK LIST FOR COMMAND TO SHUT DOWN THIS CLIENT
891 del self.stop_clients[(host,pid)] # IS IT IN stop_clients?
892 return False # IF SO, HAND BACK "NO MORE FOR YOU TO DO" SIGNAL
893 except KeyError: # DO ONE MORE CHECK: ARE WE OVER OUR MAX ALLOWED LOAD?
894 if len(self.clients)>self.max_clients: # YES, BETTER THROTTLE DOWN
895 print >>sys.stderr,'next: halting %s:too many processors (%d>%d)' \
896 % (host,len(self.clients),self.max_clients)
897 return False # HAND BACK "NO MORE FOR YOU TO DO" SIGNAL
898 for id in self.it: # GET AN ID WE CAN USE
899 if str(id) not in self.already_done:
900 self.n+=1 # GREAT, WE CAN USE THIS ID
901 self.lastID=id
902 self.pending[id]=(host,pid,time.time())
903 print >>sys.stderr,'giving id %s to %s:%d' %(str(id),host,pid)
904 return id
905 print >>sys.stderr,'exhausted all items from iterator!'
906 self.done=True # EXHAUSTED OUR ITERATOR
907 self.priority=0.0 # RELEASE OUR CLAIMS ON ANY FURTHER PROCESSOR ALLOCATION
908 self.register() # AND INFORM THE RESOURCE CONTROLLER
909 return False # False IS CONFORMABLE BY XMLRPC...
911 def get_status(self):
912 "return basic status info on number of jobs finished, client list etc."
913 client_report=[client+(nsuccess,) for client,nsuccess in self.clients.items()]
914 pending_report=[(k,)+v for k,v in self.pending.items()]
915 return self.name,self.errlog,self.n,self.nsuccess,self.nerrors,client_report,\
916 pending_report,self.logfile
917 def set_max_clients(self,n):
918 "change the maximum number of clients we should have running"
919 self.max_clients=int(n) # MAKE SURE n IS CONVERTABLE TO int
920 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
921 def stop_client(self,host,pid):
922 "set signal forcing this client to exit on next iteration"
923 self.stop_clients[(host,pid)]=None
924 return True # USE THIS AS DEFAULT XMLRPC RETURN VALUE
925 def exit(self,message):
926 "clean up and close this server"
927 self.unregister(message)
928 self.successfile.close()
929 self.errorfile.close()
930 sys.exit()
933 try:
934 class ResourceFile(file):
935 """wrapper around some locking behavior, to ensure only one copy operation
936 performed for a given resource on a given host.
937 Otherwise, it's just a regular file object."""
938 def __init__(self,resource,rule,mode,processor):
939 "resource is name of the resource; rule is (localFile,cpCommand)"
940 self.resource=resource
941 self.processor=processor
942 localFile,cpCommand=rule
943 if not os.access(localFile,os.R_OK):
944 cmd=cpCommand % localFile
945 print 'copying data:',cmd
946 os.system(cmd)
947 file.__init__(self,localFile,mode) # NOW INITIALIZE AS A REAL FILE OBJECT
949 def close(self):
950 self.processor.release_rule(self.resource) # RELEASE THE LOCK WE PLACED ON THIS RULE
951 file.close(self)
952 except TypeError:
953 pass
958 class Processor(object):
959 'provides an iterator interface to an XMLRPC ID server'
960 max_errors_in_a_row=10 # LOOKS LIKE NOTHING WORKS HERE, SO QUIT!
961 max_tb=10 # DON'T SHOW MORE THAN 10 STACK LEVELS FOR A TRACEBACK
962 report_frequency=600
963 overload_max=5 # MAXIMUM NUMBER OF OVERLOAD EVENTS IN A ROW BEFORE WE EXIT
964 def __init__(self,url="http://localhost:8888",
965 rc_url='http://localhost:5000',logfile=False,**kwargs):
966 self.url=url
967 self.logfile=logfile
968 self.server=xmlrpclib.ServerProxy(url)
969 self.rc_url=rc_url
970 self.rc_server=xmlrpclib.ServerProxy(rc_url)
971 self.host=get_hostname()
972 self.pid=os.getpid()
973 self.user=os.environ['USER']
974 self.success_id=False
975 self.pending_id=False
976 self.exit_message='MYSTERY-EXIT please debug'
977 self.overload_count=0
979 def register(self):
980 "add ourselves to list of processors for this server"
981 self.server.register_client(self.host,self.pid,self.logfile)
982 self.rc_server.register_processor(self.host,self.pid,self.url)
983 print >>sys.stderr,'REGISTERED:',self.url,self.rc_url
985 def unregister(self,message):
986 "remove ourselves from list of processors for this server"
987 if self.success_id is not False: # REPORT THAT LAST JOB SUCCEEDED!
988 self.report_success(self.success_id)
989 self.server.unregister_client(self.host,self.pid,message)
990 self.rc_server.unregister_processor(self.host,self.pid,self.url)
991 print >>sys.stderr,'UNREGISTERED:',self.url,self.rc_url,message
993 def __iter__(self):
994 return self
996 def next(self):
997 "get next ID from server"
998 # REPORT LAST JOB SUCCESSFULLY COMPLETED, IF ANY
999 while 1:
1000 id=self.server.next(self.host,self.pid,self.success_id)
1001 self.success_id=False # ERASE SUCCESS ID
1002 if id is True: # WE'RE BEING TOLD TO JUST WAIT
1003 time.sleep(60) # SO GO TO SLEEP FOR A MINUTE
1004 else:
1005 break
1006 if id is False: # NO MODE id FOR US TO PROCESS, SO QUIT
1007 self.serverStopIteration=True # RECORD THIS AS GENUINE END EVENT
1008 raise StopIteration
1009 else: # HAND BACK THE id TO THE USER
1010 self.pending_id=id
1011 return id
1013 def report_success(self,id):
1014 "report successful completion of task ID"
1015 self.server.report_success(self.host,self.pid,id)
1017 def report_error(self,id):
1018 "report an error using traceback.print_exc()"
1019 import StringIO
1020 err_report=StringIO.StringIO()
1021 traceback.print_exc(self.max_tb,sys.stderr) #REPORT TB TO OUR LOG
1022 traceback.print_exc(self.max_tb,err_report) #REPORT TB TO SERVER
1023 self.server.report_error(self.host,self.pid,id,err_report.getvalue())
1024 err_report.close()
1026 def report_load(self):
1027 "report system load"
1028 load=os.getloadavg()[0] # GET 1 MINUTE LOAD AVERAGE
1029 if self.rc_server.report_load(self.host,self.pid,load) is False:
1030 self.overload_count+=1 # ARE WE CONSISTENTLY OVERLOADED FOR EXTENDED PERIOD?
1031 if self.overload_count>self.overload_max: # IF EXCEEDED LIMIT, EXIT
1032 self.exit('load too high')
1033 else:
1034 self.overload_count=0
1036 def open_resource(self,resource,mode):
1037 "get a file object for the requested resource, opened in mode"
1038 while 1:
1039 rule=self.rc_server.get_resource(self.host,self.pid,resource)
1040 if rule is False: # WE HAVE TO LOCK AND APPLY A RULE...
1041 rule=self.acquire_rule(resource)
1042 if rule is True: # HMM, LOOKS LIKE A RACE CONDITION. KEEP WAITING
1043 time.sleep(60) # WAIT A MINUTE BEFORE ASKING FOR RESOURCE AGAIN
1044 continue
1045 return ResourceFile(resource,rule,mode,self) #CONSTRUCT THE RESOURCE
1046 elif rule is True: # RULE IS LOCKED BY ANOTHER PROCESSOR
1047 time.sleep(60) # WAIT A MINUTE BEFORE ASKING FOR RESOURCE AGAIN
1048 else: # GOT A REGULAR FILE, SO JUST OPEN IT
1049 return file(rule,mode)
1051 def acquire_rule(self,resource):
1052 "lock the specified resource rule for this host, so it's safe to build it"
1053 rule=self.rc_server.acquire_rule(self.host,self.pid,resource)
1054 if rule is False: # NO SUCH RESOURCE?!?
1055 self.exit('invalid resource: '+resource)
1056 return rule
1058 def release_rule(self,resource):
1059 "release our lock on this resource rule, so others can use it"
1060 self.rc_server.release_rule(self.host,self.pid,resource)
1062 def exit(self,message):
1063 "save message for self.unregister() and force exit"
1064 self.exit_message=message
1065 raise SystemExit
1067 def run_all(self,resultGenerator,**kwargs):
1068 "run until all task IDs completed, trap & report all errors"
1069 errors_in_a_row=0
1070 it=resultGenerator(self,**kwargs) # GET ITERATOR FROM GENERATOR
1071 report_time=time.time()
1072 self.register() # REGISTER WITH RESOURCE CONTROLLER & COORDINATOR
1073 initializationError=None
1074 try: # TRAP ERRORS BOTH IN USER CODE AND coordinator CODE
1075 while 1:
1076 try: # TRAP AND REPORT ALL ERRORS IN USER CODE
1077 id=it.next() # THIS RUNS USER CODE FOR ONE ITERATION
1078 self.success_id=id # MARK THIS AS A SUCCESS...
1079 errors_in_a_row=0
1080 initializationError=False
1081 except StopIteration: # NO MORE TASKS FOR US...
1082 if not hasattr(self,'serverStopIteration'): # WIERD!!
1083 # USER CODE RAISED StopIteration?!?
1084 self.report_error(self.pending_id) # REPORT THE PROBLEM
1085 self.exit_message='user StopIteration error'
1086 elif initializationError:
1087 self.exit_message='initialization error'
1088 else:
1089 self.exit_message='done'
1090 break
1091 except SystemExit: # sys.exit() CALLED
1092 raise # WE REALLY DO WANT TO EXIT.
1093 except: # MUST HAVE BEEN AN ERROR IN THE USER CODE
1094 if initializationError is None: # STILL IN INITIALIZATION
1095 initializationError=True
1096 self.report_error(self.pending_id) # REPORT THE PROBLEM
1097 errors_in_a_row +=1
1098 if errors_in_a_row>=self.max_errors_in_a_row:
1099 self.exit_message='too many errors'
1100 break
1101 if time.time()-report_time>self.report_frequency:
1102 self.report_load() # SEND A ROUTINE LOAD REPORT
1103 report_time=time.time()
1104 except SystemExit: # sys.exit() CALLED
1105 pass # WE REALLY DO WANT TO EXIT.
1106 except: # IMPORTANT TO TRAP ALL ERRORS SO THAT WE UNREGISTER!!
1107 traceback.print_exc(self.max_tb,sys.stderr) #REPORT TB TO OUR LOG
1108 self.exit_message='error trap'
1109 self.unregister('run_all '+self.exit_message) # MUST UNREGISTER!!
1111 def run_interactive(self,it,n=1,**kwargs):
1112 "run n task IDs, with no error trapping"
1113 if not hasattr(it,'next'):
1114 it=it(self,**kwargs) # ASSUME it IS GENERATOR, USE IT TO GET ITERATOR
1116 self.register() # REGISTER WITH RESOURCE CONTROLLER & COORDINATOR
1117 try: # EVEN IF ERROR OCCURS, WE MUST UNREGISTER!!
1118 for id in it:
1119 self.success_id=id
1120 i+=1
1121 if i>=n:
1122 break
1123 except:
1124 self.unregister('run_interactive error') # MUST UNREGISTER!!!
1125 raise # SHOW THE ERROR INTERACTIVELY
1126 self.unregister('run_interactive exit')
1127 return it # HAND BACK ITERATOR IN CASE USER WANTS TO RUN MORE...
1130 def parse_argv():
1131 "parse sys.argv into a dictionary of GNU-style args --foo=bar and list of other args"
1132 d={}
1133 l=[]
1134 for v in sys.argv[1:]:
1135 if v[:2]=='--':
1136 try:
1137 k,v=v[2:].split('=')
1138 d[k]=v
1139 except ValueError:
1140 d[v[2:]]=None
1141 else:
1142 l.append(v)
1143 return d,l
1145 def start_client_or_server(clientGenerator,serverGenerator,resources,script):
1146 """start controller, client or server depending on whether
1147 we get coordinator argument from the command-line args.
1149 Client must be a generator function that takes Processor as argument,
1150 and uses it as an iterator.
1151 Also, clientGenerator must yield the IDs that the Processor provides
1152 (this structure allows us to trap all exceptions from clientGenerator,
1153 while allowing it to do resource initializations that would be
1154 much less elegant in a callback function.)
1156 Server must be a function that returns an iterator (e.g. a generator).
1157 Resources is a list of strings naming the resources we need
1158 copied to local host for client to be able to do its work.
1160 Both client and server constructors use **kwargs to get command
1161 line arguments (passed as GNU-style --foo=bar;
1162 see the constructor arguments to see the list of
1163 options that each can be passed.
1165 #CALL LIKE THIS FROM yourscript.py:
1166 import coordinator
1167 if __name__=='__main__':
1168 coordinator.start_client_or_server(clientGen,serverGen,resources,__file__)
1170 To start the resource controller:
1171 python coordinator.py --rc=NAME [options]
1173 To start a job coordinator:
1174 python yourscript.py NAME [--rc_url=URL] [options]
1176 To start a job processor:
1177 python yourscript.py --url=URL --rc_url=URL [options]"""
1178 d,l=parse_argv()
1179 if 'url' in d: # WE ARE A CLIENT!
1180 client=Processor(**d)
1181 time.sleep(5) # GIVE THE SERVER SOME BREATHING SPACE
1182 client.run_all(clientGenerator,**d)
1183 elif 'rc' in d: # WE ARE THE RESOURCE CONTROLLER
1184 rc_server=ResourceController(**d) # NAME FOR THIS CONTROLLER...
1185 detach_as_demon_process(rc_server)
1186 rc_server() # START THE SERVER
1187 else: # WE ARE A SERVER
1188 try: # PASS OUR KWARGS TO THE SERVER FUNCTION
1189 it=serverGenerator(**d)
1190 except TypeError: # DOESN'T WANT ANY ARGS?
1191 it=serverGenerator()
1192 server=Coordinator(l[0],script,it,resources,**d)
1193 detach_as_demon_process(server)
1194 server() # START THE SERVER
1197 class CoordinatorMonitor(object):
1198 "Monitor a Coordinator."
1199 def __init__(self,coordInfo):
1200 self.name,self.url,self.priority,self.allocated_ncpu,self.ncpu,\
1201 self.start_time=coordInfo
1202 self.server=xmlrpclib.ServerProxy(self.url)
1203 self.get_status()
1204 def get_status(self):
1205 self.name,self.errlog,self.n,self.nsuccess,self.nerrors,self.client_report,\
1206 self.pending_report,self.logfile=self.server.get_status()
1207 print "Got status from Coordinator:",self.name,self.url
1208 def __getattr__(self,attr):
1209 "just pass on method requests to our server"
1210 return getattr(self.server,attr)
1212 class RCMonitor(object):
1213 """monitor a ResourceController. Useful methods:
1214 get_status()
1215 load_balance()
1216 setrule(rsrc,rule)
1217 delrule(rsrc)
1218 setload(host,maxload)
1219 retry_unused_hosts()
1220 Documented in ResourceController docstrings."""
1221 def __init__(self,host=None,port=5000):
1222 host=get_hostname(host) # GET FQDN
1223 self.rc_url='http://%s:%d' %(host,port)
1224 self.rc_server=xmlrpclib.ServerProxy(self.rc_url)
1225 self.get_status()
1227 def get_status(self):
1228 self.name,self.errlog,self.systemLoad,self.hosts,coordinators, \
1229 self.rules,self.resources,self.locks=self.rc_server.get_status()
1230 print "Got status from ResourceController:",self.name,self.rc_url
1231 self.coordinators={}
1232 for cinfo in coordinators:
1233 try: # IF COORDINATOR HAS DIED, STILL WANT TO RETURN RCMonitor...
1234 self.coordinators[cinfo[0]]=CoordinatorMonitor(cinfo)
1235 except socket.error,e: # JUST COMPLAIN, BUT CONTINUE...
1236 print >>sys.stderr,"Unable to connect to coordinator:",cinfo,e
1238 def __getattr__(self,attr):
1239 "just pass on method requests to our rc_server"
1240 return getattr(self.rc_server,attr)
1242 def test_client(server,**kwargs):
1243 for id in server:
1244 print 'ID',id
1245 yield id
1246 time.sleep(1)
1248 def test_server():
1249 return range(1000)
1251 if __name__=='__main__':
1252 start_client_or_server(test_client,test_server,[],__file__)