2 # factory.py: Automatically generate a (near-)complete new cmdline test
3 # from a series of shell commands.
5 # Subversion is a tool for revision control.
6 # See http://subversion.tigris.org for more information.
8 # ====================================================================
9 # Licensed to the Apache Software Foundation (ASF) under one
10 # or more contributor license agreements. See the NOTICE file
11 # distributed with this work for additional information
12 # regarding copyright ownership. The ASF licenses this file
13 # to you under the Apache License, Version 2.0 (the
14 # "License"); you may not use this file except in compliance
15 # with the License. You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing,
20 # software distributed under the License is distributed on an
21 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 # KIND, either express or implied. See the License for the
23 # specific language governing permissions and limitations
25 ######################################################################
30 # (1) Edit the py test script you want to enhance (for example
31 # cmdline/basic_tests.py), add a new test header as usual.
32 # Insert a call to factory.make() into the empty test:
34 # def my_new_test(sbox):
35 # "my new test modifies iota"
36 # svntest.factory.make(sbox, """
37 # echo "foo" > A/D/foo
43 # (2) Add the test to the tests list at the bottom of the py file.
50 # (3) Run the test, paste the output back into your new test,
51 # replacing the factory call.
53 # $ ./foo_tests.py my_new_test
55 # $ ./foo_tests.py my_new_test > new_test.snippet
57 # $ ./foo_tests.py my_new_test >> basic_tests.py
58 # Then edit (e.g.) basic_tests.py to put the script in the right place.
60 # Ensure that the py script (e.g. basic_tests.py) has these imports,
61 # so that the composed script that you pasted back finds everything
64 # from svntest import main, wc, actions, verify
66 # Be aware that you have to paste the result back to the .py file.
68 # Be more aware that you have to read every single line and understand
69 # that it makes sense. If the current behaviour is wrong, you need to
70 # make the changes to expect the correct behaviour and XFail() your test.
72 # factory.make() just probes the current situation and writes a test that
73 # PASSES any success AND ANY FAILURE THAT IT FINDS. The resulting script
74 # will never fail anything (if it works correctly), not even a failure.
76 # ### TODO: some sort of intelligent pasting directly into the
77 # right place, like looking for the factory call,
78 # inserting the new test there, back-up-ing the old file.
82 # If the result has a problem somewhere along the middle, you can,
83 # of course, only use the first part of the output result, maybe tweak
84 # something, and continue with another factory.make() at the end of that.
86 # Or you can first do stuff to your sbox and then call factory on it.
87 # Factory will notice if the sbox has already been built and calls
88 # sbox.build() only if you didn't already.
90 # You can also have any number of factory.make() calls scattered
91 # around "real" test code.
93 # Note that you can pass a prev_status and prev_disk to factory, to make
94 # the expected_* trees re-use a pre-existing one in the test, entirely
95 # for code beauty :P (has to match the wc_dir you will be using next).
98 # YOU ARE CORDIALLY INVITED to add/tweak/change to your needs.
99 # If you want to know what's going on, look at the switch()
100 # funtion of TestFactory below.
106 # The input to factory.make(sbox, input) is not "real" shell-script.
107 # Factory goes at great lengths to try and understand your script, it
108 # parses common shell operations during tests and translates them.
110 # All arguments are tokenized similarly to shell, so if you need a space
111 # in an argument, use quotes.
112 # echo "my content" > A/new_file
113 # Quote char escaping is done like this:
114 # echo "my \\" content" > A/new_file
115 # echo 'my \\' content' > A/new_file
116 # If you use r""" echo 'my \' content' > A/new_file """ (triple quotes
117 # with a leading 'r' character), you don't need to double-escape any
120 # You can either supply multiple lines, or separate the lines with ';'.
121 # factory.make(sbox, 'echo foo > bar; svn add bar')
122 # factory.make(sbox, 'echo foo > bar\n svn add bar')
123 # factory.make(sbox, r"""
124 # echo "foo\nbar" > bar
130 # - Factory will automatically build sbox.wc_dir if you didn't do so yet.
132 # - If you supply any path or file name, factory will prepend sbox.wc_dir
135 # --> main.file_append(
136 # os.path.join(sbox.wc_dir, 'iota'),
138 # You can also do so explicitly.
139 # echo more >> wc_dir/iota
140 # --> main.file_append(
141 # os.path.join(sbox.wc_dir, 'iota'),
144 # Factory implies the sbox.wc_dir if you fail to supply an explicit
145 # working copy dir. If you want to supply one explicitly, you can
146 # choose among these wildcards:
147 # 'wc_dir', 'wcdir', '$WC_DIR', '$WC' -- all expanded to sbox.wc_dir
149 # 'svn mkdir wc_dir/A/D/X'
150 # But as long as you want to use only the default sbox.wc_dir, you usually
151 # don't need to supply any wc_dir-wildcard:
152 # 'mkdir A/X' creates the directory sbox.wc_dir/A/X
153 # (Factory tries to know which arguments of the commands you supplied
154 # are eligible to be path arguments. If something goes wrong here, try
155 # to fix factory.py to not mistake the arg for something different.
156 # You usually just need to tweak some parameters to args2svntest() to
157 # achieve correct expansion.)
159 # - If you want to use a second (or Nth) working copy, just supply any
160 # working copy wildcard with any made-up suffix, e.g. like this:
161 # 'svn st wc_dir_2' or 'svn info $WC_2'
162 # Factory will detect that you used another wc_dir and will automatically
163 # add a corresponding directory to your sbox. The directory will initially
164 # be nonexistent, so call 'mkdir', 'svn co' or 'cp' before using:
165 # 'cp wc_dir wc_dir_other' -- a copy of the current WC
166 # 'svn co $URL wc_dir_new' -- a clean checkout
167 # 'mkdir wc_dir_empty' -- an empty directory
168 # You can subsequently use any wc-dir wildcard with your suffix added.
171 # echo more >> wc_dir_2/iota
172 # --> wc_dir_2 = sbox.add_wc_path('2')
173 # shutil.copytrees(wc_dir, wc_dir_2)
175 # os.path.join(wc_dir_2, 'iota'),
180 # Factory currently knows only one repository, thus only one repos root.
181 # The wildcards you can use for it are:
183 # A URL is not inserted automatically like wc_dir, you need to supply a
185 # Alternatively, you can use '^/' URLs. However, that is in effect a different
186 # test from an explicit entire URL. The test needs to chdir to the working
187 # copy in order find which URL '^/' should expand to.
188 # (currently, factory will chdir to sbox.wc_dir. It will only chdir
189 # to another working copy if one of the other arguments involved a WC.
190 # ### TODO add a 'cd wc_dir_2' command to select another WC as default.)
192 # 'svn co $URL Y' -- make a new nested working copy in sbox.wc_dir/Y
193 # 'svn co $URL wc_dir_2' -- create a new separate working copy
194 # 'svn cp ^/A ^/X' -- do a URL copy, creating $URL/X (branch)
198 # These commands should work:
200 # - "svn <subcommand> <options>"
201 # Some subcommands are parsed specially, others by a catch-all default
202 # parser (cmd_svn()), see switch().
203 # 'svn commit', 'svn commit --force', 'svn ci wc_dir_2'
204 # 'svn copy url/A url/X'
206 # - "echo contents > file" (replace)
207 # "echo contents >> file" (append)
208 # Calls main.file_write() / main.file_append().
209 # 'echo "froogle" >> A/D/G/rho' -- append to an existing file
210 # 'echo "bar" > A/quux' -- create a new file
211 # 'echo "fool" > wc_dir_2/me' -- manipulate other working copies
213 # - "mkdir <names> ..."
214 # Calls os.makedirs().
215 # You probably want 'svn mkdir' instead, or use 'svn add' after this.
216 # 'mkdir A/D/X' -- create an unversioned directory
217 # 'mkdir wc_dir_5' -- create a new, empty working copy
220 # Calls main.safe_rmtree().
221 # You probably want to use 'svn delete' instead.
225 # - "mv <source> [<source2> ...] <target>"
226 # Calls shutil.move()
227 # You probably want to use 'svn move' instead.
228 # 'mv iota A/D/' -- move sbox.wc_dir/iota to sbox.wc_dir/A/D/.
230 # - "cp <source> [<source2> ...] <target>"
231 # Do a filesystem copy.
232 # You probably want to use 'svn copy' instead.
233 # 'cp wc_dir wc_dir_copy'
236 # IF YOU NEED ANY OTHER COMMANDS:
237 # - first check if it doesn't work already. If not,
238 # - add your desired commands to factory.py! :)
239 # - alternatively, use a number of separate factory calls, doing what
240 # you need done in "real" svntest language in-between.
242 # IF YOU REALLY DON'T GROK THIS:
247 import sys
, re
, os
, shutil
, bisect
, textwrap
, shlex
250 from svntest
import main
, actions
, tree
251 from svntest
import Failure
253 if sys
.version_info
[0] >= 3:
255 from io
import StringIO
258 from cStringIO
import StringIO
260 def make(wc_dir
, commands
, prev_status
=None, prev_disk
=None, verbose
=True):
261 """The Factory Invocation Function. This is typically the only one
262 called from outside this file. See top comment in factory.py.
263 Prints the resulting py script to stdout when verbose is True and
264 returns the resulting line-list containing items as:
265 [ ['pseudo-shell input line #1', ' translation\n to\n py #1'], ...]"""
266 fac
= TestFactory(wc_dir
, prev_status
, prev_disk
)
274 """This class keeps all state around a factory.make() call."""
276 def __init__(self
, sbox
, prev_status
=None, prev_disk
=None):
279 # The input lines and their translations.
280 # Each translation usually has multiple output lines ('\n' characters).
281 self
.lines
= [] # [ ['in1', 'out1'], ['in2', 'out'], ...
283 # Any expected_status still there from a previous verification
284 self
.prev_status
= None
286 self
.prev_status
= [None, prev_status
] # svntest.wc.State
288 # Any expected_disk still there from a previous verification
289 self
.prev_disk
= None
291 reparented_prev_disk
= svntest
.wc
.State(prev_disk
.wc_dir
, {});
292 reparented_prev_disk
.add_state(sbox
.wc_dir
, prev_disk
);
293 self
.prev_disk
= [None, reparented_prev_disk
]
295 # Those command line options that expect an argument following
296 # which is not a path. (don't expand args following these)
297 self
.keep_args_of
= ['--depth', '--encoding', '-r',
298 '--changelist', '-m', '--message']
300 # A stack of $PWDs, to be able to chdir back after a chdir.
303 # The python variables we want to be declared at the beginning.
304 # These are path variables like "A_D = os.path.join(wc_dir, 'A', 'D')".
305 # The original wc_dir and url vars are not kept here.
308 # An optimized list kept up-to-date by variable additions
309 self
.sorted_vars_by_pathlen
= []
311 # Wether we ever used the variables 'wc_dir' and 'url' (tiny tweak)
312 self
.used_wc_dir
= False
313 self
.used_url
= False
315 # The alternate working copy directories created that need to be
316 # registered with sbox (are not inside another working copy).
317 self
.other_wc_dirs
= {}
320 def make(self
, commands
):
321 "internal main function, delegates everything except final output."
323 # keep a spacer for init
324 self
.add_line(None, None)
327 if not self
.sbox
.is_built():
329 init
+= "sbox.build()\n"
334 input_lines
= commands
.replace(';','\n').splitlines()
335 for str in input_lines
:
336 if len(str.strip()) > 0:
339 for i
in range(len(self
.lines
)):
340 if self
.lines
[i
][0] is not None:
341 # This is where everything happens:
342 self
.lines
[i
][1] = self
.switch(self
.lines
[i
][0])
344 # We're done. Add a final greeting.
347 "Remember, this only saves you typing. Doublecheck everything.")
349 # -- Insert variable defs in the first line --
350 # main wc_dir and url
352 init
+= 'wc_dir = sbox.wc_dir\n'
354 init
+= 'url = sbox.repo_url\n'
356 # registration of new WC dirs
357 sorted_names
= self
.get_sorted_other_wc_dir_names()
358 for name
in sorted_names
:
359 init
+= name
+ ' = ' + self
.other_wc_dirs
[name
][0] + '\n'
364 # general variable definitions
365 sorted_names
= self
.get_sorted_var_names()
366 for name
in sorted_names
:
367 init
+= name
+ ' = ' + self
.vars[name
][0] + '\n'
369 # Insert at the first line, being the spacer from above
371 self
.lines
[0][1] = init
373 # This usually goes to make() below (outside this class)
376 for line
in self
.lines
:
377 if line
[1] is not None:
382 def print_script(self
, stream
=sys
.stdout
):
383 "Output the resulting script of the preceding make() call"
384 if self
.lines
is not None:
385 for line
in self
.lines
:
387 # fall back to just that line as it was in the source
388 stripped
= line
[0].strip()
389 if not stripped
.startswith('#'):
390 # for comments, don't say this:
391 stream
.write(" # don't know how to handle:\n")
392 stream
.write(" " + line
[0].strip() + '\n')
394 if line
[0] is not None:
395 stream
.write( wrap_each_line(line
[0].strip(),
396 " # ", " # ", True) + '\n')
397 stream
.write(wrap_each_line(line
[1], " ", " ", False) + '\n\n')
399 stream
.write(" # empty.\n")
403 # End of public functions.
407 # "Shell" command handlers:
409 def switch(self
, line
):
410 "Given one input line, delegates to the appropriate sub-functions."
411 args
= shlex
.split(line
)
416 # This is just an if-cascade. Feel free to change that.
422 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
)
424 if second
in ['changelist', 'cl']:
426 if '--remove' in args
:
428 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
, keep_count
)
430 if second
in ['status','stat','st']:
431 return self
.cmd_svn_status(args
[2:])
433 if second
in ['commit','ci']:
434 return self
.cmd_svn_commit(args
[2:])
436 if second
in ['update','up']:
437 return self
.cmd_svn_update(args
[2:])
439 if second
in ['switch','sw']:
440 return self
.cmd_svn_switch(args
[2:])
442 if second
in ['copy', 'cp',
443 'move', 'mv', 'rename', 'ren']:
444 return self
.cmd_svn_copy_move(args
[1:])
446 if second
in ['checkout', 'co']:
447 return self
.cmd_svn_checkout(args
[2:])
449 if second
in ['propset','pset','ps']:
450 return self
.cmd_svn(args
[1:], False,
451 self
.keep_args_of
, 3)
453 if second
in ['delete','del','remove', 'rm']:
454 return self
.cmd_svn(args
[1:], False,
455 self
.keep_args_of
+ ['--with-revprop'])
457 # NOTE that not all commands need to be listed here, since
458 # some are already adequately handled by self.cmd_svn().
459 # If you find yours is not, add another self.cmd_svn_xxx().
460 return self
.cmd_svn(args
[1:], False, self
.keep_args_of
)
463 return self
.cmd_echo(args
[1:])
466 return self
.cmd_mkdir(args
[1:])
469 return self
.cmd_rm(args
[1:])
472 return self
.cmd_mv(args
[1:])
475 return self
.cmd_cp(args
[1:])
477 # if all fails, take the line verbatim
481 def cmd_svn_standard_run(self
, pyargs
, runargs
, do_chdir
, wc
):
482 "The generic invocation of svn, helper function."
483 pychdir
= self
.chdir(do_chdir
, wc
)
485 code
, out
, err
= main
.run_svn("Maybe", *runargs
)
487 if code
== 0 and len(err
) < 1:
488 # write a test that expects success
489 pylist
= self
.strlist2py(out
)
491 py
= "expected_stdout = " + pylist
+ "\n\n"
493 py
= "expected_stdout = verify.UnorderedOutput(" + pylist
+ ")\n\n"
495 py
+= "actions.run_and_verify_svn2('OUTPUT', expected_stdout, [], 0"
497 # write a test that expects failure
498 pylist
= self
.strlist2py(err
)
500 py
= "expected_stderr = " + pylist
+ "\n\n"
502 py
= "expected_stderr = verify.UnorderedOutput(" + pylist
+ ")\n\n"
504 py
+= ("actions.run_and_verify_svn2('OUTPUT', " +
505 "[], expected_stderr, " + str(code
))
508 py
+= ", " + ", ".join(pyargs
)
510 py
+= self
.chdir_back(do_chdir
)
514 def cmd_svn(self
, svnargs
, append_wc_dir_if_missing
= False,
515 keep_args_of
= [], keep_first_count
= 1,
517 "Handles all svn calls not handled by more specific functions."
519 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(svnargs
,
520 append_wc_dir_if_missing
, keep_args_of
,
521 keep_first_count
, drop_with_arg
)
523 return self
.cmd_svn_standard_run(pyargs
, runargs
, do_chdir
,
524 self
.get_first_wc(targets
))
527 def cmd_svn_status(self
, status_args
):
528 "Runs svn status, looks what happened and writes the script for it."
529 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
530 status_args
, True, self
.keep_args_of
, 0)
534 for target
in targets
:
536 py
+= '# SKIPPING NON-WC ' + target
.runarg
+ '\n'
539 if '-q' in status_args
:
540 pystatus
= self
.get_current_status(target
.wc
, True)
542 "actions.run_and_verify_status(" + target
.wc
.py
+
543 ", expected_status)\n")
545 pystatus
= self
.get_current_status(target
.wc
, False)
547 "actions.run_and_verify_unquiet_status(" + target
.wc
.py
+
548 ", expected_status)\n")
552 def cmd_svn_commit(self
, commit_args
):
553 "Runs svn commit, looks what happened and writes the script for it."
554 # these are the options that are followed by something that should not
555 # be parsed as a filename in the WC.
560 # "-F", "--file", these take a file argument, don't list here.
561 # "-m", "--message", treated separately
564 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
565 commit_args
, True, commit_arg_opts
, 0, ['-m', '--message'])
567 wc
= self
.get_first_wc(targets
)
568 pychdir
= self
.chdir(do_chdir
, wc
)
570 code
, output
, err
= main
.run_svn("Maybe", 'ci',
574 if code
== 0 and len(err
) < 1:
575 # write a test that expects success
577 output
= actions
.process_output_for_commit(output
)
578 actual_out
= tree
.build_tree_from_commit(output
)
579 py
= ("expected_output = " +
580 self
.tree2py(actual_out
, wc
) + "\n\n")
582 pystatus
= self
.get_current_status(wc
)
586 py
+= ("actions.run_and_verify_commit(" + wc
.py
+ ", " +
587 "expected_output, expected_status, " +
590 # write a test that expects error
591 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
593 py
+= ("actions.run_and_verify_commit(" + wc
.py
+ ", " +
594 "None, None, expected_error")
597 py
+= ', ' + ', '.join(pyargs
)
599 py
+= self
.chdir_back(do_chdir
)
603 def cmd_svn_update(self
, update_args
):
604 "Runs svn update, looks what happened and writes the script for it."
606 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
607 update_args
, True, self
.keep_args_of
, 0)
609 wc
= self
.get_first_wc(targets
)
610 pychdir
= self
.chdir(do_chdir
, wc
)
612 code
, output
, err
= main
.run_svn('Maybe', 'up', *runargs
)
614 if code
== 0 and len(err
) < 1:
615 # write a test that expects success
617 actual_out
= svntest
.wc
.State
.from_checkout(output
).old_tree()
618 py
= ("expected_output = " +
619 self
.tree2py(actual_out
, wc
) + "\n\n")
621 pydisk
= self
.get_current_disk(wc
)
624 pystatus
= self
.get_current_status(wc
)
628 py
+= ("actions.run_and_verify_update(" + wc
.py
+ ", " +
629 "expected_output, expected_disk, expected_status, " +
630 "None, None, None, None, None, False")
632 # write a test that expects error
633 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
635 py
+= ("actions.run_and_verify_update(" + wc
.py
+ ", None, None, " +
636 "None, expected_error, None, None, None, None, False")
639 py
+= ', ' + ', '.join(pyargs
)
641 py
+= self
.chdir_back(do_chdir
)
645 def cmd_svn_switch(self
, switch_args
):
646 "Runs svn switch, looks what happened and writes the script for it."
648 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
649 switch_args
, True, self
.keep_args_of
, 0)
651 # Sort out the targets. We need one URL and one wc node, in that order.
653 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
654 "switch. If you want to supply less, remove this " +
655 "check and implement whatever seems appropriate.")
658 del pyargs
[wc_arg
.argnr
]
659 del runargs
[wc_arg
.argnr
]
661 del pyargs
[url_arg
.argnr
]
662 del runargs
[url_arg
.argnr
]
666 raise Failure("Unexpected argument ordering to factory's 'svn switch'?")
668 pychdir
= self
.chdir(do_chdir
, wc
)
670 #if '--force' in runargs:
671 # self.really_safe_rmtree(wc_arg.runarg)
673 code
, output
, err
= main
.run_svn('Maybe', 'sw',
674 url_arg
.runarg
, wc_arg
.runarg
,
679 if code
== 0 and len(err
) < 1:
680 # write a test that expects success
682 actual_out
= tree
.build_tree_from_checkout(output
)
683 py
= ("expected_output = " +
684 self
.tree2py(actual_out
, wc
) + "\n\n")
686 pydisk
= self
.get_current_disk(wc
)
689 pystatus
= self
.get_current_status(wc
)
693 py
+= ("actions.run_and_verify_switch(" + wc
.py
+ ", " +
694 wc_arg
.pyarg
+ ", " + url_arg
.pyarg
+ ", " +
695 "expected_output, expected_disk, expected_status, " +
696 "None, None, None, None, None, False")
698 # write a test that expects error
699 py
= "expected_error = " + self
.strlist2py(err
) + "\n\n"
701 py
+= ("actions.run_and_verify_switch(" + wc
.py
+ ", " +
702 wc_arg
.pyarg
+ ", " + url_arg
.pyarg
+ ", " +
703 "None, None, None, expected_error, None, None, None, None, False")
706 py
+= ', ' + ', '.join(pyargs
)
708 py
+= self
.chdir_back(do_chdir
)
713 def cmd_svn_checkout(self
, checkout_args
):
714 "Runs svn checkout, looks what happened and writes the script for it."
716 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(
717 checkout_args
, True, self
.keep_args_of
, 0)
719 # Sort out the targets. We need one URL and one dir, in that order.
721 raise Failure("Sorry, I'm currently enforcing two targets for svn " +
722 "checkout. If you want to supply less, remove this " +
723 "check and implement whatever seems appropriate.")
724 # We need this separate for the call to run_and_verify_checkout()
725 # that's composed in the output script.
727 del pyargs
[wc_arg
.argnr
]
728 del runargs
[wc_arg
.argnr
]
730 del pyargs
[url_arg
.argnr
]
731 del runargs
[url_arg
.argnr
]
735 pychdir
= self
.chdir(do_chdir
, wc
)
737 #if '--force' in runargs:
738 # self.really_safe_rmtree(wc_arg.runarg)
740 code
, output
, err
= main
.run_svn('Maybe', 'co',
741 url_arg
.runarg
, wc_arg
.runarg
,
746 if code
== 0 and len(err
) < 1:
747 # write a test that expects success
749 actual_out
= tree
.build_tree_from_checkout(output
)
750 pyout
= ("expected_output = " +
751 self
.tree2py(actual_out
, wc
) + "\n\n")
754 pydisk
= self
.get_current_disk(wc
)
759 py
+= ("actions.run_and_verify_checkout(" +
760 url_arg
.pyarg
+ ", " + wc_arg
.pyarg
+
761 ", expected_output, expected_disk, None, None, None, None")
763 # write a test that expects failure
764 pylist
= self
.strlist2py(err
)
766 py
+= "expected_stderr = " + pylist
+ "\n\n"
768 py
+= "expected_stderr = verify.UnorderedOutput(" + pylist
+ ")\n\n"
770 py
+= ("actions.run_and_verify_svn2('OUTPUT', " +
771 "[], expected_stderr, " + str(code
) +
772 ", " + url_arg
.pyarg
+ ", " + wc_arg
.pyarg
)
774 # Append the remaining args
776 py
+= ', ' + ', '.join(pyargs
)
778 py
+= self
.chdir_back(do_chdir
)
782 def cmd_svn_copy_move(self
, args
):
783 "Runs svn copy or move, looks what happened and writes the script for it."
785 pyargs
, runargs
, do_chdir
, targets
= self
.args2svntest(args
,
786 False, self
.keep_args_of
, 1)
788 if len(targets
) == 2 and targets
[1].is_url
:
789 # The second argument is a URL.
790 # This needs a log message. Is one supplied?
793 if arg
.startswith('-m') or arg
== '--message':
798 runargs
+= [ '-m', 'copy log' ]
801 pyargs
+= [ self
.str2svntest(arg
) ]
803 return self
.cmd_svn_standard_run(pyargs
, runargs
, do_chdir
,
804 self
.get_first_wc(targets
))
807 def cmd_echo(self
, echo_args
):
808 "Writes a string to a file and writes the script for it."
813 for i
in range(len(echo_args
)):
815 if arg
.startswith('>'):
826 if target_arg
is None:
827 # we need an index (i+1) to exist, and
828 # we need (i+1) to be the only existing index left in the list.
829 if i
+1 != len(echo_args
)-1:
830 raise Failure("don't understand: echo " + " ".join(echo_args
))
831 target_arg
= echo_args
[i
+1]
833 # already got the target. no more indexes should exist.
834 if i
!= len(echo_args
)-1:
835 raise Failure("don't understand: echo " + " ".join(echo_args
))
837 contents
= " ".join(echo_args
[:i
]) + '\n'
839 if target_arg
is None:
840 raise Failure("echo needs a '>' pipe to a file name: echo " +
843 target
= self
.path2svntest(target_arg
)
846 main
.file_write(target
.runarg
, contents
)
847 py
= "main.file_write("
849 main
.file_append(target
.runarg
, contents
)
850 py
= "main.file_append("
851 py
+= target
.pyarg
+ ", " + self
.str2svntest(contents
) + ")"
856 def cmd_mkdir(self
, mkdir_args
):
857 "Makes a new directory and writes the script for it."
858 # treat all mkdirs as -p, ignore all -options.
860 for arg
in mkdir_args
:
861 if not arg
.startswith('-'):
862 target
= self
.path2svntest(arg
)
863 # don't check for not being a url,
864 # maybe it's desired by the test or something.
865 os
.makedirs(target
.runarg
)
866 out
+= "os.makedirs(" + target
.pyarg
+ ")\n"
870 def cmd_rm(self
, rm_args
):
871 "Removes a directory tree and writes the script for it."
872 # treat all removes as -rf, ignore all -options.
875 if not arg
.startswith('-'):
876 target
= self
.path2svntest(arg
)
877 if os
.path
.isfile(target
.runarg
):
878 os
.remove(target
.runarg
)
879 out
+= "os.remove(" + target
.pyarg
+ ")\n"
881 self
.really_safe_rmtree(target
.runarg
)
882 out
+= "main.safe_rmtree(" + target
.pyarg
+ ")\n"
886 def cmd_mv(self
, mv_args
):
887 "Moves things in the filesystem and writes the script for it."
888 # ignore all -options.
893 if not arg
.startswith('-'):
894 if target
is not None:
896 target
= self
.path2svntest(arg
)
899 for source
in sources
:
900 out
+= "shutil.move(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
901 shutil
.move(source
.runarg
, target
.runarg
)
906 def cmd_cp(self
, mv_args
):
907 "Copies in the filesystem and writes the script for it."
908 # ignore all -options.
913 if not arg
.startswith('-'):
914 if target
is not None:
916 target
= self
.path2svntest(arg
)
919 raise Failure("cp needs a source and a target 'cp wc_dir wc_dir_2'")
922 for source
in sources
:
923 if os
.path
.exists(target
.runarg
):
924 raise Failure("cp target exists, remove first: " + target
.pyarg
)
925 if os
.path
.isdir(source
.runarg
):
926 shutil
.copytree(source
.runarg
, target
.runarg
)
927 out
+= "shutil.copytree(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
928 elif os
.path
.isfile(source
.runarg
):
929 shutil
.copy2(source
.runarg
, target
.runarg
)
930 out
+= "shutil.copy2(" + source
.pyarg
+ ", " + target
.pyarg
+ ")\n"
932 raise Failure("cp copy source does not exist: " + source
.pyarg
)
937 # End of "shell" command handling functions.
945 "Defines the list of info we need around a working copy."
946 def __init__(self
, py
, realpath
, suffix
):
948 self
.realpath
= realpath
953 "Defines the list of info we need around a command line supplied target."
954 def __init__(self
, pyarg
, runarg
, argnr
, is_url
=False, wc
=None):
962 def add_line(self
, args
, translation
=None):
963 "Definition of how to add a new in/out line pair to LINES."
964 self
.lines
+= [ [args
, translation
] ]
967 def really_safe_rmtree(self
, dir):
968 # Safety catch. We don't want to remove outside the sandbox.
969 if dir.find('svn-test-work') < 0:
970 raise Failure("Tried to remove path outside working area: " + dir)
971 main
.safe_rmtree(dir)
974 def get_current_disk(self
, wc
):
975 "Probes the given working copy and writes an expected_disk for it."
976 actual_disk
= svntest
.wc
.State
.from_wc(wc
.realpath
, False, True)
977 actual_disk
.wc_dir
= wc
.realpath
979 make_py
, prev_disk
= self
.get_prev_disk(wc
)
981 # The tests currently compare SVNTreeNode trees, so let's do that too.
982 actual_disk_tree
= actual_disk
.old_tree()
983 prev_disk_tree
= prev_disk
.old_tree()
985 # find out the tweaks
986 tweaks
= self
.diff_trees(prev_disk_tree
, actual_disk_tree
, wc
)
987 if tweaks
== 'Purge':
990 tweaks
= self
.optimize_tweaks(tweaks
, actual_disk_tree
, wc
)
992 self
.remember_disk(wc
, actual_disk
)
994 pydisk
= make_py
+ self
.tweaks2py(tweaks
, "expected_disk", wc
)
999 def get_prev_disk(self
, wc
):
1000 "Retrieves the last used expected_disk tree if any."
1002 # If a disk was supplied via __init__(), self.prev_disk[0] is set
1003 # to None, in which case we always use it, not checking WC.
1004 if self
.prev_disk
is None or \
1005 not self
.prev_disk
[0] in [None, wc
.realpath
]:
1006 disk
= svntest
.main
.greek_state
.copy()
1007 disk
.wc_dir
= wc
.realpath
1008 self
.remember_disk(wc
, disk
)
1009 make_py
= "expected_disk = svntest.main.greek_state.copy()\n"
1011 disk
= self
.prev_disk
[1]
1012 return make_py
, disk
1014 def remember_disk(self
, wc
, actual
):
1015 "Remembers the current disk tree for future reference."
1016 self
.prev_disk
= [wc
.realpath
, actual
]
1019 def get_current_status(self
, wc
, quiet
=True):
1020 "Probes the given working copy and writes an expected_status for it."
1022 code
, output
, err
= main
.run_svn(None, 'status', '-v', '-u', '-q',
1025 code
, output
, err
= main
.run_svn(None, 'status', '-v', '-u',
1027 if code
!= 0 or len(err
) > 0:
1028 raise Failure("Hmm. `svn status' failed. What now.")
1030 make_py
, prev_status
= self
.get_prev_status(wc
)
1032 actual_status
= svntest
.wc
.State
.from_status(output
)
1034 # The tests currently compare SVNTreeNode trees, so let's do that too.
1035 prev_status_tree
= prev_status
.old_tree()
1036 actual_status_tree
= actual_status
.old_tree()
1039 tweaks
= self
.diff_trees(prev_status_tree
, actual_status_tree
, wc
)
1041 if tweaks
== 'Purge':
1042 # The tree is empty (happens with invalid WC dirs)
1043 make_py
= "expected_status = wc.State(" + wc
.py
+ ", {})\n"
1046 tweaks
= self
.optimize_tweaks(tweaks
, actual_status_tree
, wc
)
1048 self
.remember_status(wc
, actual_status
)
1050 pystatus
= make_py
+ self
.tweaks2py(tweaks
, "expected_status", wc
)
1051 if len(pystatus
) > 0:
1056 def get_prev_status(self
, wc
):
1057 "Retrieves the last used expected_status tree if any."
1061 # re-use any previous status if we are still in the same WC dir.
1062 # If a status was supplied via __init__(), self.prev_status[0] is set
1063 # to None, in which case we always use it, not checking WC.
1064 if self
.prev_status
is None or \
1065 not self
.prev_status
[0] in [None, wc
.realpath
]:
1066 # There is no or no matching previous status. Make new one.
1068 # If it's really a WC, use its base revision
1069 base_rev
= actions
.get_wc_base_rev(wc
.realpath
)
1071 # Else, just use zero. Whatever.
1073 prev_status
= actions
.get_virginal_state(wc
.realpath
, base_rev
)
1074 make_py
+= ("expected_status = actions.get_virginal_state(" +
1075 wc
.py
+ ", " + str(base_rev
) + ")\n")
1077 # We will re-use the previous expected_status.
1078 prev_status
= self
.prev_status
[1]
1079 # no need to make_py anything
1081 return make_py
, prev_status
1083 def remember_status(self
, wc
, actual_status
):
1084 "Remembers the current status tree for future reference."
1085 self
.prev_status
= [wc
.realpath
, actual_status
]
1088 def chdir(self
, do_chdir
, wc
):
1089 "Pushes the current dir onto the dir stack, does an os.chdir()."
1092 self
.prevdirs
.append(os
.getcwd())
1093 os
.chdir(wc
.realpath
)
1094 py
= ("orig_dir = os.getcwd() # Need to chdir because of '^/' args\n" +
1095 "os.chdir(" + wc
.py
+ ")\n")
1098 def chdir_back(self
, do_chdir
):
1099 "Does os.chdir() back to the directory popped from the dir stack's top."
1102 # If this fails, there's a missing chdir() call:
1103 os
.chdir(self
.prevdirs
.pop())
1104 return "os.chdir(orig_dir)\n"
1107 def get_sorted_vars_by_pathlen(self
):
1108 """Compose a listing of variable names to be expanded in script output.
1109 This is intended to be stored in self.sorted_vars_by_pathlen."""
1112 for dict in [self
.vars, self
.other_wc_dirs
]:
1114 runpath
= dict[name
][1]
1117 strlen
= len(runpath
)
1118 item
= [strlen
, name
, runpath
]
1119 bisect
.insort(lst
, item
)
1124 def get_sorted_var_names(self
):
1125 """Compose a listing of variable names to be declared.
1126 This is used by TestFactory.make()."""
1129 for name
in self
.vars:
1130 if name
.startswith('url_'):
1131 bisect
.insort(urls
, [name
.lower(), name
])
1133 bisect
.insort(paths
, [name
.lower(), name
])
1142 def get_sorted_other_wc_dir_names(self
):
1143 """Compose a listing of working copies to be declared with sbox.
1144 This is used by TestFactory.make()."""
1146 for name
in self
.other_wc_dirs
:
1147 bisect
.insort(list, [name
.lower(), name
])
1154 def str2svntest(self
, str):
1155 "Like str2py(), but replaces any known paths with variable names."
1162 def replace(str, path
, name
, quote
):
1163 return str.replace(path
, quote
+ " + " + name
+ " + " + quote
)
1165 # We want longer paths first.
1166 for var
in reversed(self
.sorted_vars_by_pathlen
):
1169 str = replace(str, path
, name
, quote
)
1171 str = replace(str, self
.sbox
.wc_dir
, 'wc_dir', quote
)
1172 str = replace(str, self
.sbox
.repo_url
, 'url', quote
)
1174 # now remove trailing null-str adds:
1176 str = str.replace("'' + ",'').replace(" + ''",'')
1178 str = str.replace('"" + ',"").replace(' + ""',"")
1180 # just a stupid check. tiny tweak. (don't declare wc_dir and url
1181 # if they never appear)
1182 if not self
.used_wc_dir
:
1183 self
.used_wc_dir
= (re
.search('\bwc_dir\b', str) is not None)
1184 if not self
.used_url
:
1185 self
.used_url
= str.find('url') >= 0
1190 def strlist2py(self
, list):
1191 "Given a list of strings, composes a py script that produces the same."
1197 return "[" + self
.str2svntest(list[0]) + "]"
1201 py
+= " " + self
.str2svntest(line
) + ",\n"
1206 def get_node_path(self
, node
, wc
):
1207 "Tries to return the node path relative to the given working copy."
1208 path
= node
.get_printable_path()
1209 if path
.startswith(wc
.realpath
+ os
.sep
):
1210 path
= path
[len(wc
.realpath
+ os
.sep
):]
1211 elif path
.startswith(wc
.realpath
):
1212 path
= path
[len(wc
.realpath
):]
1216 def node2py(self
, node
, wc
, prepend
="", drop_empties
=True):
1217 "Creates a line like 'A/C' : Item({ ... }) for wc.State composition."
1219 node
.print_script(buf
, wc
.realpath
, prepend
, drop_empties
)
1220 return buf
.getvalue()
1223 def tree2py(self
, node
, wc
):
1224 "Writes the wc.State definition for the given SVNTreeNode in given WC."
1225 # svntest.wc.State(wc_dir, {
1226 # 'A/mu' : Item(verb='Sending'),
1227 # 'A/D/G/rho' : Item(verb='Sending'),
1230 tree
.dump_tree_script(node
, stream
=buf
, subtree
=wc
.realpath
,
1232 return buf
.getvalue()
1235 def diff_trees(self
, left
, right
, wc
):
1236 """Compares the two trees given by the SVNTreeNode instances LEFT and
1237 RIGHT in the given working copy and composes an internal list of
1238 tweaks necessary to make LEFT into RIGHT."""
1239 if not right
.children
:
1241 return self
._diff
_trees
(left
, right
, wc
)
1243 def _diff_trees(self
, left
, right
, wc
):
1244 "Used by self.diff_trees(). No need to call this. See there."
1245 # all tweaks collected
1248 # the current tweak in composition
1249 path
= self
.get_node_path(left
, wc
)
1253 if ((left
.contents
is None) != (right
.contents
is None)) or \
1254 (left
.contents
!= right
.contents
):
1255 tweak
+= [ ["contents", right
.contents
] ]
1257 for key
in left
.props
:
1258 if key
not in right
.props
:
1259 tweak
+= [ [key
, None] ]
1260 elif left
.props
[key
] != right
.props
[key
]:
1261 tweak
+= [ [key
, right
.props
[key
]] ]
1263 for key
in right
.props
:
1264 if key
not in left
.props
:
1265 tweak
+= [ [key
, right
.props
[key
]] ]
1267 for key
in left
.atts
:
1268 if key
not in right
.atts
:
1269 tweak
+= [ [key
, None] ]
1270 elif left
.atts
[key
] != right
.atts
[key
]:
1271 tweak
+= [ [key
, right
.atts
[key
]] ]
1273 for key
in right
.atts
:
1274 if key
not in left
.atts
:
1275 tweak
+= [ [key
, right
.atts
[key
]] ]
1278 changetweak
= [ 'Change', [path
], tweak
]
1279 tweaks
+= [changetweak
]
1281 if left
.children
is not None:
1282 for leftchild
in left
.children
:
1284 if right
.children
is not None:
1285 rightchild
= tree
.get_child(right
, leftchild
.name
)
1286 if rightchild
is None:
1287 paths
= leftchild
.recurse(lambda n
: self
.get_node_path(n
, wc
))
1288 removetweak
= [ 'Remove', paths
]
1289 tweaks
+= [removetweak
]
1291 if right
.children
is not None:
1292 for rightchild
in right
.children
:
1294 if left
.children
is not None:
1295 leftchild
= tree
.get_child(left
, rightchild
.name
)
1296 if leftchild
is None:
1297 paths_and_nodes
= rightchild
.recurse(
1298 lambda n
: [ self
.get_node_path(n
, wc
), n
] )
1299 addtweak
= [ 'Add', paths_and_nodes
]
1300 tweaks
+= [addtweak
]
1302 tweaks
+= self
._diff
_trees
(leftchild
, rightchild
, wc
)
1307 def optimize_tweaks(self
, tweaks
, actual_tree
, wc
):
1308 "Given an internal list of tweaks, make them optimal by common sense."
1309 if tweaks
== 'Purge':
1312 subtree
= actual_tree
.find_node(wc
.realpath
)
1314 subtree
= actual_tree
1320 for tweak
in tweaks
:
1321 if tweak
[0] == 'Remove':
1322 remove_paths
+= tweak
[1]
1323 elif tweak
[0] == 'Add':
1324 additions
+= tweak
[1]
1330 if len(remove_paths
) > 0:
1331 removal
= [ [ 'Remove', remove_paths
] ]
1335 if len(additions
) > 0:
1336 addition
= [ [ 'Add', additions
] ]
1338 # find those changes that should be done on all nodes at once.
1339 def remove_mod(mod
):
1340 for change
in changes
:
1341 if mod
in change
[2]:
1342 change
[2].remove(mod
)
1346 for change
in changes
:
1353 # here we see each single "name=value" tweak in mod.
1354 # Check if the actual tree had this anyway all the way through.
1358 if name
== 'contents' and val
is None:
1361 def check_node(node
):
1363 (name
== 'contents' and node
.contents
== val
)
1365 (node
.props
and (name
in node
.props
) and node
.props
[name
] == val
)
1367 (node
.atts
and (name
in node
.atts
) and node
.atts
[name
] == val
)):
1368 # has this same thing set. count on the left.
1371 results
= subtree
.recurse(check_node
)
1374 for result
in results
:
1378 havent
+= [result
[1]]
1381 # ok, then, remove all tweaks that are like this, then
1382 # add a generic tweak.
1385 elif len(havent
) < len(have
) * 3: # this is "an empirical factor"
1388 # record the *other* nodes' actual item, overwritten above
1391 if name
== 'contents':
1392 value
= node
.contents
1393 elif name
in node
.props
:
1394 value
= node
.props
[name
]
1395 elif name
in node
.atts
:
1396 value
= node
.atts
[name
]
1399 changes
+= [ ['Change',
1400 [self
.get_node_path(node
, wc
)],
1405 # combine those paths that have exactly the same changes
1408 while i
< len(changes
):
1409 # find other changes that are identical
1411 while j
< len(changes
):
1412 if changes
[i
][2] == changes
[j
][2]:
1413 changes
[i
][1] += changes
[j
][1]
1419 # combine those changes that have exactly the same paths
1422 while i
< len(changes
):
1423 # find other paths that are identical
1425 while j
< len(changes
):
1426 if changes
[i
][1] == changes
[j
][1]:
1427 changes
[i
][2] += changes
[j
][2]
1434 changes
= [ ['Change', [], tweak_all
] ] + changes
1436 return removal
+ addition
+ changes
1439 def tweaks2py(self
, tweaks
, var_name
, wc
):
1440 "Given an internal list of tweaks, write the tweak script for it."
1445 if tweaks
== 'Purge':
1446 return var_name
+ " = wc.State(" + wc
.py
+ ", {})\n"
1448 for tweak
in tweaks
:
1449 if tweak
[0] == 'Remove':
1450 py
+= var_name
+ ".remove("
1452 py
+= self
.str2svntest(paths
[0])
1453 for path
in paths
[1:]:
1454 py
+= ", " + self
.str2svntest(path
)
1457 elif tweak
[0] == 'Add':
1458 # add({'A/D/H/zeta' : Item(status=' ', wc_rev=9), ...})
1459 py
+= var_name
+ ".add({"
1464 py
+= self
.node2py(node
, wc
, "\n ", False)
1471 py
+= var_name
+ ".tweak("
1473 py
+= self
.str2svntest(path
) + ", "
1475 return mod
[0] + "=" + self
.str2svntest(mod
[1])
1476 py
+= mod2py(mods
[0])
1477 for mod
in mods
[1:]:
1478 py
+= ", " + mod2py(mod
)
1483 def path2svntest(self
, path
, argnr
=None, do_remove_on_new_wc_path
=True):
1484 """Given an input argument, do one hell of a path expansion on it.
1485 ARGNR is simply inserted into the resulting Target.
1486 Returns a self.Target instance.
1488 wc
= self
.WorkingCopy('wc_dir', self
.sbox
.wc_dir
, None)
1489 url
= self
.sbox
.repo_url
# do we need multiple URLs too??
1492 if path
.find('/') < 0 and path
.find('\\') >= 0:
1497 # If you add to these, make sure you add longer ones first, to
1498 # avoid e.g. '$WC_DIR' matching '$WC' first.
1499 wc_dir_wildcards
= ['wc_dir', 'wcdir', '$WC_DIR', '$WC']
1500 url_wildcards
= ['url', '$URL']
1502 first
= path
.split(pathsep
, 1)[0]
1503 if first
in wc_dir_wildcards
:
1504 path
= path
[len(first
):]
1505 elif first
in url_wildcards
:
1506 path
= path
[len(first
):]
1509 for url_scheme
in ['^/', 'file:/', 'http:/', 'svn:/', 'svn+ssh:/']:
1510 if path
.startswith(url_scheme
):
1513 pyarg
= self
.str2svntest(path
)
1515 return self
.Target(pyarg
, runarg
, argnr
, is_url
, None)
1517 for wc_dir_wildcard
in wc_dir_wildcards
:
1518 if first
.startswith(wc_dir_wildcard
):
1519 # The first path element starts with "wc_dir" (or similar),
1520 # but it has more attached to it. Like "wc_dir.2" or "wc_dir_other"
1521 # Record a new wc dir name.
1523 # try to figure out a nice suffix to pass to sbox.
1524 # (it will create a new dir called sbox.wc_dir + '.' + suffix)
1526 if first
[len(wc_dir_wildcard
)] in ['.','-','_']:
1527 # it's a separator already, don't duplicate the dot. (warm&fuzzy)
1528 suffix
= first
[len(wc_dir_wildcard
) + 1:]
1530 suffix
= first
[len(wc_dir_wildcard
):]
1533 raise Failure("no suffix supplied to other-wc_dir arg")
1535 # Streamline the var name
1536 suffix
= suffix
.replace('.','_').replace('-','_')
1537 other_wc_dir_varname
= 'wc_dir_' + suffix
1539 path
= path
[len(first
):]
1541 real_path
= self
.get_other_wc_real_path(other_wc_dir_varname
,
1543 do_remove_on_new_wc_path
)
1545 wc
= self
.WorkingCopy(other_wc_dir_varname
,
1547 # found a match, no need to loop further, but still process
1551 if len(path
) < 1 or path
== pathsep
:
1553 self
.used_url
= True
1558 if wc
.suffix
is None:
1559 self
.used_wc_dir
= True
1561 runarg
= wc
.realpath
1563 pathelements
= split_remove_empty(path
, pathsep
)
1565 # make a new variable, if necessary
1567 pyarg
, runarg
= self
.ensure_url_var(pathelements
)
1570 pyarg
, runarg
= self
.ensure_path_var(wc
, pathelements
)
1572 return self
.Target(pyarg
, runarg
, argnr
, is_url
, wc
)
1575 def get_other_wc_real_path(self
, varname
, suffix
, do_remove
):
1576 "Create or retrieve the path of an alternate working copy."
1577 if varname
in self
.other_wc_dirs
:
1578 return self
.other_wc_dirs
[varname
][1]
1580 # see if there is a wc already in the sbox
1581 path
= self
.sbox
.wc_dir
+ '.' + suffix
1582 if path
in self
.sbox
.test_paths
:
1583 py
= "sbox.wc_dir + '." + suffix
+ "'"
1585 # else, we must still create one.
1586 path
= self
.sbox
.add_wc_path(suffix
, do_remove
)
1587 py
= "sbox.add_wc_path(" + str2py(suffix
)
1589 py
+= ", remove=False"
1593 self
.other_wc_dirs
[varname
] = [py
, path
]
1594 self
.sorted_vars_by_pathlen
= self
.get_sorted_vars_by_pathlen()
1598 def define_var(self
, name
, value
):
1599 "Add a variable definition, don't allow redefinitions."
1600 # see if we already have this var
1601 if name
in self
.vars:
1602 if self
.vars[name
] != value
:
1603 raise Failure("Variable name collision. Hm, fix factory.py?")
1604 # ok, it's recorded correctly. Nothing needs to happen.
1607 # a new variable needs to be recorded
1608 self
.vars[name
] = value
1609 # update the sorted list of vars for substitution by str2svntest()
1610 self
.sorted_vars_by_pathlen
= self
.get_sorted_vars_by_pathlen()
1613 def ensure_path_var(self
, wc
, pathelements
):
1614 "Given a path in a working copy, make sure we have a variable for it."
1616 # special case: if a path is '.', simply use wc_dir.
1617 if pathelements
== ['.']:
1618 return wc
.py
, wc
.realpath
1620 name
= "_".join(pathelements
)
1622 if wc
.suffix
is not None:
1623 # This is an "other" working copy (not the default).
1624 # The suffix of the wc_dir variable serves as the prefix:
1625 # wc_dir_other ==> other_A_D = os.path.join(wc_dir_other, 'A', 'D')
1626 name
= wc
.suffix
+ "_" + name
1627 if name
[0].isdigit():
1630 self
.used_wc_dir
= True
1632 py
= 'os.path.join(' + wc
.py
1633 if len(pathelements
) > 0:
1634 py
+= ", '" + "', '".join(pathelements
) + "'"
1637 wc_dir_real_path
= wc
.realpath
1638 run
= os
.path
.join(wc_dir_real_path
, *pathelements
)
1641 self
.define_var(name
, value
)
1646 def ensure_url_var(self
, pathelements
):
1647 "Given a path in the test repository, ensure we have a url var for it."
1648 name
= "url_" + "_".join(pathelements
)
1650 joined
= "/" + "/".join(pathelements
)
1653 if len(pathelements
) > 0:
1654 py
+= " + " + str2py(joined
)
1655 self
.used_url
= True
1657 run
= self
.sbox
.repo_url
+ joined
1660 self
.define_var(name
, value
)
1665 def get_first_wc(self
, target_list
):
1666 """In a list of Target instances, find the first one that is in a
1667 working copy and return that WorkingCopy. Default to sbox.wc_dir.
1668 This is useful if we need a working copy for a '^/' URL."""
1669 for target
in target_list
:
1672 return self
.WorkingCopy('wc_dir', self
.sbox
.wc_dir
, None)
1675 def args2svntest(self
, args
, append_wc_dir_if_missing
= False,
1676 keep_args_of
= [], keep_first_count
= 1,
1677 drop_with_arg
= []):
1678 """Tries to be extremely intelligent at parsing command line arguments.
1679 It needs to know which args are file targets that should be in a
1680 working copy. File targets are magically expanded.
1682 args: list of string tokens as passed to factory.make(), e.g.
1683 ['svn', 'commit', '--force', 'wc_dir2']
1685 append_wc_dir_if_missing: It's a switch.
1687 keep_args_of: See TestFactory.keep_args_of (comment in __init__)
1689 keep_first_count: Don't expand the first N non-option args. This is used
1690 to preserve e.g. the token 'update' in '[svn] update wc_dir'
1691 (the 'svn' is usually split off before this function is called).
1693 drop_with_arg: list of string tokens that are commandline options with
1694 following argument which we want to drop from the list of args
1698 wc_dir
= self
.sbox
.wc_dir
1699 url
= self
.sbox
.repo_url
1701 target_supplied
= False
1709 while i
< len(args
):
1712 if arg
in drop_with_arg
:
1713 # skip this and the next arg
1714 if not arg
.startswith('--') and len(arg
) > 2:
1715 # it is a concatenated arg like -r123 instead of -r 123
1716 # skip only this one. Do nothing.
1719 # skip this and the next arg
1722 elif arg
.startswith('-'):
1723 # keep this option arg verbatim.
1724 pyargs
+= [ self
.str2svntest(arg
) ]
1726 # does this option expect a non-filename argument?
1727 # take that verbatim as well.
1728 if arg
in keep_args_of
:
1732 pyargs
+= [ self
.str2svntest(arg
) ]
1735 elif keep_first_count
> 0:
1736 # args still to be taken verbatim.
1737 pyargs
+= [ self
.str2svntest(arg
) ]
1739 keep_first_count
-= 1
1741 elif arg
.startswith('^/'):
1742 # this is a ^/url, keep it verbatim.
1743 # if we use "^/", we need to chdir(wc_dir).
1746 targets
+= [ self
.Target(pyarg
, arg
, len(pyargs
), True, None) ]
1751 # well, then this must be a filename or url, autoexpand it.
1752 target
= self
.path2svntest(arg
, argnr
=len(pyargs
))
1753 pyargs
+= [ target
.pyarg
]
1754 runargs
+= [ target
.runarg
]
1755 target_supplied
= True
1756 targets
+= [ target
]
1760 if not target_supplied
and append_wc_dir_if_missing
:
1761 # add a simple wc_dir target
1762 self
.used_wc_dir
= True
1763 wc
= self
.WorkingCopy('wc_dir', wc_dir
, None)
1764 targets
+= [ self
.Target('wc_dir', wc_dir
, len(pyargs
), False, wc
) ]
1765 pyargs
+= [ 'wc_dir' ]
1766 runargs
+= [ wc_dir
]
1768 return pyargs
, runargs
, do_chdir
, targets
1770 ###### END of the TestFactory class ######
1774 # Quotes-preserving text wrapping for output
1776 def find_quote_end(text
, i
):
1777 "In string TEXT, find the end of the qoute that starts at TEXT[i]"
1778 # don't handle """ quotes
1781 while i
< len(text
):
1784 elif text
[i
] == quote
:
1787 return len(text
) - 1
1790 class MyWrapper(textwrap
.TextWrapper
):
1791 "A textwrap.TextWrapper that doesn't break a line within quotes."
1792 ### TODO regexes would be nice, maybe?
1793 def _split(self
, text
):
1797 # This loop will break before and after each space, but keep
1798 # quoted strings in one piece. Example, breaks marked '/':
1799 # /(one,/ /two(blagger),/ /'three three three',)/
1800 while i
< len(text
):
1801 if text
[i
] in ['"', "'"]:
1802 # handle """ quotes. (why, actually?)
1803 if text
[i
:i
+3] == '"""':
1804 end
= text
[i
+3:].find('"""')
1810 # handle normal quotes
1811 i
= find_quote_end(text
, i
)
1812 elif text
[i
].isspace():
1813 # split off previous section, if any
1815 parts
+= [text
[start
:i
]]
1817 # split off this space
1823 if start
< len(text
):
1824 parts
+= [text
[start
:]]
1828 def wrap_each_line(str, ii
, si
, blw
):
1829 """Wrap lines to a defined width (<80 chars). Feed the lines single to
1830 MyWrapper, so that it preserves the current line endings already in there.
1831 We only want to insert new wraps, not remove existing newlines."""
1832 wrapper
= MyWrapper(77, initial_indent
=ii
,
1833 subsequent_indent
=si
)
1835 lines
= str.splitlines()
1836 for i
in range(0,len(lines
)):
1838 lines
[i
] = wrapper
.fill(lines
[i
])
1839 return '\n'.join(lines
)
1843 # Other miscellaneous helpers
1846 "un-escapes away /x sequences"
1849 return string
.decode("string-escape")
1852 def get_quote_style(str):
1853 """find which quote is the outer one, ' or "."""
1857 found
= str.find("'")
1858 found2
= str.find('"')
1860 # If found == found2, both must be -1, so nothing was found.
1862 # If a quote was found
1863 if found
>= 0 and found2
>= 0:
1864 # If both were found, invalidate the later one
1869 # See which one remains.
1877 return quote_char
, at
1879 def split_remove_empty(str, sep
):
1880 "do a split, then remove empty elements."
1881 list = str.split(sep
)
1882 return filter(lambda item
: item
and len(item
) > 0, list)
1885 "returns the string enclosed in quotes, suitable for py scripts."
1889 # try to make a nice choice of quoting character
1890 if str.find("'") >= 0:
1891 return '"' + str.encode("string-escape"
1892 ).replace("\\'", "'"
1893 ).replace('"', '\\"') + '"'
1895 return "'" + str.encode("string-escape") + "'"