Bring CHANGES up to date.
[cvs2svn.git] / svntest / factory.py
blobcb387e83e241cc3b8a03a0f15b4522bbb69c9e94
2 # factory.py: Automatically generate a (near-)complete new cmdline test
3 # from a series of shell commands.
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
24 # under the License.
25 ######################################################################
28 ## HOW TO USE:
30 # (1) Edit the py test script you want to enhance (for example
31 # cmdline/basic_tests.py), add a new test header as usual.
32 # Insert a call to factory.make() into the empty test:
34 # def my_new_test(sbox):
35 # "my new test modifies iota"
36 # svntest.factory.make(sbox, """
37 # echo "foo" > A/D/foo
38 # svn add A/D/foo
39 # svn st
40 # svn ci
41 # """)
43 # (2) Add the test to the tests list at the bottom of the py file.
44 # [...]
45 # some_other_test,
46 # my_new_test,
47 # ]
50 # (3) Run the test, paste the output back into your new test,
51 # replacing the factory call.
53 # $ ./foo_tests.py my_new_test
54 # OR
55 # $ ./foo_tests.py my_new_test > new_test.snippet
56 # OR
57 # $ ./foo_tests.py my_new_test >> basic_tests.py
58 # Then edit (e.g.) basic_tests.py to put the script in the right place.
60 # Ensure that the py script (e.g. basic_tests.py) has these imports,
61 # so that the composed script that you pasted back finds everything
62 # that it uses:
63 # import os, shutil
64 # from svntest import main, wc, actions, verify
66 # Be aware that you have to paste the result back to the .py file.
68 # Be more aware that you have to read every single line and understand
69 # that it makes sense. If the current behaviour is wrong, you need to
70 # make the changes to expect the correct behaviour and XFail() your test.
72 # factory.make() just probes the current situation and writes a test that
73 # PASSES any success AND ANY FAILURE THAT IT FINDS. The resulting script
74 # will never fail anything (if it works correctly), not even a failure.
76 # ### TODO: some sort of intelligent pasting directly into the
77 # right place, like looking for the factory call,
78 # inserting the new test there, back-up-ing the old file.
81 # TROUBLESHOOTING
82 # If the result has a problem somewhere along the middle, you can,
83 # of course, only use the first part of the output result, maybe tweak
84 # something, and continue with another factory.make() at the end of that.
86 # Or you can first do stuff to your sbox and then call factory on it.
87 # Factory will notice if the sbox has already been built and calls
88 # sbox.build() only if you didn't already.
90 # You can also have any number of factory.make() calls scattered
91 # around "real" test code.
93 # Note that you can pass a prev_status and prev_disk to factory, to make
94 # the expected_* trees re-use a pre-existing one in the test, entirely
95 # for code beauty :P (has to match the wc_dir you will be using next).
98 # YOU ARE CORDIALLY INVITED to add/tweak/change to your needs.
99 # If you want to know what's going on, look at the switch()
100 # funtion of TestFactory below.
103 # DETAILS
104 # =======
106 # The input to factory.make(sbox, input) is not "real" shell-script.
107 # Factory goes at great lengths to try and understand your script, it
108 # parses common shell operations during tests and translates them.
110 # All arguments are tokenized similarly to shell, so if you need a space
111 # in an argument, use quotes.
112 # echo "my content" > A/new_file
113 # Quote char escaping is done like this:
114 # echo "my \\" content" > A/new_file
115 # echo 'my \\' content' > A/new_file
116 # If you use r""" echo 'my \' content' > A/new_file """ (triple quotes
117 # with a leading 'r' character), you don't need to double-escape any
118 # characters.
120 # You can either supply multiple lines, or separate the lines with ';'.
121 # factory.make(sbox, 'echo foo > bar; svn add bar')
122 # factory.make(sbox, 'echo foo > bar\n svn add bar')
123 # factory.make(sbox, r"""
124 # echo "foo\nbar" > bar
125 # svn add bar
126 # """)
129 # WORKING COPY PATHS
130 # - Factory will automatically build sbox.wc_dir if you didn't do so yet.
132 # - If you supply any path or file name, factory will prepend sbox.wc_dir
133 # to it.
134 # echo more >> iota
135 # --> main.file_append(
136 # os.path.join(sbox.wc_dir, 'iota'),
137 # "more")
138 # You can also do so explicitly.
139 # echo more >> wc_dir/iota
140 # --> main.file_append(
141 # os.path.join(sbox.wc_dir, 'iota'),
142 # "more")
144 # Factory implies the sbox.wc_dir if you fail to supply an explicit
145 # working copy dir. If you want to supply one explicitly, you can
146 # choose among these wildcards:
147 # 'wc_dir', 'wcdir', '$WC_DIR', '$WC' -- all expanded to sbox.wc_dir
148 # For example:
149 # 'svn mkdir wc_dir/A/D/X'
150 # But as long as you want to use only the default sbox.wc_dir, you usually
151 # don't need to supply any wc_dir-wildcard:
152 # 'mkdir A/X' creates the directory sbox.wc_dir/A/X
153 # (Factory tries to know which arguments of the commands you supplied
154 # are eligible to be path arguments. If something goes wrong here, try
155 # to fix factory.py to not mistake the arg for something different.
156 # You usually just need to tweak some parameters to args2svntest() to
157 # achieve correct expansion.)
159 # - If you want to use a second (or Nth) working copy, just supply any
160 # working copy wildcard with any made-up suffix, e.g. like this:
161 # 'svn st wc_dir_2' or 'svn info $WC_2'
162 # Factory will detect that you used another wc_dir and will automatically
163 # add a corresponding directory to your sbox. The directory will initially
164 # be nonexistent, so call 'mkdir', 'svn co' or 'cp' before using:
165 # 'cp wc_dir wc_dir_other' -- a copy of the current WC
166 # 'svn co $URL wc_dir_new' -- a clean checkout
167 # 'mkdir wc_dir_empty' -- an empty directory
168 # You can subsequently use any wc-dir wildcard with your suffix added.
170 # cp wc_dir wc_dir_2
171 # echo more >> wc_dir_2/iota
172 # --> wc_dir_2 = sbox.add_wc_path('2')
173 # shutil.copytrees(wc_dir, wc_dir_2)
174 # main.file_append(
175 # os.path.join(wc_dir_2, 'iota'),
176 # "more")
179 # URLs
180 # Factory currently knows only one repository, thus only one repos root.
181 # The wildcards you can use for it are:
182 # 'url', '$URL'
183 # A URL is not inserted automatically like wc_dir, you need to supply a
184 # URL wildcard.
185 # Alternatively, you can use '^/' URLs. However, that is in effect a different
186 # test from an explicit entire URL. The test needs to chdir to the working
187 # copy in order find which URL '^/' should expand to.
188 # (currently, factory will chdir to sbox.wc_dir. It will only chdir
189 # to another working copy if one of the other arguments involved a WC.
190 # ### TODO add a 'cd wc_dir_2' command to select another WC as default.)
191 # Example:
192 # 'svn co $URL Y' -- make a new nested working copy in sbox.wc_dir/Y
193 # 'svn co $URL wc_dir_2' -- create a new separate working copy
194 # 'svn cp ^/A ^/X' -- do a URL copy, creating $URL/X (branch)
197 # SOME EXAMPLES
198 # These commands should work:
200 # - "svn <subcommand> <options>"
201 # Some subcommands are parsed specially, others by a catch-all default
202 # parser (cmd_svn()), see switch().
203 # 'svn commit', 'svn commit --force', 'svn ci wc_dir_2'
204 # 'svn copy url/A url/X'
206 # - "echo contents > file" (replace)
207 # "echo contents >> file" (append)
208 # Calls main.file_write() / main.file_append().
209 # 'echo "froogle" >> A/D/G/rho' -- append to an existing file
210 # 'echo "bar" > A/quux' -- create a new file
211 # 'echo "fool" > wc_dir_2/me' -- manipulate other working copies
213 # - "mkdir <names> ..."
214 # Calls os.makedirs().
215 # You probably want 'svn mkdir' instead, or use 'svn add' after this.
216 # 'mkdir A/D/X' -- create an unversioned directory
217 # 'mkdir wc_dir_5' -- create a new, empty working copy
219 # - "rm <targets>"
220 # Calls main.safe_rmtree().
221 # You probably want to use 'svn delete' instead.
222 # 'rm A/D/G'
223 # 'rm wc_dir_2'
225 # - "mv <source> [<source2> ...] <target>"
226 # Calls shutil.move()
227 # You probably want to use 'svn move' instead.
228 # 'mv iota A/D/' -- move sbox.wc_dir/iota to sbox.wc_dir/A/D/.
230 # - "cp <source> [<source2> ...] <target>"
231 # Do a filesystem copy.
232 # You probably want to use 'svn copy' instead.
233 # 'cp wc_dir wc_dir_copy'
234 # 'cp A/D/G A/X'
236 # IF YOU NEED ANY OTHER COMMANDS:
237 # - first check if it doesn't work already. If not,
238 # - add your desired commands to factory.py! :)
239 # - alternatively, use a number of separate factory calls, doing what
240 # you need done in "real" svntest language in-between.
242 # IF YOU REALLY DON'T GROK THIS:
243 # - ask #svn-dev
244 # - ask dev@
245 # - ask neels
247 import sys, re, os, shutil, bisect, textwrap, shlex
249 import svntest
250 from svntest import main, actions, tree
251 from svntest import Failure
253 if sys.version_info[0] >= 3:
254 # Python >=3.0
255 from io import StringIO
256 else:
257 # Python <3.0
258 from cStringIO import StringIO
260 def make(wc_dir, commands, prev_status=None, prev_disk=None, verbose=True):
261 """The Factory Invocation Function. This is typically the only one
262 called from outside this file. See top comment in factory.py.
263 Prints the resulting py script to stdout when verbose is True and
264 returns the resulting line-list containing items as:
265 [ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
266 fac = TestFactory(wc_dir, prev_status, prev_disk)
267 fac.make(commands)
268 fac.print_script()
269 return fac.lines
273 class TestFactory:
274 """This class keeps all state around a factory.make() call."""
276 def __init__(self, sbox, prev_status=None, prev_disk=None):
277 self.sbox = sbox
279 # The input lines and their translations.
280 # Each translation usually has multiple output lines ('\n' characters).
281 self.lines = [] # [ ['in1', 'out1'], ['in2', 'out'], ...
283 # Any expected_status still there from a previous verification
284 self.prev_status = None
285 if prev_status:
286 self.prev_status = [None, prev_status] # svntest.wc.State
288 # Any expected_disk still there from a previous verification
289 self.prev_disk = None
290 if prev_disk:
291 reparented_prev_disk = svntest.wc.State(prev_disk.wc_dir, {});
292 reparented_prev_disk.add_state(sbox.wc_dir, prev_disk);
293 self.prev_disk = [None, reparented_prev_disk]
295 # Those command line options that expect an argument following
296 # which is not a path. (don't expand args following these)
297 self.keep_args_of = ['--depth', '--encoding', '-r',
298 '--changelist', '-m', '--message']
300 # A stack of $PWDs, to be able to chdir back after a chdir.
301 self.prevdirs = []
303 # The python variables we want to be declared at the beginning.
304 # These are path variables like "A_D = os.path.join(wc_dir, 'A', 'D')".
305 # The original wc_dir and url vars are not kept here.
306 self.vars = {}
308 # An optimized list kept up-to-date by variable additions
309 self.sorted_vars_by_pathlen = []
311 # Wether we ever used the variables 'wc_dir' and 'url' (tiny tweak)
312 self.used_wc_dir = False
313 self.used_url = False
315 # The alternate working copy directories created that need to be
316 # registered with sbox (are not inside another working copy).
317 self.other_wc_dirs = {}
320 def make(self, commands):
321 "internal main function, delegates everything except final output."
323 # keep a spacer for init
324 self.add_line(None, None)
326 init = ""
327 if not self.sbox.is_built():
328 self.sbox.build()
329 init += "sbox.build()\n"
332 try:
333 # split input args
334 input_lines = commands.replace(';','\n').splitlines()
335 for str in input_lines:
336 if len(str.strip()) > 0:
337 self.add_line(str)
339 for i in range(len(self.lines)):
340 if self.lines[i][0] is not None:
341 # This is where everything happens:
342 self.lines[i][1] = self.switch(self.lines[i][0])
344 # We're done. Add a final greeting.
345 self.add_line(
346 None,
347 "Remember, this only saves you typing. Doublecheck everything.")
349 # -- Insert variable defs in the first line --
350 # main wc_dir and url
351 if self.used_wc_dir:
352 init += 'wc_dir = sbox.wc_dir\n'
353 if self.used_url:
354 init += 'url = sbox.repo_url\n'
356 # registration of new WC dirs
357 sorted_names = self.get_sorted_other_wc_dir_names()
358 for name in sorted_names:
359 init += name + ' = ' + self.other_wc_dirs[name][0] + '\n'
361 if len(init) > 0:
362 init += '\n'
364 # general variable definitions
365 sorted_names = self.get_sorted_var_names()
366 for name in sorted_names:
367 init += name + ' = ' + self.vars[name][0] + '\n'
369 # Insert at the first line, being the spacer from above
370 if len(init) > 0:
371 self.lines[0][1] = init
373 # This usually goes to make() below (outside this class)
374 return self.lines
375 except:
376 for line in self.lines:
377 if line[1] is not None:
378 print(line[1])
379 raise
382 def print_script(self, stream=sys.stdout):
383 "Output the resulting script of the preceding make() call"
384 if self.lines is not None:
385 for line in self.lines:
386 if line[1] is None:
387 # fall back to just that line as it was in the source
388 stripped = line[0].strip()
389 if not stripped.startswith('#'):
390 # for comments, don't say this:
391 stream.write(" # don't know how to handle:\n")
392 stream.write(" " + line[0].strip() + '\n')
393 else:
394 if line[0] is not None:
395 stream.write( wrap_each_line(line[0].strip(),
396 " # ", " # ", True) + '\n')
397 stream.write(wrap_each_line(line[1], " ", " ", False) + '\n\n')
398 else:
399 stream.write(" # empty.\n")
400 stream.flush()
403 # End of public functions.
407 # "Shell" command handlers:
409 def switch(self, line):
410 "Given one input line, delegates to the appropriate sub-functions."
411 args = shlex.split(line)
412 if len(args) < 1:
413 return ""
414 first = args[0]
416 # This is just an if-cascade. Feel free to change that.
418 if first == 'svn':
419 second = args[1]
421 if second == 'add':
422 return self.cmd_svn(args[1:], False, self.keep_args_of)
424 if second in ['changelist', 'cl']:
425 keep_count = 2
426 if '--remove' in args:
427 keep_count = 1
428 return self.cmd_svn(args[1:], False, self.keep_args_of, keep_count)
430 if second in ['status','stat','st']:
431 return self.cmd_svn_status(args[2:])
433 if second in ['commit','ci']:
434 return self.cmd_svn_commit(args[2:])
436 if second in ['update','up']:
437 return self.cmd_svn_update(args[2:])
439 if second in ['switch','sw']:
440 return self.cmd_svn_switch(args[2:])
442 if second in ['copy', 'cp',
443 'move', 'mv', 'rename', 'ren']:
444 return self.cmd_svn_copy_move(args[1:])
446 if second in ['checkout', 'co']:
447 return self.cmd_svn_checkout(args[2:])
449 if second in ['propset','pset','ps']:
450 return self.cmd_svn(args[1:], False,
451 self.keep_args_of, 3)
453 if second in ['delete','del','remove', 'rm']:
454 return self.cmd_svn(args[1:], False,
455 self.keep_args_of + ['--with-revprop'])
457 # NOTE that not all commands need to be listed here, since
458 # some are already adequately handled by self.cmd_svn().
459 # If you find yours is not, add another self.cmd_svn_xxx().
460 return self.cmd_svn(args[1:], False, self.keep_args_of)
462 if first == 'echo':
463 return self.cmd_echo(args[1:])
465 if first == 'mkdir':
466 return self.cmd_mkdir(args[1:])
468 if first == 'rm':
469 return self.cmd_rm(args[1:])
471 if first == 'mv':
472 return self.cmd_mv(args[1:])
474 if first == 'cp':
475 return self.cmd_cp(args[1:])
477 # if all fails, take the line verbatim
478 return None
481 def cmd_svn_standard_run(self, pyargs, runargs, do_chdir, wc):
482 "The generic invocation of svn, helper function."
483 pychdir = self.chdir(do_chdir, wc)
485 code, out, err = main.run_svn("Maybe", *runargs)
487 if code == 0 and len(err) < 1:
488 # write a test that expects success
489 pylist = self.strlist2py(out)
490 if len(out) <= 1:
491 py = "expected_stdout = " + pylist + "\n\n"
492 else:
493 py = "expected_stdout = verify.UnorderedOutput(" + pylist + ")\n\n"
494 py += pychdir
495 py += "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
496 else:
497 # write a test that expects failure
498 pylist = self.strlist2py(err)
499 if len(err) <= 1:
500 py = "expected_stderr = " + pylist + "\n\n"
501 else:
502 py = "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
503 py += pychdir
504 py += ("actions.run_and_verify_svn2('OUTPUT', " +
505 "[], expected_stderr, " + str(code))
507 if len(pyargs) > 0:
508 py += ", " + ", ".join(pyargs)
509 py += ")\n"
510 py += self.chdir_back(do_chdir)
511 return py
514 def cmd_svn(self, svnargs, append_wc_dir_if_missing = False,
515 keep_args_of = [], keep_first_count = 1,
516 drop_with_arg = []):
517 "Handles all svn calls not handled by more specific functions."
519 pyargs, runargs, do_chdir, targets = self.args2svntest(svnargs,
520 append_wc_dir_if_missing, keep_args_of,
521 keep_first_count, drop_with_arg)
523 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
524 self.get_first_wc(targets))
527 def cmd_svn_status(self, status_args):
528 "Runs svn status, looks what happened and writes the script for it."
529 pyargs, runargs, do_chdir, targets = self.args2svntest(
530 status_args, True, self.keep_args_of, 0)
532 py = ""
534 for target in targets:
535 if not target.wc:
536 py += '# SKIPPING NON-WC ' + target.runarg + '\n'
537 continue
539 if '-q' in status_args:
540 pystatus = self.get_current_status(target.wc, True)
541 py += (pystatus +
542 "actions.run_and_verify_status(" + target.wc.py +
543 ", expected_status)\n")
544 else:
545 pystatus = self.get_current_status(target.wc, False)
546 py += (pystatus +
547 "actions.run_and_verify_unquiet_status(" + target.wc.py +
548 ", expected_status)\n")
549 return py
552 def cmd_svn_commit(self, commit_args):
553 "Runs svn commit, looks what happened and writes the script for it."
554 # these are the options that are followed by something that should not
555 # be parsed as a filename in the WC.
556 commit_arg_opts = [
557 "--depth",
558 "--with-revprop",
559 "--changelist",
560 # "-F", "--file", these take a file argument, don't list here.
561 # "-m", "--message", treated separately
564 pyargs, runargs, do_chdir, targets = self.args2svntest(
565 commit_args, True, commit_arg_opts, 0, ['-m', '--message'])
567 wc = self.get_first_wc(targets)
568 pychdir = self.chdir(do_chdir, wc)
570 code, output, err = main.run_svn("Maybe", 'ci',
571 '-m', 'log msg',
572 *runargs)
574 if code == 0 and len(err) < 1:
575 # write a test that expects success
577 output = actions.process_output_for_commit(output)
578 actual_out = tree.build_tree_from_commit(output)
579 py = ("expected_output = " +
580 self.tree2py(actual_out, wc) + "\n\n")
582 pystatus = self.get_current_status(wc)
583 py += pystatus
585 py += pychdir
586 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
587 "expected_output, expected_status, " +
588 "None")
589 else:
590 # write a test that expects error
591 py = "expected_error = " + self.strlist2py(err) + "\n\n"
592 py += pychdir
593 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
594 "None, None, expected_error")
596 if len(pyargs) > 0:
597 py += ', ' + ', '.join(pyargs)
598 py += ")"
599 py += self.chdir_back(do_chdir)
600 return py
603 def cmd_svn_update(self, update_args):
604 "Runs svn update, looks what happened and writes the script for it."
606 pyargs, runargs, do_chdir, targets = self.args2svntest(
607 update_args, True, self.keep_args_of, 0)
609 wc = self.get_first_wc(targets)
610 pychdir = self.chdir(do_chdir, wc)
612 code, output, err = main.run_svn('Maybe', 'up', *runargs)
614 if code == 0 and len(err) < 1:
615 # write a test that expects success
617 actual_out = svntest.wc.State.from_checkout(output).old_tree()
618 py = ("expected_output = " +
619 self.tree2py(actual_out, wc) + "\n\n")
621 pydisk = self.get_current_disk(wc)
622 py += pydisk
624 pystatus = self.get_current_status(wc)
625 py += pystatus
627 py += pychdir
628 py += ("actions.run_and_verify_update(" + wc.py + ", " +
629 "expected_output, expected_disk, expected_status, " +
630 "None, None, None, None, None, False")
631 else:
632 # write a test that expects error
633 py = "expected_error = " + self.strlist2py(err) + "\n\n"
634 py += pychdir
635 py += ("actions.run_and_verify_update(" + wc.py + ", None, None, " +
636 "None, expected_error, None, None, None, None, False")
638 if len(pyargs) > 0:
639 py += ', ' + ', '.join(pyargs)
640 py += ")"
641 py += self.chdir_back(do_chdir)
642 return py
645 def cmd_svn_switch(self, switch_args):
646 "Runs svn switch, looks what happened and writes the script for it."
648 pyargs, runargs, do_chdir, targets = self.args2svntest(
649 switch_args, True, self.keep_args_of, 0)
651 # Sort out the targets. We need one URL and one wc node, in that order.
652 if len(targets) < 2:
653 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
654 "switch. If you want to supply less, remove this " +
655 "check and implement whatever seems appropriate.")
657 wc_arg = targets[1]
658 del pyargs[wc_arg.argnr]
659 del runargs[wc_arg.argnr]
660 url_arg = targets[0]
661 del pyargs[url_arg.argnr]
662 del runargs[url_arg.argnr]
664 wc = wc_arg.wc
665 if not wc:
666 raise Failure("Unexpected argument ordering to factory's 'svn switch'?")
668 pychdir = self.chdir(do_chdir, wc)
670 #if '--force' in runargs:
671 # self.really_safe_rmtree(wc_arg.runarg)
673 code, output, err = main.run_svn('Maybe', 'sw',
674 url_arg.runarg, wc_arg.runarg,
675 *runargs)
677 py = ""
679 if code == 0 and len(err) < 1:
680 # write a test that expects success
682 actual_out = tree.build_tree_from_checkout(output)
683 py = ("expected_output = " +
684 self.tree2py(actual_out, wc) + "\n\n")
686 pydisk = self.get_current_disk(wc)
687 py += pydisk
689 pystatus = self.get_current_status(wc)
690 py += pystatus
692 py += pychdir
693 py += ("actions.run_and_verify_switch(" + wc.py + ", " +
694 wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
695 "expected_output, expected_disk, expected_status, " +
696 "None, None, None, None, None, False")
697 else:
698 # write a test that expects error
699 py = "expected_error = " + self.strlist2py(err) + "\n\n"
700 py += pychdir
701 py += ("actions.run_and_verify_switch(" + wc.py + ", " +
702 wc_arg.pyarg + ", " + url_arg.pyarg + ", " +
703 "None, None, None, expected_error, None, None, None, None, False")
705 if len(pyargs) > 0:
706 py += ', ' + ', '.join(pyargs)
707 py += ")"
708 py += self.chdir_back(do_chdir)
710 return py
713 def cmd_svn_checkout(self, checkout_args):
714 "Runs svn checkout, looks what happened and writes the script for it."
716 pyargs, runargs, do_chdir, targets = self.args2svntest(
717 checkout_args, True, self.keep_args_of, 0)
719 # Sort out the targets. We need one URL and one dir, in that order.
720 if len(targets) < 2:
721 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
722 "checkout. If you want to supply less, remove this " +
723 "check and implement whatever seems appropriate.")
724 # We need this separate for the call to run_and_verify_checkout()
725 # that's composed in the output script.
726 wc_arg = targets[1]
727 del pyargs[wc_arg.argnr]
728 del runargs[wc_arg.argnr]
729 url_arg = targets[0]
730 del pyargs[url_arg.argnr]
731 del runargs[url_arg.argnr]
733 wc = wc_arg.wc
735 pychdir = self.chdir(do_chdir, wc)
737 #if '--force' in runargs:
738 # self.really_safe_rmtree(wc_arg.runarg)
740 code, output, err = main.run_svn('Maybe', 'co',
741 url_arg.runarg, wc_arg.runarg,
742 *runargs)
744 py = ""
746 if code == 0 and len(err) < 1:
747 # write a test that expects success
749 actual_out = tree.build_tree_from_checkout(output)
750 pyout = ("expected_output = " +
751 self.tree2py(actual_out, wc) + "\n\n")
752 py += pyout
754 pydisk = self.get_current_disk(wc)
755 py += pydisk
757 py += pychdir
759 py += ("actions.run_and_verify_checkout(" +
760 url_arg.pyarg + ", " + wc_arg.pyarg +
761 ", expected_output, expected_disk, None, None, None, None")
762 else:
763 # write a test that expects failure
764 pylist = self.strlist2py(err)
765 if len(err) <= 1:
766 py += "expected_stderr = " + pylist + "\n\n"
767 else:
768 py += "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
769 py += pychdir
770 py += ("actions.run_and_verify_svn2('OUTPUT', " +
771 "[], expected_stderr, " + str(code) +
772 ", " + url_arg.pyarg + ", " + wc_arg.pyarg)
774 # Append the remaining args
775 if len(pyargs) > 0:
776 py += ', ' + ', '.join(pyargs)
777 py += ")"
778 py += self.chdir_back(do_chdir)
779 return py
782 def cmd_svn_copy_move(self, args):
783 "Runs svn copy or move, looks what happened and writes the script for it."
785 pyargs, runargs, do_chdir, targets = self.args2svntest(args,
786 False, self.keep_args_of, 1)
788 if len(targets) == 2 and targets[1].is_url:
789 # The second argument is a URL.
790 # This needs a log message. Is one supplied?
791 has_message = False
792 for arg in runargs:
793 if arg.startswith('-m') or arg == '--message':
794 has_message = True
795 break
796 if not has_message:
797 # add one
798 runargs += [ '-m', 'copy log' ]
799 pyargs = []
800 for arg in runargs:
801 pyargs += [ self.str2svntest(arg) ]
803 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
804 self.get_first_wc(targets))
807 def cmd_echo(self, echo_args):
808 "Writes a string to a file and writes the script for it."
809 # split off target
810 target_arg = None
811 replace = True
812 contents = None
813 for i in range(len(echo_args)):
814 arg = echo_args[i]
815 if arg.startswith('>'):
816 if len(arg) > 1:
817 if arg[1] == '>':
818 # it's a '>>'
819 replace = False
820 arg = arg[2:]
821 else:
822 arg = arg[1:]
823 if len(arg) > 0:
824 target_arg = arg
826 if target_arg is None:
827 # we need an index (i+1) to exist, and
828 # we need (i+1) to be the only existing index left in the list.
829 if i+1 != len(echo_args)-1:
830 raise Failure("don't understand: echo " + " ".join(echo_args))
831 target_arg = echo_args[i+1]
832 else:
833 # already got the target. no more indexes should exist.
834 if i != len(echo_args)-1:
835 raise Failure("don't understand: echo " + " ".join(echo_args))
837 contents = " ".join(echo_args[:i]) + '\n'
839 if target_arg is None:
840 raise Failure("echo needs a '>' pipe to a file name: echo " +
841 " ".join(echo_args))
843 target = self.path2svntest(target_arg)
845 if replace:
846 main.file_write(target.runarg, contents)
847 py = "main.file_write("
848 else:
849 main.file_append(target.runarg, contents)
850 py = "main.file_append("
851 py += target.pyarg + ", " + self.str2svntest(contents) + ")"
853 return py
856 def cmd_mkdir(self, mkdir_args):
857 "Makes a new directory and writes the script for it."
858 # treat all mkdirs as -p, ignore all -options.
859 out = ""
860 for arg in mkdir_args:
861 if not arg.startswith('-'):
862 target = self.path2svntest(arg)
863 # don't check for not being a url,
864 # maybe it's desired by the test or something.
865 os.makedirs(target.runarg)
866 out += "os.makedirs(" + target.pyarg + ")\n"
867 return out
870 def cmd_rm(self, rm_args):
871 "Removes a directory tree and writes the script for it."
872 # treat all removes as -rf, ignore all -options.
873 out = ""
874 for arg in rm_args:
875 if not arg.startswith('-'):
876 target = self.path2svntest(arg)
877 if os.path.isfile(target.runarg):
878 os.remove(target.runarg)
879 out += "os.remove(" + target.pyarg + ")\n"
880 else:
881 self.really_safe_rmtree(target.runarg)
882 out += "main.safe_rmtree(" + target.pyarg + ")\n"
883 return out
886 def cmd_mv(self, mv_args):
887 "Moves things in the filesystem and writes the script for it."
888 # ignore all -options.
889 out = ""
890 sources = []
891 target = None
892 for arg in mv_args:
893 if not arg.startswith('-'):
894 if target is not None:
895 sources += [target]
896 target = self.path2svntest(arg)
898 out = ""
899 for source in sources:
900 out += "shutil.move(" + source.pyarg + ", " + target.pyarg + ")\n"
901 shutil.move(source.runarg, target.runarg)
903 return out
906 def cmd_cp(self, mv_args):
907 "Copies in the filesystem and writes the script for it."
908 # ignore all -options.
909 out = ""
910 sources = []
911 target = None
912 for arg in mv_args:
913 if not arg.startswith('-'):
914 if target is not None:
915 sources += [target]
916 target = self.path2svntest(arg)
918 if not target:
919 raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
921 out = ""
922 for source in sources:
923 if os.path.exists(target.runarg):
924 raise Failure("cp target exists, remove first: " + target.pyarg)
925 if os.path.isdir(source.runarg):
926 shutil.copytree(source.runarg, target.runarg)
927 out += "shutil.copytree(" + source.pyarg + ", " + target.pyarg + ")\n"
928 elif os.path.isfile(source.runarg):
929 shutil.copy2(source.runarg, target.runarg)
930 out += "shutil.copy2(" + source.pyarg + ", " + target.pyarg + ")\n"
931 else:
932 raise Failure("cp copy source does not exist: " + source.pyarg)
934 return out
937 # End of "shell" command handling functions.
941 # Internal helpers:
944 class WorkingCopy:
945 "Defines the list of info we need around a working copy."
946 def __init__(self, py, realpath, suffix):
947 self.py = py
948 self.realpath = realpath
949 self.suffix = suffix
952 class Target:
953 "Defines the list of info we need around a command line supplied target."
954 def __init__(self, pyarg, runarg, argnr, is_url=False, wc=None):
955 self.pyarg = pyarg
956 self.runarg = runarg
957 self.argnr = argnr
958 self.is_url = is_url
959 self.wc = wc
962 def add_line(self, args, translation=None):
963 "Definition of how to add a new in/out line pair to LINES."
964 self.lines += [ [args, translation] ]
967 def really_safe_rmtree(self, dir):
968 # Safety catch. We don't want to remove outside the sandbox.
969 if dir.find('svn-test-work') < 0:
970 raise Failure("Tried to remove path outside working area: " + dir)
971 main.safe_rmtree(dir)
974 def get_current_disk(self, wc):
975 "Probes the given working copy and writes an expected_disk for it."
976 actual_disk = svntest.wc.State.from_wc(wc.realpath, False, True)
977 actual_disk.wc_dir = wc.realpath
979 make_py, prev_disk = self.get_prev_disk(wc)
981 # The tests currently compare SVNTreeNode trees, so let's do that too.
982 actual_disk_tree = actual_disk.old_tree()
983 prev_disk_tree = prev_disk.old_tree()
985 # find out the tweaks
986 tweaks = self.diff_trees(prev_disk_tree, actual_disk_tree, wc)
987 if tweaks == 'Purge':
988 make_py = ''
989 else:
990 tweaks = self.optimize_tweaks(tweaks, actual_disk_tree, wc)
992 self.remember_disk(wc, actual_disk)
994 pydisk = make_py + self.tweaks2py(tweaks, "expected_disk", wc)
995 if len(pydisk) > 0:
996 pydisk += '\n'
997 return pydisk
999 def get_prev_disk(self, wc):
1000 "Retrieves the last used expected_disk tree if any."
1001 make_py = ""
1002 # If a disk was supplied via __init__(), self.prev_disk[0] is set
1003 # to None, in which case we always use it, not checking WC.
1004 if self.prev_disk is None or \
1005 not self.prev_disk[0] in [None, wc.realpath]:
1006 disk = svntest.main.greek_state.copy()
1007 disk.wc_dir = wc.realpath
1008 self.remember_disk(wc, disk)
1009 make_py = "expected_disk = svntest.main.greek_state.copy()\n"
1010 else:
1011 disk = self.prev_disk[1]
1012 return make_py, disk
1014 def remember_disk(self, wc, actual):
1015 "Remembers the current disk tree for future reference."
1016 self.prev_disk = [wc.realpath, actual]
1019 def get_current_status(self, wc, quiet=True):
1020 "Probes the given working copy and writes an expected_status for it."
1021 if quiet:
1022 code, output, err = main.run_svn(None, 'status', '-v', '-u', '-q',
1023 wc.realpath)
1024 else:
1025 code, output, err = main.run_svn(None, 'status', '-v', '-u',
1026 wc.realpath)
1027 if code != 0 or len(err) > 0:
1028 raise Failure("Hmm. `svn status' failed. What now.")
1030 make_py, prev_status = self.get_prev_status(wc)
1032 actual_status = svntest.wc.State.from_status(output)
1034 # The tests currently compare SVNTreeNode trees, so let's do that too.
1035 prev_status_tree = prev_status.old_tree()
1036 actual_status_tree = actual_status.old_tree()
1038 # Get the tweaks
1039 tweaks = self.diff_trees(prev_status_tree, actual_status_tree, wc)
1041 if tweaks == 'Purge':
1042 # The tree is empty (happens with invalid WC dirs)
1043 make_py = "expected_status = wc.State(" + wc.py + ", {})\n"
1044 tweaks = []
1045 else:
1046 tweaks = self.optimize_tweaks(tweaks, actual_status_tree, wc)
1048 self.remember_status(wc, actual_status)
1050 pystatus = make_py + self.tweaks2py(tweaks, "expected_status", wc)
1051 if len(pystatus) > 0:
1052 pystatus += '\n'
1054 return pystatus
1056 def get_prev_status(self, wc):
1057 "Retrieves the last used expected_status tree if any."
1058 make_py = ""
1059 prev_status = None
1061 # re-use any previous status if we are still in the same WC dir.
1062 # If a status was supplied via __init__(), self.prev_status[0] is set
1063 # to None, in which case we always use it, not checking WC.
1064 if self.prev_status is None or \
1065 not self.prev_status[0] in [None, wc.realpath]:
1066 # There is no or no matching previous status. Make new one.
1067 try:
1068 # If it's really a WC, use its base revision
1069 base_rev = actions.get_wc_base_rev(wc.realpath)
1070 except:
1071 # Else, just use zero. Whatever.
1072 base_rev = 0
1073 prev_status = actions.get_virginal_state(wc.realpath, base_rev)
1074 make_py += ("expected_status = actions.get_virginal_state(" +
1075 wc.py + ", " + str(base_rev) + ")\n")
1076 else:
1077 # We will re-use the previous expected_status.
1078 prev_status = self.prev_status[1]
1079 # no need to make_py anything
1081 return make_py, prev_status
1083 def remember_status(self, wc, actual_status):
1084 "Remembers the current status tree for future reference."
1085 self.prev_status = [wc.realpath, actual_status]
1088 def chdir(self, do_chdir, wc):
1089 "Pushes the current dir onto the dir stack, does an os.chdir()."
1090 if not do_chdir:
1091 return ""
1092 self.prevdirs.append(os.getcwd())
1093 os.chdir(wc.realpath)
1094 py = ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
1095 "os.chdir(" + wc.py + ")\n")
1096 return py
1098 def chdir_back(self, do_chdir):
1099 "Does os.chdir() back to the directory popped from the dir stack's top."
1100 if not do_chdir:
1101 return ""
1102 # If this fails, there's a missing chdir() call:
1103 os.chdir(self.prevdirs.pop())
1104 return "os.chdir(orig_dir)\n"
1107 def get_sorted_vars_by_pathlen(self):
1108 """Compose a listing of variable names to be expanded in script output.
1109 This is intended to be stored in self.sorted_vars_by_pathlen."""
1110 lst = []
1112 for dict in [self.vars, self.other_wc_dirs]:
1113 for name in dict:
1114 runpath = dict[name][1]
1115 if not runpath:
1116 continue
1117 strlen = len(runpath)
1118 item = [strlen, name, runpath]
1119 bisect.insort(lst, item)
1121 return lst
1124 def get_sorted_var_names(self):
1125 """Compose a listing of variable names to be declared.
1126 This is used by TestFactory.make()."""
1127 paths = []
1128 urls = []
1129 for name in self.vars:
1130 if name.startswith('url_'):
1131 bisect.insort(urls, [name.lower(), name])
1132 else:
1133 bisect.insort(paths, [name.lower(), name])
1134 list = []
1135 for path in paths:
1136 list += [path[1]]
1137 for url in urls:
1138 list += [url[1]]
1139 return list
1142 def get_sorted_other_wc_dir_names(self):
1143 """Compose a listing of working copies to be declared with sbox.
1144 This is used by TestFactory.make()."""
1145 list = []
1146 for name in self.other_wc_dirs:
1147 bisect.insort(list, [name.lower(), name])
1148 names = []
1149 for item in list:
1150 names += [item[1]]
1151 return names
1154 def str2svntest(self, str):
1155 "Like str2py(), but replaces any known paths with variable names."
1156 if str is None:
1157 return "None"
1159 str = str2py(str)
1160 quote = str[0]
1162 def replace(str, path, name, quote):
1163 return str.replace(path, quote + " + " + name + " + " + quote)
1165 # We want longer paths first.
1166 for var in reversed(self.sorted_vars_by_pathlen):
1167 name = var[1]
1168 path = var[2]
1169 str = replace(str, path, name, quote)
1171 str = replace(str, self.sbox.wc_dir, 'wc_dir', quote)
1172 str = replace(str, self.sbox.repo_url, 'url', quote)
1174 # now remove trailing null-str adds:
1175 # '' + url_A_C + ''
1176 str = str.replace("'' + ",'').replace(" + ''",'')
1177 # "" + url_A_C + ""
1178 str = str.replace('"" + ',"").replace(' + ""',"")
1180 # just a stupid check. tiny tweak. (don't declare wc_dir and url
1181 # if they never appear)
1182 if not self.used_wc_dir:
1183 self.used_wc_dir = (re.search('\bwc_dir\b', str) is not None)
1184 if not self.used_url:
1185 self.used_url = str.find('url') >= 0
1187 return str
1190 def strlist2py(self, list):
1191 "Given a list of strings, composes a py script that produces the same."
1192 if list is None:
1193 return "None"
1194 if len(list) < 1:
1195 return "[]"
1196 if len(list) == 1:
1197 return "[" + self.str2svntest(list[0]) + "]"
1199 py = "[\n"
1200 for line in list:
1201 py += " " + self.str2svntest(line) + ",\n"
1202 py += "]"
1203 return py
1206 def get_node_path(self, node, wc):
1207 "Tries to return the node path relative to the given working copy."
1208 path = node.get_printable_path()
1209 if path.startswith(wc.realpath + os.sep):
1210 path = path[len(wc.realpath + os.sep):]
1211 elif path.startswith(wc.realpath):
1212 path = path[len(wc.realpath):]
1213 return path
1216 def node2py(self, node, wc, prepend="", drop_empties=True):
1217 "Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
1218 buf = StringIO()
1219 node.print_script(buf, wc.realpath, prepend, drop_empties)
1220 return buf.getvalue()
1223 def tree2py(self, node, wc):
1224 "Writes the wc.State definition for the given SVNTreeNode in given WC."
1225 # svntest.wc.State(wc_dir, {
1226 # 'A/mu' : Item(verb='Sending'),
1227 # 'A/D/G/rho' : Item(verb='Sending'),
1228 # })
1229 buf = StringIO()
1230 tree.dump_tree_script(node, stream=buf, subtree=wc.realpath,
1231 wc_varname=wc.py)
1232 return buf.getvalue()
1235 def diff_trees(self, left, right, wc):
1236 """Compares the two trees given by the SVNTreeNode instances LEFT and
1237 RIGHT in the given working copy and composes an internal list of
1238 tweaks necessary to make LEFT into RIGHT."""
1239 if not right.children:
1240 return 'Purge'
1241 return self._diff_trees(left, right, wc)
1243 def _diff_trees(self, left, right, wc):
1244 "Used by self.diff_trees(). No need to call this. See there."
1245 # all tweaks collected
1246 tweaks = []
1248 # the current tweak in composition
1249 path = self.get_node_path(left, wc)
1250 tweak = []
1252 # node attributes
1253 if ((left.contents is None) != (right.contents is None)) or \
1254 (left.contents != right.contents):
1255 tweak += [ ["contents", right.contents] ]
1257 for key in left.props:
1258 if key not in right.props:
1259 tweak += [ [key, None] ]
1260 elif left.props[key] != right.props[key]:
1261 tweak += [ [key, right.props[key]] ]
1263 for key in right.props:
1264 if key not in left.props:
1265 tweak += [ [key, right.props[key]] ]
1267 for key in left.atts:
1268 if key not in right.atts:
1269 tweak += [ [key, None] ]
1270 elif left.atts[key] != right.atts[key]:
1271 tweak += [ [key, right.atts[key]] ]
1273 for key in right.atts:
1274 if key not in left.atts:
1275 tweak += [ [key, right.atts[key]] ]
1277 if len(tweak) > 0:
1278 changetweak = [ 'Change', [path], tweak]
1279 tweaks += [changetweak]
1281 if left.children is not None:
1282 for leftchild in left.children:
1283 rightchild = None
1284 if right.children is not None:
1285 rightchild = tree.get_child(right, leftchild.name)
1286 if rightchild is None:
1287 paths = leftchild.recurse(lambda n: self.get_node_path(n, wc))
1288 removetweak = [ 'Remove', paths ]
1289 tweaks += [removetweak]
1291 if right.children is not None:
1292 for rightchild in right.children:
1293 leftchild = None
1294 if left.children is not None:
1295 leftchild = tree.get_child(left, rightchild.name)
1296 if leftchild is None:
1297 paths_and_nodes = rightchild.recurse(
1298 lambda n: [ self.get_node_path(n, wc), n ] )
1299 addtweak = [ 'Add', paths_and_nodes ]
1300 tweaks += [addtweak]
1301 else:
1302 tweaks += self._diff_trees(leftchild, rightchild, wc)
1304 return tweaks
1307 def optimize_tweaks(self, tweaks, actual_tree, wc):
1308 "Given an internal list of tweaks, make them optimal by common sense."
1309 if tweaks == 'Purge':
1310 return tweaks
1312 subtree = actual_tree.find_node(wc.realpath)
1313 if not subtree:
1314 subtree = actual_tree
1316 remove_paths = []
1317 additions = []
1318 changes = []
1320 for tweak in tweaks:
1321 if tweak[0] == 'Remove':
1322 remove_paths += tweak[1]
1323 elif tweak[0] == 'Add':
1324 additions += tweak[1]
1325 else:
1326 changes += [tweak]
1328 # combine removals
1329 removal = []
1330 if len(remove_paths) > 0:
1331 removal = [ [ 'Remove', remove_paths] ]
1333 # combine additions
1334 addition = []
1335 if len(additions) > 0:
1336 addition = [ [ 'Add', additions ] ]
1338 # find those changes that should be done on all nodes at once.
1339 def remove_mod(mod):
1340 for change in changes:
1341 if mod in change[2]:
1342 change[2].remove(mod)
1344 seen = []
1345 tweak_all = []
1346 for change in changes:
1347 tweak = change[2]
1348 for mod in tweak:
1349 if mod in seen:
1350 continue
1351 seen += [mod]
1353 # here we see each single "name=value" tweak in mod.
1354 # Check if the actual tree had this anyway all the way through.
1355 name = mod[0]
1356 val = mod[1]
1358 if name == 'contents' and val is None:
1359 continue;
1361 def check_node(node):
1362 if (
1363 (name == 'contents' and node.contents == val)
1365 (node.props and (name in node.props) and node.props[name] == val)
1367 (node.atts and (name in node.atts) and node.atts[name] == val)):
1368 # has this same thing set. count on the left.
1369 return [node, None]
1370 return [None, node]
1371 results = subtree.recurse(check_node)
1372 have = []
1373 havent = []
1374 for result in results:
1375 if result[0]:
1376 have += [result[0]]
1377 else:
1378 havent += [result[1]]
1380 if havent == []:
1381 # ok, then, remove all tweaks that are like this, then
1382 # add a generic tweak.
1383 remove_mod(mod)
1384 tweak_all += [mod]
1385 elif len(havent) < len(have) * 3: # this is "an empirical factor"
1386 remove_mod(mod)
1387 tweak_all += [mod]
1388 # record the *other* nodes' actual item, overwritten above
1389 for node in havent:
1390 name = mod[0]
1391 if name == 'contents':
1392 value = node.contents
1393 elif name in node.props:
1394 value = node.props[name]
1395 elif name in node.atts:
1396 value = node.atts[name]
1397 else:
1398 continue
1399 changes += [ ['Change',
1400 [self.get_node_path(node, wc)],
1401 [[name, value]]
1405 # combine those paths that have exactly the same changes
1406 i = 0
1407 j = 0
1408 while i < len(changes):
1409 # find other changes that are identical
1410 j = i + 1
1411 while j < len(changes):
1412 if changes[i][2] == changes[j][2]:
1413 changes[i][1] += changes[j][1]
1414 del changes[j]
1415 else:
1416 j += 1
1417 i += 1
1419 # combine those changes that have exactly the same paths
1420 i = 0
1421 j = 0
1422 while i < len(changes):
1423 # find other paths that are identical
1424 j = i + 1
1425 while j < len(changes):
1426 if changes[i][1] == changes[j][1]:
1427 changes[i][2] += changes[j][2]
1428 del changes[j]
1429 else:
1430 j += 1
1431 i += 1
1433 if tweak_all != []:
1434 changes = [ ['Change', [], tweak_all ] ] + changes
1436 return removal + addition + changes
1439 def tweaks2py(self, tweaks, var_name, wc):
1440 "Given an internal list of tweaks, write the tweak script for it."
1441 py = ""
1442 if tweaks is None:
1443 return ""
1445 if tweaks == 'Purge':
1446 return var_name + " = wc.State(" + wc.py + ", {})\n"
1448 for tweak in tweaks:
1449 if tweak[0] == 'Remove':
1450 py += var_name + ".remove("
1451 paths = tweak[1]
1452 py += self.str2svntest(paths[0])
1453 for path in paths[1:]:
1454 py += ", " + self.str2svntest(path)
1455 py += ")\n"
1457 elif tweak[0] == 'Add':
1458 # add({'A/D/H/zeta' : Item(status=' ', wc_rev=9), ...})
1459 py += var_name + ".add({"
1460 adds = tweak[1]
1461 for add in adds:
1462 path = add[0]
1463 node = add[1]
1464 py += self.node2py(node, wc, "\n ", False)
1465 py += "\n})\n"
1467 else:
1468 paths = tweak[1]
1469 mods = tweak[2]
1470 if mods != []:
1471 py += var_name + ".tweak("
1472 for path in paths:
1473 py += self.str2svntest(path) + ", "
1474 def mod2py(mod):
1475 return mod[0] + "=" + self.str2svntest(mod[1])
1476 py += mod2py(mods[0])
1477 for mod in mods[1:]:
1478 py += ", " + mod2py(mod)
1479 py += ")\n"
1480 return py
1483 def path2svntest(self, path, argnr=None, do_remove_on_new_wc_path=True):
1484 """Given an input argument, do one hell of a path expansion on it.
1485 ARGNR is simply inserted into the resulting Target.
1486 Returns a self.Target instance.
1488 wc = self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1489 url = self.sbox.repo_url # do we need multiple URLs too??
1491 pathsep = '/'
1492 if path.find('/') < 0 and path.find('\\') >= 0:
1493 pathsep = '\\'
1495 is_url = False
1497 # If you add to these, make sure you add longer ones first, to
1498 # avoid e.g. '$WC_DIR' matching '$WC' first.
1499 wc_dir_wildcards = ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
1500 url_wildcards = ['url', '$URL']
1502 first = path.split(pathsep, 1)[0]
1503 if first in wc_dir_wildcards:
1504 path = path[len(first):]
1505 elif first in url_wildcards:
1506 path = path[len(first):]
1507 is_url = True
1508 else:
1509 for url_scheme in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
1510 if path.startswith(url_scheme):
1511 is_url = True
1512 # keep it as it is
1513 pyarg = self.str2svntest(path)
1514 runarg = path
1515 return self.Target(pyarg, runarg, argnr, is_url, None)
1517 for wc_dir_wildcard in wc_dir_wildcards:
1518 if first.startswith(wc_dir_wildcard):
1519 # The first path element starts with "wc_dir" (or similar),
1520 # but it has more attached to it. Like "wc_dir.2" or "wc_dir_other"
1521 # Record a new wc dir name.
1523 # try to figure out a nice suffix to pass to sbox.
1524 # (it will create a new dir called sbox.wc_dir + '.' + suffix)
1525 suffix = ''
1526 if first[len(wc_dir_wildcard)] in ['.','-','_']:
1527 # it's a separator already, don't duplicate the dot. (warm&fuzzy)
1528 suffix = first[len(wc_dir_wildcard) + 1:]
1529 if len(suffix) < 1:
1530 suffix = first[len(wc_dir_wildcard):]
1532 if len(suffix) < 1:
1533 raise Failure("no suffix supplied to other-wc_dir arg")
1535 # Streamline the var name
1536 suffix = suffix.replace('.','_').replace('-','_')
1537 other_wc_dir_varname = 'wc_dir_' + suffix
1539 path = path[len(first):]
1541 real_path = self.get_other_wc_real_path(other_wc_dir_varname,
1542 suffix,
1543 do_remove_on_new_wc_path)
1545 wc = self.WorkingCopy(other_wc_dir_varname,
1546 real_path, suffix)
1547 # found a match, no need to loop further, but still process
1548 # the path further.
1549 break
1551 if len(path) < 1 or path == pathsep:
1552 if is_url:
1553 self.used_url = True
1554 pyarg = 'url'
1555 runarg = url
1556 wc = None
1557 else:
1558 if wc.suffix is None:
1559 self.used_wc_dir = True
1560 pyarg = wc.py
1561 runarg = wc.realpath
1562 else:
1563 pathelements = split_remove_empty(path, pathsep)
1565 # make a new variable, if necessary
1566 if is_url:
1567 pyarg, runarg = self.ensure_url_var(pathelements)
1568 wc = None
1569 else:
1570 pyarg, runarg = self.ensure_path_var(wc, pathelements)
1572 return self.Target(pyarg, runarg, argnr, is_url, wc)
1575 def get_other_wc_real_path(self, varname, suffix, do_remove):
1576 "Create or retrieve the path of an alternate working copy."
1577 if varname in self.other_wc_dirs:
1578 return self.other_wc_dirs[varname][1]
1580 # see if there is a wc already in the sbox
1581 path = self.sbox.wc_dir + '.' + suffix
1582 if path in self.sbox.test_paths:
1583 py = "sbox.wc_dir + '." + suffix + "'"
1584 else:
1585 # else, we must still create one.
1586 path = self.sbox.add_wc_path(suffix, do_remove)
1587 py = "sbox.add_wc_path(" + str2py(suffix)
1588 if not do_remove:
1589 py += ", remove=False"
1590 py += ')'
1592 value = [py, path]
1593 self.other_wc_dirs[varname] = [py, path]
1594 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1595 return path
1598 def define_var(self, name, value):
1599 "Add a variable definition, don't allow redefinitions."
1600 # see if we already have this var
1601 if name in self.vars:
1602 if self.vars[name] != value:
1603 raise Failure("Variable name collision. Hm, fix factory.py?")
1604 # ok, it's recorded correctly. Nothing needs to happen.
1605 return
1607 # a new variable needs to be recorded
1608 self.vars[name] = value
1609 # update the sorted list of vars for substitution by str2svntest()
1610 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1613 def ensure_path_var(self, wc, pathelements):
1614 "Given a path in a working copy, make sure we have a variable for it."
1616 # special case: if a path is '.', simply use wc_dir.
1617 if pathelements == ['.']:
1618 return wc.py, wc.realpath
1620 name = "_".join(pathelements)
1622 if wc.suffix is not None:
1623 # This is an "other" working copy (not the default).
1624 # The suffix of the wc_dir variable serves as the prefix:
1625 # wc_dir_other ==> other_A_D = os.path.join(wc_dir_other, 'A', 'D')
1626 name = wc.suffix + "_" + name
1627 if name[0].isdigit():
1628 name = "_" + name
1629 else:
1630 self.used_wc_dir = True
1632 py = 'os.path.join(' + wc.py
1633 if len(pathelements) > 0:
1634 py += ", '" + "', '".join(pathelements) + "'"
1635 py += ')'
1637 wc_dir_real_path = wc.realpath
1638 run = os.path.join(wc_dir_real_path, *pathelements)
1640 value = [py, run]
1641 self.define_var(name, value)
1643 return name, run
1646 def ensure_url_var(self, pathelements):
1647 "Given a path in the test repository, ensure we have a url var for it."
1648 name = "url_" + "_".join(pathelements)
1650 joined = "/" + "/".join(pathelements)
1652 py = 'url'
1653 if len(pathelements) > 0:
1654 py += " + " + str2py(joined)
1655 self.used_url = True
1657 run = self.sbox.repo_url + joined
1659 value = [py, run]
1660 self.define_var(name, value)
1662 return name, run
1665 def get_first_wc(self, target_list):
1666 """In a list of Target instances, find the first one that is in a
1667 working copy and return that WorkingCopy. Default to sbox.wc_dir.
1668 This is useful if we need a working copy for a '^/' URL."""
1669 for target in target_list:
1670 if target.wc:
1671 return target.wc
1672 return self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1675 def args2svntest(self, args, append_wc_dir_if_missing = False,
1676 keep_args_of = [], keep_first_count = 1,
1677 drop_with_arg = []):
1678 """Tries to be extremely intelligent at parsing command line arguments.
1679 It needs to know which args are file targets that should be in a
1680 working copy. File targets are magically expanded.
1682 args: list of string tokens as passed to factory.make(), e.g.
1683 ['svn', 'commit', '--force', 'wc_dir2']
1685 append_wc_dir_if_missing: It's a switch.
1687 keep_args_of: See TestFactory.keep_args_of (comment in __init__)
1689 keep_first_count: Don't expand the first N non-option args. This is used
1690 to preserve e.g. the token 'update' in '[svn] update wc_dir'
1691 (the 'svn' is usually split off before this function is called).
1693 drop_with_arg: list of string tokens that are commandline options with
1694 following argument which we want to drop from the list of args
1695 (e.g. -m message).
1698 wc_dir = self.sbox.wc_dir
1699 url = self.sbox.repo_url
1701 target_supplied = False
1702 pyargs = []
1703 runargs = []
1704 do_chdir = False
1705 targets = []
1706 wc_dirs = []
1708 i = 0
1709 while i < len(args):
1710 arg = args[i]
1712 if arg in drop_with_arg:
1713 # skip this and the next arg
1714 if not arg.startswith('--') and len(arg) > 2:
1715 # it is a concatenated arg like -r123 instead of -r 123
1716 # skip only this one. Do nothing.
1717 i = i
1718 else:
1719 # skip this and the next arg
1720 i += 1
1722 elif arg.startswith('-'):
1723 # keep this option arg verbatim.
1724 pyargs += [ self.str2svntest(arg) ]
1725 runargs += [ arg ]
1726 # does this option expect a non-filename argument?
1727 # take that verbatim as well.
1728 if arg in keep_args_of:
1729 i += 1
1730 if i < len(args):
1731 arg = args[i]
1732 pyargs += [ self.str2svntest(arg) ]
1733 runargs += [ arg ]
1735 elif keep_first_count > 0:
1736 # args still to be taken verbatim.
1737 pyargs += [ self.str2svntest(arg) ]
1738 runargs += [ arg ]
1739 keep_first_count -= 1
1741 elif arg.startswith('^/'):
1742 # this is a ^/url, keep it verbatim.
1743 # if we use "^/", we need to chdir(wc_dir).
1744 do_chdir = True
1745 pyarg = str2py(arg)
1746 targets += [ self.Target(pyarg, arg, len(pyargs), True, None) ]
1747 pyargs += [ pyarg ]
1748 runargs += [ arg ]
1750 else:
1751 # well, then this must be a filename or url, autoexpand it.
1752 target = self.path2svntest(arg, argnr=len(pyargs))
1753 pyargs += [ target.pyarg ]
1754 runargs += [ target.runarg ]
1755 target_supplied = True
1756 targets += [ target ]
1758 i += 1
1760 if not target_supplied and append_wc_dir_if_missing:
1761 # add a simple wc_dir target
1762 self.used_wc_dir = True
1763 wc = self.WorkingCopy('wc_dir', wc_dir, None)
1764 targets += [ self.Target('wc_dir', wc_dir, len(pyargs), False, wc) ]
1765 pyargs += [ 'wc_dir' ]
1766 runargs += [ wc_dir ]
1768 return pyargs, runargs, do_chdir, targets
1770 ###### END of the TestFactory class ######
1774 # Quotes-preserving text wrapping for output
1776 def find_quote_end(text, i):
1777 "In string TEXT, find the end of the qoute that starts at TEXT[i]"
1778 # don't handle """ quotes
1779 quote = text[i]
1780 i += 1
1781 while i < len(text):
1782 if text[i] == '\\':
1783 i += 1
1784 elif text[i] == quote:
1785 return i
1786 i += 1
1787 return len(text) - 1
1790 class MyWrapper(textwrap.TextWrapper):
1791 "A textwrap.TextWrapper that doesn't break a line within quotes."
1792 ### TODO regexes would be nice, maybe?
1793 def _split(self, text):
1794 parts = []
1795 i = 0
1796 start = 0
1797 # This loop will break before and after each space, but keep
1798 # quoted strings in one piece. Example, breaks marked '/':
1799 # /(one,/ /two(blagger),/ /'three three three',)/
1800 while i < len(text):
1801 if text[i] in ['"', "'"]:
1802 # handle """ quotes. (why, actually?)
1803 if text[i:i+3] == '"""':
1804 end = text[i+3:].find('"""')
1805 if end >= 0:
1806 i += end + 2
1807 else:
1808 i = len(text) - 1
1809 else:
1810 # handle normal quotes
1811 i = find_quote_end(text, i)
1812 elif text[i].isspace():
1813 # split off previous section, if any
1814 if start < i:
1815 parts += [text[start:i]]
1816 start = i
1817 # split off this space
1818 parts += [text[i]]
1819 start = i + 1
1821 i += 1
1823 if start < len(text):
1824 parts += [text[start:]]
1825 return parts
1828 def wrap_each_line(str, ii, si, blw):
1829 """Wrap lines to a defined width (<80 chars). Feed the lines single to
1830 MyWrapper, so that it preserves the current line endings already in there.
1831 We only want to insert new wraps, not remove existing newlines."""
1832 wrapper = MyWrapper(77, initial_indent=ii,
1833 subsequent_indent=si)
1835 lines = str.splitlines()
1836 for i in range(0,len(lines)):
1837 if lines[i] != '':
1838 lines[i] = wrapper.fill(lines[i])
1839 return '\n'.join(lines)
1843 # Other miscellaneous helpers
1845 def sh2str(string):
1846 "un-escapes away /x sequences"
1847 if string is None:
1848 return None
1849 return string.decode("string-escape")
1852 def get_quote_style(str):
1853 """find which quote is the outer one, ' or "."""
1854 quote_char = None
1855 at = None
1857 found = str.find("'")
1858 found2 = str.find('"')
1860 # If found == found2, both must be -1, so nothing was found.
1861 if found != found2:
1862 # If a quote was found
1863 if found >= 0 and found2 >= 0:
1864 # If both were found, invalidate the later one
1865 if found < found2:
1866 found2 = -1
1867 else:
1868 found = -1
1869 # See which one remains.
1870 if found >= 0:
1871 at = found + 1
1872 quote_char = "'"
1873 elif found2 >= 0:
1874 at = found2 + 1
1875 quote_char = '"'
1877 return quote_char, at
1879 def split_remove_empty(str, sep):
1880 "do a split, then remove empty elements."
1881 list = str.split(sep)
1882 return filter(lambda item: item and len(item) > 0, list)
1884 def str2py(str):
1885 "returns the string enclosed in quotes, suitable for py scripts."
1886 if str is None:
1887 return "None"
1889 # try to make a nice choice of quoting character
1890 if str.find("'") >= 0:
1891 return '"' + str.encode("string-escape"
1892 ).replace("\\'", "'"
1893 ).replace('"', '\\"') + '"'
1894 else:
1895 return "'" + str.encode("string-escape") + "'"
1897 return str
1900 ### End of file.