.
[corvix.git] / var / deb-package / corvix-cluster / opt / cluster / sbin / rgang.py
blob55bbdb5b5ad35460f4f8ba34ac25963745625580
1 #!/usr/bin/env python
2 # This file (rgang.py) was created by Ron Rechenmacher <ron@fnal.gov> on
3 # Apr 13, 2001. "TERMS AND CONDITIONS" governing this file are in the README
4 # or COPYING file. If you do not have such a file, one can be obtained by
5 # contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
6 # $RCSfile: rgang.py,v $ $Revision: 1.112 $ $Date: 2005/05/04 17:25:34 $
8 import os.path # basename
9 import sys # argv
10 import re # findall
11 import string # split
12 import time # time
14 DFLT_RSH='rsh'
15 DFLT_RCP='rcp'
16 DFLT_RSHTO='150.0' # float seconds (basically, let the user decide)
17 DFLT_RCPTO='3600.0' # float seconds (big files?) (basically, let the user decide)
19 VERSION='2.7 cvs: $Revision: 1.112 $$Date: 2005/05/04 17:25:34 $'
20 APP = os.path.basename( sys.argv[0] )
21 USAGE=\
22 "%s: run cmds on hosts\n"%(APP,)+\
23 "Usage: %s [options] <nodespec> [cmd]\n"%(APP,)+\
24 " %s [options] -<c|C> <nodespec> 'srcfile' 'dstdfile'\n"%(APP,)+\
25 " %s [--skip <nodespec>] --list [<nodespec>]\n"%(APP,)+\
26 " %s -d\n"%(APP,)+\
27 " nodespec can be a \"farmlet\", \"expandable node list\", or \"-\".\n"+\
28 " when nodespec is \"-\", nodes are read from stdin (1 per line)\n"+\
29 " until a line containing a single \".\" is encountered\n"+\
30 " The nodespec is evaluated in the following order:\n"+\
31 " is it the stdin (\"-\")\n"+\
32 " it it a file in \"farmlets\" directory\n"+\
33 " it it a file in current working directory\n"+\
34 " assume \"expandable node list\"\n"+\
35 "Examples: %s -c root@qcd01{01-4} .profile .\n"%(APP,)+\
36 " %s all ls\n"%(APP,)+\
37 " %s all \"echo 'hi there'\"\n"%(APP,)+\
38 " %s qcd01{01-4} '%s qcd01{01-4} \"echo hi\"'\n"%(APP,APP)+\
39 " %s -l root qcd0{1-2}{04-10} \"echo 'hi there'\"\n"%(APP,)+\
40 " %s --skip node0{b-d} --list \"node{04-0x44,4f-0x55}\"\n"%(APP,)+\
41 " to test a large # of nodes:\n"+\
42 " %s 'qcd{,,,}{01-8}{01-10}' echo hi\n"%(APP,)+\
43 "Note: when using node expansion, don't forget to quote the expression to\n"+\
44 " stop shell expansion (as bash would do with the \"{,}\" syntax in\n"+\
45 " above example)"
46 # For the following, the definitions are:
47 # 'desc' Short description; printed out along with "USAGE"
48 # 'init' The initialized value, i.e. when option is not specified;
49 # usually string, but can be other if accompanied by specific
50 # processing (default='')
51 # 'alias' a list of aliases (i.e. short form?)
52 # 'arg' the type of option argument:
53 # an integer
54 # 0 => option does not take an argument (default)
55 # 1 => option does take an argument
56 # 2 => option takes an optional argument where if it is
57 # multiple leter (long [--] form) option there must
58 # be an '=<value>', else if it is a single letter,
59 # the arg will follow
60 # 'opt' for optional argument type options (arg=2), the
61 # value of the option when specified without an argument
62 OPTSPEC={ 'd':{'desc':"List farmlet names"},
63 's':{'desc':"Skip current (local) node"},
64 'c':{'desc':"Copy input output"},
65 'C':{'desc':"Copy and skip current (local) node. Equiv to -sc"},
66 'p':{'desc':"Same as rcp -p (only applicable if -c/C)"},
67 'l':{'desc':"Same as rsh -l (only applicable if not -c/C)",'arg':1},
68 'x':{'desc':"Same as rsh/rcp -x (turns on encryption)"},
69 'X':{'desc':"Same as rsh/rcp -X (turns off encryption)"},
70 'f':{'desc':"Same as rsh -f (cause nonforwardable credentials to be forwarded)"},
71 'F':{'desc':"Same as rsh/rcp -F (cause forwardable credentials to be forwarded)"},
72 'N':{'desc':"Same as rsh/rcp -N (prevent credentials from being forwarded)"},
73 'n':{'desc':"-n or -n0: no header, -n1 or -nn: node=, -n2: ---, -n3: --- and cmd",
74 'arg':2,'init':'','opt':'0'},
75 'list' :{'desc':"list farmlets (there contents)"},
76 'farmlets':{'desc':"override farmlets dir /opt/cluster/etc/rgang",
77 'arg':1,'init':'/opt/cluster/etc/rgang'},
78 'serial' :{'desc':"do not fork all commands before reading output"},
79 'skip' :{'desc':'skip this specific list of nodes','arg':1},
80 'rsh' :{'desc':'override default rsh','arg':1,'init':DFLT_RSH},
81 'rcp' :{'desc':'override default rcp','arg':1,'init':DFLT_RCP},
82 'nway' :{'desc':'number of branches for each node of the tree OR if --serial, # of groups (def=200)',
83 'arg':1,'init':200},
84 'combine' :{'desc':"spawn commands with stderr dupped onto stdout"},
85 'do-local':{'desc':"do 1st node in current (local) environment; do not rsh"},
86 'tlvlmsk' :{'desc':"debugging; set hex debug mask i.e. 0xf, \"1<<9\"",'arg':2,'opt':1,'init':'0'},
87 'pty' :{'desc':"use pseudo term -- good for when ssh wants passwd"},
88 'rshto' :{'desc':"change the default timeout value %s (float seconds) for non-copy"%DFLT_RSHTO,
89 'arg':1,'init':DFLT_RSHTO},
90 'rcpto' :{'desc':"change the default timeout value %s (float seconds) for the copy"%DFLT_RCPTO,
91 'arg':1,'init':DFLT_RCPTO},
92 'err-file':{'desc':"file to write timedout/error nodes to (could be used for retry)",
93 'arg':1},
94 'verbose' :{'desc':"verbose",'alias':['v']},
95 'pyret' :{'desc':"don't output to stdout/err (used when rgang is imported in other .py scripts)"},
96 'pyprint' :{'desc':"implies pyret but *final* result is printed"},
97 'pypickle':{'desc':"implies pyret but *final* result is pickled and printed (preceeded by 8 length characters)"},
98 'path' :{'desc':"prepend to path when rsh rgang",'arg':1,'init':os.path.abspath(os.path.dirname(sys.argv[0]))},
99 'app' :{'desc':"change application name from default \"%s\" (use when call from script)"%(APP,),'arg':1,'init':APP},
100 'input-to-all-branches' :{'desc':"send input to all branches, not just currently processed"},
101 'adjust-copy-args':{'desc':"internal - applicable for \"-c\" (copy-mode)"},
102 'mach_idx_offset' :{'desc':"internal - used to determine \"root\" machine",'arg':1},
106 def getopts( optspec, argv, usage_in, app, version='' ):
107 import os # environ
108 import string # replace, split
109 import sys # exit
110 import re # sub
111 optspec.update( {'help' :{'alias':['h','?'],'desc':'print usage/help'}} )
112 optspec.update( {'version':{'alias':['V'],'desc':'print cvs version/date'}} )
113 opt_map={} # handles aliases
114 opt={} # local master options dictionary
115 long_opts = []; env_opts = []
116 for op in optspec.keys():
117 default = {'desc':'', 'init':'', 'alias':[], 'arg':0, 'opt':''}
118 default.update( optspec[op] ); optspec[op].update( default )
119 opt_map[op] = op; dashes='-'
120 if len(op) > 1: long_opts.append(op); dashes='--'
121 for alias in optspec[op]['alias']:
122 opt_map[alias] = op
123 if len(alias) > 1: long_opts.append(alias)
124 env=string.upper(re.sub('[.-]','_',app)+'_'+string.replace(op,'-','_'))
125 if env in os.environ.keys():
126 if optspec[op]['arg']:
127 ee=os.environ[env]
128 opt[op]=ee; env_opts.append(dashes+op+'='+ee)
129 else: opt[op]='1'; env_opts.append(dashes+op)
130 else: opt[op]=optspec[op]['init']
131 usage_out = usage_in+"\n\
132 Note: all options can be specified via environment variables of the form\n\
133 %s_<OPTION> where option is all uppercase\n\
134 Options:\n"%(string.upper(re.sub('[.-]','_',app)),)
135 xx=optspec.keys(); xx.sort()
136 for op in xx:
137 dash=''
138 for op_ in [op]+optspec[op]['alias']:
139 if len(op_) == 1: dash=dash+',-'+op_
140 else: dash=dash+',--'+op_
141 if len(op_) == 1 and optspec[op]['arg'] == 1: dash=dash+'<val>'
142 elif optspec[op]['arg'] == 1: dash=dash+'<=val>'
143 if len(op_) == 1 and optspec[op]['arg'] == 2: dash=dash+'[val]'
144 elif optspec[op]['arg'] == 2: dash=dash+'[=val]'
145 usage_out = usage_out + ' %-20s %s\n'%(dash[1:],optspec[op]['desc'])
146 long_space_separated = ' '+string.join(long_opts)
147 opts=[] # to remember all args passwd
148 while argv and argv[0][0] == '-' and len(argv[0]) > 1:
149 op = argv[0][1:]; opts.append(argv.pop(0)) # save all opts
150 long_flg = 0 # SHORT FORM is default
151 if op[0] == '-':
152 long_flg = 1 # LONG FORM
153 op = op[1:]
154 # check for '=' and prepare possible op_arg. (adding the '=' is a
155 op,op_arg = string.split( op+'=', '=', 1 ) #trick, it gets removed)
156 op_grp = [op]
157 else: op_grp = map( lambda x:x, op ) # convert string to list
158 while op_grp:
159 op_ = op_grp.pop(0)
160 if long_flg and not op_ in long_opts:
161 possibles = re.findall(" "+op_+"[^ ]*",long_space_separated)
162 if len(possibles) == 1: op_ = possibles[0][1:]
163 elif len(possibles) > 1:
164 pp = string.join(possibles,'')
165 sys.stderr.write('ambiguous "long" option: %s\ncould be:%s\n'%(op_,pp))
166 sys.exit(1)
167 else: sys.stderr.write('%s: unknown "long" option: %s\n'%(APP,op_)); sys.exit(1)
168 elif not long_flg and not op_ in opt_map.keys():#OK, no short list.
169 sys.stderr.write('unknown option: %s\n'%(op_,)); sys.exit(1)
170 op_ = opt_map[op_] # unalias
171 if optspec[op_]['arg'] == 0: ### NO OPTION ARG
172 if long_flg and op_arg:
173 sys.stderr.write('option %s does not take an argument\n'%(op_,))
174 sys.exit(1)
175 opt[op_] = '1'
176 elif optspec[op_]['arg'] == 1: ### OPTION ARG
177 if long_flg and op_arg:
178 opt[op_] = op_arg[:-1] # strip off added '='
179 elif not long_flg and op_grp:
180 opt[op_] = string.join(op_grp,''); op_grp = []
181 elif not argv:
182 sys.stderr.write('option %s requires and argument\n'%(op_,)); sys.exit(1)
183 else: opt[op_]=argv[0]; opts.append(argv.pop(0))# save all opts
184 elif long_flg and op_arg: ### OPTIONAL OPTION ARG
185 opt[op_] = op_arg[:-1] # strip off added '='
186 elif not long_flg and op_grp:
187 opt[op_] = string.join(op_grp,''); op_grp = []
188 else: opt[op_] = optspec[op_]['opt']
189 if opt['version']:
190 if version == '':
191 try: version = VERSION # incase VERSION is not set (this is generic code) -
192 except: pass # version will just remain ''
193 print 'version:',version; sys.exit( 0 )
194 if opt['help']: print usage_out; sys.exit( 0 )
195 return env_opts+opts,argv,opt,usage_out
196 #getopts
199 # this needs g_opt['tlvlmsk']
200 def TRACE( lvl, fmt_s, *args ):
201 import socket
202 # how is g_opt working w/o "global" declaration? - must default if read
203 fd = sys.stderr.fileno() # default
204 if g_opt['tlvlmsk'] & (1<<lvl): # and g_opt['mach_idx_offset']=='':
205 fo = open( "%s.%s.trc"%(g_thisnode.hostnames_l[0],os.getpid()), "a+" )
206 fd = fo.fileno()
207 os.write( fd, '%.2f:%s:%s:%d:%s\n'%(time.time(),socket.gethostname(),g_thisnode.mach_idx,lvl,fmt_s%args) )
208 fo.close()
209 # TRACE
211 ###############################################################################
212 # General Regular Expression class that allows for:
213 # xx = Re( re )
214 # ...
215 # if xx.search( line ):
216 # yy = xx.match_obj.group( x )
219 class Re:
220 import re
221 def __init__( self, reg_ex=None,flags=0 ):
222 if reg_ex: self.compiled = self.re.compile(reg_ex,flags)
223 self.match_obj=None
224 def search(self,arg1,string=None):
225 if string: self.match_obj = self.re.search(arg1, string)
226 else: self.match_obj = self.compiled.search( arg1 )
227 return self.match_obj
228 # Re
230 re_numeric = Re( r"([0-9a-f]+)-((0x{0,1}){0,1}[0-9a-f]+)" ) # the "r" prefix --> use Python's raw string notation
231 re_1alpha = Re( r"([a-zA-Z])-([a-zA-Z])" ) # the "r" prefix --> use Python's raw string notation
233 # the connect str will be the first thing that is *supposed to be* printed out.
234 CONNECT_MAGIC = "__rg_connect__"
235 re_connect = Re( r"(.*)%s"%(CONNECT_MAGIC,) ) # the "r" prefix --> use Python's raw string notation
237 STATUS_MAGIC = "__rg_sts__:" # Can I manipulate this so it is TRACE-able? --> note ("") appended in spawn_cmd
238 #re_status = Re( r"(.*)%s([0-9]+)$"%(STATUS_MAGIC,),re.MULTILINE ) # the "r" prefix --> use Python's raw string notation
239 # I'll assume that with the echo of the STATUS_MAGIC that ends with newline as
240 # defined in spawn_cmd, the re search does not need to be multiline (actually,
241 # I recall there may be a problem with MULTILINE
242 re_status = Re( r"(.*)%s([0-9]+)"%(STATUS_MAGIC,) ) # the "r" prefix --> use Python's raw string notation
244 re_pickle = Re( r"PICKLE:([0-9a-f]{8}):" )
246 re_mach_id = Re( r"(.*[^\\]|^)\$(RGANG_MACH_ID|{RGANG_MACH_ID})([^a-zA-Z_].*|$)" ) # for rcp
248 re_mach = Re( r"^[^#\S]*([^#\s]+)" )
251 def findall_expands( ss ):
252 result = []; result_idx = 0; brace_lvl = 0
253 for cc in ss:
254 if cc == '{':
255 brace_lvl = brace_lvl + 1
256 if brace_lvl == 1: result.append('')
257 if brace_lvl > 0: result[result_idx] = result[result_idx] + cc
258 if cc == '}':
259 brace_lvl = brace_lvl - 1
260 if brace_lvl == 0: result_idx = result_idx + 1
261 if brace_lvl != 0: result.pop()
262 return result
263 # findall_expands
266 def numeric_expand( ss_l ):
267 ret = []
268 for sss in ss_l:
269 # single alpha check 1st so {a-f} is not mistaken for
270 # integer (not hex) numeric expand
271 if re_1alpha.search( sss ):
272 start = re_1alpha.match_obj.group(1)
273 end = re_1alpha.match_obj.group(2)
274 end = chr(ord(end)+1)
275 while start != end:
276 ret.append( start )
277 start = chr(ord(start)+1)
278 elif re_numeric.search( sss ):
279 start = re_numeric.match_obj.group(1)
280 end = re_numeric.match_obj.group(2)
281 bb = re_numeric.match_obj.group(3)
282 if bb == None:
283 for num in range(int(start),eval(end)+1):
284 ret.append( '%0*d'%(len(start),num) )
285 elif bb == '0':
286 for num in range(eval('0%s'%(start,)),eval(end)+1):
287 ret.append( '%0*o'%(len(start),num) )
288 elif bb == '0x':
289 for num in range(eval('0x%s'%(start,)),eval(end)+1):
290 ret.append( '%0*x'%(len(start),num) )
291 else: ret.append( sss )
292 TRACE( 17, 'numeric_expand returning %s', ret )
293 return ret
294 # numeric_expand
296 def expand( ss ):
297 import string
298 import re
299 TRACE( 18, 'expand(%s)', ss )
300 ssIn = ss
301 try:
302 placeholder_idx = 0
303 expands = findall_expands( ss )
304 if not expands: return string.split(ss,',')
305 exp_result = []
306 for exp in expands:
307 ss = string.replace( ss, exp, '<%d>'%(placeholder_idx,),1 )
308 placeholder_idx = placeholder_idx + 1
309 placeholder_idx = 0
310 for sss in string.split(ss,','):
311 TRACE( 19, 'expand sss=%s of ss=%s', sss, ss )
312 place_holders = re.findall( '<[0-9]+>', sss )
313 for idx in range(len(place_holders)):
314 p_holder = '<%d>'%(placeholder_idx+idx,)
315 expanding = expand( expands[placeholder_idx+idx][1:-1] ) #Recursive call
316 expanding = numeric_expand( expanding )
317 result = []
318 for ssss in string.split(sss, ','):
319 holder_idx = string.find(ssss,p_holder)
320 if holder_idx != -1:
321 pre = ssss[:holder_idx]
322 post= ssss[holder_idx+len(p_holder):]
323 for expanded in expanding:
324 result.append( pre+expanded+post )
325 sss = string.join(result,',')
326 exp_result = exp_result + string.split(sss,',')
327 placeholder_idx = placeholder_idx + len(place_holders)
328 except: # any
329 exc, value, tb = sys.exc_info()
330 sys.stderr.write('Error expanding node list "%s": %s: %s\n'%(ssIn,exc,value) )
331 sys.stderr.write('Prehaps an invalid decimal/octal/hex digit\n' )
332 sys.stderr.write('remember: in the \'{seq1-seq2}\' syntax, seq2\n' )
333 sys.stderr.write('can begin with \'0x\' to force hex or \'0\' to\n' )
334 sys.stderr.write('force octal\n' )
335 if g_opt['tlvlmsk']:
336 for ln in traceback.format_exception( exc, value, tb ):
337 sys.stderr.write(ln)
338 sys.exit(1)
340 return exp_result
341 # expand, numeric_expand, findall_expands
344 def build_quoted_str( args ):
345 import string # join
346 quoted_args=[]
347 for arg in args:
348 if repr(arg)[0] == "'": quoted_args.append( "'%s'"%(arg,) )
349 else: quoted_args.append( '"%s"'%(arg,) )
350 return string.join( quoted_args )
351 # build_quoted_str
353 def build_sh_single_quoted_str( i_str ):
354 o_str = re.sub("'","<<0>>",i_str,0)
355 o_str = re.sub("<<0>>",'\'"\'"\'',o_str,0)
356 o_str = "'%s'"%(o_str,)
357 return (o_str)
358 # build_sh_single_quoted_str
360 def build_sh_doubly_single_quoted_str( i_str ):
361 o_str = re.sub("'","<<0>>",i_str,0)
362 o_str = re.sub("<<0>>",'\'"\'"\'',o_str,0)
363 return (o_str)
364 # build_sh_doubly_single_quoted_str
366 # this routine needs:
367 # g_opt={'tlvlmsk':0,'pty':''}
368 g_num_spawns = 0
369 def spawn( cmd, args, combine_stdout_stderr=0 ):
370 import os # fork, pipe
371 import pty # fork
372 import string # split
373 global g_num_spawns # keep track for total life of process
375 g_num_spawns = g_num_spawns + 1
376 cmd_list = string.split(cmd) # support cmd='cmd -opt'
378 # for stdin/out/err for new child. Note: (read,write)=os.pipe()
379 if g_opt['pty']: pipe0 = [0,0] ; pipe1 = [1,1]; pipe2 = os.pipe()
380 else: pipe0 = os.pipe(); pipe1 = os.pipe(); pipe2 = os.pipe()
382 if g_opt['pty']: pid,fd = pty.fork()
383 else: pid = os.fork()
385 if pid == 0:
386 #child
387 # combining stdout and stderr helps (when simply printing output)
388 # get the output in the same order
389 if combine_stdout_stderr: os.dup2( pipe1[1], 2 ); os.close( pipe2[1] )#; TRACE( 20, "child close %d", pipe2[1]) # close either way as we
390 else: os.dup2( pipe2[1], 2 ); os.close( pipe2[1] )#; TRACE( 20, "child close %d", pipe2[1]) # are done with it.
391 if g_opt['pty']:
392 pass # all done for use in pyt.fork() (except our combining above)
393 else:
394 os.close( pipe0[1] )#; TRACE( 20, "child close %d", pipe0[1] )
395 os.close( pipe1[0] )#; TRACE( 20, "child close %d", pipe1[0] )
396 os.close( pipe2[0] )#; TRACE( 20, "child close %d", pipe2[0] )
397 os.dup2( pipe0[0], 0 ); os.close( pipe0[0] )#; TRACE( 20, "child close %d", pipe0[0] )
398 os.dup2( pipe1[1], 1 ); os.close( pipe1[1] )#; TRACE( 20, "child close %d", pipe1[1] )
399 for ii in range(3,750): # if default nway=200, and there are 3 fd's per process...
400 try: os.close(ii)#; TRACE( 20, "child successfully closed %d", ii )
401 except: pass
403 os.execvp( cmd_list[0], cmd_list+args )
404 # bye-bye python
405 pass
406 #parent
407 TRACE( 20, 'spawn: pid=%d p0=%s p1=%s p2=%s execvp( %s, %s )', pid, pipe0, pipe1, pipe2, cmd_list[0], cmd_list+args )
408 if g_opt['pty']:
409 pipe0[1] = fd # stdin (fd is read/write and only valid in parent; pty takes care of child stdin )
410 pipe1[0] = fd # stdout (fd is read/write and only valid in parent; pty takes care of child stdout )
411 os.close( pipe2[1] ) # parent doesn't need to write to child's stderr (pty does not take care of stderr)
412 else:
413 os.close( pipe0[0] ) # parent doesn't need to read from child's stdin
414 os.close( pipe1[1] ) # parent doesn't need to write to child's stdout
415 os.close( pipe2[1] ) # parent doesn't need to write to child's stderr
416 child_stdin = pipe0[1]
417 child_stdout = pipe1[0]
418 if combine_stdout_stderr: child_stderr = None
419 else: child_stderr = pipe2[0]
420 return pid,child_stdin,child_stdout,child_stderr
421 # spawn
424 # node_info == g_internal_info[x]
425 def spawn_cmd( node_info, mach_idx, opts, args, branch_nodes, do_local ):
426 import os.path # basename, isdir
427 import os # environ
428 global g_mach_idx_offset # declaration necessary because I'm setting it
429 global g_connects_expected # declaration necessary because I'm setting it
430 TRACE( 21, 'spawn_cmd args=%s', args )
431 if g_opt['c']:
432 # rgang COPY mode
433 dest = args[-1]
434 # do special $RGANG_MACH_ID processing
435 # 3 cases: 1 for \"initiator\" node (adjust-copy-args='')
436 # 2 for non-initiator node (adjust-copy-args='1' and '2')
437 if g_opt['adjust-copy-args']:
438 g_opt['adjust-copy-args'] = '' # do this once, not for each branch
439 sour = args[-1]
440 if re_mach_id.search(sour): sour = '%s%d%s'%(re_mach_id.match_obj.group(1),g_mach_idx_offset,re_mach_id.match_obj.group(3))
441 for ii in range(len(args[:-1])):
442 if os.path.isdir(dest):
443 if sour[-1] == '/': args[ii] = sour+os.path.basename(args[ii])
444 else: args[ii] = sour+'/'+os.path.basename(args[ii])
445 else:
446 # THERE SHOULD BE JUST ONE
447 args[ii] = sour
448 g_mach_idx_offset = g_mach_idx_offset + 1 # TRICKY - correct effects dest node processing and rgang --mach_idx_offset
449 if re_mach_id.search(dest): dest = '%s%d%s'%(re_mach_id.match_obj.group(1),g_mach_idx_offset+mach_idx,re_mach_id.match_obj.group(3))
451 if node_info['stage'] == None:
452 # RECALL: rcp is always first; when there are multiple nodes
453 # per branch, rgang uses a 2 step process - 1st/always rcp, then
454 # 2nd, rsh the rgang (if it were one node, we could just rsh, but
455 # for the stake of simplicity, when just use/rely on rgang).
456 sp_args = args[:-1]+['%s:%s'%(node_info['ret_info']['name'],dest)]
457 if g_opt['p']: sp_args = ['-p']+sp_args
458 if g_opt['x']: sp_args = ['-x']+sp_args
459 if g_opt['X']: sp_args = ['-X']+sp_args
460 if g_opt['F']: sp_args = ['-F']+sp_args
461 if g_opt['N']: sp_args = ['-N']+sp_args
462 sp_info = spawn( g_opt['rcp'], sp_args, g_opt['combine'] )
463 node_info['stage'] = 'rcp'
464 timeout_add( node_info['gbl_branch_idx'], float(g_opt['rcpto']) )
465 else:
466 # assume stage is rgang; it would be stage==rcp (with additional
467 # node(s))
468 sp_args = []
469 if g_opt['l']: sp_args = sp_args + ['-l',g_opt['l']]
470 if g_opt['x']: sp_args = sp_args + ['-x']
471 if g_opt['X']: sp_args = sp_args + ['-X']
472 if g_opt['f']: sp_args = sp_args + ['-f']
473 if g_opt['F']: sp_args = sp_args + ['-F']
474 if g_opt['N']: sp_args = sp_args + ['-N']
475 sp_args = sp_args + [node_info['ret_info']['name']]
476 q_user_arg_s = build_quoted_str( args )
477 sp_args = sp_args + ['/bin/sh','-c']
478 # HERE'S THE 1ST PLACE WHERE I NEED TO ADD CONNECT_MAGIC
479 sh_cmd_s = "'" # I want only sh to interpret things
480 sh_cmd_s = sh_cmd_s+'echo %s;'%(CONNECT_MAGIC,)
481 g_connects_expected = g_connects_expected + 1
482 sh_cmd_s = sh_cmd_s+'PATH=%s:$PATH;'%(g_opt['path'],) # see option init
483 sh_cmd_s = sh_cmd_s+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx+g_mach_idx_offset,)
484 sh_cmd_s = sh_cmd_s+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;' # stdout from .rgangrc should be OK; I search for PICKLE:
485 sh_cmd_s = sh_cmd_s+'%s '%(g_opt['app'],)
486 sh_cmd_s = sh_cmd_s+'--pypickle --mach_idx_offset=%d --adjust-copy-args '%(g_mach_idx_offset+mach_idx,)
487 for rgang_opt in opts:
488 if string.find(rgang_opt,'--mach_idx_offset') == 0: continue
489 if string.find(rgang_opt,'--pypickle') == 0: continue
490 if string.find(rgang_opt,'--adjust-copy-args')== 0: continue
491 # use build_quoted_str to preserve, i.e., --rsh="rsh -F" (which is
492 # equiv to "--rsh=rsh -F")
493 sh_cmd_s = sh_cmd_s+build_sh_doubly_single_quoted_str(build_quoted_str([rgang_opt]))+' '
495 sh_cmd_s = sh_cmd_s+'- %s'%(build_sh_doubly_single_quoted_str(q_user_arg_s),)
497 sh_cmd_s = sh_cmd_s+"'" # end the sh cmd string
499 TRACE( 22, 'spawn_cmd rcp rgang sh_cmd_ss=>%s<', sh_cmd_s )
501 sp_args = sp_args + [sh_cmd_s]
502 sp_info = spawn( g_opt['rsh'], sp_args, 0 ) # never combine stderr/out of rsh rgang
503 for machine in branch_nodes:
504 os.write( sp_info[1], machine )
505 os.write( sp_info[1], '\n' )
506 os.write( sp_info[1], '.\n' )
507 #sp_info[1].write( machine )
508 #sp_info[1].write( '\n' )
509 #sp_info[1].write( '.\n' ); sp_info[1].flush()
510 node_info['stage'] = 'rgang'
511 timeout_add( node_info['gbl_branch_idx'], float(g_opt['rshto']) )
512 elif len(branch_nodes) == 1 and do_local and g_thisnode.is_me(branch_nodes[0]): # local, no need to rsh to ourselves
513 sh_cmd_s = ''
514 sh_cmd_s = sh_cmd_s+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx+g_mach_idx_offset,)
515 # RGANG_INITIATOR, RGANG_PARENT, and RGANG_PARENT_ID should already be set
516 sh_cmd_s = sh_cmd_s+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;'
517 sh_cmd_s = sh_cmd_s+string.join(args)
518 TRACE( 22, 'spawn_cmd local sh_cmd_ss=>%s<', sh_cmd_s )
519 sp_args = ['-c',sh_cmd_s]
520 sp_info = spawn( '/bin/sh', sp_args, g_opt['combine'] )
521 node_info['stage'] = 'local'
522 elif len(branch_nodes) == 1: # rsh
523 sp_args = []
524 if g_opt['l']: sp_args = sp_args + ['-l',g_opt['l']]
525 if g_opt['x']: sp_args = sp_args + ['-x']
526 if g_opt['X']: sp_args = sp_args + ['-X']
527 if g_opt['f']: sp_args = sp_args + ['-f']
528 if g_opt['F']: sp_args = sp_args + ['-F']
529 if g_opt['N']: sp_args = sp_args + ['-N']
530 sp_args = sp_args + [node_info['ret_info']['name']]
531 q_user_arg_s = build_quoted_str( args )
532 sp_args = sp_args + ['/bin/sh','-c'] # NOTE: I cannot use 'exec','sh'... b/c
533 # of "&& echo..." appended below. And "&& echo..." needs to be appended
534 # after sh -c 'cmd' (as opposed to to the end of cmd) b/c usr cmd
535 # might end w/ "&"
536 # HERE'S THE 2ND PLACE WHERE I NEED TO ADD CONNECT_MAGIC
537 sh_cmd_s = "'" # I want only sh to interpret things
538 sh_cmd_s = sh_cmd_s+'echo %s;'%(CONNECT_MAGIC,)
539 g_connects_expected = g_connects_expected + 1
540 sh_cmd_s = sh_cmd_s+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx+g_mach_idx_offset,)
541 sh_cmd_s = sh_cmd_s+'RGANG_INITIATOR=%s;export RGANG_INITIATOR;'%(os.environ['RGANG_INITIATOR'],)
542 sh_cmd_s = sh_cmd_s+'RGANG_PARENT=%s;export RGANG_PARENT;'%(g_thisnode.hostnames_l[0],)
543 sh_cmd_s = sh_cmd_s+'RGANG_PARENT_ID=%s;export RGANG_PARENT_ID;'%(g_thisnode.mach_idx,)
544 sh_cmd_s = sh_cmd_s+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;'
546 sh_cmd_s = sh_cmd_s+build_sh_doubly_single_quoted_str(string.join(args,' '))
548 sh_cmd_s = sh_cmd_s+"'" # end the sh cmd string
550 TRACE( 22, 'spawn_cmd rsh sh_cmd_s=>%s<', sh_cmd_s )
552 sp_args = sp_args + [sh_cmd_s]
553 sp_args = sp_args+[' && echo %s""0 || echo %s""1'%(STATUS_MAGIC,STATUS_MAGIC)]
555 sp_info = spawn( g_opt['rsh'], sp_args, g_opt['combine'] )
556 node_info['stage'] = 'rsh'
557 timeout_add( node_info['gbl_branch_idx'], float(g_opt['rshto']) )
558 elif len(branch_nodes) >= 1: # rsh rgang (not user command!)
559 # need rsh <rsh_opts>... <node> <sh_cmd>
560 # sh_cmd is appended/quoted_str of "sh -c 'python_n_rgangapp_path_var_set;rgang_app '"
561 # ['"]quoted_rgang_opts['"]
562 # "' - '"
563 # ['"]quoted_user_args['"]
564 sp_args = []
565 if g_opt['l']: sp_args = sp_args + ['-l',g_opt['l']]
566 if g_opt['x']: sp_args = sp_args + ['-x']
567 if g_opt['X']: sp_args = sp_args + ['-X']
568 if g_opt['f']: sp_args = sp_args + ['-f']
569 if g_opt['F']: sp_args = sp_args + ['-F']
570 if g_opt['N']: sp_args = sp_args + ['-N']
571 sp_args = sp_args + [node_info['ret_info']['name']]
572 q_user_arg_s = build_quoted_str( args )
573 sp_args = sp_args + ['/bin/sh','-c']
574 # HERE'S THE 3RD PLACE WHERE I NEED TO ADD CONNECT_MAGIC
575 sh_cmd_s = "'" # I want only sh to interpret things
576 sh_cmd_s = sh_cmd_s+'echo %s;'%(CONNECT_MAGIC,)
577 g_connects_expected = g_connects_expected + 1
578 sh_cmd_s = sh_cmd_s+'PATH=%s:$PATH;'%(g_opt['path'],) # see option init
579 sh_cmd_s = sh_cmd_s+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx+g_mach_idx_offset,)
580 sh_cmd_s = sh_cmd_s+'RGANG_INITIATOR=%s;export RGANG_INITIATOR;'%(os.environ['RGANG_INITIATOR'],)
581 sh_cmd_s = sh_cmd_s+'RGANG_PARENT=%s;export RGANG_PARENT;'%(g_thisnode.hostnames_l[0],)
582 sh_cmd_s = sh_cmd_s+'RGANG_PARENT_ID=%s;export RGANG_PARENT_ID;'%(g_thisnode.mach_idx,)
583 sh_cmd_s = sh_cmd_s+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;' # stdout from .rgangrc should be OK; I search for PICKLE:
584 sh_cmd_s = sh_cmd_s+'%s '%(g_opt['app'],)
585 sh_cmd_s = sh_cmd_s+'--pypickle --mach_idx_offset=%d '%(g_mach_idx_offset+mach_idx,)
586 for rgang_opt in opts:
587 if string.find(rgang_opt,'--mach_idx_offset') == 0: continue
588 if string.find(rgang_opt,'--pypickle') == 0: continue
589 # use build_quoted_str to preserve, i.e., --rsh="rsh -F" (which is
590 # equiv to "--rsh=rsh -F")
591 sh_cmd_s = sh_cmd_s+build_sh_doubly_single_quoted_str(build_quoted_str([rgang_opt]))+' '
593 sh_cmd_s = sh_cmd_s+'- ' # "-" is the nodespec
595 for arg_s in args:
596 sh_cmd_s = sh_cmd_s+build_sh_doubly_single_quoted_str( build_sh_single_quoted_str(arg_s) ) + ' '
598 sh_cmd_s = sh_cmd_s+"'" # end the sh cmd string
600 TRACE( 22, 'spawn_cmd rang sh_cmd_s=>%s<', sh_cmd_s )
602 sp_args = sp_args + [sh_cmd_s]
603 sp_info = spawn( g_opt['rsh'], sp_args, 0 ) # never combine stderr/out of rsh rgang
604 TRACE( 23, 'spawn_cmd sending nodes: %s', branch_nodes )
605 for machine in branch_nodes:
606 os.write( sp_info[1], machine )
607 os.write( sp_info[1], '\n' )
608 os.write( sp_info[1], '.\n' )
609 node_info['stage'] = 'rgang'
610 timeout_add( node_info['gbl_branch_idx'], float(g_opt['rshto']) )
611 else: # program error
612 raise 'Program Error', 'unexpected branch_nodes list len=%s'%(len(branch_nodes),)
613 return sp_info #pid,child_stdin,child_stdout,child_stderr
614 # spawn_cmd, spawn
617 class NodeInfo:
618 import socket # gethostname()
619 def __init__( self ):
620 import socket
621 import string # split
622 import os
623 xx = socket.gethostbyaddr( socket.gethostname() )
624 self.hostnames_l = [xx[0]]
625 self.alias_l = xx[1]
626 self.ip_l = xx[2]
627 ss = string.split(xx[0],".")
628 if len(ss) == 1: self.shortnames_l = self.hostnames_l
629 else: self.shortnames_l = [ss[0]]
630 try: # must "try" because this aint going to work under every os
631 # get a list of inet address for all interfaces and aliases
632 os_fo = os.popen( "ifconfig 2>/dev/null | grep 'inet addr:' | sed -e 's/.*addr://' -e 's/ .*//'" )
633 for inet_addr in os_fo.readlines():
634 xx = socket.gethostbyaddr(inet_addr[:-1])
635 self.hostnames_l = self.hostnames_l + [xx[0]]
636 self.alias_l = self.alias_l + xx[1]
637 self.ip_l = self.ip_l + xx[2]
638 ss = string.split(xx[0],".")
639 if len(ss) == 1: self.shortnames_l = self.shortnames_l + self.hostnames_l
640 else: self.shortnames_l = self.shortnames_l + [ss[0]]
641 os_fo.close()
642 except: pass
643 self.mach_idx = '?' # rgang specific
644 def is_me( self, node ):
645 import string # find
646 # NOTE: I feel that using gethostbyaddr, which potentially
647 # contacts the names server, is not the right choice.
648 if string.find(node,".") == -1:
649 if node in self.shortnames_l \
650 or node in self.alias_l \
651 or node in self.ip_l: return 1
652 else: return 0
653 else:
654 if node in self.hostnames_l \
655 or node in self.alias_l \
656 or node in self.ip_l: return 1
657 else: return 0
658 # NodeInfo
661 def get_nway_indexes( nway, nth, list_length, minus_idx0=0 ):
662 if minus_idx0: minus_idx0=1; list_length = list_length - 1
663 split_num = float(list_length) / nway
664 start = int( ((nth)*split_num)+0.5 ) + minus_idx0
665 end = int( ((nth+1)*split_num)+0.5 ) + minus_idx0
666 TRACE( 23, 'get_nway_indexes(nway=%s,nth=%s,list_length=%s,minus_idx0=%s)=(start=%s,end=%s)',
667 nway, nth, list_length, minus_idx0, start, end )
668 return start,end
669 # get_nway_indexes
671 def determ( index, list_length, nway, minus_idx0=0 ):
672 if minus_idx0: minus_idx0=1; list_length = list_length - 1
673 # get starting point
674 split_num = float(list_length) / nway
675 nth = int((index-minus_idx0)/split_num)
676 # now see if it's right
677 if (index-minus_idx0) >= int( ((nth+1)*split_num)+0.5 ): nth = nth + 1
678 idx_in_nth = (index-minus_idx0) - int( ((nth)*split_num)+0.5 )
679 return nth,idx_in_nth
680 # determ, get_nway_indexes
683 def header( machine, args ):
684 if g_opt['n'] == '1': os.write( sys.stdout.fileno(), "%s= "%(machine,) )
685 elif g_opt['n'] == '2' or g_opt['n'] == '3': print "\n\
686 - - - - - - - - - - - - - - %s - - - - - - - - - - - - - -"%(machine,)
687 if g_opt['c'] and g_opt['n']=='3':
688 sp_args = args[:-1]+['%s:%s'%(machine,args[-1])]
689 print '%s %s'%(g_opt['rcp'],string.join(sp_args))
690 elif not g_opt['c'] and g_opt['n']=='3':
691 print '%s %s %s'%(g_opt['rsh'],machine,build_quoted_str(args))
692 sys.stdout.flush()
693 # header
696 # returns string,status where:
697 # status=0 == line
698 # status=1 == tmo
699 # status=2 == eof
700 def try_line( fd ):
701 import select
702 sts = 0
703 try:
704 #final_s = s = fd.read( 1 )
705 #final_s = s = os.read(fd.fileno(), 1 )
706 final_s = s = os.read(fd, 1 )
707 except: # any error - THIS CAN MESS UP ^C (main()'s except KeyboardInterrupt...)
708 return '',2
709 #TRACE( 0, 'try_line 1st read - fd=%d', fd )
710 #sys.exit( 1 )
711 #exc, value, tb = sys.exc_info()
712 #raise exc, value
713 try:
714 TRACE( 24, 'try_line looking for line' )
715 while s != '\n' and s != '':
716 rr,ww,ee = select.select([fd],[],[],0.9)
717 #rr,ww,ee = select.select([fd.fileno()],[],[],1)
718 if not rr: sts=1;break
719 #s = fd.read( 1 )
720 #s = os.read(fd.fileno(), 1 )
721 s = os.read( fd, 1 )
722 final_s = final_s + s
723 if s == '': sts=2
724 except: # ??????
725 sys.stderr.write( 'while... fd=%s\n'%(fd,) )
726 #sys.exit( 1 )
727 exc, value, tb = sys.exc_info()
728 raise exc, value
729 TRACE( 25, 'try_line returning %s', (final_s,sts) )
730 return final_s,sts
731 # try_line
734 def get_output( sel_l, fo2node, wait ):
735 import select
736 import os
737 global g_num_connects # because I'm modifying it
739 mach_idx=-1; s='' # init for TRACE below
740 chk_exit=0; final_s=''; fd=None; sh_exit_status=None
741 while 1: # debugging (EAAGAIN; see below)
742 #if sys.stdin in sel_l: TRACE( 26, 'get_output select checking stdin' )
743 #TRACE( 26, 'get_output sel_l=%s wait=%s', sel_l, wait )
745 ready = select.select( sel_l, [], sel_l, wait )
747 #TRACE( 27, 'get_output ready is %s', ready )
748 if not ready[0]:
749 # timeout (or error, but assume timeout; I'm not processing errors)
750 if ready[2]: raise 'Program Error', 'select error'
751 break
753 fd = ready[0][0]
754 mach_idx = fo2node[fd]['mach_idx']
756 #if fo2node[fd]['std'] == sys.stdout.fileno():
757 # TRACE( 27, 'get_output select processing for mach_idx=%d stdout', mach_idx )
758 #elif fo2node[fd]['std'] == sys.stdin.fileno():
759 # TRACE( 27, 'get_output select processing for mach_idx=%d stderr', mach_idx )
760 #else:
761 # TRACE( 27, 'get_output select processing for mach_idx=%d main stdin', mach_idx )
762 # NOTE: When in pty mode, the input is NOT echoed locally and
763 # currently it will take time to check for the STATUS_MAGIC
764 # before echoing the typed characters.
765 try:
766 # ['stage'] should be (see spawn_cmd) one of:
767 # copy-mode:
768 # 'rcp'
769 # 'rgang' need "connect"
770 # cmd-mode:
771 # 'local'
772 # 'rsh' need "connect" ,then "status"
773 # 'rgang' need "connect"
774 if fo2node[fd]['std'] == sys.stdout.fileno() \
775 and ( g_internal_info[mach_idx]['stage'] == 'rsh' \
776 or (g_internal_info[mach_idx]['stage'] == 'rgang' \
777 and g_internal_info[mach_idx]['connected'] == 0 )):
778 s,sts = try_line( fd )
779 else:
780 # Could also be main stdin
781 final_s = os.read( fd, 8192 )
782 if not final_s: chk_exit = 1
783 return chk_exit, final_s, fd, mach_idx, sh_exit_status
784 except IOError, detail:
785 if detail.errno == 11: sys.stderr.write( "EAGAIN\n" ); continue
786 else: raise IOError, detail
787 #TRACE( 31, "get_output mach_idx=%d: ['stage']=%s connect=%s s=>%s<"
788 # , mach_idx, g_internal_info[mach_idx]['stage'], g_internal_info[mach_idx]['connected'], s )
790 if s:
791 if fo2node[fd]['std'] == sys.stdout.fileno():
792 if ( g_internal_info[mach_idx]['stage'] == 'rsh' \
793 or g_internal_info[mach_idx]['stage'] == 'rgang') \
794 and g_internal_info[mach_idx]['connected'] == 0:
795 # Look for the STATUS_MAGIC or CONNECT_MAGIC.
796 # NOTE! THERE IS THE SMALL POSSIBILITY THAT THIS PROCESSING
797 # WILL FAIL B/C THE OF A DELAY IN THE TRANSMISSION OF THE
798 # STATUS LINE (GREATER THAN THE TIMEOUT IN THE TRY_LINE
799 # FUNCTION ABOVE)
800 if re_connect.search( s ):
801 TRACE( 29, 'get_output mach_idx=%d: yes connect magic', mach_idx )
802 final_s = re_connect.match_obj.group(1)
803 g_internal_info[mach_idx]['connected'] = 1
804 g_num_connects = g_num_connects + 1
805 timeout_cancel( g_internal_info[mach_idx]['gbl_branch_idx'] )
806 else:
807 final_s = s
808 elif g_internal_info[mach_idx]['stage'] == 'rsh': # and connected
809 if re_status.search( s ):
810 TRACE( 29, 'get_output mach_idx=%d: yes status magic', mach_idx )
811 final_s = re_status.match_obj.group(1)
812 sh_exit_status = int(re_status.match_obj.group(2))
813 else:
814 final_s = s
815 else: # either rcp, local or (rgang and connected)
816 final_s = s
817 else:
818 final_s = s
819 break
820 else:
821 chk_exit = 1
822 break
823 #TRACE( 31, 'get_output mach_idx=%d fd=%s chk_exit=%s sh_exit_status=%s returning (s=%s) >%s<',
824 # mach_idx,fd,chk_exit,sh_exit_status,s,final_s )
825 return chk_exit, final_s, fd, mach_idx, sh_exit_status
826 # get_output
829 def do_output( mach_idx, processing_idx ):
830 if mach_idx == processing_idx \
831 and not g_opt['pyret'] \
832 and g_internal_info[mach_idx]['stage'] != 'rgang':
833 return 1
834 else: return 0
835 # do output
838 def info_update( mach_idx, fo2node_map, sp_info, select_l ):
839 # recall: sp_info=[pid,child_stdin,child_stdout,child_stderr]
840 g_internal_info[mach_idx]['sp_info'] = sp_info
841 if 1:
842 fo2node_map[sp_info[1]] = {'mach_idx':mach_idx,'std':None}
843 fo2node_map[sp_info[2]] = {'mach_idx':mach_idx,'std':sys.stdout.fileno()}
844 select_l.insert( 0, sp_info[2] ) # order matters; do not "append" after select_l[0] (main stdin)
845 if sp_info[3]:
846 fo2node_map[sp_info[3]] = {'mach_idx':mach_idx,'std':sys.stderr.fileno()}
847 select_l.insert( 0, sp_info[3] ) # order matters; do not "append" after select_l[0] (main stdin)
848 # info_update
851 def info_clear( fd, fo2node_map, select_l ):
852 TRACE( 2, "info_clear clearing fd=%s", fd )
853 select_l.remove( fd )
854 del( fo2node_map[fd] )
855 os.close( fd )
856 # info_clear
858 # this routine requires:
859 # g_timeout_l=[];g_opt={'c':1}
860 def timeout_add( gbl_br_idx, timeout_period ): # timeout_period is either float(g_opt['rshto'] or float(g_opt['rcpto']
861 import time
862 # b/c rcp time can be different, I need to search-add (list needs to be ordered)
864 expire_tm = time.time()+timeout_period
865 if not g_opt['c']: g_timeout_l.append( {'timeout_expires':expire_tm,'gbl_branch_idx':gbl_br_idx} )
866 elif len(g_timeout_l) == 0: g_timeout_l.append( {'timeout_expires':expire_tm,'gbl_branch_idx':gbl_br_idx} )
867 else: # copy mode, mix of rsh and rcp
868 low_idx=0; high_idx=len(g_timeout_l)-1 # gaurd against len=0 above
869 mid_idx = low_idx + (high_idx - low_idx)/2
870 while mid_idx!=high_idx:
871 if expire_tm < g_timeout_l[mid_idx]['timeout_expires']: high_idx = mid_idx
872 else: low_idx = mid_idx+1 # no need to look at mid again
873 mid_idx = low_idx + (high_idx - low_idx)/2
874 if expire_tm < g_timeout_l[mid_idx]['timeout_expires']: new_idx = mid_idx
875 else: new_idx = mid_idx+1
876 g_timeout_l.insert( new_idx, {'timeout_expires':expire_tm,'gbl_branch_idx':gbl_br_idx} )
877 pass
878 pass
879 # timeout_add
881 # currently OK if not found
882 def timeout_cancel( gbl_br_idx ):
883 found = 0
884 for idx in range(len(g_timeout_l)):
885 if g_timeout_l[idx]['gbl_branch_idx'] == gbl_br_idx: found=1; g_timeout_l.pop(idx); break
886 TRACE( 31, "timeout_cancel branch_idx=%d found=%d", gbl_br_idx, found )
887 # timeout_cancel
890 # This handles 1 timeout - the 1st one!
891 def timeout_connect_process():
892 import select # select
893 branch_idx = g_timeout_l[0]['gbl_branch_idx']
894 mach_idx = g_branch_info_l[branch_idx]['active_head']
895 connected = g_internal_info[mach_idx]['connected']
896 # recall: sp_info=[pid,child_stdin,child_stdout,child_stderr]
897 pid = g_internal_info[mach_idx]['sp_info'][0]
898 g_timeout_l.pop(0)
899 TRACE( 30, "timeout_connect_process branch_idx=%d connected=%d pidToKill=%d br_in=%s g_tmo_l=%s",
900 branch_idx, connected, pid, g_internal_info[mach_idx]['sp_info'][1], g_timeout_l )
902 # append ?something? to stderr
903 ret_info = g_internal_info[mach_idx]['ret_info']
904 ret_info['stderr'] = ret_info['stderr']+'rgang timeout expired\n'
906 # do kill and kill check here
907 for sig in (1,2,15,3,9): # 1=HUP, 2=INT(i.e.^C), 15=TERM(default "kill"), 3=QUIT(i.e.^\), 9=KILL
908 try: rpid,status = os.waitpid(pid,os.WNOHANG);status=(status>>8) # but I probably won't use this status
909 except: rpid = 0; break # i.e. (OSError, '[Errno 10] No child processes')
910 if rpid == pid: # OK, process is out-of-there
911 if g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] == None:
912 TRACE( 31, "timeout_connect_process status=%d", status )
913 g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] = 8
914 break
915 os.kill(pid,sig)
916 TRACE( 31, "timeout_connect_process os.kill(%d,%d)", pid, sig )
917 select.select([],[],[],0.05) # use select to sleep sub second
919 return mach_idx # need to return "chk_exit,..." like get_output
920 # timeout_connect_process
923 def initiator_node_status( mach_idx ):
924 # FIRST DETERMINE IF I AM THE INITIATOR NODE
925 if g_opt['mach_idx_offset']=='' and g_opt['err-file']!='':
926 sts = g_internal_info[mach_idx]['ret_info']['rmt_sh_sts']
927 if sts != 0:
928 TRACE( 28, 'initiator_node_status mach_idx=%d sts=%s', mach_idx,sts )
929 fo = open( g_opt['err-file'], 'a+' )
930 name = g_internal_info[mach_idx]['ret_info']['name']
931 fo.write( "%s # mach_idx=%d sts=%s\n"%(name,mach_idx,sts) )
932 fo.close()
933 # initiator_node_status
936 def node_list_from_file( listfile ):
937 mach_l = []
938 fo = open(listfile)
939 TRACE( 2, "node_list_from_file fo.fileno()=%d", fo.fileno() )
940 for xx in fo.readlines():
941 if re_mach.search(xx): mach_l.append(re_mach.match_obj.group(1))
942 fo.close()
943 return mach_l
945 def node_list_from_spec( spec ):
946 mach_l = []
947 if spec == '-':
948 xx,sts = try_line( sys.stdin.fileno() )
949 #xx = sys.stdin.readline()
950 while xx != '.\n' and xx != '':
951 if re_mach.search(xx): mach_l.append(re_mach.match_obj.group(1))
952 xx,sts = try_line( sys.stdin.fileno() )
953 #xx = sys.stdin.readline()
954 elif spec[0]=='.' and os.access(spec,os.R_OK):
955 listfile = spec
956 mach_l = node_list_from_file( listfile )
957 elif os.access(g_opt['farmlets']+'/'+spec,os.R_OK): # you can always specify --farmlets=.
958 listfile = g_opt['farmlets']+'/'+spec
959 mach_l = node_list_from_file( listfile )
960 elif os.access(spec,os.R_OK):
961 listfile = spec
962 mach_l = node_list_from_file( listfile )
963 else:
964 if g_opt['verbose']: sys.stderr.write('assuming expandable node list\n')
965 mach_l = expand( spec )
966 return mach_l
969 def clean():
970 #tty.setcbreak(0)
971 os.system( "stty sane" )
972 #print 'term reset'
973 return
974 # clean
976 def cleanup(signum,frame):
977 clean()
978 sys.exit( 1 )
979 return
980 # cleanup
984 g_opt={'tlvlmsk':0} # and init so test script importing
985 # rgang (to test rgang.expand(),
986 # for example) don't have to.
988 def rgang( opts_n_args ):
989 import os # system
990 import pickle # dumps
991 import traceback # format_exception
992 import pprint # pprint
993 import signal # signal
994 global g_opt
995 global g_thisnode #
996 global g_timeout_l #
997 global g_internal_info #
998 global g_branch_info_l #
999 global g_mach_idx_offset #
1000 global g_num_connects # needed for robust "input-to-all-branches" see (get_output)
1001 global g_connects_expected # needed for robust "input-to-all-branches" see (spawn_cmd and below)
1002 # --------------------------------- # NOTE: currently, there is the
1003 # possibility of a hang on the write to a branch if all nodes in
1004 # the rgang sub tree fail and stdin is large.
1006 opts,args,g_opt,usage = getopts( OPTSPEC, opts_n_args, USAGE, APP )
1007 try: g_opt['tlvlmsk'] = int( eval(g_opt['tlvlmsk']) ) # 0x hex or normal decimal numbers OK
1008 except: sys.stderr.write('invalid tlvlmsk value; must be integer/hex\n');return 1,[]
1009 g_thisnode = NodeInfo() # needed before 1st TRACE
1010 TRACE( 3, 'rgang g_opt is %s', g_opt )
1012 if g_opt['d']:
1013 c="ls %s"%(g_opt['farmlets'],); os.system(c); return 0,[]
1014 elif g_opt['list'] and not args:
1015 if os.access(g_opt['farmlets']+'/.',os.R_OK):
1016 c1="for i in *;do echo FARMLET $i:; cat $i;done"
1017 c="sh -c 'cd %s;%s'"%(g_opt['farmlets'],c1)
1018 os.system(c)
1019 else:
1020 print 'farmlets directory %s not readable'%(g_opt['farmlets'],)
1021 return 0,[]
1022 if not args: print 'no args\n'+usage; return 0,[]
1024 mach_l = node_list_from_spec( args.pop(0) )
1026 if g_opt['pyprint']: g_opt['pyret']='1'
1027 if g_opt['pypickle']: g_opt['pyret']='1'
1028 if g_opt['pypickle'] and g_opt['pyprint']: g_opt['pyprint']='' # pypickle wins
1029 # clean skips
1030 if g_opt['skip']:
1031 skip_l = node_list_from_spec( g_opt['skip'] )
1032 for sk in skip_l:
1033 ii = 0; mach_l_len = len( mach_l )
1034 while ii < mach_l_len:
1035 if mach_l[ii] == sk: mach_l.pop(ii); mach_l_len = mach_l_len-1
1036 else: ii = ii + 1
1037 if g_opt['s']: # skip current (local) node
1038 ii = 0; mach_l_len = len( mach_l )
1039 while ii < mach_l_len:
1040 if g_thisnode.is_me(mach_l[ii]):
1041 mach_l.pop(ii); mach_l_len = mach_l_len - 1
1042 else: ii = ii + 1
1043 # mach_l is now set.
1045 if g_opt['list']:
1046 if not g_opt['pyret']:
1047 for mach in mach_l: print mach
1048 overall_status = 0; ret_info = mach_l # ret_info can have different formats
1049 if g_opt['pyprint']: pprint.pprint( ret_info )
1050 elif g_opt['pypickle']:
1051 dumps = pickle.dumps(ret_info)
1052 sys.stdout.write( 'PICKLE:%08x:'%(len(dumps),) )
1053 sys.stdout.write( dumps )
1054 return overall_status,ret_info
1056 TRACE( 4, 'rgang args is %s', args )
1057 if g_opt['C']: g_opt['s']='1'; g_opt['c']='1'
1058 if g_opt['n'] == '':
1059 if len(mach_l) == 1: g_opt['n']='0'
1060 else: g_opt['n']='1'
1061 elif g_opt['n'] == 'n': g_opt['n']='1'
1062 if len(g_opt['n'])>1 or not '0'<=g_opt['n']<='3':
1063 sys.stderr.write('invalid optional argument "%s" for -n option\n'%(g_opt['n'],))
1064 return 1,[]
1066 if g_opt['c'] and len(args) < 2:
1067 sys.stderr.write('copy mode must have at least 2 arguments\n')
1068 return 1,[]
1070 if len(args) == 0:
1071 sys.stderr.write('interactive not yet supported\n'); return 1,[]
1073 try: nway = int( g_opt['nway'] ) # int(string)=>int and int(int)=>int
1074 except: sys.stderr.write('invalid nway value; must be integer >= 0\n');return 1,[]
1075 if nway < 0: sys.stderr.write('invalid nway value; must be integer >= 0\n');return 1,[]
1077 if g_opt['mach_idx_offset'] != '': # note: also used to determine initiator node
1078 try: g_mach_idx_offset = int( g_opt['mach_idx_offset'] ) # int(string)=>int and int(int)=>int
1079 except: sys.stderr.write('invalid mach_idx_offset value; must be decimal integer\n');return 1,[]
1080 g_thisnode.mach_idx = '%d'%(g_mach_idx_offset,) # for spawn_cmd rsh rgang
1081 g_opt['do-local'] = 1 # I'm assuming I've already rsh'd (rsh rgang...), so don't do rsh again
1082 else: # assume I'm the "initiator" node
1083 os.environ['RGANG_INITIATOR'] = g_thisnode.hostnames_l[0]
1084 # set the following now, in case we are also the root node and
1085 # opt['do-local']; this make spawn_cmd easier
1086 os.environ['RGANG_PARENT'] = ''
1087 os.environ['RGANG_PARENT_ID'] = ''
1088 g_mach_idx_offset = 0 # for branches (see spawn_cmd)
1089 g_thisnode.mach_idx='' # for spawn_cmd rsh rgang; init for TRACE
1090 for ii in range(len(mach_l)):
1091 if g_thisnode.is_me(mach_l[ii]): g_thisnode.mach_idx='%d'%(ii,); break # for spawn_cmd rsh rgang
1092 if g_opt['err-file'] != '': # init err-file
1093 fo = open( g_opt['err-file'], 'w+' )
1094 TRACE( 2, "rgang err-file fd=%d", fo.fileno() )
1095 fo.close()
1097 if g_opt['serial']:
1098 if g_opt['input-to-all-branches']:
1099 sys.stderr.write('invalid --serial/--input-to-all-branches configuration\n');return 1,[]
1100 outer_nway = nway; inner_nway = 0
1101 else:
1102 outer_nway = 1; inner_nway = nway
1103 mach_l_len = len( mach_l )
1104 if outer_nway > mach_l_len or outer_nway == 0: outer_nway = mach_l_len # inner_nway is handled below
1107 ####### OK done with ALL the OPTIONS PROCESSING
1109 if g_opt['pty']:
1110 signal.signal(2,cleanup)
1111 signal.signal(15,cleanup)
1112 os.system("stty -echo -icanon min 1 time 0" )
1113 os.system("stty -inlcr -icrnl" ) # no translations
1114 #os.system("stty ignbrk -ixon -isig" )
1115 os.system("stty ignbrk -ixon" )
1118 ####### NOW, DO THE WORK!!!!
1120 stdin_bytes = 0L # could be counting > 2G bytes
1121 # build/initialize the array so we can add stdin at the end
1122 ret_info = []; g_internal_info=[]
1123 for ii in range(mach_l_len):
1124 ret_info.append(None); g_internal_info.append(None)
1126 g_num_connects = 0
1127 g_connects_expected = 0
1128 select_l = []
1129 fo2node_map = {}
1130 if not g_opt['input-to-all-branches']:
1131 # add in (kludge in??) stdin --> index mach_l_len
1132 g_internal_info.append( {'gbl_branch_idx':None,
1133 'ret_info':None, # ret_info NOT NEEDED!
1134 'stage':None,'sp_info':None,'connected':1} )
1135 select_l.append(sys.stdin.fileno())
1136 fo2node_map[sys.stdin.fileno()] = {'mach_idx':mach_l_len,'std':None}
1137 need_stdin_after_connects = 0
1138 else:
1139 need_stdin_after_connects = 1
1141 branch_input_l = []
1142 g_branch_info_l = []
1143 g_timeout_l = []
1145 have_me = 0 # basicaly, flag to skip past idx 0 in branch processing below
1146 if not g_opt['c'] and not g_opt['serial'] and g_thisnode.is_me(mach_l[0]):
1147 have_me = 1
1148 ret_info[0] = {'name':mach_l[0],'stdout':'','stderr':'','rmt_sh_sts':None}
1149 g_branch_info_l.append( {'active_head':0,'branch_end_idx':1} )
1150 gbl_branch_idx = len(g_branch_info_l)-1 # len would be 1 here ==> gbl_branch_idx = 0
1151 g_internal_info[0] = {'gbl_branch_idx':gbl_branch_idx,
1152 'ret_info':ret_info[0], # ptr
1153 'stage':None,'sp_info':None,'connected':0}
1154 TRACE( 5, 'rgang local spawn_cmd' )
1155 mach_idx = 0
1156 sp_info = spawn_cmd( g_internal_info[mach_idx], mach_idx, opts, args, [mach_l[0]], g_opt['do-local'] ) #1 not g_opt['c'] and g_thisnode.is_me(mach_l[0])
1157 info_update( 0, fo2node_map, sp_info, select_l )
1158 branch_input_l.append( sp_info[1] )
1159 # no timeout
1161 overall_status = 0
1162 START = 0; END = 1
1163 processing_idx = 0
1164 # outer loops result when --serial is specified. It is processed in
1165 # conjunction with the --nway option to specify the number of outer loops.
1166 # (nway==0 means all nodes so just specifying --serial with --nway=0 gives
1167 # "completely serial" operation. --serial with --nway=2 with 400 nodes
1168 # would do 2 set of 200_parallel_spawns This can be demonstrated via:
1169 # rgang.py --serial --nway=2 "192.168.1.136{,,,,,}" 'sleep 8;date'
1170 for outer_group_idx in range(outer_nway):
1171 grp_idxs = get_nway_indexes( outer_nway, outer_group_idx, mach_l_len, have_me)
1173 # START EACH BRANCH
1174 # need to get:
1175 # - list for select for get output_line and
1176 # - map of select fo to node
1177 group_len = grp_idxs[END] - grp_idxs[START]
1178 if inner_nway > group_len or inner_nway == 0: inner_nway = group_len
1180 for inner_branch_idx in range(inner_nway):
1181 branch_idxs = get_nway_indexes( inner_nway, inner_branch_idx, group_len)
1182 branch_len = branch_idxs[END] - branch_idxs[START]
1183 mach_idx = grp_idxs[START] + branch_idxs[START]
1184 branch_end_idx = mach_idx + branch_len
1185 g_branch_info_l.append( {'active_head':mach_idx,'branch_end_idx':branch_end_idx} )
1186 gbl_branch_idx = len(g_branch_info_l)-1
1187 for ii in range(branch_len):
1188 ret_info[mach_idx+ii] = {'name':mach_l[mach_idx+ii],
1189 'stdout':'','stderr':'','rmt_sh_sts':None}
1190 g_internal_info[mach_idx+ii] = {'gbl_branch_idx':gbl_branch_idx,
1191 'ret_info':ret_info[mach_idx+ii], # ptr
1192 'stage':None,'sp_info':None,'connected':0}
1193 branch_nodes = mach_l[mach_idx:branch_end_idx]
1194 sp_info = spawn_cmd( g_internal_info[mach_idx], mach_idx, opts, args, branch_nodes, 0 ) #2 for inner_branch_idx in ...
1195 TRACE( 6, 'rgang after initial spawn_cmd gbl_branch_idx=%d branch_len=%d sp_info=%s', gbl_branch_idx, branch_len, sp_info )
1196 info_update( mach_idx, fo2node_map, sp_info, select_l )
1197 branch_input_l.append( sp_info[1] )
1199 # NOW DO THE PROCESSING
1201 if not g_opt['pyret']: header( mach_l[processing_idx], args )
1202 while processing_idx < grp_idxs[END]:
1204 if need_stdin_after_connects and g_num_connects == g_connects_expected:
1205 #time.sleep(30)
1206 g_internal_info.append( {'gbl_branch_idx':None,
1207 'ret_info':None, # ret_info NOT NEEDED!
1208 'stage':None,'sp_info':None,'connected':1} )
1209 select_l.insert(0,sys.stdin.fileno())
1210 fo2node_map[sys.stdin.fileno()] = {'mach_idx':mach_l_len,'std':None}
1211 need_stdin_after_connects = 0
1214 if g_timeout_l:
1215 # 3 cases:
1216 # 1) "connect" timeout period expires while waiting at select
1217 # 2) "connect" timeout period expires while processing for some
1218 # node
1219 # 3) "connect" timeout period never expires as all nodes
1220 # "connect" promptly
1221 timeout_wait = g_timeout_l[0]['timeout_expires'] - time.time()
1222 if timeout_wait < 0: timeout_wait = 0
1223 else: timeout_wait = None
1225 # DO THE SELECT TO (potentially) GET SOME DATA
1226 # chk_exit indicate that select indicated a file, but the
1227 # read of the file returned 0 bytes (ss=''); the
1228 # process associated with the particular file/node
1229 # probably exited.
1230 # If check_exit==1 then the following should be true:
1231 # ss==''
1232 # fo!=None
1233 # sh_exit_stat==None
1234 # ss is the output data returned unless there is a
1235 # timeout
1236 # fo is the file/node to process, unless timeout
1237 # sh_exit_stat is set if command exit_status (STATUS_MAGIC) was
1238 # received/indicated.
1239 # When a timeout occurs, the return values will be (as initialize
1240 # in get_output):
1241 # chk_exit=0;ss='';fo=None;sh_exit_stat=None
1242 #TRACE( 7, 'rgang before get_output wait=%s select_l=%s need_stdin=%d g_num_connects=%d',
1243 # timeout_wait, select_l, need_stdin_after_connects, g_num_connects )
1244 chk_exit,ss,fo,mach_idx,sh_exit_stat = get_output( select_l, fo2node_map, timeout_wait )
1246 if fo == None:
1247 # timeout
1248 # should be able to do the processing here and not continue, but return
1249 # chk_exit=1 and mach_idx
1251 #move processing from above and do it here to properly continue and finish processing after killing process
1252 # DO CONNECT TIMEOUT PROCESSING NOW
1253 # IN THE CASE of an rgang node timing out, A NEW TIMEOUT
1254 # PERIOD WOULD BE INITIATED
1255 # WHAT HAPPENS IF IT CHANGES THE processing _idx???
1256 mach_idx = timeout_connect_process()
1257 chk_exit,ss,sh_exit_stat = 1,'',None
1258 else:
1259 # no timeout
1260 if mach_idx == mach_l_len: # SPECIAL STDIN FLAG
1261 #TRACE( 8, 'rgang stdin processing_idx=%d', processing_idx )
1262 if len(ss):
1263 #stdin_bytes = stdin_bytes + len(ss)
1264 if g_opt['input-to-all-branches'] or g_opt['pyret'] :
1265 #TRACE( 9, "rgang branch_input_l=%s", branch_input_l )
1266 for br_sdtin in branch_input_l:
1267 bytes_written = os.write( br_sdtin, ss )
1268 while bytes_written < len(ss):
1269 bytes_this_write = os.write( br_sdtin, ss[bytes_written:] )
1270 bytes_written = bytes_written + bytes_this_write
1271 else:
1272 os.write( g_internal_info[processing_idx]['sp_info'][1], ss )
1273 else:
1274 info_clear( sys.stdin.fileno(), fo2node_map, select_l )
1275 #TRACE( 9, "rgang closing everyone's stdin after %d bytes l=%s", stdin_bytes, branch_input_l )
1276 for br_stdin in branch_input_l:
1277 os.close( br_stdin )
1278 mi = fo2node_map[br_stdin]['mach_idx']
1279 del( fo2node_map[br_stdin] )
1280 #pid = g_internal_info[mi]['sp_info'][0]
1281 #os.kill(pid,1)
1282 #os.kill(pid,13)
1283 # this should cause (via chain reaction) the remote
1284 # cmd's to exit; the branch_input_l will be cleaned
1285 # up when they do; as our stdin is no longer in the
1286 # select_l, this whole "SPECIAL STDIN" code should
1287 # not get executed again.
1288 continue # after STDIN PROCESSING
1289 if sh_exit_stat != None: ret_info[mach_idx]['rmt_sh_sts'] = sh_exit_stat # OK if we overwrite timeout kill status
1291 # PROCESS THE DATA
1292 if do_output( mach_idx, processing_idx ):
1293 os.write( fo2node_map[fo]['std'], ss )
1294 elif fo2node_map[fo]['std'] == sys.stdout.fileno():
1295 ret_info[mach_idx]['stdout'] = ret_info[mach_idx]['stdout'] + ss
1296 else:
1297 ret_info[mach_idx]['stderr'] = ret_info[mach_idx]['stderr'] + ss
1301 # CHECK FOR BRANCH/GROUP STATUS
1303 if chk_exit:
1304 TRACE( 8, 'rgang chk_exit mach_idx=%d processing_idx=%d sp_info=%s',
1305 mach_idx, processing_idx, g_internal_info[mach_idx]['sp_info'] )
1307 if do_output( mach_idx, processing_idx ):
1308 # print any previously store stdout/err
1309 os.write( sys.stdout.fileno(), ret_info[mach_idx]['stdout'] )
1310 os.write( sys.stderr.fileno(), ret_info[mach_idx]['stderr'] )
1312 # cleanup_output_status
1313 # if the fd that trigger us was stdOUT, then we need to check stdERR
1314 pid = g_internal_info[mach_idx]['sp_info'][0]
1315 if g_internal_info[mach_idx]['sp_info'][3] \
1316 and ( ( fo != None and fo2node_map[fo]['std'] == sys.stdout.fileno() ) \
1317 or fo == None ):
1318 TRACE( 2, 'rgang CHECKing stdERR fo=%s pid=%d mach_idx=%d', fo, pid, mach_idx )
1319 chk = 0; fo2 = g_internal_info[mach_idx]['sp_info'][3] # init this as fo may be None (i.e. tmo)
1320 while not chk and fo2 != None:
1321 chk,ss,fo2,mi,dont_used = get_output( [g_internal_info[mach_idx]['sp_info'][3]], # 3 is stdERR
1322 fo2node_map, 0 )
1323 if do_output( mach_idx, processing_idx ):
1324 os.write( sys.stderr.fileno(), ss )
1325 else:
1326 ret_info[mach_idx]['stderr'] = ret_info[mach_idx]['stderr'] + ss
1327 # if the fd that trigger us was stdERR, then we need to check stdOUT
1328 if g_internal_info[mach_idx]['sp_info'][2] \
1329 and ( ( fo != None and fo2node_map[fo]['std'] == sys.stderr.fileno() ) \
1330 or fo == None ):
1331 TRACE( 2, 'rgang CHECKing stdOUT fo=%s pid=%d mach_idx=%d', fo, pid, mach_idx )
1332 # There quite possibly could be 2 iteration through
1333 # this while. If the main get_output (above) had stderr
1334 # in the select list first AND the shell is really fast AND
1335 # there is little to no output, then the CONNECT machanism
1336 # may not have happened and then the 1st loop here will
1337 # cause the a '' value to be returned for ss; chk will be 0
1338 # however.
1339 chk = 0; fo2 = g_internal_info[mach_idx]['sp_info'][2] # init this as fo may be None (i.e. tmo)
1340 while not chk and fo2 != None:
1341 chk,ss,fo2,mi,sh_exit_stat = get_output( [g_internal_info[mach_idx]['sp_info'][2]], # 2 is stdout
1342 fo2node_map, 0 )
1343 if sh_exit_stat != None:
1344 ret_info[mach_idx]['rmt_sh_sts'] = sh_exit_stat # OK if we overwrite timeout kill status
1345 if do_output( mach_idx, processing_idx ):
1346 os.write( sys.stdout.fileno(), ss )
1347 else:
1348 ret_info[mach_idx]['stdout'] = ret_info[mach_idx]['stdout'] + ss
1349 pass
1350 pass
1351 pass
1352 # done with output from process
1353 # remove sub-process's output and input fds from
1354 # the select list, fo2node_map, branch_input_l
1355 for ffo in g_internal_info[mach_idx]['sp_info'][2:]:
1356 if ffo: info_clear( ffo, fo2node_map, select_l )
1357 branch_input_l.remove( g_internal_info[mach_idx]['sp_info'][1] )
1358 if g_internal_info[mach_idx]['sp_info'][1] in fo2node_map.keys():
1359 del( fo2node_map[g_internal_info[mach_idx]['sp_info'][1]] )
1360 gbl_branch_idx = g_internal_info[mach_idx]['gbl_branch_idx']
1361 timeout_cancel( gbl_branch_idx )
1363 try:
1364 opid,status = os.waitpid( pid, 0 )
1365 if opid != pid: raise 'Program Error', 'process did not exit'
1366 TRACE( 2, 'rgang waitpid got status for pid=%d mach_idx=%d gbl_branch_idx=%d status=%s',
1367 pid, mach_idx, gbl_branch_idx, status )
1368 status = (status>>8)
1369 except: # i.e. (OSError, '[Errno 10] No child processes')
1370 # must have been killed in timeout_connect_process
1371 status = g_internal_info[mach_idx]['ret_info']['rmt_sh_sts']
1372 TRACE( 2, 'rgang waitpid NO status for pid=%d mach_idx=%d gbl_branch_idx=%d using status=%s',
1373 pid, mach_idx, gbl_branch_idx, status )
1375 # g_internal_info[x].keys = ('stage','connected','sp_info','gbl_branch_idx','ret_info')
1376 # g_internal_info[x]['ret_info'].keys =
1377 if g_internal_info[mach_idx]['stage'] == 'rgang':
1378 g_internal_info[mach_idx]['stage'] = 'done' # SEE "ACTIVE OUTPUT PROCESSING" BELOW
1379 # EEEEEEEE THIS HAS RCP OUTPUT AND RGANG RET!
1380 # BUT WAIT, NORMALLY THERE IS NO RCP STDOUT (maybe stderr
1381 #stdout should be in form: llllllll:pickle
1382 if g_internal_info[mach_idx]['connected'] == 0:
1383 g_connects_expected = g_connects_expected - 1 # if it's not connected now, it never will be
1384 ss = ret_info[mach_idx]['stdout']
1385 TRACE( 11, 'rgang looking for PICKLE in (rmt_sh_sts=%s) >%s<',
1386 ret_info[mach_idx]['rmt_sh_sts'], ss )
1387 pickle_idx = string.find( ss, "PICKLE:" )
1388 if pickle_idx != -1:
1389 try:
1390 pre = ss[:pickle_idx]
1391 pickle_idx = pickle_idx+len("PICKLE:")
1392 length = string.atoi( ss[pickle_idx:pickle_idx+8],16 )
1393 loads = pickle.loads( ss[pickle_idx+9:pickle_idx+9+length] )
1394 post = ss[pickle_idx+9+length:]
1395 if g_opt['c']: offset = 1
1396 else: offset = 0;stderr_sav=ret_info[mach_idx]['stderr']
1397 for ii in range(len(loads)):
1398 ret_info[mach_idx+offset+ii].update( loads[ii] )
1399 overall_status = overall_status | ret_info[mach_idx+offset+ii]['rmt_sh_sts']
1400 TRACE( 28, 'rgang calling initiator_node_status #1 mach_idx=%d', mach_idx+offset+ii )
1401 initiator_node_status( mach_idx+offset+ii )
1402 # NOTE??? for "copy", previous stdout??? may already be printed b/c of rcp stage
1403 if g_opt['c']: ret_info[mach_idx]['stdout'] = pre + post
1404 else:
1405 ret_info[mach_idx]['stdout'] = pre + ret_info[mach_idx]['stdout'] + post
1406 ret_info[mach_idx]['stderr'] = stderr_sav + ret_info[mach_idx]['stderr']
1407 except:
1408 sys.stderr.write('EEEEE-pickle exception mach_idx=%s g_opt[c]=%s ii=%s\n'%(mach_idx,g_opt['c'],ii))
1409 sys.stderr.write( 'EEEEEEEEE - ss[pickle_idx:pickle_idx+9+length=%d]=>%s<\n'%(length,ss[pickle_idx:pickle_idx+9+length]) )
1410 pickle_idx = -1
1411 exc, value, tb = sys.exc_info()
1412 for ln in traceback.format_exception( exc, value, tb ):
1413 sys.stderr.write(ln)
1414 if pickle_idx == -1: # status must be bad
1415 if g_opt['c']:
1416 ret_info[mach_idx]['stderr'] = ret_info[mach_idx]['stderr'] + '%s: warning: "rcp" %s failed\n'%(APP,APP)
1417 else:
1418 if ret_info[mach_idx]['rmt_sh_sts'] == None:
1419 ret_info[mach_idx]['rmt_sh_sts'] = 2
1420 TRACE( 12, 'rgang #1 rmt_sh_sts=2' ) # rsh failure or shell abort (i.e. syntax error)
1421 ret_info[mach_idx]['rmt_sh_sts'] = ret_info[mach_idx]['rmt_sh_sts'] | status
1422 overall_status = overall_status | ret_info[mach_idx]['rmt_sh_sts']
1423 TRACE( 28, 'rgang calling initiator_node_status #2 mach_idx=%d', mach_idx )
1424 initiator_node_status( mach_idx )
1425 gbl_branch_idx = g_internal_info[mach_idx]['gbl_branch_idx']
1426 branch_end_idx = g_branch_info_l[gbl_branch_idx]['branch_end_idx']
1427 branch_nodes = mach_l[mach_idx+1:branch_end_idx]
1428 if branch_nodes:
1429 TRACE( 13, 'rgang spawn_cmd branch_node is %s', branch_nodes )
1430 g_branch_info_l[gbl_branch_idx]['active_head'] = mach_idx+1
1431 #timeout_cancel( gbl_branch_idx ) # cancel 1st timeout for this gbl_branch_idx
1432 sp_info = spawn_cmd( g_internal_info[mach_idx+1], mach_idx+1, opts, args, branch_nodes, 0 ) #3 after bad rgang
1433 info_update( mach_idx+1, fo2node_map, sp_info, select_l )
1434 branch_input_l.append( sp_info[1] )
1435 if mach_idx == processing_idx and not g_opt['pyret']:
1436 os.write( sys.stdout.fileno(), ret_info[processing_idx]['stdout'])
1437 os.write( sys.stderr.fileno(), ret_info[processing_idx]['stderr'])
1438 elif g_internal_info[mach_idx]['stage'] == 'rcp':
1439 # STRIP OFF NODE HERE
1440 if ret_info[mach_idx]['rmt_sh_sts'] != None: sys.stderr.write('1EEEEEEEEEE\n')
1441 ret_info[mach_idx]['rmt_sh_sts'] = status
1442 overall_status = overall_status | ret_info[mach_idx]['rmt_sh_sts']
1443 TRACE( 28, 'rgang calling initiator_node_status #3 mach_idx=%d', mach_idx )
1444 initiator_node_status( mach_idx )
1445 gbl_branch_idx = g_internal_info[mach_idx]['gbl_branch_idx']
1446 branch_end_idx = g_branch_info_l[gbl_branch_idx]['branch_end_idx']
1447 branch_nodes = mach_l[mach_idx+1:branch_end_idx]
1448 if status == 0 and branch_nodes:
1449 TRACE( 14, 'rgang rcp spawn_cmd branch_nodes is %s', branch_nodes )
1450 #timeout_cancel( gbl_branch_idx )
1451 sp_info = spawn_cmd( g_internal_info[mach_idx], mach_idx, opts, args, branch_nodes, 0 ) #4 the rgang after good rcp
1452 info_update( mach_idx, fo2node_map, sp_info, select_l )
1453 branch_input_l.append( sp_info[1] )
1454 g_branch_info_l[gbl_branch_idx]['active_head'] = mach_idx
1455 elif status != 0 and branch_nodes:
1456 # start rcp stage on next node
1457 TRACE( 15, 'rgang rcp spawn_cmd branch_nodes is %s', branch_nodes )
1458 #timeout_cancel( gbl_branch_idx )
1459 sp_info = spawn_cmd( g_internal_info[mach_idx+1], mach_idx+1, opts, args, branch_nodes, 0 ) #5 bad rcp, next node rcp
1460 info_update( mach_idx+1, fo2node_map, sp_info, select_l )
1461 branch_input_l.append( sp_info[1] )
1462 g_branch_info_l[gbl_branch_idx]['active_head'] = mach_idx+1
1463 else:
1464 # DONE WITH BRANCH
1465 pass
1466 pass
1467 elif g_internal_info[mach_idx]['stage'] == 'rsh': # no further branch processing
1468 if g_internal_info[mach_idx]['connected'] == 0:
1469 g_connects_expected = g_connects_expected - 1 # if it's not connected now, it never will be
1470 if ret_info[mach_idx]['rmt_sh_sts'] == None:
1471 ret_info[mach_idx]['rmt_sh_sts'] = 4
1472 TRACE( 16, 'rgang #2 rmt_sh_sts=4' ) # shell abort (i.e. syntax error)
1473 ret_info[mach_idx]['rmt_sh_sts'] = ret_info[mach_idx]['rmt_sh_sts'] | status
1474 overall_status = overall_status | ret_info[mach_idx]['rmt_sh_sts']
1475 TRACE( 28, 'rgang calling initiator_node_status #4 mach_idx=%d', mach_idx )
1476 initiator_node_status( mach_idx )
1477 else: # local - no further branch processing
1478 if ret_info[mach_idx]['rmt_sh_sts'] != None: sys.stderr.write('4EEEEEEEEEE\n')
1479 ret_info[mach_idx]['rmt_sh_sts'] = status
1480 overall_status = overall_status | ret_info[mach_idx]['rmt_sh_sts']
1481 TRACE( 28, 'rgang calling initiator_node_status #5 mach_idx=%d', mach_idx )
1482 initiator_node_status( mach_idx )
1484 # ACTIVE OUTPUT PROCESSING
1485 if mach_idx == processing_idx:
1486 if g_internal_info[processing_idx]['stage'] == 'rgang': continue # continue if "inprogress" rgang
1487 processing_idx = processing_idx + 1
1488 while processing_idx < grp_idxs[END]:
1489 if not g_opt['pyret']: header( mach_l[processing_idx], args )
1490 if g_internal_info[processing_idx]['stage'] == 'rgang': break
1491 if ret_info[processing_idx]['rmt_sh_sts'] == None:
1492 if not g_opt['pyret']:
1493 # print-n-flush and clear stdout/err
1494 os.write( sys.stdout.fileno(),ret_info[processing_idx]['stdout'])
1495 ret_info[processing_idx]['stdout'] = ''
1496 os.write( sys.stderr.fileno(),ret_info[processing_idx]['stderr'])
1497 ret_info[processing_idx]['stderr'] = ''
1498 break
1499 if not g_opt['pyret']:
1500 os.write( sys.stdout.fileno(),ret_info[processing_idx]['stdout'])
1501 os.write( sys.stderr.fileno(),ret_info[processing_idx]['stderr'])
1502 processing_idx = processing_idx + 1
1503 pass
1504 pass
1505 pass # end of "if chk_exit" processing
1507 pass # while processing_idx < grp_idxs[END]:
1509 if g_opt['pty']: clean()
1510 if not g_opt['pyret'] and g_opt['n']=='2': print
1511 if g_opt['pyprint']: pprint.pprint( ret_info )
1512 elif g_opt['pypickle']:
1513 dumps = pickle.dumps(ret_info)
1514 sys.stdout.write( 'PICKLE:%08x:'%(len(dumps),) )
1515 sys.stdout.write( dumps )
1516 return overall_status,ret_info
1517 # rgang
1520 ###############################################################################
1522 def main():
1523 import sys # argv, exit
1524 import select # select
1525 if 1: # switch to 0 to debug
1526 try: total_stat,ret_list = rgang( sys.argv[1:] )
1527 except KeyboardInterrupt, detail:
1528 for mach_idx in range(len(g_internal_info)):
1529 # There is a case, for example: rgang <node> 'sleep 5 &'
1530 # where rgang will receive the "remote shell status", but because
1531 # stdout/err were not closed (i.e.:rgang <node> 'sleep 5 >&- 2>&- &' ), the
1532 # remote shell will hang until the backgrounded process completes.
1533 # In this case I will have a 'rmt_sh_sts' (it will NOT == None);
1534 # so I should NOT do 'rmt_sh_sts' checking.
1535 #if g_internal_info[mach_idx]['ret_info'] != None \
1536 # and g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] == None \
1537 # and g_internal_info[mach_idx]['sp_info'] != None:
1538 if g_internal_info[mach_idx]['sp_info'] != None:
1539 pid = g_internal_info[mach_idx]['sp_info'][0]
1540 # do kill and kill check here
1541 for sig in (1,2,15,3,9): # 1=HUP, 2=INT(i.e.^C), 15=TERM(default "kill"), 3=QUIT(i.e.^\), 9=KILL
1542 try: rpid,status = os.waitpid(pid,os.WNOHANG);status=(status>>8) # but I probably won't use this status
1543 except: rpid = 0; break # i.e. (OSError, '[Errno 10] No child processes')
1544 if rpid == pid: # OK, process is out-of-there
1545 if g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] == None:
1546 TRACE( 30, "main KeyboardInterrupt status=%d", status )
1547 g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] = 0x10
1548 break
1549 os.kill(pid,sig)
1550 TRACE( 30, "main os.kill(%d,%d)", pid, sig )
1551 select.select([],[],[],0.05) # use select to sleep sub second
1553 # emulate the old shell version of rgang when rsh is "interrupted"
1554 sys.exit( (1<<7)+2 )
1555 pass
1556 pass
1557 else:
1558 total_stat,ret_list = rgang( sys.argv[1:] )
1559 sys.exit( total_stat )
1560 # main
1563 # this simple "if ...main..." allows for taking advantage of *experimenting
1564 # with* the optimization (or even just the plain) byte compiled file via:
1565 # python -OO -c 'import rgang;rgang.main()' -nn all 'echo hi'
1566 # and/or a small script:
1567 # #!/bin/sh
1568 # exec python -OO -c "
1569 # import sys;sys.argv[0]='`basename $0`';import rgang;rgang.main()" "$@"
1570 if __name__ == "__main__": main()