2 # This file (rgang.py) was created by Ron Rechenmacher <ron@fnal.gov> on
3 # Apr 13, 2001. "TERMS AND CONDITIONS" governing this file are in the README
4 # or COPYING file. If you do not have such a file, one can be obtained by
5 # contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
6 # $RCSfile: rgang.py,v $ $Revision: 1.112 $ $Date: 2005/05/04 17:25:34 $
8 import os
.path
# basename
16 DFLT_RSHTO
='150.0' # float seconds (basically, let the user decide)
17 DFLT_RCPTO
='3600.0' # float seconds (big files?) (basically, let the user decide)
19 VERSION
='2.7 cvs: $Revision: 1.112 $$Date: 2005/05/04 17:25:34 $'
20 APP
= os
.path
.basename( sys
.argv
[0] )
22 "%s: run cmds on hosts\n"%(APP
,)+\
23 "Usage: %s [options] <nodespec> [cmd]\n"%(APP
,)+\
24 " %s [options] -<c|C> <nodespec> 'srcfile' 'dstdfile'\n"%(APP
,)+\
25 " %s [--skip <nodespec>] --list [<nodespec>]\n"%(APP
,)+\
27 " nodespec can be a \"farmlet\", \"expandable node list\", or \"-\".\n"+\
28 " when nodespec is \"-\", nodes are read from stdin (1 per line)\n"+\
29 " until a line containing a single \".\" is encountered\n"+\
30 " The nodespec is evaluated in the following order:\n"+\
31 " is it the stdin (\"-\")\n"+\
32 " it it a file in \"farmlets\" directory\n"+\
33 " it it a file in current working directory\n"+\
34 " assume \"expandable node list\"\n"+\
35 "Examples: %s -c root@qcd01{01-4} .profile .\n"%(APP
,)+\
36 " %s all ls\n"%(APP
,)+\
37 " %s all \"echo 'hi there'\"\n"%(APP
,)+\
38 " %s qcd01{01-4} '%s qcd01{01-4} \"echo hi\"'\n"%(APP
,APP
)+\
39 " %s -l root qcd0{1-2}{04-10} \"echo 'hi there'\"\n"%(APP
,)+\
40 " %s --skip node0{b-d} --list \"node{04-0x44,4f-0x55}\"\n"%(APP
,)+\
41 " to test a large # of nodes:\n"+\
42 " %s 'qcd{,,,}{01-8}{01-10}' echo hi\n"%(APP
,)+\
43 "Note: when using node expansion, don't forget to quote the expression to\n"+\
44 " stop shell expansion (as bash would do with the \"{,}\" syntax in\n"+\
46 # For the following, the definitions are:
47 # 'desc' Short description; printed out along with "USAGE"
48 # 'init' The initialized value, i.e. when option is not specified;
49 # usually string, but can be other if accompanied by specific
50 # processing (default='')
51 # 'alias' a list of aliases (i.e. short form?)
52 # 'arg' the type of option argument:
54 # 0 => option does not take an argument (default)
55 # 1 => option does take an argument
56 # 2 => option takes an optional argument where if it is
57 # multiple leter (long [--] form) option there must
58 # be an '=<value>', else if it is a single letter,
60 # 'opt' for optional argument type options (arg=2), the
61 # value of the option when specified without an argument
62 OPTSPEC
={ 'd':{'desc':"List farmlet names"},
63 's':{'desc':"Skip current (local) node"},
64 'c':{'desc':"Copy input output"},
65 'C':{'desc':"Copy and skip current (local) node. Equiv to -sc"},
66 'p':{'desc':"Same as rcp -p (only applicable if -c/C)"},
67 'l':{'desc':"Same as rsh -l (only applicable if not -c/C)",'arg':1},
68 'x':{'desc':"Same as rsh/rcp -x (turns on encryption)"},
69 'X':{'desc':"Same as rsh/rcp -X (turns off encryption)"},
70 'f':{'desc':"Same as rsh -f (cause nonforwardable credentials to be forwarded)"},
71 'F':{'desc':"Same as rsh/rcp -F (cause forwardable credentials to be forwarded)"},
72 'N':{'desc':"Same as rsh/rcp -N (prevent credentials from being forwarded)"},
73 'n':{'desc':"-n or -n0: no header, -n1 or -nn: node=, -n2: ---, -n3: --- and cmd",
74 'arg':2,'init':'','opt':'0'},
75 'list' :{'desc':"list farmlets (there contents)"},
76 'farmlets':{'desc':"override farmlets dir /opt/cluster/etc/rgang",
77 'arg':1,'init':'/opt/cluster/etc/rgang'},
78 'serial' :{'desc':"do not fork all commands before reading output"},
79 'skip' :{'desc':'skip this specific list of nodes','arg':1},
80 'rsh' :{'desc':'override default rsh','arg':1,'init':DFLT_RSH
},
81 'rcp' :{'desc':'override default rcp','arg':1,'init':DFLT_RCP
},
82 'nway' :{'desc':'number of branches for each node of the tree OR if --serial, # of groups (def=200)',
84 'combine' :{'desc':"spawn commands with stderr dupped onto stdout"},
85 'do-local':{'desc':"do 1st node in current (local) environment; do not rsh"},
86 'tlvlmsk' :{'desc':"debugging; set hex debug mask i.e. 0xf, \"1<<9\"",'arg':2,'opt':1,'init':'0'},
87 'pty' :{'desc':"use pseudo term -- good for when ssh wants passwd"},
88 'rshto' :{'desc':"change the default timeout value %s (float seconds) for non-copy"%DFLT_RSHTO
,
89 'arg':1,'init':DFLT_RSHTO
},
90 'rcpto' :{'desc':"change the default timeout value %s (float seconds) for the copy"%DFLT_RCPTO
,
91 'arg':1,'init':DFLT_RCPTO
},
92 'err-file':{'desc':"file to write timedout/error nodes to (could be used for retry)",
94 'verbose' :{'desc':"verbose",'alias':['v']},
95 'pyret' :{'desc':"don't output to stdout/err (used when rgang is imported in other .py scripts)"},
96 'pyprint' :{'desc':"implies pyret but *final* result is printed"},
97 'pypickle':{'desc':"implies pyret but *final* result is pickled and printed (preceeded by 8 length characters)"},
98 'path' :{'desc':"prepend to path when rsh rgang",'arg':1,'init':os
.path
.abspath(os
.path
.dirname(sys
.argv
[0]))},
99 'app' :{'desc':"change application name from default \"%s\" (use when call from script)"%(APP
,),'arg':1,'init':APP
},
100 'input-to-all-branches' :{'desc':"send input to all branches, not just currently processed"},
101 'adjust-copy-args':{'desc':"internal - applicable for \"-c\" (copy-mode)"},
102 'mach_idx_offset' :{'desc':"internal - used to determine \"root\" machine",'arg':1},
106 def getopts( optspec
, argv
, usage_in
, app
, version
='' ):
108 import string
# replace, split
111 optspec
.update( {'help' :{'alias':['h','?'],'desc':'print usage/help'}} )
112 optspec
.update( {'version':{'alias':['V'],'desc':'print cvs version/date'}} )
113 opt_map
={} # handles aliases
114 opt
={} # local master options dictionary
115 long_opts
= []; env_opts
= []
116 for op
in optspec
.keys():
117 default
= {'desc':'', 'init':'', 'alias':[], 'arg':0, 'opt':''}
118 default
.update( optspec
[op
] ); optspec
[op
].update( default
)
119 opt_map
[op
] = op
; dashes
='-'
120 if len(op
) > 1: long_opts
.append(op
); dashes
='--'
121 for alias
in optspec
[op
]['alias']:
123 if len(alias
) > 1: long_opts
.append(alias
)
124 env
=string
.upper(re
.sub('[.-]','_',app
)+'_'+string
.replace(op
,'-','_'))
125 if env
in os
.environ
.keys():
126 if optspec
[op
]['arg']:
128 opt
[op
]=ee
; env_opts
.append(dashes
+op
+'='+ee
)
129 else: opt
[op
]='1'; env_opts
.append(dashes
+op
)
130 else: opt
[op
]=optspec
[op
]['init']
131 usage_out
= usage_in
+"\n\
132 Note: all options can be specified via environment variables of the form\n\
133 %s_<OPTION> where option is all uppercase\n\
134 Options:\n"%(string
.upper(re
.sub('[.-]','_',app
)),)
135 xx
=optspec
.keys(); xx
.sort()
138 for op_
in [op
]+optspec
[op
]['alias']:
139 if len(op_
) == 1: dash
=dash
+',-'+op_
140 else: dash
=dash
+',--'+op_
141 if len(op_
) == 1 and optspec
[op
]['arg'] == 1: dash
=dash
+'<val>'
142 elif optspec
[op
]['arg'] == 1: dash
=dash
+'<=val>'
143 if len(op_
) == 1 and optspec
[op
]['arg'] == 2: dash
=dash
+'[val]'
144 elif optspec
[op
]['arg'] == 2: dash
=dash
+'[=val]'
145 usage_out
= usage_out
+ ' %-20s %s\n'%(dash
[1:],optspec
[op
]['desc'])
146 long_space_separated
= ' '+string
.join(long_opts
)
147 opts
=[] # to remember all args passwd
148 while argv
and argv
[0][0] == '-' and len(argv
[0]) > 1:
149 op
= argv
[0][1:]; opts
.append(argv
.pop(0)) # save all opts
150 long_flg
= 0 # SHORT FORM is default
152 long_flg
= 1 # LONG FORM
154 # check for '=' and prepare possible op_arg. (adding the '=' is a
155 op
,op_arg
= string
.split( op
+'=', '=', 1 ) #trick, it gets removed)
157 else: op_grp
= map( lambda x
:x
, op
) # convert string to list
160 if long_flg
and not op_
in long_opts
:
161 possibles
= re
.findall(" "+op_
+"[^ ]*",long_space_separated
)
162 if len(possibles
) == 1: op_
= possibles
[0][1:]
163 elif len(possibles
) > 1:
164 pp
= string
.join(possibles
,'')
165 sys
.stderr
.write('ambiguous "long" option: %s\ncould be:%s\n'%(op_
,pp
))
167 else: sys
.stderr
.write('%s: unknown "long" option: %s\n'%(APP
,op_
)); sys
.exit(1)
168 elif not long_flg
and not op_
in opt_map
.keys():#OK, no short list.
169 sys
.stderr
.write('unknown option: %s\n'%(op_
,)); sys
.exit(1)
170 op_
= opt_map
[op_
] # unalias
171 if optspec
[op_
]['arg'] == 0: ### NO OPTION ARG
172 if long_flg
and op_arg
:
173 sys
.stderr
.write('option %s does not take an argument\n'%(op_
,))
176 elif optspec
[op_
]['arg'] == 1: ### OPTION ARG
177 if long_flg
and op_arg
:
178 opt
[op_
] = op_arg
[:-1] # strip off added '='
179 elif not long_flg
and op_grp
:
180 opt
[op_
] = string
.join(op_grp
,''); op_grp
= []
182 sys
.stderr
.write('option %s requires and argument\n'%(op_
,)); sys
.exit(1)
183 else: opt
[op_
]=argv
[0]; opts
.append(argv
.pop(0))# save all opts
184 elif long_flg
and op_arg
: ### OPTIONAL OPTION ARG
185 opt
[op_
] = op_arg
[:-1] # strip off added '='
186 elif not long_flg
and op_grp
:
187 opt
[op_
] = string
.join(op_grp
,''); op_grp
= []
188 else: opt
[op_
] = optspec
[op_
]['opt']
191 try: version
= VERSION
# incase VERSION is not set (this is generic code) -
192 except: pass # version will just remain ''
193 print 'version:',version
; sys
.exit( 0 )
194 if opt
['help']: print usage_out
; sys
.exit( 0 )
195 return env_opts
+opts
,argv
,opt
,usage_out
199 # this needs g_opt['tlvlmsk']
200 def TRACE( lvl
, fmt_s
, *args
):
202 # how is g_opt working w/o "global" declaration? - must default if read
203 fd
= sys
.stderr
.fileno() # default
204 if g_opt
['tlvlmsk'] & (1<<lvl
): # and g_opt['mach_idx_offset']=='':
205 fo
= open( "%s.%s.trc"%(g_thisnode
.hostnames_l
[0],os
.getpid()), "a+" )
207 os
.write( fd
, '%.2f:%s:%s:%d:%s\n'%(time
.time(),socket
.gethostname(),g_thisnode
.mach_idx
,lvl
,fmt_s
%args
) )
211 ###############################################################################
212 # General Regular Expression class that allows for:
215 # if xx.search( line ):
216 # yy = xx.match_obj.group( x )
221 def __init__( self
, reg_ex
=None,flags
=0 ):
222 if reg_ex
: self
.compiled
= self
.re
.compile(reg_ex
,flags
)
224 def search(self
,arg1
,string
=None):
225 if string
: self
.match_obj
= self
.re
.search(arg1
, string
)
226 else: self
.match_obj
= self
.compiled
.search( arg1
)
227 return self
.match_obj
230 re_numeric
= Re( r
"([0-9a-f]+)-((0x{0,1}){0,1}[0-9a-f]+)" ) # the "r" prefix --> use Python's raw string notation
231 re_1alpha
= Re( r
"([a-zA-Z])-([a-zA-Z])" ) # the "r" prefix --> use Python's raw string notation
233 # the connect str will be the first thing that is *supposed to be* printed out.
234 CONNECT_MAGIC
= "__rg_connect__"
235 re_connect
= Re( r
"(.*)%s"%(CONNECT_MAGIC
,) ) # the "r" prefix --> use Python's raw string notation
237 STATUS_MAGIC
= "__rg_sts__:" # Can I manipulate this so it is TRACE-able? --> note ("") appended in spawn_cmd
238 #re_status = Re( r"(.*)%s([0-9]+)$"%(STATUS_MAGIC,),re.MULTILINE ) # the "r" prefix --> use Python's raw string notation
239 # I'll assume that with the echo of the STATUS_MAGIC that ends with newline as
240 # defined in spawn_cmd, the re search does not need to be multiline (actually,
241 # I recall there may be a problem with MULTILINE
242 re_status
= Re( r
"(.*)%s([0-9]+)"%(STATUS_MAGIC
,) ) # the "r" prefix --> use Python's raw string notation
244 re_pickle
= Re( r
"PICKLE:([0-9a-f]{8}):" )
246 re_mach_id
= Re( r
"(.*[^\\]|^)\$(RGANG_MACH_ID|{RGANG_MACH_ID})([^a-zA-Z_].*|$)" ) # for rcp
248 re_mach
= Re( r
"^[^#\S]*([^#\s]+)" )
251 def findall_expands( ss
):
252 result
= []; result_idx
= 0; brace_lvl
= 0
255 brace_lvl
= brace_lvl
+ 1
256 if brace_lvl
== 1: result
.append('')
257 if brace_lvl
> 0: result
[result_idx
] = result
[result_idx
] + cc
259 brace_lvl
= brace_lvl
- 1
260 if brace_lvl
== 0: result_idx
= result_idx
+ 1
261 if brace_lvl
!= 0: result
.pop()
266 def numeric_expand( ss_l
):
269 # single alpha check 1st so {a-f} is not mistaken for
270 # integer (not hex) numeric expand
271 if re_1alpha
.search( sss
):
272 start
= re_1alpha
.match_obj
.group(1)
273 end
= re_1alpha
.match_obj
.group(2)
274 end
= chr(ord(end
)+1)
277 start
= chr(ord(start
)+1)
278 elif re_numeric
.search( sss
):
279 start
= re_numeric
.match_obj
.group(1)
280 end
= re_numeric
.match_obj
.group(2)
281 bb
= re_numeric
.match_obj
.group(3)
283 for num
in range(int(start
),eval(end
)+1):
284 ret
.append( '%0*d'%(len(start
),num
) )
286 for num
in range(eval('0%s'%(start
,)),eval(end
)+1):
287 ret
.append( '%0*o'%(len(start
),num
) )
289 for num
in range(eval('0x%s'%(start
,)),eval(end
)+1):
290 ret
.append( '%0*x'%(len(start
),num
) )
291 else: ret
.append( sss
)
292 TRACE( 17, 'numeric_expand returning %s', ret
)
299 TRACE( 18, 'expand(%s)', ss
)
303 expands
= findall_expands( ss
)
304 if not expands
: return string
.split(ss
,',')
307 ss
= string
.replace( ss
, exp
, '<%d>'%(placeholder_idx
,),1 )
308 placeholder_idx
= placeholder_idx
+ 1
310 for sss
in string
.split(ss
,','):
311 TRACE( 19, 'expand sss=%s of ss=%s', sss
, ss
)
312 place_holders
= re
.findall( '<[0-9]+>', sss
)
313 for idx
in range(len(place_holders
)):
314 p_holder
= '<%d>'%(placeholder_idx
+idx
,)
315 expanding
= expand( expands
[placeholder_idx
+idx
][1:-1] ) #Recursive call
316 expanding
= numeric_expand( expanding
)
318 for ssss
in string
.split(sss
, ','):
319 holder_idx
= string
.find(ssss
,p_holder
)
321 pre
= ssss
[:holder_idx
]
322 post
= ssss
[holder_idx
+len(p_holder
):]
323 for expanded
in expanding
:
324 result
.append( pre
+expanded
+post
)
325 sss
= string
.join(result
,',')
326 exp_result
= exp_result
+ string
.split(sss
,',')
327 placeholder_idx
= placeholder_idx
+ len(place_holders
)
329 exc
, value
, tb
= sys
.exc_info()
330 sys
.stderr
.write('Error expanding node list "%s": %s: %s\n'%(ssIn
,exc
,value
) )
331 sys
.stderr
.write('Prehaps an invalid decimal/octal/hex digit\n' )
332 sys
.stderr
.write('remember: in the \'{seq1-seq2}\' syntax, seq2\n' )
333 sys
.stderr
.write('can begin with \'0x\' to force hex or \'0\' to\n' )
334 sys
.stderr
.write('force octal\n' )
336 for ln
in traceback
.format_exception( exc
, value
, tb
):
341 # expand, numeric_expand, findall_expands
344 def build_quoted_str( args
):
348 if repr(arg
)[0] == "'": quoted_args
.append( "'%s'"%(arg
,) )
349 else: quoted_args
.append( '"%s"'%(arg
,) )
350 return string
.join( quoted_args
)
353 def build_sh_single_quoted_str( i_str
):
354 o_str
= re
.sub("'","<<0>>",i_str
,0)
355 o_str
= re
.sub("<<0>>",'\'"\'"\'',o_str
,0)
356 o_str
= "'%s'"%(o_str
,)
358 # build_sh_single_quoted_str
360 def build_sh_doubly_single_quoted_str( i_str
):
361 o_str
= re
.sub("'","<<0>>",i_str
,0)
362 o_str
= re
.sub("<<0>>",'\'"\'"\'',o_str
,0)
364 # build_sh_doubly_single_quoted_str
366 # this routine needs:
367 # g_opt={'tlvlmsk':0,'pty':''}
369 def spawn( cmd
, args
, combine_stdout_stderr
=0 ):
370 import os
# fork, pipe
372 import string
# split
373 global g_num_spawns
# keep track for total life of process
375 g_num_spawns
= g_num_spawns
+ 1
376 cmd_list
= string
.split(cmd
) # support cmd='cmd -opt'
378 # for stdin/out/err for new child. Note: (read,write)=os.pipe()
379 if g_opt
['pty']: pipe0
= [0,0] ; pipe1
= [1,1]; pipe2
= os
.pipe()
380 else: pipe0
= os
.pipe(); pipe1
= os
.pipe(); pipe2
= os
.pipe()
382 if g_opt
['pty']: pid
,fd
= pty
.fork()
383 else: pid
= os
.fork()
387 # combining stdout and stderr helps (when simply printing output)
388 # get the output in the same order
389 if combine_stdout_stderr
: os
.dup2( pipe1
[1], 2 ); os
.close( pipe2
[1] )#; TRACE( 20, "child close %d", pipe2[1]) # close either way as we
390 else: os
.dup2( pipe2
[1], 2 ); os
.close( pipe2
[1] )#; TRACE( 20, "child close %d", pipe2[1]) # are done with it.
392 pass # all done for use in pyt.fork() (except our combining above)
394 os
.close( pipe0
[1] )#; TRACE( 20, "child close %d", pipe0[1] )
395 os
.close( pipe1
[0] )#; TRACE( 20, "child close %d", pipe1[0] )
396 os
.close( pipe2
[0] )#; TRACE( 20, "child close %d", pipe2[0] )
397 os
.dup2( pipe0
[0], 0 ); os
.close( pipe0
[0] )#; TRACE( 20, "child close %d", pipe0[0] )
398 os
.dup2( pipe1
[1], 1 ); os
.close( pipe1
[1] )#; TRACE( 20, "child close %d", pipe1[1] )
399 for ii
in range(3,750): # if default nway=200, and there are 3 fd's per process...
400 try: os
.close(ii
)#; TRACE( 20, "child successfully closed %d", ii )
403 os
.execvp( cmd_list
[0], cmd_list
+args
)
407 TRACE( 20, 'spawn: pid=%d p0=%s p1=%s p2=%s execvp( %s, %s )', pid
, pipe0
, pipe1
, pipe2
, cmd_list
[0], cmd_list
+args
)
409 pipe0
[1] = fd
# stdin (fd is read/write and only valid in parent; pty takes care of child stdin )
410 pipe1
[0] = fd
# stdout (fd is read/write and only valid in parent; pty takes care of child stdout )
411 os
.close( pipe2
[1] ) # parent doesn't need to write to child's stderr (pty does not take care of stderr)
413 os
.close( pipe0
[0] ) # parent doesn't need to read from child's stdin
414 os
.close( pipe1
[1] ) # parent doesn't need to write to child's stdout
415 os
.close( pipe2
[1] ) # parent doesn't need to write to child's stderr
416 child_stdin
= pipe0
[1]
417 child_stdout
= pipe1
[0]
418 if combine_stdout_stderr
: child_stderr
= None
419 else: child_stderr
= pipe2
[0]
420 return pid
,child_stdin
,child_stdout
,child_stderr
424 # node_info == g_internal_info[x]
425 def spawn_cmd( node_info
, mach_idx
, opts
, args
, branch_nodes
, do_local
):
426 import os
.path
# basename, isdir
428 global g_mach_idx_offset
# declaration necessary because I'm setting it
429 global g_connects_expected
# declaration necessary because I'm setting it
430 TRACE( 21, 'spawn_cmd args=%s', args
)
434 # do special $RGANG_MACH_ID processing
435 # 3 cases: 1 for \"initiator\" node (adjust-copy-args='')
436 # 2 for non-initiator node (adjust-copy-args='1' and '2')
437 if g_opt
['adjust-copy-args']:
438 g_opt
['adjust-copy-args'] = '' # do this once, not for each branch
440 if re_mach_id
.search(sour
): sour
= '%s%d%s'%(re_mach_id
.match_obj
.group(1),g_mach_idx_offset
,re_mach_id
.match_obj
.group(3))
441 for ii
in range(len(args
[:-1])):
442 if os
.path
.isdir(dest
):
443 if sour
[-1] == '/': args
[ii
] = sour
+os
.path
.basename(args
[ii
])
444 else: args
[ii
] = sour
+'/'+os
.path
.basename(args
[ii
])
446 # THERE SHOULD BE JUST ONE
448 g_mach_idx_offset
= g_mach_idx_offset
+ 1 # TRICKY - correct effects dest node processing and rgang --mach_idx_offset
449 if re_mach_id
.search(dest
): dest
= '%s%d%s'%(re_mach_id
.match_obj
.group(1),g_mach_idx_offset
+mach_idx
,re_mach_id
.match_obj
.group(3))
451 if node_info
['stage'] == None:
452 # RECALL: rcp is always first; when there are multiple nodes
453 # per branch, rgang uses a 2 step process - 1st/always rcp, then
454 # 2nd, rsh the rgang (if it were one node, we could just rsh, but
455 # for the stake of simplicity, when just use/rely on rgang).
456 sp_args
= args
[:-1]+['%s:%s'%(node_info
['ret_info']['name'],dest
)]
457 if g_opt
['p']: sp_args
= ['-p']+sp_args
458 if g_opt
['x']: sp_args
= ['-x']+sp_args
459 if g_opt
['X']: sp_args
= ['-X']+sp_args
460 if g_opt
['F']: sp_args
= ['-F']+sp_args
461 if g_opt
['N']: sp_args
= ['-N']+sp_args
462 sp_info
= spawn( g_opt
['rcp'], sp_args
, g_opt
['combine'] )
463 node_info
['stage'] = 'rcp'
464 timeout_add( node_info
['gbl_branch_idx'], float(g_opt
['rcpto']) )
466 # assume stage is rgang; it would be stage==rcp (with additional
469 if g_opt
['l']: sp_args
= sp_args
+ ['-l',g_opt
['l']]
470 if g_opt
['x']: sp_args
= sp_args
+ ['-x']
471 if g_opt
['X']: sp_args
= sp_args
+ ['-X']
472 if g_opt
['f']: sp_args
= sp_args
+ ['-f']
473 if g_opt
['F']: sp_args
= sp_args
+ ['-F']
474 if g_opt
['N']: sp_args
= sp_args
+ ['-N']
475 sp_args
= sp_args
+ [node_info
['ret_info']['name']]
476 q_user_arg_s
= build_quoted_str( args
)
477 sp_args
= sp_args
+ ['/bin/sh','-c']
478 # HERE'S THE 1ST PLACE WHERE I NEED TO ADD CONNECT_MAGIC
479 sh_cmd_s
= "'" # I want only sh to interpret things
480 sh_cmd_s
= sh_cmd_s
+'echo %s;'%(CONNECT_MAGIC
,)
481 g_connects_expected
= g_connects_expected
+ 1
482 sh_cmd_s
= sh_cmd_s
+'PATH=%s:$PATH;'%(g_opt
['path'],) # see option init
483 sh_cmd_s
= sh_cmd_s
+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx
+g_mach_idx_offset
,)
484 sh_cmd_s
= sh_cmd_s
+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;' # stdout from .rgangrc should be OK; I search for PICKLE:
485 sh_cmd_s
= sh_cmd_s
+'%s '%(g_opt
['app'],)
486 sh_cmd_s
= sh_cmd_s
+'--pypickle --mach_idx_offset=%d --adjust-copy-args '%(g_mach_idx_offset
+mach_idx
,)
487 for rgang_opt
in opts
:
488 if string
.find(rgang_opt
,'--mach_idx_offset') == 0: continue
489 if string
.find(rgang_opt
,'--pypickle') == 0: continue
490 if string
.find(rgang_opt
,'--adjust-copy-args')== 0: continue
491 # use build_quoted_str to preserve, i.e., --rsh="rsh -F" (which is
492 # equiv to "--rsh=rsh -F")
493 sh_cmd_s
= sh_cmd_s
+build_sh_doubly_single_quoted_str(build_quoted_str([rgang_opt
]))+' '
495 sh_cmd_s
= sh_cmd_s
+'- %s'%(build_sh_doubly_single_quoted_str(q_user_arg_s
),)
497 sh_cmd_s
= sh_cmd_s
+"'" # end the sh cmd string
499 TRACE( 22, 'spawn_cmd rcp rgang sh_cmd_ss=>%s<', sh_cmd_s
)
501 sp_args
= sp_args
+ [sh_cmd_s
]
502 sp_info
= spawn( g_opt
['rsh'], sp_args
, 0 ) # never combine stderr/out of rsh rgang
503 for machine
in branch_nodes
:
504 os
.write( sp_info
[1], machine
)
505 os
.write( sp_info
[1], '\n' )
506 os
.write( sp_info
[1], '.\n' )
507 #sp_info[1].write( machine )
508 #sp_info[1].write( '\n' )
509 #sp_info[1].write( '.\n' ); sp_info[1].flush()
510 node_info
['stage'] = 'rgang'
511 timeout_add( node_info
['gbl_branch_idx'], float(g_opt
['rshto']) )
512 elif len(branch_nodes
) == 1 and do_local
and g_thisnode
.is_me(branch_nodes
[0]): # local, no need to rsh to ourselves
514 sh_cmd_s
= sh_cmd_s
+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx
+g_mach_idx_offset
,)
515 # RGANG_INITIATOR, RGANG_PARENT, and RGANG_PARENT_ID should already be set
516 sh_cmd_s
= sh_cmd_s
+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;'
517 sh_cmd_s
= sh_cmd_s
+string
.join(args
)
518 TRACE( 22, 'spawn_cmd local sh_cmd_ss=>%s<', sh_cmd_s
)
519 sp_args
= ['-c',sh_cmd_s
]
520 sp_info
= spawn( '/bin/sh', sp_args
, g_opt
['combine'] )
521 node_info
['stage'] = 'local'
522 elif len(branch_nodes
) == 1: # rsh
524 if g_opt
['l']: sp_args
= sp_args
+ ['-l',g_opt
['l']]
525 if g_opt
['x']: sp_args
= sp_args
+ ['-x']
526 if g_opt
['X']: sp_args
= sp_args
+ ['-X']
527 if g_opt
['f']: sp_args
= sp_args
+ ['-f']
528 if g_opt
['F']: sp_args
= sp_args
+ ['-F']
529 if g_opt
['N']: sp_args
= sp_args
+ ['-N']
530 sp_args
= sp_args
+ [node_info
['ret_info']['name']]
531 q_user_arg_s
= build_quoted_str( args
)
532 sp_args
= sp_args
+ ['/bin/sh','-c'] # NOTE: I cannot use 'exec','sh'... b/c
533 # of "&& echo..." appended below. And "&& echo..." needs to be appended
534 # after sh -c 'cmd' (as opposed to to the end of cmd) b/c usr cmd
536 # HERE'S THE 2ND PLACE WHERE I NEED TO ADD CONNECT_MAGIC
537 sh_cmd_s
= "'" # I want only sh to interpret things
538 sh_cmd_s
= sh_cmd_s
+'echo %s;'%(CONNECT_MAGIC
,)
539 g_connects_expected
= g_connects_expected
+ 1
540 sh_cmd_s
= sh_cmd_s
+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx
+g_mach_idx_offset
,)
541 sh_cmd_s
= sh_cmd_s
+'RGANG_INITIATOR=%s;export RGANG_INITIATOR;'%(os
.environ
['RGANG_INITIATOR'],)
542 sh_cmd_s
= sh_cmd_s
+'RGANG_PARENT=%s;export RGANG_PARENT;'%(g_thisnode
.hostnames_l
[0],)
543 sh_cmd_s
= sh_cmd_s
+'RGANG_PARENT_ID=%s;export RGANG_PARENT_ID;'%(g_thisnode
.mach_idx
,)
544 sh_cmd_s
= sh_cmd_s
+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;'
546 sh_cmd_s
= sh_cmd_s
+build_sh_doubly_single_quoted_str(string
.join(args
,' '))
548 sh_cmd_s
= sh_cmd_s
+"'" # end the sh cmd string
550 TRACE( 22, 'spawn_cmd rsh sh_cmd_s=>%s<', sh_cmd_s
)
552 sp_args
= sp_args
+ [sh_cmd_s
]
553 sp_args
= sp_args
+[' && echo %s""0 || echo %s""1'%(STATUS_MAGIC
,STATUS_MAGIC
)]
555 sp_info
= spawn( g_opt
['rsh'], sp_args
, g_opt
['combine'] )
556 node_info
['stage'] = 'rsh'
557 timeout_add( node_info
['gbl_branch_idx'], float(g_opt
['rshto']) )
558 elif len(branch_nodes
) >= 1: # rsh rgang (not user command!)
559 # need rsh <rsh_opts>... <node> <sh_cmd>
560 # sh_cmd is appended/quoted_str of "sh -c 'python_n_rgangapp_path_var_set;rgang_app '"
561 # ['"]quoted_rgang_opts['"]
563 # ['"]quoted_user_args['"]
565 if g_opt
['l']: sp_args
= sp_args
+ ['-l',g_opt
['l']]
566 if g_opt
['x']: sp_args
= sp_args
+ ['-x']
567 if g_opt
['X']: sp_args
= sp_args
+ ['-X']
568 if g_opt
['f']: sp_args
= sp_args
+ ['-f']
569 if g_opt
['F']: sp_args
= sp_args
+ ['-F']
570 if g_opt
['N']: sp_args
= sp_args
+ ['-N']
571 sp_args
= sp_args
+ [node_info
['ret_info']['name']]
572 q_user_arg_s
= build_quoted_str( args
)
573 sp_args
= sp_args
+ ['/bin/sh','-c']
574 # HERE'S THE 3RD PLACE WHERE I NEED TO ADD CONNECT_MAGIC
575 sh_cmd_s
= "'" # I want only sh to interpret things
576 sh_cmd_s
= sh_cmd_s
+'echo %s;'%(CONNECT_MAGIC
,)
577 g_connects_expected
= g_connects_expected
+ 1
578 sh_cmd_s
= sh_cmd_s
+'PATH=%s:$PATH;'%(g_opt
['path'],) # see option init
579 sh_cmd_s
= sh_cmd_s
+'RGANG_MACH_ID=%d;export RGANG_MACH_ID;'%(mach_idx
+g_mach_idx_offset
,)
580 sh_cmd_s
= sh_cmd_s
+'RGANG_INITIATOR=%s;export RGANG_INITIATOR;'%(os
.environ
['RGANG_INITIATOR'],)
581 sh_cmd_s
= sh_cmd_s
+'RGANG_PARENT=%s;export RGANG_PARENT;'%(g_thisnode
.hostnames_l
[0],)
582 sh_cmd_s
= sh_cmd_s
+'RGANG_PARENT_ID=%s;export RGANG_PARENT_ID;'%(g_thisnode
.mach_idx
,)
583 sh_cmd_s
= sh_cmd_s
+'if [ -r $HOME/.rgangrc ];then . $HOME/.rgangrc;fi;' # stdout from .rgangrc should be OK; I search for PICKLE:
584 sh_cmd_s
= sh_cmd_s
+'%s '%(g_opt
['app'],)
585 sh_cmd_s
= sh_cmd_s
+'--pypickle --mach_idx_offset=%d '%(g_mach_idx_offset
+mach_idx
,)
586 for rgang_opt
in opts
:
587 if string
.find(rgang_opt
,'--mach_idx_offset') == 0: continue
588 if string
.find(rgang_opt
,'--pypickle') == 0: continue
589 # use build_quoted_str to preserve, i.e., --rsh="rsh -F" (which is
590 # equiv to "--rsh=rsh -F")
591 sh_cmd_s
= sh_cmd_s
+build_sh_doubly_single_quoted_str(build_quoted_str([rgang_opt
]))+' '
593 sh_cmd_s
= sh_cmd_s
+'- ' # "-" is the nodespec
596 sh_cmd_s
= sh_cmd_s
+build_sh_doubly_single_quoted_str( build_sh_single_quoted_str(arg_s
) ) + ' '
598 sh_cmd_s
= sh_cmd_s
+"'" # end the sh cmd string
600 TRACE( 22, 'spawn_cmd rang sh_cmd_s=>%s<', sh_cmd_s
)
602 sp_args
= sp_args
+ [sh_cmd_s
]
603 sp_info
= spawn( g_opt
['rsh'], sp_args
, 0 ) # never combine stderr/out of rsh rgang
604 TRACE( 23, 'spawn_cmd sending nodes: %s', branch_nodes
)
605 for machine
in branch_nodes
:
606 os
.write( sp_info
[1], machine
)
607 os
.write( sp_info
[1], '\n' )
608 os
.write( sp_info
[1], '.\n' )
609 node_info
['stage'] = 'rgang'
610 timeout_add( node_info
['gbl_branch_idx'], float(g_opt
['rshto']) )
611 else: # program error
612 raise 'Program Error', 'unexpected branch_nodes list len=%s'%(len(branch_nodes
),)
613 return sp_info
#pid,child_stdin,child_stdout,child_stderr
618 import socket
# gethostname()
619 def __init__( self
):
621 import string
# split
623 xx
= socket
.gethostbyaddr( socket
.gethostname() )
624 self
.hostnames_l
= [xx
[0]]
627 ss
= string
.split(xx
[0],".")
628 if len(ss
) == 1: self
.shortnames_l
= self
.hostnames_l
629 else: self
.shortnames_l
= [ss
[0]]
630 try: # must "try" because this aint going to work under every os
631 # get a list of inet address for all interfaces and aliases
632 os_fo
= os
.popen( "ifconfig 2>/dev/null | grep 'inet addr:' | sed -e 's/.*addr://' -e 's/ .*//'" )
633 for inet_addr
in os_fo
.readlines():
634 xx
= socket
.gethostbyaddr(inet_addr
[:-1])
635 self
.hostnames_l
= self
.hostnames_l
+ [xx
[0]]
636 self
.alias_l
= self
.alias_l
+ xx
[1]
637 self
.ip_l
= self
.ip_l
+ xx
[2]
638 ss
= string
.split(xx
[0],".")
639 if len(ss
) == 1: self
.shortnames_l
= self
.shortnames_l
+ self
.hostnames_l
640 else: self
.shortnames_l
= self
.shortnames_l
+ [ss
[0]]
643 self
.mach_idx
= '?' # rgang specific
644 def is_me( self
, node
):
646 # NOTE: I feel that using gethostbyaddr, which potentially
647 # contacts the names server, is not the right choice.
648 if string
.find(node
,".") == -1:
649 if node
in self
.shortnames_l \
650 or node
in self
.alias_l \
651 or node
in self
.ip_l
: return 1
654 if node
in self
.hostnames_l \
655 or node
in self
.alias_l \
656 or node
in self
.ip_l
: return 1
661 def get_nway_indexes( nway
, nth
, list_length
, minus_idx0
=0 ):
662 if minus_idx0
: minus_idx0
=1; list_length
= list_length
- 1
663 split_num
= float(list_length
) / nway
664 start
= int( ((nth
)*split_num
)+0.5 ) + minus_idx0
665 end
= int( ((nth
+1)*split_num
)+0.5 ) + minus_idx0
666 TRACE( 23, 'get_nway_indexes(nway=%s,nth=%s,list_length=%s,minus_idx0=%s)=(start=%s,end=%s)',
667 nway
, nth
, list_length
, minus_idx0
, start
, end
)
671 def determ( index
, list_length
, nway
, minus_idx0
=0 ):
672 if minus_idx0
: minus_idx0
=1; list_length
= list_length
- 1
674 split_num
= float(list_length
) / nway
675 nth
= int((index
-minus_idx0
)/split_num
)
676 # now see if it's right
677 if (index
-minus_idx0
) >= int( ((nth
+1)*split_num
)+0.5 ): nth
= nth
+ 1
678 idx_in_nth
= (index
-minus_idx0
) - int( ((nth
)*split_num
)+0.5 )
679 return nth
,idx_in_nth
680 # determ, get_nway_indexes
683 def header( machine
, args
):
684 if g_opt
['n'] == '1': os
.write( sys
.stdout
.fileno(), "%s= "%(machine
,) )
685 elif g_opt
['n'] == '2' or g_opt
['n'] == '3': print "\n\
686 - - - - - - - - - - - - - - %s - - - - - - - - - - - - - -"%(machine
,)
687 if g_opt
['c'] and g_opt
['n']=='3':
688 sp_args
= args
[:-1]+['%s:%s'%(machine
,args
[-1])]
689 print '%s %s'%(g_opt
['rcp'],string
.join(sp_args
))
690 elif not g_opt
['c'] and g_opt
['n']=='3':
691 print '%s %s %s'%(g_opt
['rsh'],machine
,build_quoted_str(args
))
696 # returns string,status where:
704 #final_s = s = fd.read( 1 )
705 #final_s = s = os.read(fd.fileno(), 1 )
706 final_s
= s
= os
.read(fd
, 1 )
707 except: # any error - THIS CAN MESS UP ^C (main()'s except KeyboardInterrupt...)
709 #TRACE( 0, 'try_line 1st read - fd=%d', fd )
711 #exc, value, tb = sys.exc_info()
714 TRACE( 24, 'try_line looking for line' )
715 while s
!= '\n' and s
!= '':
716 rr
,ww
,ee
= select
.select([fd
],[],[],0.9)
717 #rr,ww,ee = select.select([fd.fileno()],[],[],1)
718 if not rr
: sts
=1;break
720 #s = os.read(fd.fileno(), 1 )
722 final_s
= final_s
+ s
725 sys
.stderr
.write( 'while... fd=%s\n'%(fd
,) )
727 exc
, value
, tb
= sys
.exc_info()
729 TRACE( 25, 'try_line returning %s', (final_s
,sts
) )
734 def get_output( sel_l
, fo2node
, wait
):
737 global g_num_connects
# because I'm modifying it
739 mach_idx
=-1; s
='' # init for TRACE below
740 chk_exit
=0; final_s
=''; fd
=None; sh_exit_status
=None
741 while 1: # debugging (EAAGAIN; see below)
742 #if sys.stdin in sel_l: TRACE( 26, 'get_output select checking stdin' )
743 #TRACE( 26, 'get_output sel_l=%s wait=%s', sel_l, wait )
745 ready
= select
.select( sel_l
, [], sel_l
, wait
)
747 #TRACE( 27, 'get_output ready is %s', ready )
749 # timeout (or error, but assume timeout; I'm not processing errors)
750 if ready
[2]: raise 'Program Error', 'select error'
754 mach_idx
= fo2node
[fd
]['mach_idx']
756 #if fo2node[fd]['std'] == sys.stdout.fileno():
757 # TRACE( 27, 'get_output select processing for mach_idx=%d stdout', mach_idx )
758 #elif fo2node[fd]['std'] == sys.stdin.fileno():
759 # TRACE( 27, 'get_output select processing for mach_idx=%d stderr', mach_idx )
761 # TRACE( 27, 'get_output select processing for mach_idx=%d main stdin', mach_idx )
762 # NOTE: When in pty mode, the input is NOT echoed locally and
763 # currently it will take time to check for the STATUS_MAGIC
764 # before echoing the typed characters.
766 # ['stage'] should be (see spawn_cmd) one of:
769 # 'rgang' need "connect"
772 # 'rsh' need "connect" ,then "status"
773 # 'rgang' need "connect"
774 if fo2node
[fd
]['std'] == sys
.stdout
.fileno() \
775 and ( g_internal_info
[mach_idx
]['stage'] == 'rsh' \
776 or (g_internal_info
[mach_idx
]['stage'] == 'rgang' \
777 and g_internal_info
[mach_idx
]['connected'] == 0 )):
778 s
,sts
= try_line( fd
)
780 # Could also be main stdin
781 final_s
= os
.read( fd
, 8192 )
782 if not final_s
: chk_exit
= 1
783 return chk_exit
, final_s
, fd
, mach_idx
, sh_exit_status
784 except IOError, detail
:
785 if detail
.errno
== 11: sys
.stderr
.write( "EAGAIN\n" ); continue
786 else: raise IOError, detail
787 #TRACE( 31, "get_output mach_idx=%d: ['stage']=%s connect=%s s=>%s<"
788 # , mach_idx, g_internal_info[mach_idx]['stage'], g_internal_info[mach_idx]['connected'], s )
791 if fo2node
[fd
]['std'] == sys
.stdout
.fileno():
792 if ( g_internal_info
[mach_idx
]['stage'] == 'rsh' \
793 or g_internal_info
[mach_idx
]['stage'] == 'rgang') \
794 and g_internal_info
[mach_idx
]['connected'] == 0:
795 # Look for the STATUS_MAGIC or CONNECT_MAGIC.
796 # NOTE! THERE IS THE SMALL POSSIBILITY THAT THIS PROCESSING
797 # WILL FAIL B/C THE OF A DELAY IN THE TRANSMISSION OF THE
798 # STATUS LINE (GREATER THAN THE TIMEOUT IN THE TRY_LINE
800 if re_connect
.search( s
):
801 TRACE( 29, 'get_output mach_idx=%d: yes connect magic', mach_idx
)
802 final_s
= re_connect
.match_obj
.group(1)
803 g_internal_info
[mach_idx
]['connected'] = 1
804 g_num_connects
= g_num_connects
+ 1
805 timeout_cancel( g_internal_info
[mach_idx
]['gbl_branch_idx'] )
808 elif g_internal_info
[mach_idx
]['stage'] == 'rsh': # and connected
809 if re_status
.search( s
):
810 TRACE( 29, 'get_output mach_idx=%d: yes status magic', mach_idx
)
811 final_s
= re_status
.match_obj
.group(1)
812 sh_exit_status
= int(re_status
.match_obj
.group(2))
815 else: # either rcp, local or (rgang and connected)
823 #TRACE( 31, 'get_output mach_idx=%d fd=%s chk_exit=%s sh_exit_status=%s returning (s=%s) >%s<',
824 # mach_idx,fd,chk_exit,sh_exit_status,s,final_s )
825 return chk_exit
, final_s
, fd
, mach_idx
, sh_exit_status
829 def do_output( mach_idx
, processing_idx
):
830 if mach_idx
== processing_idx \
831 and not g_opt
['pyret'] \
832 and g_internal_info
[mach_idx
]['stage'] != 'rgang':
838 def info_update( mach_idx
, fo2node_map
, sp_info
, select_l
):
839 # recall: sp_info=[pid,child_stdin,child_stdout,child_stderr]
840 g_internal_info
[mach_idx
]['sp_info'] = sp_info
842 fo2node_map
[sp_info
[1]] = {'mach_idx':mach_idx
,'std':None}
843 fo2node_map
[sp_info
[2]] = {'mach_idx':mach_idx
,'std':sys
.stdout
.fileno()}
844 select_l
.insert( 0, sp_info
[2] ) # order matters; do not "append" after select_l[0] (main stdin)
846 fo2node_map
[sp_info
[3]] = {'mach_idx':mach_idx
,'std':sys
.stderr
.fileno()}
847 select_l
.insert( 0, sp_info
[3] ) # order matters; do not "append" after select_l[0] (main stdin)
851 def info_clear( fd
, fo2node_map
, select_l
):
852 TRACE( 2, "info_clear clearing fd=%s", fd
)
853 select_l
.remove( fd
)
854 del( fo2node_map
[fd
] )
858 # this routine requires:
859 # g_timeout_l=[];g_opt={'c':1}
860 def timeout_add( gbl_br_idx
, timeout_period
): # timeout_period is either float(g_opt['rshto'] or float(g_opt['rcpto']
862 # b/c rcp time can be different, I need to search-add (list needs to be ordered)
864 expire_tm
= time
.time()+timeout_period
865 if not g_opt
['c']: g_timeout_l
.append( {'timeout_expires':expire_tm
,'gbl_branch_idx':gbl_br_idx
} )
866 elif len(g_timeout_l
) == 0: g_timeout_l
.append( {'timeout_expires':expire_tm
,'gbl_branch_idx':gbl_br_idx
} )
867 else: # copy mode, mix of rsh and rcp
868 low_idx
=0; high_idx
=len(g_timeout_l
)-1 # gaurd against len=0 above
869 mid_idx
= low_idx
+ (high_idx
- low_idx
)/2
870 while mid_idx
!=high_idx
:
871 if expire_tm
< g_timeout_l
[mid_idx
]['timeout_expires']: high_idx
= mid_idx
872 else: low_idx
= mid_idx
+1 # no need to look at mid again
873 mid_idx
= low_idx
+ (high_idx
- low_idx
)/2
874 if expire_tm
< g_timeout_l
[mid_idx
]['timeout_expires']: new_idx
= mid_idx
875 else: new_idx
= mid_idx
+1
876 g_timeout_l
.insert( new_idx
, {'timeout_expires':expire_tm
,'gbl_branch_idx':gbl_br_idx
} )
881 # currently OK if not found
882 def timeout_cancel( gbl_br_idx
):
884 for idx
in range(len(g_timeout_l
)):
885 if g_timeout_l
[idx
]['gbl_branch_idx'] == gbl_br_idx
: found
=1; g_timeout_l
.pop(idx
); break
886 TRACE( 31, "timeout_cancel branch_idx=%d found=%d", gbl_br_idx
, found
)
890 # This handles 1 timeout - the 1st one!
891 def timeout_connect_process():
892 import select
# select
893 branch_idx
= g_timeout_l
[0]['gbl_branch_idx']
894 mach_idx
= g_branch_info_l
[branch_idx
]['active_head']
895 connected
= g_internal_info
[mach_idx
]['connected']
896 # recall: sp_info=[pid,child_stdin,child_stdout,child_stderr]
897 pid
= g_internal_info
[mach_idx
]['sp_info'][0]
899 TRACE( 30, "timeout_connect_process branch_idx=%d connected=%d pidToKill=%d br_in=%s g_tmo_l=%s",
900 branch_idx
, connected
, pid
, g_internal_info
[mach_idx
]['sp_info'][1], g_timeout_l
)
902 # append ?something? to stderr
903 ret_info
= g_internal_info
[mach_idx
]['ret_info']
904 ret_info
['stderr'] = ret_info
['stderr']+'rgang timeout expired\n'
906 # do kill and kill check here
907 for sig
in (1,2,15,3,9): # 1=HUP, 2=INT(i.e.^C), 15=TERM(default "kill"), 3=QUIT(i.e.^\), 9=KILL
908 try: rpid
,status
= os
.waitpid(pid
,os
.WNOHANG
);status
=(status
>>8) # but I probably won't use this status
909 except: rpid
= 0; break # i.e. (OSError, '[Errno 10] No child processes')
910 if rpid
== pid
: # OK, process is out-of-there
911 if g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts'] == None:
912 TRACE( 31, "timeout_connect_process status=%d", status
)
913 g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts'] = 8
916 TRACE( 31, "timeout_connect_process os.kill(%d,%d)", pid
, sig
)
917 select
.select([],[],[],0.05) # use select to sleep sub second
919 return mach_idx
# need to return "chk_exit,..." like get_output
920 # timeout_connect_process
923 def initiator_node_status( mach_idx
):
924 # FIRST DETERMINE IF I AM THE INITIATOR NODE
925 if g_opt
['mach_idx_offset']=='' and g_opt
['err-file']!='':
926 sts
= g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts']
928 TRACE( 28, 'initiator_node_status mach_idx=%d sts=%s', mach_idx
,sts
)
929 fo
= open( g_opt
['err-file'], 'a+' )
930 name
= g_internal_info
[mach_idx
]['ret_info']['name']
931 fo
.write( "%s # mach_idx=%d sts=%s\n"%(name
,mach_idx
,sts
) )
933 # initiator_node_status
936 def node_list_from_file( listfile
):
939 TRACE( 2, "node_list_from_file fo.fileno()=%d", fo
.fileno() )
940 for xx
in fo
.readlines():
941 if re_mach
.search(xx
): mach_l
.append(re_mach
.match_obj
.group(1))
945 def node_list_from_spec( spec
):
948 xx
,sts
= try_line( sys
.stdin
.fileno() )
949 #xx = sys.stdin.readline()
950 while xx
!= '.\n' and xx
!= '':
951 if re_mach
.search(xx
): mach_l
.append(re_mach
.match_obj
.group(1))
952 xx
,sts
= try_line( sys
.stdin
.fileno() )
953 #xx = sys.stdin.readline()
954 elif spec
[0]=='.' and os
.access(spec
,os
.R_OK
):
956 mach_l
= node_list_from_file( listfile
)
957 elif os
.access(g_opt
['farmlets']+'/'+spec
,os
.R_OK
): # you can always specify --farmlets=.
958 listfile
= g_opt
['farmlets']+'/'+spec
959 mach_l
= node_list_from_file( listfile
)
960 elif os
.access(spec
,os
.R_OK
):
962 mach_l
= node_list_from_file( listfile
)
964 if g_opt
['verbose']: sys
.stderr
.write('assuming expandable node list\n')
965 mach_l
= expand( spec
)
971 os
.system( "stty sane" )
976 def cleanup(signum
,frame
):
984 g_opt
={'tlvlmsk':0} # and init so test script importing
985 # rgang (to test rgang.expand(),
986 # for example) don't have to.
988 def rgang( opts_n_args
):
990 import pickle
# dumps
991 import traceback
# format_exception
992 import pprint
# pprint
993 import signal
# signal
997 global g_internal_info
#
998 global g_branch_info_l
#
999 global g_mach_idx_offset
#
1000 global g_num_connects
# needed for robust "input-to-all-branches" see (get_output)
1001 global g_connects_expected
# needed for robust "input-to-all-branches" see (spawn_cmd and below)
1002 # --------------------------------- # NOTE: currently, there is the
1003 # possibility of a hang on the write to a branch if all nodes in
1004 # the rgang sub tree fail and stdin is large.
1006 opts
,args
,g_opt
,usage
= getopts( OPTSPEC
, opts_n_args
, USAGE
, APP
)
1007 try: g_opt
['tlvlmsk'] = int( eval(g_opt
['tlvlmsk']) ) # 0x hex or normal decimal numbers OK
1008 except: sys
.stderr
.write('invalid tlvlmsk value; must be integer/hex\n');return 1,[]
1009 g_thisnode
= NodeInfo() # needed before 1st TRACE
1010 TRACE( 3, 'rgang g_opt is %s', g_opt
)
1013 c
="ls %s"%(g_opt
['farmlets'],); os
.system(c
); return 0,[]
1014 elif g_opt
['list'] and not args
:
1015 if os
.access(g_opt
['farmlets']+'/.',os
.R_OK
):
1016 c1
="for i in *;do echo FARMLET $i:; cat $i;done"
1017 c
="sh -c 'cd %s;%s'"%(g_opt
['farmlets'],c1
)
1020 print 'farmlets directory %s not readable'%(g_opt
['farmlets'],)
1022 if not args
: print 'no args\n'+usage
; return 0,[]
1024 mach_l
= node_list_from_spec( args
.pop(0) )
1026 if g_opt
['pyprint']: g_opt
['pyret']='1'
1027 if g_opt
['pypickle']: g_opt
['pyret']='1'
1028 if g_opt
['pypickle'] and g_opt
['pyprint']: g_opt
['pyprint']='' # pypickle wins
1031 skip_l
= node_list_from_spec( g_opt
['skip'] )
1033 ii
= 0; mach_l_len
= len( mach_l
)
1034 while ii
< mach_l_len
:
1035 if mach_l
[ii
] == sk
: mach_l
.pop(ii
); mach_l_len
= mach_l_len
-1
1037 if g_opt
['s']: # skip current (local) node
1038 ii
= 0; mach_l_len
= len( mach_l
)
1039 while ii
< mach_l_len
:
1040 if g_thisnode
.is_me(mach_l
[ii
]):
1041 mach_l
.pop(ii
); mach_l_len
= mach_l_len
- 1
1043 # mach_l is now set.
1046 if not g_opt
['pyret']:
1047 for mach
in mach_l
: print mach
1048 overall_status
= 0; ret_info
= mach_l
# ret_info can have different formats
1049 if g_opt
['pyprint']: pprint
.pprint( ret_info
)
1050 elif g_opt
['pypickle']:
1051 dumps
= pickle
.dumps(ret_info
)
1052 sys
.stdout
.write( 'PICKLE:%08x:'%(len(dumps
),) )
1053 sys
.stdout
.write( dumps
)
1054 return overall_status
,ret_info
1056 TRACE( 4, 'rgang args is %s', args
)
1057 if g_opt
['C']: g_opt
['s']='1'; g_opt
['c']='1'
1058 if g_opt
['n'] == '':
1059 if len(mach_l
) == 1: g_opt
['n']='0'
1060 else: g_opt
['n']='1'
1061 elif g_opt
['n'] == 'n': g_opt
['n']='1'
1062 if len(g_opt
['n'])>1 or not '0'<=g_opt
['n']<='3':
1063 sys
.stderr
.write('invalid optional argument "%s" for -n option\n'%(g_opt
['n'],))
1066 if g_opt
['c'] and len(args
) < 2:
1067 sys
.stderr
.write('copy mode must have at least 2 arguments\n')
1071 sys
.stderr
.write('interactive not yet supported\n'); return 1,[]
1073 try: nway
= int( g_opt
['nway'] ) # int(string)=>int and int(int)=>int
1074 except: sys
.stderr
.write('invalid nway value; must be integer >= 0\n');return 1,[]
1075 if nway
< 0: sys
.stderr
.write('invalid nway value; must be integer >= 0\n');return 1,[]
1077 if g_opt
['mach_idx_offset'] != '': # note: also used to determine initiator node
1078 try: g_mach_idx_offset
= int( g_opt
['mach_idx_offset'] ) # int(string)=>int and int(int)=>int
1079 except: sys
.stderr
.write('invalid mach_idx_offset value; must be decimal integer\n');return 1,[]
1080 g_thisnode
.mach_idx
= '%d'%(g_mach_idx_offset
,) # for spawn_cmd rsh rgang
1081 g_opt
['do-local'] = 1 # I'm assuming I've already rsh'd (rsh rgang...), so don't do rsh again
1082 else: # assume I'm the "initiator" node
1083 os
.environ
['RGANG_INITIATOR'] = g_thisnode
.hostnames_l
[0]
1084 # set the following now, in case we are also the root node and
1085 # opt['do-local']; this make spawn_cmd easier
1086 os
.environ
['RGANG_PARENT'] = ''
1087 os
.environ
['RGANG_PARENT_ID'] = ''
1088 g_mach_idx_offset
= 0 # for branches (see spawn_cmd)
1089 g_thisnode
.mach_idx
='' # for spawn_cmd rsh rgang; init for TRACE
1090 for ii
in range(len(mach_l
)):
1091 if g_thisnode
.is_me(mach_l
[ii
]): g_thisnode
.mach_idx
='%d'%(ii
,); break # for spawn_cmd rsh rgang
1092 if g_opt
['err-file'] != '': # init err-file
1093 fo
= open( g_opt
['err-file'], 'w+' )
1094 TRACE( 2, "rgang err-file fd=%d", fo
.fileno() )
1098 if g_opt
['input-to-all-branches']:
1099 sys
.stderr
.write('invalid --serial/--input-to-all-branches configuration\n');return 1,[]
1100 outer_nway
= nway
; inner_nway
= 0
1102 outer_nway
= 1; inner_nway
= nway
1103 mach_l_len
= len( mach_l
)
1104 if outer_nway
> mach_l_len
or outer_nway
== 0: outer_nway
= mach_l_len
# inner_nway is handled below
1107 ####### OK done with ALL the OPTIONS PROCESSING
1110 signal
.signal(2,cleanup
)
1111 signal
.signal(15,cleanup
)
1112 os
.system("stty -echo -icanon min 1 time 0" )
1113 os
.system("stty -inlcr -icrnl" ) # no translations
1114 #os.system("stty ignbrk -ixon -isig" )
1115 os
.system("stty ignbrk -ixon" )
1118 ####### NOW, DO THE WORK!!!!
1120 stdin_bytes
= 0L # could be counting > 2G bytes
1121 # build/initialize the array so we can add stdin at the end
1122 ret_info
= []; g_internal_info
=[]
1123 for ii
in range(mach_l_len
):
1124 ret_info
.append(None); g_internal_info
.append(None)
1127 g_connects_expected
= 0
1130 if not g_opt
['input-to-all-branches']:
1131 # add in (kludge in??) stdin --> index mach_l_len
1132 g_internal_info
.append( {'gbl_branch_idx':None,
1133 'ret_info':None, # ret_info NOT NEEDED!
1134 'stage':None,'sp_info':None,'connected':1} )
1135 select_l
.append(sys
.stdin
.fileno())
1136 fo2node_map
[sys
.stdin
.fileno()] = {'mach_idx':mach_l_len
,'std':None}
1137 need_stdin_after_connects
= 0
1139 need_stdin_after_connects
= 1
1142 g_branch_info_l
= []
1145 have_me
= 0 # basicaly, flag to skip past idx 0 in branch processing below
1146 if not g_opt
['c'] and not g_opt
['serial'] and g_thisnode
.is_me(mach_l
[0]):
1148 ret_info
[0] = {'name':mach_l
[0],'stdout':'','stderr':'','rmt_sh_sts':None}
1149 g_branch_info_l
.append( {'active_head':0,'branch_end_idx':1} )
1150 gbl_branch_idx
= len(g_branch_info_l
)-1 # len would be 1 here ==> gbl_branch_idx = 0
1151 g_internal_info
[0] = {'gbl_branch_idx':gbl_branch_idx
,
1152 'ret_info':ret_info
[0], # ptr
1153 'stage':None,'sp_info':None,'connected':0}
1154 TRACE( 5, 'rgang local spawn_cmd' )
1156 sp_info
= spawn_cmd( g_internal_info
[mach_idx
], mach_idx
, opts
, args
, [mach_l
[0]], g_opt
['do-local'] ) #1 not g_opt['c'] and g_thisnode.is_me(mach_l[0])
1157 info_update( 0, fo2node_map
, sp_info
, select_l
)
1158 branch_input_l
.append( sp_info
[1] )
1164 # outer loops result when --serial is specified. It is processed in
1165 # conjunction with the --nway option to specify the number of outer loops.
1166 # (nway==0 means all nodes so just specifying --serial with --nway=0 gives
1167 # "completely serial" operation. --serial with --nway=2 with 400 nodes
1168 # would do 2 set of 200_parallel_spawns This can be demonstrated via:
1169 # rgang.py --serial --nway=2 "192.168.1.136{,,,,,}" 'sleep 8;date'
1170 for outer_group_idx
in range(outer_nway
):
1171 grp_idxs
= get_nway_indexes( outer_nway
, outer_group_idx
, mach_l_len
, have_me
)
1175 # - list for select for get output_line and
1176 # - map of select fo to node
1177 group_len
= grp_idxs
[END
] - grp_idxs
[START
]
1178 if inner_nway
> group_len
or inner_nway
== 0: inner_nway
= group_len
1180 for inner_branch_idx
in range(inner_nway
):
1181 branch_idxs
= get_nway_indexes( inner_nway
, inner_branch_idx
, group_len
)
1182 branch_len
= branch_idxs
[END
] - branch_idxs
[START
]
1183 mach_idx
= grp_idxs
[START
] + branch_idxs
[START
]
1184 branch_end_idx
= mach_idx
+ branch_len
1185 g_branch_info_l
.append( {'active_head':mach_idx
,'branch_end_idx':branch_end_idx
} )
1186 gbl_branch_idx
= len(g_branch_info_l
)-1
1187 for ii
in range(branch_len
):
1188 ret_info
[mach_idx
+ii
] = {'name':mach_l
[mach_idx
+ii
],
1189 'stdout':'','stderr':'','rmt_sh_sts':None}
1190 g_internal_info
[mach_idx
+ii
] = {'gbl_branch_idx':gbl_branch_idx
,
1191 'ret_info':ret_info
[mach_idx
+ii
], # ptr
1192 'stage':None,'sp_info':None,'connected':0}
1193 branch_nodes
= mach_l
[mach_idx
:branch_end_idx
]
1194 sp_info
= spawn_cmd( g_internal_info
[mach_idx
], mach_idx
, opts
, args
, branch_nodes
, 0 ) #2 for inner_branch_idx in ...
1195 TRACE( 6, 'rgang after initial spawn_cmd gbl_branch_idx=%d branch_len=%d sp_info=%s', gbl_branch_idx
, branch_len
, sp_info
)
1196 info_update( mach_idx
, fo2node_map
, sp_info
, select_l
)
1197 branch_input_l
.append( sp_info
[1] )
1199 # NOW DO THE PROCESSING
1201 if not g_opt
['pyret']: header( mach_l
[processing_idx
], args
)
1202 while processing_idx
< grp_idxs
[END
]:
1204 if need_stdin_after_connects
and g_num_connects
== g_connects_expected
:
1206 g_internal_info
.append( {'gbl_branch_idx':None,
1207 'ret_info':None, # ret_info NOT NEEDED!
1208 'stage':None,'sp_info':None,'connected':1} )
1209 select_l
.insert(0,sys
.stdin
.fileno())
1210 fo2node_map
[sys
.stdin
.fileno()] = {'mach_idx':mach_l_len
,'std':None}
1211 need_stdin_after_connects
= 0
1216 # 1) "connect" timeout period expires while waiting at select
1217 # 2) "connect" timeout period expires while processing for some
1219 # 3) "connect" timeout period never expires as all nodes
1220 # "connect" promptly
1221 timeout_wait
= g_timeout_l
[0]['timeout_expires'] - time
.time()
1222 if timeout_wait
< 0: timeout_wait
= 0
1223 else: timeout_wait
= None
1225 # DO THE SELECT TO (potentially) GET SOME DATA
1226 # chk_exit indicate that select indicated a file, but the
1227 # read of the file returned 0 bytes (ss=''); the
1228 # process associated with the particular file/node
1230 # If check_exit==1 then the following should be true:
1233 # sh_exit_stat==None
1234 # ss is the output data returned unless there is a
1236 # fo is the file/node to process, unless timeout
1237 # sh_exit_stat is set if command exit_status (STATUS_MAGIC) was
1238 # received/indicated.
1239 # When a timeout occurs, the return values will be (as initialize
1241 # chk_exit=0;ss='';fo=None;sh_exit_stat=None
1242 #TRACE( 7, 'rgang before get_output wait=%s select_l=%s need_stdin=%d g_num_connects=%d',
1243 # timeout_wait, select_l, need_stdin_after_connects, g_num_connects )
1244 chk_exit
,ss
,fo
,mach_idx
,sh_exit_stat
= get_output( select_l
, fo2node_map
, timeout_wait
)
1248 # should be able to do the processing here and not continue, but return
1249 # chk_exit=1 and mach_idx
1251 #move processing from above and do it here to properly continue and finish processing after killing process
1252 # DO CONNECT TIMEOUT PROCESSING NOW
1253 # IN THE CASE of an rgang node timing out, A NEW TIMEOUT
1254 # PERIOD WOULD BE INITIATED
1255 # WHAT HAPPENS IF IT CHANGES THE processing _idx???
1256 mach_idx
= timeout_connect_process()
1257 chk_exit
,ss
,sh_exit_stat
= 1,'',None
1260 if mach_idx
== mach_l_len
: # SPECIAL STDIN FLAG
1261 #TRACE( 8, 'rgang stdin processing_idx=%d', processing_idx )
1263 #stdin_bytes = stdin_bytes + len(ss)
1264 if g_opt
['input-to-all-branches'] or g_opt
['pyret'] :
1265 #TRACE( 9, "rgang branch_input_l=%s", branch_input_l )
1266 for br_sdtin
in branch_input_l
:
1267 bytes_written
= os
.write( br_sdtin
, ss
)
1268 while bytes_written
< len(ss
):
1269 bytes_this_write
= os
.write( br_sdtin
, ss
[bytes_written
:] )
1270 bytes_written
= bytes_written
+ bytes_this_write
1272 os
.write( g_internal_info
[processing_idx
]['sp_info'][1], ss
)
1274 info_clear( sys
.stdin
.fileno(), fo2node_map
, select_l
)
1275 #TRACE( 9, "rgang closing everyone's stdin after %d bytes l=%s", stdin_bytes, branch_input_l )
1276 for br_stdin
in branch_input_l
:
1277 os
.close( br_stdin
)
1278 mi
= fo2node_map
[br_stdin
]['mach_idx']
1279 del( fo2node_map
[br_stdin
] )
1280 #pid = g_internal_info[mi]['sp_info'][0]
1283 # this should cause (via chain reaction) the remote
1284 # cmd's to exit; the branch_input_l will be cleaned
1285 # up when they do; as our stdin is no longer in the
1286 # select_l, this whole "SPECIAL STDIN" code should
1287 # not get executed again.
1288 continue # after STDIN PROCESSING
1289 if sh_exit_stat
!= None: ret_info
[mach_idx
]['rmt_sh_sts'] = sh_exit_stat
# OK if we overwrite timeout kill status
1292 if do_output( mach_idx
, processing_idx
):
1293 os
.write( fo2node_map
[fo
]['std'], ss
)
1294 elif fo2node_map
[fo
]['std'] == sys
.stdout
.fileno():
1295 ret_info
[mach_idx
]['stdout'] = ret_info
[mach_idx
]['stdout'] + ss
1297 ret_info
[mach_idx
]['stderr'] = ret_info
[mach_idx
]['stderr'] + ss
1301 # CHECK FOR BRANCH/GROUP STATUS
1304 TRACE( 8, 'rgang chk_exit mach_idx=%d processing_idx=%d sp_info=%s',
1305 mach_idx
, processing_idx
, g_internal_info
[mach_idx
]['sp_info'] )
1307 if do_output( mach_idx
, processing_idx
):
1308 # print any previously store stdout/err
1309 os
.write( sys
.stdout
.fileno(), ret_info
[mach_idx
]['stdout'] )
1310 os
.write( sys
.stderr
.fileno(), ret_info
[mach_idx
]['stderr'] )
1312 # cleanup_output_status
1313 # if the fd that trigger us was stdOUT, then we need to check stdERR
1314 pid
= g_internal_info
[mach_idx
]['sp_info'][0]
1315 if g_internal_info
[mach_idx
]['sp_info'][3] \
1316 and ( ( fo
!= None and fo2node_map
[fo
]['std'] == sys
.stdout
.fileno() ) \
1318 TRACE( 2, 'rgang CHECKing stdERR fo=%s pid=%d mach_idx=%d', fo
, pid
, mach_idx
)
1319 chk
= 0; fo2
= g_internal_info
[mach_idx
]['sp_info'][3] # init this as fo may be None (i.e. tmo)
1320 while not chk
and fo2
!= None:
1321 chk
,ss
,fo2
,mi
,dont_used
= get_output( [g_internal_info
[mach_idx
]['sp_info'][3]], # 3 is stdERR
1323 if do_output( mach_idx
, processing_idx
):
1324 os
.write( sys
.stderr
.fileno(), ss
)
1326 ret_info
[mach_idx
]['stderr'] = ret_info
[mach_idx
]['stderr'] + ss
1327 # if the fd that trigger us was stdERR, then we need to check stdOUT
1328 if g_internal_info
[mach_idx
]['sp_info'][2] \
1329 and ( ( fo
!= None and fo2node_map
[fo
]['std'] == sys
.stderr
.fileno() ) \
1331 TRACE( 2, 'rgang CHECKing stdOUT fo=%s pid=%d mach_idx=%d', fo
, pid
, mach_idx
)
1332 # There quite possibly could be 2 iteration through
1333 # this while. If the main get_output (above) had stderr
1334 # in the select list first AND the shell is really fast AND
1335 # there is little to no output, then the CONNECT machanism
1336 # may not have happened and then the 1st loop here will
1337 # cause the a '' value to be returned for ss; chk will be 0
1339 chk
= 0; fo2
= g_internal_info
[mach_idx
]['sp_info'][2] # init this as fo may be None (i.e. tmo)
1340 while not chk
and fo2
!= None:
1341 chk
,ss
,fo2
,mi
,sh_exit_stat
= get_output( [g_internal_info
[mach_idx
]['sp_info'][2]], # 2 is stdout
1343 if sh_exit_stat
!= None:
1344 ret_info
[mach_idx
]['rmt_sh_sts'] = sh_exit_stat
# OK if we overwrite timeout kill status
1345 if do_output( mach_idx
, processing_idx
):
1346 os
.write( sys
.stdout
.fileno(), ss
)
1348 ret_info
[mach_idx
]['stdout'] = ret_info
[mach_idx
]['stdout'] + ss
1352 # done with output from process
1353 # remove sub-process's output and input fds from
1354 # the select list, fo2node_map, branch_input_l
1355 for ffo
in g_internal_info
[mach_idx
]['sp_info'][2:]:
1356 if ffo
: info_clear( ffo
, fo2node_map
, select_l
)
1357 branch_input_l
.remove( g_internal_info
[mach_idx
]['sp_info'][1] )
1358 if g_internal_info
[mach_idx
]['sp_info'][1] in fo2node_map
.keys():
1359 del( fo2node_map
[g_internal_info
[mach_idx
]['sp_info'][1]] )
1360 gbl_branch_idx
= g_internal_info
[mach_idx
]['gbl_branch_idx']
1361 timeout_cancel( gbl_branch_idx
)
1364 opid
,status
= os
.waitpid( pid
, 0 )
1365 if opid
!= pid
: raise 'Program Error', 'process did not exit'
1366 TRACE( 2, 'rgang waitpid got status for pid=%d mach_idx=%d gbl_branch_idx=%d status=%s',
1367 pid
, mach_idx
, gbl_branch_idx
, status
)
1368 status
= (status
>>8)
1369 except: # i.e. (OSError, '[Errno 10] No child processes')
1370 # must have been killed in timeout_connect_process
1371 status
= g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts']
1372 TRACE( 2, 'rgang waitpid NO status for pid=%d mach_idx=%d gbl_branch_idx=%d using status=%s',
1373 pid
, mach_idx
, gbl_branch_idx
, status
)
1375 # g_internal_info[x].keys = ('stage','connected','sp_info','gbl_branch_idx','ret_info')
1376 # g_internal_info[x]['ret_info'].keys =
1377 if g_internal_info
[mach_idx
]['stage'] == 'rgang':
1378 g_internal_info
[mach_idx
]['stage'] = 'done' # SEE "ACTIVE OUTPUT PROCESSING" BELOW
1379 # EEEEEEEE THIS HAS RCP OUTPUT AND RGANG RET!
1380 # BUT WAIT, NORMALLY THERE IS NO RCP STDOUT (maybe stderr
1381 #stdout should be in form: llllllll:pickle
1382 if g_internal_info
[mach_idx
]['connected'] == 0:
1383 g_connects_expected
= g_connects_expected
- 1 # if it's not connected now, it never will be
1384 ss
= ret_info
[mach_idx
]['stdout']
1385 TRACE( 11, 'rgang looking for PICKLE in (rmt_sh_sts=%s) >%s<',
1386 ret_info
[mach_idx
]['rmt_sh_sts'], ss
)
1387 pickle_idx
= string
.find( ss
, "PICKLE:" )
1388 if pickle_idx
!= -1:
1390 pre
= ss
[:pickle_idx
]
1391 pickle_idx
= pickle_idx
+len("PICKLE:")
1392 length
= string
.atoi( ss
[pickle_idx
:pickle_idx
+8],16 )
1393 loads
= pickle
.loads( ss
[pickle_idx
+9:pickle_idx
+9+length
] )
1394 post
= ss
[pickle_idx
+9+length
:]
1395 if g_opt
['c']: offset
= 1
1396 else: offset
= 0;stderr_sav
=ret_info
[mach_idx
]['stderr']
1397 for ii
in range(len(loads
)):
1398 ret_info
[mach_idx
+offset
+ii
].update( loads
[ii
] )
1399 overall_status
= overall_status | ret_info
[mach_idx
+offset
+ii
]['rmt_sh_sts']
1400 TRACE( 28, 'rgang calling initiator_node_status #1 mach_idx=%d', mach_idx
+offset
+ii
)
1401 initiator_node_status( mach_idx
+offset
+ii
)
1402 # NOTE??? for "copy", previous stdout??? may already be printed b/c of rcp stage
1403 if g_opt
['c']: ret_info
[mach_idx
]['stdout'] = pre
+ post
1405 ret_info
[mach_idx
]['stdout'] = pre
+ ret_info
[mach_idx
]['stdout'] + post
1406 ret_info
[mach_idx
]['stderr'] = stderr_sav
+ ret_info
[mach_idx
]['stderr']
1408 sys
.stderr
.write('EEEEE-pickle exception mach_idx=%s g_opt[c]=%s ii=%s\n'%(mach_idx
,g_opt
['c'],ii
))
1409 sys
.stderr
.write( 'EEEEEEEEE - ss[pickle_idx:pickle_idx+9+length=%d]=>%s<\n'%(length
,ss
[pickle_idx
:pickle_idx
+9+length
]) )
1411 exc
, value
, tb
= sys
.exc_info()
1412 for ln
in traceback
.format_exception( exc
, value
, tb
):
1413 sys
.stderr
.write(ln
)
1414 if pickle_idx
== -1: # status must be bad
1416 ret_info
[mach_idx
]['stderr'] = ret_info
[mach_idx
]['stderr'] + '%s: warning: "rcp" %s failed\n'%(APP
,APP
)
1418 if ret_info
[mach_idx
]['rmt_sh_sts'] == None:
1419 ret_info
[mach_idx
]['rmt_sh_sts'] = 2
1420 TRACE( 12, 'rgang #1 rmt_sh_sts=2' ) # rsh failure or shell abort (i.e. syntax error)
1421 ret_info
[mach_idx
]['rmt_sh_sts'] = ret_info
[mach_idx
]['rmt_sh_sts'] | status
1422 overall_status
= overall_status | ret_info
[mach_idx
]['rmt_sh_sts']
1423 TRACE( 28, 'rgang calling initiator_node_status #2 mach_idx=%d', mach_idx
)
1424 initiator_node_status( mach_idx
)
1425 gbl_branch_idx
= g_internal_info
[mach_idx
]['gbl_branch_idx']
1426 branch_end_idx
= g_branch_info_l
[gbl_branch_idx
]['branch_end_idx']
1427 branch_nodes
= mach_l
[mach_idx
+1:branch_end_idx
]
1429 TRACE( 13, 'rgang spawn_cmd branch_node is %s', branch_nodes
)
1430 g_branch_info_l
[gbl_branch_idx
]['active_head'] = mach_idx
+1
1431 #timeout_cancel( gbl_branch_idx ) # cancel 1st timeout for this gbl_branch_idx
1432 sp_info
= spawn_cmd( g_internal_info
[mach_idx
+1], mach_idx
+1, opts
, args
, branch_nodes
, 0 ) #3 after bad rgang
1433 info_update( mach_idx
+1, fo2node_map
, sp_info
, select_l
)
1434 branch_input_l
.append( sp_info
[1] )
1435 if mach_idx
== processing_idx
and not g_opt
['pyret']:
1436 os
.write( sys
.stdout
.fileno(), ret_info
[processing_idx
]['stdout'])
1437 os
.write( sys
.stderr
.fileno(), ret_info
[processing_idx
]['stderr'])
1438 elif g_internal_info
[mach_idx
]['stage'] == 'rcp':
1439 # STRIP OFF NODE HERE
1440 if ret_info
[mach_idx
]['rmt_sh_sts'] != None: sys
.stderr
.write('1EEEEEEEEEE\n')
1441 ret_info
[mach_idx
]['rmt_sh_sts'] = status
1442 overall_status
= overall_status | ret_info
[mach_idx
]['rmt_sh_sts']
1443 TRACE( 28, 'rgang calling initiator_node_status #3 mach_idx=%d', mach_idx
)
1444 initiator_node_status( mach_idx
)
1445 gbl_branch_idx
= g_internal_info
[mach_idx
]['gbl_branch_idx']
1446 branch_end_idx
= g_branch_info_l
[gbl_branch_idx
]['branch_end_idx']
1447 branch_nodes
= mach_l
[mach_idx
+1:branch_end_idx
]
1448 if status
== 0 and branch_nodes
:
1449 TRACE( 14, 'rgang rcp spawn_cmd branch_nodes is %s', branch_nodes
)
1450 #timeout_cancel( gbl_branch_idx )
1451 sp_info
= spawn_cmd( g_internal_info
[mach_idx
], mach_idx
, opts
, args
, branch_nodes
, 0 ) #4 the rgang after good rcp
1452 info_update( mach_idx
, fo2node_map
, sp_info
, select_l
)
1453 branch_input_l
.append( sp_info
[1] )
1454 g_branch_info_l
[gbl_branch_idx
]['active_head'] = mach_idx
1455 elif status
!= 0 and branch_nodes
:
1456 # start rcp stage on next node
1457 TRACE( 15, 'rgang rcp spawn_cmd branch_nodes is %s', branch_nodes
)
1458 #timeout_cancel( gbl_branch_idx )
1459 sp_info
= spawn_cmd( g_internal_info
[mach_idx
+1], mach_idx
+1, opts
, args
, branch_nodes
, 0 ) #5 bad rcp, next node rcp
1460 info_update( mach_idx
+1, fo2node_map
, sp_info
, select_l
)
1461 branch_input_l
.append( sp_info
[1] )
1462 g_branch_info_l
[gbl_branch_idx
]['active_head'] = mach_idx
+1
1467 elif g_internal_info
[mach_idx
]['stage'] == 'rsh': # no further branch processing
1468 if g_internal_info
[mach_idx
]['connected'] == 0:
1469 g_connects_expected
= g_connects_expected
- 1 # if it's not connected now, it never will be
1470 if ret_info
[mach_idx
]['rmt_sh_sts'] == None:
1471 ret_info
[mach_idx
]['rmt_sh_sts'] = 4
1472 TRACE( 16, 'rgang #2 rmt_sh_sts=4' ) # shell abort (i.e. syntax error)
1473 ret_info
[mach_idx
]['rmt_sh_sts'] = ret_info
[mach_idx
]['rmt_sh_sts'] | status
1474 overall_status
= overall_status | ret_info
[mach_idx
]['rmt_sh_sts']
1475 TRACE( 28, 'rgang calling initiator_node_status #4 mach_idx=%d', mach_idx
)
1476 initiator_node_status( mach_idx
)
1477 else: # local - no further branch processing
1478 if ret_info
[mach_idx
]['rmt_sh_sts'] != None: sys
.stderr
.write('4EEEEEEEEEE\n')
1479 ret_info
[mach_idx
]['rmt_sh_sts'] = status
1480 overall_status
= overall_status | ret_info
[mach_idx
]['rmt_sh_sts']
1481 TRACE( 28, 'rgang calling initiator_node_status #5 mach_idx=%d', mach_idx
)
1482 initiator_node_status( mach_idx
)
1484 # ACTIVE OUTPUT PROCESSING
1485 if mach_idx
== processing_idx
:
1486 if g_internal_info
[processing_idx
]['stage'] == 'rgang': continue # continue if "inprogress" rgang
1487 processing_idx
= processing_idx
+ 1
1488 while processing_idx
< grp_idxs
[END
]:
1489 if not g_opt
['pyret']: header( mach_l
[processing_idx
], args
)
1490 if g_internal_info
[processing_idx
]['stage'] == 'rgang': break
1491 if ret_info
[processing_idx
]['rmt_sh_sts'] == None:
1492 if not g_opt
['pyret']:
1493 # print-n-flush and clear stdout/err
1494 os
.write( sys
.stdout
.fileno(),ret_info
[processing_idx
]['stdout'])
1495 ret_info
[processing_idx
]['stdout'] = ''
1496 os
.write( sys
.stderr
.fileno(),ret_info
[processing_idx
]['stderr'])
1497 ret_info
[processing_idx
]['stderr'] = ''
1499 if not g_opt
['pyret']:
1500 os
.write( sys
.stdout
.fileno(),ret_info
[processing_idx
]['stdout'])
1501 os
.write( sys
.stderr
.fileno(),ret_info
[processing_idx
]['stderr'])
1502 processing_idx
= processing_idx
+ 1
1505 pass # end of "if chk_exit" processing
1507 pass # while processing_idx < grp_idxs[END]:
1509 if g_opt
['pty']: clean()
1510 if not g_opt
['pyret'] and g_opt
['n']=='2': print
1511 if g_opt
['pyprint']: pprint
.pprint( ret_info
)
1512 elif g_opt
['pypickle']:
1513 dumps
= pickle
.dumps(ret_info
)
1514 sys
.stdout
.write( 'PICKLE:%08x:'%(len(dumps
),) )
1515 sys
.stdout
.write( dumps
)
1516 return overall_status
,ret_info
1520 ###############################################################################
1523 import sys
# argv, exit
1524 import select
# select
1525 if 1: # switch to 0 to debug
1526 try: total_stat
,ret_list
= rgang( sys
.argv
[1:] )
1527 except KeyboardInterrupt, detail
:
1528 for mach_idx
in range(len(g_internal_info
)):
1529 # There is a case, for example: rgang <node> 'sleep 5 &'
1530 # where rgang will receive the "remote shell status", but because
1531 # stdout/err were not closed (i.e.:rgang <node> 'sleep 5 >&- 2>&- &' ), the
1532 # remote shell will hang until the backgrounded process completes.
1533 # In this case I will have a 'rmt_sh_sts' (it will NOT == None);
1534 # so I should NOT do 'rmt_sh_sts' checking.
1535 #if g_internal_info[mach_idx]['ret_info'] != None \
1536 # and g_internal_info[mach_idx]['ret_info']['rmt_sh_sts'] == None \
1537 # and g_internal_info[mach_idx]['sp_info'] != None:
1538 if g_internal_info
[mach_idx
]['sp_info'] != None:
1539 pid
= g_internal_info
[mach_idx
]['sp_info'][0]
1540 # do kill and kill check here
1541 for sig
in (1,2,15,3,9): # 1=HUP, 2=INT(i.e.^C), 15=TERM(default "kill"), 3=QUIT(i.e.^\), 9=KILL
1542 try: rpid
,status
= os
.waitpid(pid
,os
.WNOHANG
);status
=(status
>>8) # but I probably won't use this status
1543 except: rpid
= 0; break # i.e. (OSError, '[Errno 10] No child processes')
1544 if rpid
== pid
: # OK, process is out-of-there
1545 if g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts'] == None:
1546 TRACE( 30, "main KeyboardInterrupt status=%d", status
)
1547 g_internal_info
[mach_idx
]['ret_info']['rmt_sh_sts'] = 0x10
1550 TRACE( 30, "main os.kill(%d,%d)", pid
, sig
)
1551 select
.select([],[],[],0.05) # use select to sleep sub second
1553 # emulate the old shell version of rgang when rsh is "interrupted"
1554 sys
.exit( (1<<7)+2 )
1558 total_stat
,ret_list
= rgang( sys
.argv
[1:] )
1559 sys
.exit( total_stat
)
1563 # this simple "if ...main..." allows for taking advantage of *experimenting
1564 # with* the optimization (or even just the plain) byte compiled file via:
1565 # python -OO -c 'import rgang;rgang.main()' -nn all 'echo hi'
1566 # and/or a small script:
1568 # exec python -OO -c "
1569 # import sys;sys.argv[0]='`basename $0`';import rgang;rgang.main()" "$@"
1570 if __name__
== "__main__": main()