Fix the expansion of the $Source$ keyword.
[cvs2svn.git] / svntest / factory.py
blob7c8af7f81d2bc8484cc288073a94e50ff4180e78
2 # factory.py: Automatically generate a (near-)complete new cmdline test
3 # from a series of shell commands.
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
24 # under the License.
25 ######################################################################
28 ## HOW TO USE:
30 # (1) Edit the py test script you want to enhance (for example
31 # cmdline/basic_tests.py), add a new test header as usual.
32 # Insert a call to factory.make() into the empty test:
34 # def my_new_test(sbox):
35 # "my new test modifies iota"
36 # svntest.factory.make(sbox, """
37 # echo "foo" > A/D/foo
38 # svn add A/D/foo
39 # svn st
40 # svn ci
41 # """)
43 # (2) Add the test to the tests list at the bottom of the py file.
44 # [...]
45 # some_other_test,
46 # my_new_test,
47 # ]
50 # (3) Run the test, paste the output back into your new test,
51 # replacing the factory call.
53 # $ ./foo_tests.py my_new_test
54 # OR
55 # $ ./foo_tests.py my_new_test > new_test.snippet
56 # OR
57 # $ ./foo_tests.py my_new_test >> basic_tests.py
58 # Then edit (e.g.) basic_tests.py to put the script in the right place.
60 # Ensure that the py script (e.g. basic_tests.py) has these imports,
61 # so that the composed script that you pasted back finds everything
62 # that it uses:
63 # import os, shutil
64 # from svntest import main, wc, actions, verify
66 # Be aware that you have to paste the result back to the .py file.
68 # Be more aware that you have to read every single line and understand
69 # that it makes sense. If the current behaviour is wrong, you need to
70 # make the changes to expect the correct behaviour and XFail() your test.
72 # factory.make() just probes the current situation and writes a test that
73 # PASSES any success AND ANY FAILURE THAT IT FINDS. The resulting script
74 # will never fail anything (if it works correctly), not even a failure.
76 # ### TODO: some sort of intelligent pasting directly into the
77 # right place, like looking for the factory call,
78 # inserting the new test there, back-up-ing the old file.
81 # TROUBLESHOOTING
82 # If the result has a problem somewhere along the middle, you can,
83 # of course, only use the first part of the output result, maybe tweak
84 # something, and continue with another factory.make() at the end of that.
86 # Or you can first do stuff to your sbox and then call factory on it.
87 # Factory will notice if the sbox has already been built and calls
88 # sbox.build() only if you didn't already.
90 # You can also have any number of factory.make() calls scattered
91 # around "real" test code.
93 # Note that you can pass a prev_status and prev_disk to factory, to make
94 # the expected_* trees re-use a pre-existing one in the test, entirely
95 # for code beauty :P (has to match the wc_dir you will be using next).
98 # YOU ARE CORDIALLY INVITED to add/tweak/change to your needs.
99 # If you want to know what's going on, look at the switch()
100 # funtion of TestFactory below.
103 # DETAILS
104 # =======
106 # The input to factory.make(sbox, input) is not "real" shell-script.
107 # Factory goes at great lengths to try and understand your script, it
108 # parses common shell operations during tests and translates them.
110 # All arguments are tokenized similarly to shell, so if you need a space
111 # in an argument, use quotes.
112 # echo "my content" > A/new_file
113 # Quote char escaping is done like this:
114 # echo "my \\" content" > A/new_file
115 # echo 'my \\' content' > A/new_file
116 # If you use r""" echo 'my \' content' > A/new_file """ (triple quotes
117 # with a leading 'r' character), you don't need to double-escape any
118 # characters.
120 # You can either supply multiple lines, or separate the lines with ';'.
121 # factory.make(sbox, 'echo foo > bar; svn add bar')
122 # factory.make(sbox, 'echo foo > bar\n svn add bar')
123 # factory.make(sbox, r"""
124 # echo "foo\nbar" > bar
125 # svn add bar
126 # """)
129 # WORKING COPY PATHS
130 # - Factory will automatically build sbox.wc_dir if you didn't do so yet.
132 # - If you supply any path or file name, factory will prepend sbox.wc_dir
133 # to it.
134 # echo more >> iota
135 # --> main.file_append(
136 # os.path.join(sbox.wc_dir, 'iota'),
137 # "more")
138 # You can also do so explicitly.
139 # echo more >> wc_dir/iota
140 # --> main.file_append(
141 # os.path.join(sbox.wc_dir, 'iota'),
142 # "more")
144 # Factory implies the sbox.wc_dir if you fail to supply an explicit
145 # working copy dir. If you want to supply one explicitly, you can
146 # choose among these wildcards:
147 # 'wc_dir', 'wcdir', '$WC_DIR', '$WC' -- all expanded to sbox.wc_dir
148 # For example:
149 # 'svn mkdir wc_dir/A/D/X'
150 # But as long as you want to use only the default sbox.wc_dir, you usually
151 # don't need to supply any wc_dir-wildcard:
152 # 'mkdir A/X' creates the directory sbox.wc_dir/A/X
153 # (Factory tries to know which arguments of the commands you supplied
154 # are eligible to be path arguments. If something goes wrong here, try
155 # to fix factory.py to not mistake the arg for something different.
156 # You usually just need to tweak some parameters to args2svntest() to
157 # achieve correct expansion.)
159 # - If you want to use a second (or Nth) working copy, just supply any
160 # working copy wildcard with any made-up suffix, e.g. like this:
161 # 'svn st wc_dir_2' or 'svn info $WC_2'
162 # Factory will detect that you used another wc_dir and will automatically
163 # add a corresponding directory to your sbox. The directory will initially
164 # be nonexistent, so call 'mkdir', 'svn co' or 'cp' before using:
165 # 'cp wc_dir wc_dir_other' -- a copy of the current WC
166 # 'svn co $URL wc_dir_new' -- a clean checkout
167 # 'mkdir wc_dir_empty' -- an empty directory
168 # You can subsequently use any wc-dir wildcard with your suffix added.
170 # cp wc_dir wc_dir_2
171 # echo more >> wc_dir_2/iota
172 # --> wc_dir_2 = sbox.add_wc_path('2')
173 # shutil.copytrees(wc_dir, wc_dir_2)
174 # main.file_append(
175 # os.path.join(wc_dir_2, 'iota'),
176 # "more")
179 # URLs
180 # Factory currently knows only one repository, thus only one repos root.
181 # The wildcards you can use for it are:
182 # 'url', '$URL'
183 # A URL is not inserted automatically like wc_dir, you need to supply a
184 # URL wildcard.
185 # Alternatively, you can use '^/' URLs. However, that is in effect a different
186 # test from an explicit entire URL. The test needs to chdir to the working
187 # copy in order find which URL '^/' should expand to.
188 # (currently, factory will chdir to sbox.wc_dir. It will only chdir
189 # to another working copy if one of the other arguments involved a WC.
190 # ### TODO add a 'cd wc_dir_2' command to select another WC as default.)
191 # Example:
192 # 'svn co $URL Y' -- make a new nested working copy in sbox.wc_dir/Y
193 # 'svn co $URL wc_dir_2' -- create a new separate working copy
194 # 'svn cp ^/A ^/X' -- do a URL copy, creating $URL/X (branch)
197 # SOME EXAMPLES
198 # These commands should work:
200 # - "svn <subcommand> <options>"
201 # Some subcommands are parsed specially, others by a catch-all default
202 # parser (cmd_svn()), see switch().
203 # 'svn commit', 'svn commit --force', 'svn ci wc_dir_2'
204 # 'svn copy url/A url/X'
206 # - "echo contents > file" (replace)
207 # "echo contents >> file" (append)
208 # Calls main.file_write() / main.file_append().
209 # 'echo "froogle" >> A/D/G/rho' -- append to an existing file
210 # 'echo "bar" > A/quux' -- create a new file
211 # 'echo "fool" > wc_dir_2/me' -- manipulate other working copies
213 # - "mkdir <names> ..."
214 # Calls os.makedirs().
215 # You probably want 'svn mkdir' instead, or use 'svn add' after this.
216 # 'mkdir A/D/X' -- create an unversioned directory
217 # 'mkdir wc_dir_5' -- create a new, empty working copy
219 # - "rm <targets>"
220 # Calls main.safe_rmtree().
221 # You probably want to use 'svn delete' instead.
222 # 'rm A/D/G'
223 # 'rm wc_dir_2'
225 # - "mv <source> [<source2> ...] <target>"
226 # Calls shutil.move()
227 # You probably want to use 'svn move' instead.
228 # 'mv iota A/D/' -- move sbox.wc_dir/iota to sbox.wc_dir/A/D/.
230 # - "cp <source> [<source2> ...] <target>"
231 # Do a filesystem copy.
232 # You probably want to use 'svn copy' instead.
233 # 'cp wc_dir wc_dir_copy'
234 # 'cp A/D/G A/X'
236 # IF YOU NEED ANY OTHER COMMANDS:
237 # - first check if it doesn't work already. If not,
238 # - add your desired commands to factory.py! :)
239 # - alternatively, use a number of separate factory calls, doing what
240 # you need done in "real" svntest language in-between.
242 # IF YOU REALLY DON'T GROK THIS:
243 # - ask #svn-dev
244 # - ask dev@
245 # - ask neels
247 import sys, re, os, shutil, bisect, textwrap, shlex
249 import svntest
250 from svntest import main, actions, tree
251 from svntest import Failure
253 if sys.version_info[0] >= 3:
254 # Python >=3.0
255 from io import StringIO
256 else:
257 # Python <3.0
258 from StringIO import StringIO
260 def make(wc_dir, commands, prev_status=None, prev_disk=None, verbose=True):
261 """The Factory Invocation Function. This is typically the only one
262 called from outside this file. See top comment in factory.py.
263 Prints the resulting py script to stdout when verbose is True and
264 returns the resulting line-list containing items as:
265 [ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
266 fac = TestFactory(wc_dir, prev_status, prev_disk)
267 fac.make(commands)
268 fac.print_script()
269 return fac.lines
273 class TestFactory:
274 """This class keeps all state around a factory.make() call."""
276 def __init__(self, sbox, prev_status=None, prev_disk=None):
277 self.sbox = sbox
279 # The input lines and their translations.
280 # Each translation usually has multiple output lines ('\n' characters).
281 self.lines = [] # [ ['in1', 'out1'], ['in2', 'out'], ...
283 # Any expected_status still there from a previous verification
284 self.prev_status = None
285 if prev_status:
286 self.prev_status = [None, prev_status] # svntest.wc.State
288 # Any expected_disk still there from a previous verification
289 self.prev_disk = None
290 if prev_disk:
291 self.prev_disk = [None, prev_disk] # svntest.wc.State
293 # Those command line options that expect an argument following
294 # which is not a path. (don't expand args following these)
295 self.keep_args_of = ['--depth', '--encoding', '-r',
296 '--changelist', '-m', '--message']
298 # A stack of $PWDs, to be able to chdir back after a chdir.
299 self.prevdirs = []
301 # The python variables we want to be declared at the beginning.
302 # These are path variables like "A_D = os.path.join(wc_dir, 'A', 'D')".
303 # The original wc_dir and url vars are not kept here.
304 self.vars = {}
306 # An optimized list kept up-to-date by variable additions
307 self.sorted_vars_by_pathlen = []
309 # Wether we ever used the variables 'wc_dir' and 'url' (tiny tweak)
310 self.used_wc_dir = False
311 self.used_url = False
313 # The alternate working copy directories created that need to be
314 # registered with sbox (are not inside another working copy).
315 self.other_wc_dirs = {}
318 def make(self, commands):
319 "internal main function, delegates everything except final output."
321 # keep a spacer for init
322 self.add_line(None, None)
324 init = ""
325 if not self.sbox.is_built():
326 self.sbox.build()
327 init += "sbox.build()\n"
330 try:
331 # split input args
332 input_lines = commands.replace(';','\n').splitlines()
333 for str in input_lines:
334 if len(str.strip()) > 0:
335 self.add_line(str)
337 for i in range(len(self.lines)):
338 if self.lines[i][0] is not None:
339 # This is where everything happens:
340 self.lines[i][1] = self.switch(self.lines[i][0])
342 # We're done. Add a final greeting.
343 self.add_line(
344 None,
345 "Remember, this only saves you typing. Doublecheck everything.")
347 # -- Insert variable defs in the first line --
348 # main wc_dir and url
349 if self.used_wc_dir:
350 init += 'wc_dir = sbox.wc_dir\n'
351 if self.used_url:
352 init += 'url = sbox.repo_url\n'
354 # registration of new WC dirs
355 sorted_names = self.get_sorted_other_wc_dir_names()
356 for name in sorted_names:
357 init += name + ' = ' + self.other_wc_dirs[name][0] + '\n'
359 if len(init) > 0:
360 init += '\n'
362 # general variable definitions
363 sorted_names = self.get_sorted_var_names()
364 for name in sorted_names:
365 init += name + ' = ' + self.vars[name][0] + '\n'
367 # Insert at the first line, being the spacer from above
368 if len(init) > 0:
369 self.lines[0][1] = init
371 # This usually goes to make() below (outside this class)
372 return self.lines
373 except:
374 for line in self.lines:
375 if line[1] is not None:
376 print(line[1])
377 raise
380 def print_script(self, stream=sys.stdout):
381 "Output the resulting script of the preceding make() call"
382 if self.lines is not None:
383 for line in self.lines:
384 if line[1] is None:
385 # fall back to just that line as it was in the source
386 stripped = line[0].strip()
387 if not stripped.startswith('#'):
388 # for comments, don't say this:
389 stream.write(" # don't know how to handle:\n")
390 stream.write(" " + line[0].strip() + '\n')
391 else:
392 if line[0] is not None:
393 stream.write( wrap_each_line(line[0].strip(),
394 " # ", " # ", True) + '\n')
395 stream.write(wrap_each_line(line[1], " ", " ", False) + '\n\n')
396 else:
397 stream.write(" # empty.\n")
398 stream.flush()
401 # End of public functions.
405 # "Shell" command handlers:
407 def switch(self, line):
408 "Given one input line, delegates to the appropriate sub-functions."
409 args = shlex.split(line)
410 if len(args) < 1:
411 return ""
412 first = args[0]
414 # This is just an if-cascade. Feel free to change that.
416 if first == 'svn':
417 second = args[1]
419 if second == 'add':
420 return self.cmd_svn(args[1:], False, self.keep_args_of)
422 if second in ['changelist', 'cl']:
423 keep_count = 2
424 if '--remove' in args:
425 keep_count = 1
426 return self.cmd_svn(args[1:], False, self.keep_args_of, keep_count)
428 if second in ['status','stat','st']:
429 return self.cmd_svn_status(args[2:])
431 if second in ['commit','ci']:
432 return self.cmd_svn_commit(args[2:])
434 if second in ['update','up']:
435 return self.cmd_svn_update(args[2:])
437 if second in ['copy', 'cp',
438 'move', 'mv', 'rename', 'ren']:
439 return self.cmd_svn_copy_move(args[1:])
441 if second in ['checkout', 'co']:
442 return self.cmd_svn_checkout(args[2:])
444 if second in ['propset','pset','ps']:
445 return self.cmd_svn(args[1:], False,
446 self.keep_args_of, 3)
448 if second in ['delete','del','remove', 'rm']:
449 return self.cmd_svn(args[1:], False,
450 self.keep_args_of + ['--with-revprop'])
452 # NOTE that not all commands need to be listed here, since
453 # some are already adequately handled by self.cmd_svn().
454 # If you find yours is not, add another self.cmd_svn_xxx().
455 return self.cmd_svn(args[1:], False, self.keep_args_of)
457 if first == 'echo':
458 return self.cmd_echo(args[1:])
460 if first == 'mkdir':
461 return self.cmd_mkdir(args[1:])
463 if first == 'rm':
464 return self.cmd_rm(args[1:])
466 if first == 'mv':
467 return self.cmd_mv(args[1:])
469 if first == 'cp':
470 return self.cmd_cp(args[1:])
472 # if all fails, take the line verbatim
473 return None
476 def cmd_svn_standard_run(self, pyargs, runargs, do_chdir, wc):
477 "The generic invocation of svn, helper function."
478 pychdir = self.chdir(do_chdir, wc)
480 code, out, err = main.run_svn("Maybe", *runargs)
482 if code == 0 and len(err) < 1:
483 # write a test that expects success
484 pylist = self.strlist2py(out)
485 if len(out) <= 1:
486 py = "expected_stdout = " + pylist + "\n\n"
487 else:
488 py = "expected_stdout = verify.UnorderedOutput(" + pylist + ")\n\n"
489 py += pychdir
490 py += "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
491 else:
492 # write a test that expects failure
493 pylist = self.strlist2py(err)
494 if len(err) <= 1:
495 py = "expected_stderr = " + pylist + "\n\n"
496 else:
497 py = "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
498 py += pychdir
499 py += ("actions.run_and_verify_svn2('OUTPUT', " +
500 "[], expected_stderr, " + str(code))
502 if len(pyargs) > 0:
503 py += ", " + ", ".join(pyargs)
504 py += ")\n"
505 py += self.chdir_back(do_chdir)
506 return py
509 def cmd_svn(self, svnargs, append_wc_dir_if_missing = False,
510 keep_args_of = [], keep_first_count = 1,
511 drop_with_arg = []):
512 "Handles all svn calls not handled by more specific functions."
514 pyargs, runargs, do_chdir, targets = self.args2svntest(svnargs,
515 append_wc_dir_if_missing, keep_args_of,
516 keep_first_count, drop_with_arg)
518 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
519 self.get_first_wc(targets))
522 def cmd_svn_status(self, status_args):
523 "Runs svn status, looks what happened and writes the script for it."
524 pyargs, runargs, do_chdir, targets = self.args2svntest(
525 status_args, True, self.keep_args_of, 0)
527 py = ""
529 for target in targets:
530 if not target.wc:
531 py += '# SKIPPING NON-WC ' + target.runarg + '\n'
532 continue
534 if '-q' in status_args:
535 pystatus = self.get_current_status(target.wc, True)
536 py += (pystatus +
537 "actions.run_and_verify_status(" + target.wc.py +
538 ", expected_status)\n")
539 else:
540 pystatus = self.get_current_status(target.wc, False)
541 py += (pystatus +
542 "actions.run_and_verify_unquiet_status(" + target.wc.py +
543 ", expected_status)\n")
544 return py
547 def cmd_svn_commit(self, commit_args):
548 "Runs svn commit, looks what happened and writes the script for it."
549 # these are the options that are followed by something that should not
550 # be parsed as a filename in the WC.
551 commit_arg_opts = [
552 "--depth",
553 "--with-revprop",
554 "--changelist",
555 # "-F", "--file", these take a file argument, don't list here.
556 # "-m", "--message", treated separately
559 pyargs, runargs, do_chdir, targets = self.args2svntest(
560 commit_args, True, commit_arg_opts, 0, ['-m', '--message'])
562 wc = self.get_first_wc(targets)
563 pychdir = self.chdir(do_chdir, wc)
565 code, output, err = main.run_svn("Maybe", 'ci',
566 '-m', 'log msg',
567 *runargs)
569 if code == 0 and len(err) < 1:
570 # write a test that expects success
572 output = actions.process_output_for_commit(output)
573 actual_out = tree.build_tree_from_commit(output)
574 py = ("expected_output = " +
575 self.tree2py(actual_out, wc) + "\n\n")
577 pystatus = self.get_current_status(wc)
578 py += pystatus
580 py += pychdir
581 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
582 "expected_output, expected_status, " +
583 "None")
584 else:
585 # write a test that expects error
586 py = "expected_error = " + self.strlist2py(err) + "\n\n"
587 py += pychdir
588 py += ("actions.run_and_verify_commit(" + wc.py + ", " +
589 "None, None, expected_error")
591 if len(pyargs) > 0:
592 py += ', ' + ', '.join(pyargs)
593 py += ")"
594 py += self.chdir_back(do_chdir)
595 return py
598 def cmd_svn_update(self, update_args):
599 "Runs svnn update, looks what happened and writes the script for it."
601 pyargs, runargs, do_chdir, targets = self.args2svntest(
602 update_args, True, self.keep_args_of, 0)
604 wc = self.get_first_wc(targets)
605 pychdir = self.chdir(do_chdir, wc)
607 code, output, err = main.run_svn('Maybe', 'up', *runargs)
609 if code == 0 and len(err) < 1:
610 # write a test that expects success
612 actual_out = svntest.wc.State.from_checkout(output).old_tree()
613 py = ("expected_output = " +
614 self.tree2py(actual_out, wc) + "\n\n")
616 pydisk = self.get_current_disk(wc)
617 py += pydisk
619 pystatus = self.get_current_status(wc)
620 py += pystatus
622 py += pychdir
623 py += ("actions.run_and_verify_update(" + wc.py + ", " +
624 "expected_output, expected_disk, expected_status, " +
625 "None, None, None, None, None, False")
626 else:
627 # write a test that expects error
628 py = "expected_error = " + self.strlist2py(err) + "\n\n"
629 py += pychdir
630 py += ("actions.run_and_verify_update(" + wc.py + ", None, None, " +
631 "None, expected_error, None, None, None, None, False")
633 if len(pyargs) > 0:
634 py += ', ' + ', '.join(pyargs)
635 py += ")"
636 py += self.chdir_back(do_chdir)
637 return py
640 def cmd_svn_checkout(self, checkout_args):
641 "Runs svn checkout, looks what happened and writes the script for it."
643 pyargs, runargs, do_chdir, targets = self.args2svntest(
644 checkout_args, True, self.keep_args_of, 0)
646 # Sort out the targets. We need one URL and one dir, in that order.
647 if len(targets) < 2:
648 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
649 "checkout. If you want to supply less, remove this " +
650 "check and implement whatever seems appropriate.")
651 # We need this separate for the call to run_and_verify_checkout()
652 # that's composed in the output script.
653 wc_arg = targets[1]
654 del pyargs[wc_arg.argnr]
655 del runargs[wc_arg.argnr]
656 url_arg = targets[0]
657 del pyargs[url_arg.argnr]
658 del runargs[url_arg.argnr]
660 wc = wc_arg.wc
662 pychdir = self.chdir(do_chdir, wc)
664 if '--force' in runargs:
665 self.really_safe_rmtree(wc_arg.runarg)
667 code, output, err = main.run_svn('Maybe', 'co',
668 url_arg.runarg, wc_arg.runarg,
669 *runargs)
671 py = ""
673 if code == 0 and len(err) < 1:
674 # write a test that expects success
676 actual_out = tree.build_tree_from_checkout(output)
677 pyout = ("expected_output = " +
678 self.tree2py(actual_out, wc) + "\n\n")
679 py += pyout
681 pydisk = self.get_current_disk(wc)
682 py += pydisk
684 py += pychdir
686 py += ("actions.run_and_verify_checkout(" +
687 url_arg.pyarg + ", " + wc_arg.pyarg +
688 ", expected_output, expected_disk, None, None, None, None")
689 else:
690 # write a test that expects failure
691 pylist = self.strlist2py(err)
692 if len(err) <= 1:
693 py += "expected_stderr = " + pylist + "\n\n"
694 else:
695 py += "expected_stderr = verify.UnorderedOutput(" + pylist + ")\n\n"
696 py += pychdir
697 py += ("actions.run_and_verify_svn2('OUTPUT', " +
698 "[], expected_stderr, " + str(code) +
699 ", " + url_arg.pyarg + ", " + wc_arg.pyarg)
701 # Append the remaining args
702 if len(pyargs) > 0:
703 py += ', ' + ', '.join(pyargs)
704 py += ")"
705 py += self.chdir_back(do_chdir)
706 return py
709 def cmd_svn_copy_move(self, args):
710 "Runs svn copy or move, looks what happened and writes the script for it."
712 pyargs, runargs, do_chdir, targets = self.args2svntest(args,
713 False, self.keep_args_of, 1)
715 if len(targets) == 2 and targets[1].is_url:
716 # The second argument is a URL.
717 # This needs a log message. Is one supplied?
718 has_message = False
719 for arg in runargs:
720 if arg.startswith('-m') or arg == '--message':
721 has_message = True
722 break
723 if not has_message:
724 # add one
725 runargs += [ '-m', 'copy log' ]
726 pyargs = []
727 for arg in runargs:
728 pyargs += [ self.str2svntest(arg) ]
730 return self.cmd_svn_standard_run(pyargs, runargs, do_chdir,
731 self.get_first_wc(targets))
734 def cmd_echo(self, echo_args):
735 "Writes a string to a file and writes the script for it."
736 # split off target
737 target_arg = None
738 replace = True
739 contents = None
740 for i in range(len(echo_args)):
741 arg = echo_args[i]
742 if arg.startswith('>'):
743 if len(arg) > 1:
744 if arg[1] == '>':
745 # it's a '>>'
746 replace = False
747 arg = arg[2:]
748 else:
749 arg = arg[1:]
750 if len(arg) > 0:
751 target_arg = arg
753 if target_arg is None:
754 # we need an index (i+1) to exist, and
755 # we need (i+1) to be the only existing index left in the list.
756 if i+1 != len(echo_args)-1:
757 raise Failure("don't understand: echo " + " ".join(echo_args))
758 target_arg = echo_args[i+1]
759 else:
760 # already got the target. no more indexes should exist.
761 if i != len(echo_args)-1:
762 raise Failure("don't understand: echo " + " ".join(echo_args))
764 contents = " ".join(echo_args[:i])
766 if target_arg is None:
767 raise Failure("echo needs a '>' pipe to a file name: echo " +
768 " ".join(echo_args))
770 target = self.path2svntest(target_arg)
772 if replace:
773 main.file_write(target.runarg, contents)
774 py = "main.file_write("
775 else:
776 main.file_append(target.runarg, contents)
777 py = "main.file_append("
778 py += target.pyarg + ", " + self.str2svntest(contents) + ")"
780 return py
783 def cmd_mkdir(self, mkdir_args):
784 "Makes a new directory and writes the script for it."
785 # treat all mkdirs as -p, ignore all -options.
786 out = ""
787 for arg in mkdir_args:
788 if not arg.startswith('-'):
789 target = self.path2svntest(arg)
790 # don't check for not being a url,
791 # maybe it's desired by the test or something.
792 os.makedirs(target.runarg)
793 out += "os.makedirs(" + target.pyarg + ")\n"
794 return out
797 def cmd_rm(self, rm_args):
798 "Removes a directory tree and writes the script for it."
799 # treat all removes as -rf, ignore all -options.
800 out = ""
801 for arg in rm_args:
802 if not arg.startswith('-'):
803 target = self.path2svntest(arg)
804 self.really_safe_rmtree(target.runarg)
805 out += "main.safe_rmtree(" + target.pyarg + ")\n"
806 return out
809 def cmd_mv(self, mv_args):
810 "Moves things in the filesystem and writes the script for it."
811 # ignore all -options.
812 out = ""
813 sources = []
814 target = None
815 for arg in mv_args:
816 if not arg.startswith('-'):
817 if target is not None:
818 sources += [target]
819 target = self.path2svntest(arg)
821 out = ""
822 for source in sources:
823 out += "shutil.move(" + source.pyarg + ", " + target.pyarg + ")\n"
824 shutil.move(source.runarg, target.runarg)
826 return out
829 def cmd_cp(self, mv_args):
830 "Copies in the filesystem and writes the script for it."
831 # ignore all -options.
832 out = ""
833 sources = []
834 target = None
835 for arg in mv_args:
836 if not arg.startswith('-'):
837 if target is not None:
838 sources += [target]
839 target = self.path2svntest(arg)
841 if not target:
842 raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
844 out = ""
845 for source in sources:
846 if os.path.exists(target.runarg):
847 raise Failure("cp target exists, remove first: " + target.pyarg)
848 if os.path.isdir(source.runarg):
849 shutil.copytree(source.runarg, target.runarg)
850 out += "shutil.copytree(" + source.pyarg + ", " + target.pyarg + ")\n"
851 elif os.path.isfile(source.runarg):
852 shutil.copy2(source.runarg, target.runarg)
853 out += "shutil.copy2(" + source.pyarg + ", " + target.pyarg + ")\n"
854 else:
855 raise Failure("cp copy source does not exist: " + source.pyarg)
857 return out
860 # End of "shell" command handling functions.
864 # Internal helpers:
867 class WorkingCopy:
868 "Defines the list of info we need around a working copy."
869 def __init__(self, py, realpath, suffix):
870 self.py = py
871 self.realpath = realpath
872 self.suffix = suffix
875 class Target:
876 "Defines the list of info we need around a command line supplied target."
877 def __init__(self, pyarg, runarg, argnr, is_url=False, wc=None):
878 self.pyarg = pyarg
879 self.runarg = runarg
880 self.argnr = argnr
881 self.is_url = is_url
882 self.wc = wc
885 def add_line(self, args, translation=None):
886 "Definition of how to add a new in/out line pair to LINES."
887 self.lines += [ [args, translation] ]
890 def really_safe_rmtree(self, dir):
891 # Safety catch. We don't want to remove outside the sandbox.
892 if dir.find('svn-test-work') < 0:
893 raise Failure("Tried to remove path outside working area: " + dir)
894 main.safe_rmtree(dir)
897 def get_current_disk(self, wc):
898 "Probes the given working copy and writes an expected_disk for it."
899 actual_disk = svntest.wc.State.from_wc(wc.realpath, False, True)
900 actual_disk.wc_dir = wc.realpath
902 make_py, prev_disk = self.get_prev_disk(wc)
904 # The tests currently compare SVNTreeNode trees, so let's do that too.
905 actual_disk_tree = actual_disk.old_tree()
906 prev_disk_tree = prev_disk.old_tree()
908 # find out the tweaks
909 tweaks = self.diff_trees(prev_disk_tree, actual_disk_tree, wc)
910 if tweaks == 'Purge':
911 make_py = ''
912 else:
913 tweaks = self.optimize_tweaks(tweaks, actual_disk_tree, wc)
915 self.remember_disk(wc, actual_disk)
917 pydisk = make_py + self.tweaks2py(tweaks, "expected_disk", wc)
918 if len(pydisk) > 0:
919 pydisk += '\n'
920 return pydisk
922 def get_prev_disk(self, wc):
923 "Retrieves the last used expected_disk tree if any."
924 make_py = ""
925 # If a disk was supplied via __init__(), self.prev_disk[0] is set
926 # to None, in which case we always use it, not checking WC.
927 if self.prev_disk is None or \
928 not self.prev_disk[0] in [None, wc.realpath]:
929 disk = svntest.main.greek_state.copy()
930 disk.wc_dir = wc.realpath
931 self.remember_disk(wc, disk)
932 make_py = "expected_disk = svntest.main.greek_state.copy()\n"
933 else:
934 disk = self.prev_disk[1]
935 return make_py, disk
937 def remember_disk(self, wc, actual):
938 "Remembers the current disk tree for future reference."
939 self.prev_disk = [wc.realpath, actual]
942 def get_current_status(self, wc, quiet=True):
943 "Probes the given working copy and writes an expected_status for it."
944 if quiet:
945 code, output, err = main.run_svn(None, 'status', '-v', '-u', '-q',
946 wc.realpath)
947 else:
948 code, output, err = main.run_svn(None, 'status', '-v', '-u',
949 wc.realpath)
950 if code != 0 or len(err) > 0:
951 raise Failure("Hmm. `svn status' failed. What now.")
953 make_py, prev_status = self.get_prev_status(wc)
955 actual_status = svntest.wc.State.from_status(output)
957 # The tests currently compare SVNTreeNode trees, so let's do that too.
958 prev_status_tree = prev_status.old_tree()
959 actual_status_tree = actual_status.old_tree()
961 # Get the tweaks
962 tweaks = self.diff_trees(prev_status_tree, actual_status_tree, wc)
964 if tweaks == 'Purge':
965 # The tree is empty (happens with invalid WC dirs)
966 make_py = "expected_status = wc.State(" + wc.py + ", {})\n"
967 tweaks = []
968 else:
969 tweaks = self.optimize_tweaks(tweaks, actual_status_tree, wc)
971 self.remember_status(wc, actual_status)
973 pystatus = make_py + self.tweaks2py(tweaks, "expected_status", wc)
974 if len(pystatus) > 0:
975 pystatus += '\n'
977 return pystatus
979 def get_prev_status(self, wc):
980 "Retrieves the last used expected_status tree if any."
981 make_py = ""
982 prev_status = None
984 # re-use any previous status if we are still in the same WC dir.
985 # If a status was supplied via __init__(), self.prev_status[0] is set
986 # to None, in which case we always use it, not checking WC.
987 if self.prev_status is None or \
988 not self.prev_status[0] in [None, wc.realpath]:
989 # There is no or no matching previous status. Make new one.
990 try:
991 # If it's really a WC, use its base revision
992 base_rev = actions.get_wc_base_rev(wc.realpath)
993 except:
994 # Else, just use zero. Whatever.
995 base_rev = 0
996 prev_status = actions.get_virginal_state(wc.realpath, base_rev)
997 make_py += ("expected_status = actions.get_virginal_state(" +
998 wc.py + ", " + str(base_rev) + ")\n")
999 else:
1000 # We will re-use the previous expected_status.
1001 prev_status = self.prev_status[1]
1002 # no need to make_py anything
1004 return make_py, prev_status
1006 def remember_status(self, wc, actual_status):
1007 "Remembers the current status tree for future reference."
1008 self.prev_status = [wc.realpath, actual_status]
1011 def chdir(self, do_chdir, wc):
1012 "Pushes the current dir onto the dir stack, does an os.chdir()."
1013 if not do_chdir:
1014 return ""
1015 self.prevdirs.append(os.getcwd())
1016 os.chdir(wc.realpath)
1017 py = ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
1018 "os.chdir(" + wc.py + ")\n")
1019 return py
1021 def chdir_back(self, do_chdir):
1022 "Does os.chdir() back to the directory popped from the dir stack's top."
1023 if not do_chdir:
1024 return ""
1025 # If this fails, there's a missing chdir() call:
1026 os.chdir(self.prevdirs.pop())
1027 return "os.chdir(orig_dir)\n"
1030 def get_sorted_vars_by_pathlen(self):
1031 """Compose a listing of variable names to be expanded in script output.
1032 This is intended to be stored in self.sorted_vars_by_pathlen."""
1033 list = []
1035 for dict in [self.vars, self.other_wc_dirs]:
1036 for name in dict:
1037 runpath = dict[name][1]
1038 strlen = len(runpath)
1039 item = [strlen, name, runpath]
1040 bisect.insort(list, item)
1042 return list
1045 def get_sorted_var_names(self):
1046 """Compose a listing of variable names to be declared.
1047 This is used by TestFactory.make()."""
1048 paths = []
1049 urls = []
1050 for name in self.vars:
1051 if name.startswith('url_'):
1052 bisect.insort(urls, [name.lower(), name])
1053 else:
1054 bisect.insort(paths, [name.lower(), name])
1055 list = []
1056 for path in paths:
1057 list += [path[1]]
1058 for url in urls:
1059 list += [url[1]]
1060 return list
1063 def get_sorted_other_wc_dir_names(self):
1064 """Compose a listing of working copies to be declared with sbox.
1065 This is used by TestFactory.make()."""
1066 list = []
1067 for name in self.other_wc_dirs:
1068 bisect.insort(list, [name.lower(), name])
1069 names = []
1070 for item in list:
1071 names += [item[1]]
1072 return names
1075 def str2svntest(self, str):
1076 "Like str2py(), but replaces any known paths with variable names."
1077 if str is None:
1078 return "None"
1080 str = str2py(str)
1081 quote = str[0]
1083 def replace(str, path, name, quote):
1084 return str.replace(path, quote + " + " + name + " + " + quote)
1086 # We want longer paths first.
1087 for var in reversed(self.sorted_vars_by_pathlen):
1088 name = var[1]
1089 path = var[2]
1090 str = replace(str, path, name, quote)
1092 str = replace(str, self.sbox.wc_dir, 'wc_dir', quote)
1093 str = replace(str, self.sbox.repo_url, 'url', quote)
1095 # now remove trailing null-str adds:
1096 # '' + url_A_C + ''
1097 str = str.replace("'' + ",'').replace(" + ''",'')
1098 # "" + url_A_C + ""
1099 str = str.replace('"" + ',"").replace(' + ""',"")
1101 # just a stupid check. tiny tweak. (don't declare wc_dir and url
1102 # if they never appear)
1103 if not self.used_wc_dir:
1104 self.used_wc_dir = (re.search('\bwc_dir\b', str) is not None)
1105 if not self.used_url:
1106 self.used_url = str.find('url') >= 0
1108 return str
1111 def strlist2py(self, list):
1112 "Given a list of strings, composes a py script that produces the same."
1113 if list is None:
1114 return "None"
1115 if len(list) < 1:
1116 return "[]"
1117 if len(list) == 1:
1118 return "[" + self.str2svntest(list[0]) + "]"
1120 py = "[\n"
1121 for line in list:
1122 py += " " + self.str2svntest(line) + ",\n"
1123 py += "]"
1124 return py
1127 def get_node_path(self, node, wc):
1128 "Tries to return the node path relative to the given working copy."
1129 path = node.get_printable_path()
1130 if path.startswith(wc.realpath + os.sep):
1131 path = path[len(wc.realpath + os.sep):]
1132 elif path.startswith(wc.realpath):
1133 path = path[len(wc.realpath):]
1134 return path
1137 def node2py(self, node, wc, prepend="", drop_empties=True):
1138 "Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
1139 buf = StringIO()
1140 node.print_script(buf, wc.realpath, prepend, drop_empties)
1141 return buf.getvalue()
1144 def tree2py(self, node, wc):
1145 "Writes the wc.State definition for the given SVNTreeNode in given WC."
1146 # svntest.wc.State(wc_dir, {
1147 # 'A/mu' : Item(verb='Sending'),
1148 # 'A/D/G/rho' : Item(verb='Sending'),
1149 # })
1150 buf = StringIO()
1151 tree.dump_tree_script(node, stream=buf, subtree=wc.realpath,
1152 wc_varname=wc.py)
1153 return buf.getvalue()
1156 def diff_trees(self, left, right, wc):
1157 """Compares the two trees given by the SVNTreeNode instances LEFT and
1158 RIGHT in the given working copy and composes an internal list of
1159 tweaks necessary to make LEFT into RIGHT."""
1160 if not right.children:
1161 return 'Purge'
1162 return self._diff_trees(left, right, wc)
1164 def _diff_trees(self, left, right, wc):
1165 "Used by self.diff_trees(). No need to call this. See there."
1166 # all tweaks collected
1167 tweaks = []
1169 # the current tweak in composition
1170 path = self.get_node_path(left, wc)
1171 tweak = []
1173 # node attributes
1174 if ((left.contents is None) != (right.contents is None)) or \
1175 (left.contents != right.contents):
1176 tweak += [ ["contents", right.contents] ]
1178 for key in left.props:
1179 if key not in right.props:
1180 tweak += [ [key, None] ]
1181 elif left.props[key] != right.props[key]:
1182 tweak += [ [key, right.props[key]] ]
1184 for key in right.props:
1185 if key not in left.props:
1186 tweak += [ [key, right.props[key]] ]
1188 for key in left.atts:
1189 if key not in right.atts:
1190 tweak += [ [key, None] ]
1191 elif left.atts[key] != right.atts[key]:
1192 tweak += [ [key, right.atts[key]] ]
1194 for key in right.atts:
1195 if key not in left.atts:
1196 tweak += [ [key, right.atts[key]] ]
1198 if len(tweak) > 0:
1199 changetweak = [ 'Change', [path], tweak]
1200 tweaks += [changetweak]
1202 if left.children is not None:
1203 for leftchild in left.children:
1204 rightchild = None
1205 if right.children is not None:
1206 rightchild = tree.get_child(right, leftchild.name)
1207 if rightchild is None:
1208 paths = leftchild.recurse(lambda n: self.get_node_path(n, wc))
1209 removetweak = [ 'Remove', paths ]
1210 tweaks += [removetweak]
1212 if right.children is not None:
1213 for rightchild in right.children:
1214 leftchild = None
1215 if left.children is not None:
1216 leftchild = tree.get_child(left, rightchild.name)
1217 if leftchild is None:
1218 paths_and_nodes = rightchild.recurse(
1219 lambda n: [ self.get_node_path(n, wc), n ] )
1220 addtweak = [ 'Add', paths_and_nodes ]
1221 tweaks += [addtweak]
1222 else:
1223 tweaks += self._diff_trees(leftchild, rightchild, wc)
1225 return tweaks
1228 def optimize_tweaks(self, tweaks, actual_tree, wc):
1229 "Given an internal list of tweaks, make them optimal by common sense."
1230 if tweaks == 'Purge':
1231 return tweaks
1233 subtree = actual_tree.find_node(wc.realpath)
1234 if not subtree:
1235 subtree = actual_tree
1237 remove_paths = []
1238 additions = []
1239 changes = []
1241 for tweak in tweaks:
1242 if tweak[0] == 'Remove':
1243 remove_paths += tweak[1]
1244 elif tweak[0] == 'Add':
1245 additions += tweak[1]
1246 else:
1247 changes += [tweak]
1249 # combine removals
1250 removal = []
1251 if len(remove_paths) > 0:
1252 removal = [ [ 'Remove', remove_paths] ]
1254 # combine additions
1255 addition = []
1256 if len(additions) > 0:
1257 addition = [ [ 'Add', additions ] ]
1259 # find those changes that should be done on all nodes at once.
1260 def remove_mod(mod):
1261 for change in changes:
1262 if mod in change[2]:
1263 change[2].remove(mod)
1265 seen = []
1266 tweak_all = []
1267 for change in changes:
1268 tweak = change[2]
1269 for mod in tweak:
1270 if mod in seen:
1271 continue
1272 seen += [mod]
1274 # here we see each single "name=value" tweak in mod.
1275 # Check if the actual tree had this anyway all the way through.
1276 name = mod[0]
1277 val = mod[1]
1278 def check_node(node):
1279 if (
1280 (name == 'contents' and node.contents == val)
1282 (node.props and (name in node.props) and node.props[name] == val)
1284 (node.atts and (name in node.atts) and node.atts[name] == val)):
1285 # has this same thing set. count on the left.
1286 return [node, None]
1287 return [None, node]
1288 results = subtree.recurse(check_node)
1289 have = []
1290 havent = []
1291 for result in results:
1292 if result[0]:
1293 have += [result[0]]
1294 else:
1295 havent += [result[1]]
1297 if havent == []:
1298 # ok, then, remove all tweaks that are like this, then
1299 # add a generic tweak.
1300 remove_mod(mod)
1301 tweak_all += [mod]
1302 elif len(havent) < len(have) * 3: # this is "an empirical factor"
1303 remove_mod(mod)
1304 tweak_all += [mod]
1305 # record the *other* nodes' actual item, overwritten above
1306 for node in havent:
1307 name = mod[0]
1308 if name == 'contents':
1309 value = node.contents
1310 elif name in node.props:
1311 value = node.props[name]
1312 elif name in node.atts:
1313 value = node.atts[name]
1314 else:
1315 continue
1316 changes += [ ['Change',
1317 [self.get_node_path(node, wc)],
1318 [[name, value]]
1322 # combine those paths that have exactly the same changes
1323 i = 0
1324 j = 0
1325 while i < len(changes):
1326 # find other changes that are identical
1327 j = i + 1
1328 while j < len(changes):
1329 if changes[i][2] == changes[j][2]:
1330 changes[i][1] += changes[j][1]
1331 del changes[j]
1332 else:
1333 j += 1
1334 i += 1
1336 # combine those changes that have exactly the same paths
1337 i = 0
1338 j = 0
1339 while i < len(changes):
1340 # find other paths that are identical
1341 j = i + 1
1342 while j < len(changes):
1343 if changes[i][1] == changes[j][1]:
1344 changes[i][2] += changes[j][2]
1345 del changes[j]
1346 else:
1347 j += 1
1348 i += 1
1350 if tweak_all != []:
1351 changes = [ ['Change', [], tweak_all ] ] + changes
1353 return removal + addition + changes
1356 def tweaks2py(self, tweaks, var_name, wc):
1357 "Given an internal list of tweaks, write the tweak script for it."
1358 py = ""
1359 if tweaks is None:
1360 return ""
1362 if tweaks == 'Purge':
1363 return var_name + " = wc.State(" + wc.py + ", {})\n"
1365 for tweak in tweaks:
1366 if tweak[0] == 'Remove':
1367 py += var_name + ".remove("
1368 paths = tweak[1]
1369 py += self.str2svntest(paths[0])
1370 for path in paths[1:]:
1371 py += ", " + self.str2svntest(path)
1372 py += ")\n"
1374 elif tweak[0] == 'Add':
1375 # add({'A/D/H/zeta' : Item(status=' ', wc_rev=9), ...})
1376 py += var_name + ".add({"
1377 adds = tweak[1]
1378 for add in adds:
1379 path = add[0]
1380 node = add[1]
1381 py += self.node2py(node, wc, "\n ", False)
1382 py += "\n})\n"
1384 else:
1385 paths = tweak[1]
1386 mods = tweak[2]
1387 if mods != []:
1388 py += var_name + ".tweak("
1389 for path in paths:
1390 py += self.str2svntest(path) + ", "
1391 def mod2py(mod):
1392 return mod[0] + "=" + self.str2svntest(mod[1])
1393 py += mod2py(mods[0])
1394 for mod in mods[1:]:
1395 py += ", " + mod2py(mod)
1396 py += ")\n"
1397 return py
1400 def path2svntest(self, path, argnr=None):
1401 """Given an input argument, do one hell of a path expansion on it.
1402 ARGNR is simply inserted into the resulting Target.
1403 Returns a self.Target instance.
1405 wc = self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1406 url = self.sbox.repo_url # do we need multiple URLs too??
1408 pathsep = '/'
1409 if path.find('/') < 0 and path.find('\\') >= 0:
1410 pathsep = '\\'
1412 is_url = False
1414 # If you add to these, make sure you add longer ones first, to
1415 # avoid e.g. '$WC_DIR' matching '$WC' first.
1416 wc_dir_wildcards = ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
1417 url_wildcards = ['url', '$URL']
1419 first = path.split(pathsep, 1)[0]
1420 if first in wc_dir_wildcards:
1421 path = path[len(first):]
1422 elif first in url_wildcards:
1423 path = path[len(first):]
1424 is_url = True
1425 else:
1426 for url_scheme in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
1427 if path.startswith(url_scheme):
1428 is_url = True
1429 # keep it as it is
1430 pyarg = self.str2svntest(path)
1431 runarg = path
1432 return self.Target(pyarg, runarg, argnr, is_url, None)
1434 for wc_dir_wildcard in wc_dir_wildcards:
1435 if first.startswith(wc_dir_wildcard):
1436 # The first path element starts with "wc_dir" (or similar),
1437 # but it has more attached to it. Like "wc_dir.2" or "wc_dir_other"
1438 # Record a new wc dir name.
1440 # try to figure out a nice suffix to pass to sbox.
1441 # (it will create a new dir called sbox.wc_dir + '.' + suffix)
1442 suffix = ''
1443 if first[len(wc_dir_wildcard)] in ['.','-','_']:
1444 # it's a separator already, don't duplicate the dot. (warm&fuzzy)
1445 suffix = first[len(wc_dir_wildcard) + 1:]
1446 if len(suffix) < 1:
1447 suffix = first[len(wc_dir_wildcard):]
1449 if len(suffix) < 1:
1450 raise Failure("no suffix supplied to other-wc_dir arg")
1452 # Streamline the var name
1453 suffix = suffix.replace('.','_').replace('-','_')
1454 other_wc_dir_varname = 'wc_dir_' + suffix
1456 path = path[len(first):]
1458 real_path = self.get_other_wc_real_path(other_wc_dir_varname,
1459 suffix,
1460 do_remove_on_new_wc_path)
1462 wc = self.WorkingCopy(other_wc_dir_varname,
1463 real_path, suffix)
1464 # found a match, no need to loop further, but still process
1465 # the path further.
1466 break
1468 if len(path) < 1 or path == pathsep:
1469 if is_url:
1470 self.used_url = True
1471 pyarg = 'url'
1472 runarg = url
1473 wc = None
1474 else:
1475 if wc.suffix is None:
1476 self.used_wc_dir = True
1477 pyarg = wc.py
1478 runarg = wc.realpath
1479 else:
1480 pathelements = split_remove_empty(path, pathsep)
1482 # make a new variable, if necessary
1483 if is_url:
1484 pyarg, runarg = self.ensure_url_var(pathelements)
1485 wc = None
1486 else:
1487 pyarg, runarg = self.ensure_path_var(wc, pathelements)
1489 return self.Target(pyarg, runarg, argnr, is_url, wc)
1492 def get_other_wc_real_path(self, varname, suffix, do_remove):
1493 "Create or retrieve the path of an alternate working copy."
1494 if varname in self.other_wc_dirs:
1495 return self.other_wc_dirs[varname][1]
1497 # else, we must still create one.
1498 path = self.sbox.add_wc_path(suffix, do_remove)
1499 py = "sbox.add_wc_path(" + str2py(suffix)
1500 if not do_remove:
1501 py += ", remove=False"
1502 py += ')'
1503 value = [py, path]
1504 self.other_wc_dirs[varname] = [py, path]
1505 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1506 return path
1509 def define_var(self, name, value):
1510 "Add a variable definition, don't allow redefinitions."
1511 # see if we already have this var
1512 if name in self.vars:
1513 if self.vars[name] != value:
1514 raise Failure("Variable name collision. Hm, fix factory.py?")
1515 # ok, it's recorded correctly. Nothing needs to happen.
1516 return
1518 # a new variable needs to be recorded
1519 self.vars[name] = value
1520 # update the sorted list of vars for substitution by str2svntest()
1521 self.sorted_vars_by_pathlen = self.get_sorted_vars_by_pathlen()
1524 def ensure_path_var(self, wc, pathelements):
1525 "Given a path in a working copy, make sure we have a variable for it."
1526 name = "_".join(pathelements)
1528 if wc.suffix is not None:
1529 # This is an "other" working copy (not the default).
1530 # The suffix of the wc_dir variable serves as the prefix:
1531 # wc_dir_other ==> other_A_D = os.path.join(wc_dir_other, 'A', 'D')
1532 name = wc.suffix + "_" + name
1533 if name[0].isdigit():
1534 name = "_" + name
1535 else:
1536 self.used_wc_dir = True
1538 py = 'os.path.join(' + wc.py
1539 if len(pathelements) > 0:
1540 py += ", '" + "', '".join(pathelements) + "'"
1541 py += ')'
1543 wc_dir_real_path = wc.realpath
1544 run = os.path.join(wc_dir_real_path, *pathelements)
1546 value = [py, run]
1547 self.define_var(name, value)
1549 return name, run
1552 def ensure_url_var(self, pathelements):
1553 "Given a path in the test repository, ensure we have a url var for it."
1554 name = "url_" + "_".join(pathelements)
1556 joined = "/" + "/".join(pathelements)
1558 py = 'url'
1559 if len(pathelements) > 0:
1560 py += " + " + str2py(joined)
1561 self.used_url = True
1563 run = self.sbox.repo_url + joined
1565 value = [py, run]
1566 self.define_var(name, value)
1568 return name, run
1571 def get_first_wc(self, target_list):
1572 """In a list of Target instances, find the first one that is in a
1573 working copy and return that WorkingCopy. Default to sbox.wc_dir.
1574 This is useful if we need a working copy for a '^/' URL."""
1575 for target in target_list:
1576 if target.wc:
1577 return target.wc
1578 return self.WorkingCopy('wc_dir', self.sbox.wc_dir, None)
1581 def args2svntest(self, args, append_wc_dir_if_missing = False,
1582 keep_args_of = [], keep_first_count = 1,
1583 drop_with_arg = []):
1584 """Tries to be extremely intelligent at parsing command line arguments.
1585 It needs to know which args are file targets that should be in a
1586 working copy. File targets are magically expanded.
1588 args: list of string tokens as passed to factory.make(), e.g.
1589 ['svn', 'commit', '--force', 'wc_dir2']
1591 append_wc_dir_if_missing: It's a switch.
1593 keep_args_of: See TestFactory.keep_args_of (comment in __init__)
1595 keep_first_count: Don't expand the first N non-option args. This is used
1596 to preserve e.g. the token 'update' in '[svn] update wc_dir'
1597 (the 'svn' is usually split off before this function is called).
1599 drop_with_arg: list of string tokens that are commandline options with
1600 following argument which we want to drop from the list of args
1601 (e.g. -m message).
1604 wc_dir = self.sbox.wc_dir
1605 url = self.sbox.repo_url
1607 target_supplied = False
1608 pyargs = []
1609 runargs = []
1610 do_chdir = False
1611 targets = []
1612 wc_dirs = []
1614 i = 0
1615 while i < len(args):
1616 arg = args[i]
1618 if arg in drop_with_arg:
1619 # skip this and the next arg
1620 if not arg.startswith('--') and len(arg) > 2:
1621 # it is a concatenated arg like -r123 instead of -r 123
1622 # skip only this one. Do nothing.
1623 i = i
1624 else:
1625 # skip this and the next arg
1626 i += 1
1628 elif arg.startswith('-'):
1629 # keep this option arg verbatim.
1630 pyargs += [ self.str2svntest(arg) ]
1631 runargs += [ arg ]
1632 # does this option expect a non-filename argument?
1633 # take that verbatim as well.
1634 if arg in keep_args_of:
1635 i += 1
1636 if i < len(args):
1637 arg = args[i]
1638 pyargs += [ self.str2svntest(arg) ]
1639 runargs += [ arg ]
1641 elif keep_first_count > 0:
1642 # args still to be taken verbatim.
1643 pyargs += [ self.str2svntest(arg) ]
1644 runargs += [ arg ]
1645 keep_first_count -= 1
1647 elif arg.startswith('^/'):
1648 # this is a ^/url, keep it verbatim.
1649 # if we use "^/", we need to chdir(wc_dir).
1650 do_chdir = True
1651 pyarg = str2py(arg)
1652 targets += [ self.Target(pyarg, arg, len(pyargs), True, None) ]
1653 pyargs += [ pyarg ]
1654 runargs += [ arg ]
1656 else:
1657 # well, then this must be a filename or url, autoexpand it.
1658 target = self.path2svntest(arg, argnr=len(pyargs))
1659 pyargs += [ target.pyarg ]
1660 runargs += [ target.runarg ]
1661 target_supplied = True
1662 targets += [ target ]
1664 i += 1
1666 if not target_supplied and append_wc_dir_if_missing:
1667 # add a simple wc_dir target
1668 self.used_wc_dir = True
1669 wc = self.WorkingCopy('wc_dir', wc_dir, None)
1670 targets += [ self.Target('wc_dir', wc_dir, len(pyargs), False, wc) ]
1671 pyargs += [ 'wc_dir' ]
1672 runargs += [ wc_dir ]
1674 return pyargs, runargs, do_chdir, targets
1676 ###### END of the TestFactory class ######
1680 # Quotes-preserving text wrapping for output
1682 def find_quote_end(text, i):
1683 "In string TEXT, find the end of the qoute that starts at TEXT[i]"
1684 # don't handle """ quotes
1685 quote = text[i]
1686 i += 1
1687 while i < len(text):
1688 if text[i] == '\\':
1689 i += 1
1690 elif text[i] == quote:
1691 return i
1692 i += 1
1693 return len(text) - 1
1696 class MyWrapper(textwrap.TextWrapper):
1697 "A textwrap.TextWrapper that doesn't break a line within quotes."
1698 ### TODO regexes would be nice, maybe?
1699 def _split(self, text):
1700 parts = []
1701 i = 0
1702 start = 0
1703 # This loop will break before and after each space, but keep
1704 # quoted strings in one piece. Example, breaks marked '/':
1705 # /(one,/ /two(blagger),/ /'three three three',)/
1706 while i < len(text):
1707 if text[i] in ['"', "'"]:
1708 # handle """ quotes. (why, actually?)
1709 if text[i:i+3] == '"""':
1710 end = text[i+3:].find('"""')
1711 if end >= 0:
1712 i += end + 2
1713 else:
1714 i = len(text) - 1
1715 else:
1716 # handle normal quotes
1717 i = find_quote_end(text, i)
1718 elif text[i].isspace():
1719 # split off previous section, if any
1720 if start < i:
1721 parts += [text[start:i]]
1722 start = i
1723 # split off this space
1724 parts += [text[i]]
1725 start = i + 1
1727 i += 1
1729 if start < len(text):
1730 parts += [text[start:]]
1731 return parts
1734 def wrap_each_line(str, ii, si, blw):
1735 """Wrap lines to a defined width (<80 chars). Feed the lines single to
1736 MyWrapper, so that it preserves the current line endings already in there.
1737 We only want to insert new wraps, not remove existing newlines."""
1738 wrapper = MyWrapper(77, initial_indent=ii,
1739 subsequent_indent=si)
1741 lines = str.splitlines()
1742 for i in range(0,len(lines)):
1743 if lines[i] != '':
1744 lines[i] = wrapper.fill(lines[i])
1745 return '\n'.join(lines)
1749 # Other miscellaneous helpers
1751 def sh2str(string):
1752 "un-escapes away /x sequences"
1753 if string is None:
1754 return None
1755 return string.decode("string-escape")
1758 def get_quote_style(str):
1759 """find which quote is the outer one, ' or "."""
1760 quote_char = None
1761 at = None
1763 found = str.find("'")
1764 found2 = str.find('"')
1766 # If found == found2, both must be -1, so nothing was found.
1767 if found != found2:
1768 # If a quote was found
1769 if found >= 0 and found2 >= 0:
1770 # If both were found, invalidate the later one
1771 if found < found2:
1772 found2 = -1
1773 else:
1774 found = -1
1775 # See which one remains.
1776 if found >= 0:
1777 at = found + 1
1778 quote_char = "'"
1779 elif found2 >= 0:
1780 at = found2 + 1
1781 quote_char = '"'
1783 return quote_char, at
1785 def split_remove_empty(str, sep):
1786 "do a split, then remove empty elements."
1787 list = str.split(sep)
1788 return filter(lambda item: item and len(item) > 0, list)
1790 def str2py(str):
1791 "returns the string enclosed in quotes, suitable for py scripts."
1792 if str is None:
1793 return "None"
1795 # try to make a nice choice of quoting character
1796 if str.find("'") >= 0:
1797 return '"' + str.encode("string-escape"
1798 ).replace("\\'", "'"
1799 ).replace('"', '\\"') + '"'
1800 else:
1801 return "'" + str.encode("string-escape") + "'"
1803 return str
1806 ### End of file.