Add a hash table for holding packages
[pacman-ng.git] / test / pacman / pmtest.py
blob82dfda6da2f33c1194260e02f8293631d95ca3d5
1 #! /usr/bin/python
3 # Copyright (c) 2006 by Aurelien Foret <orelien@chez.com>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import os
20 import os.path
21 import shutil
22 import stat
23 import time
25 import pmrule
26 import pmdb
27 import pmfile
28 import util
29 from util import vprint
31 class pmtest(object):
32 """Test object
33 """
35 def __init__(self, name, root):
36 self.name = name
37 self.testname = os.path.basename(name).replace('.py', '')
38 self.root = root
39 self.cachepkgs = True
41 def __str__(self):
42 return "name = %s\n" \
43 "testname = %s\n" \
44 "root = %s" % (self.name, self.testname, self.root)
46 def addpkg2db(self, treename, pkg):
47 """
48 """
49 if not treename in self.db:
50 self.db[treename] = pmdb.pmdb(treename, self.root)
51 self.db[treename].pkgs.append(pkg)
53 def addpkg(self, pkg):
54 """
55 """
56 self.localpkgs.append(pkg)
58 def findpkg(self, name, version, allow_local=False):
59 """Find a package object matching the name and version specified in
60 either sync databases or the local package collection. The local database
61 is allowed to match if allow_local is True."""
62 for db in self.db.itervalues():
63 if db.treename == "local" and not allow_local:
64 continue
65 pkg = db.getpkg(name)
66 if pkg and pkg.version == version:
67 return pkg
68 for pkg in self.localpkgs:
69 if pkg.name == name and pkg.version == version:
70 return pkg
72 return None
74 def addrule(self, rulename):
75 """
76 """
77 rule = pmrule.pmrule(rulename)
78 self.rules.append(rule)
80 def load(self):
81 """
82 """
84 # Reset test parameters
85 self.result = {
86 "success": 0,
87 "fail": 0
89 self.args = ""
90 self.retcode = 0
91 self.db = {
92 "local": pmdb.pmdb("local", self.root)
94 self.localpkgs = []
95 self.createlocalpkgs = False
96 self.filesystem = []
98 self.description = ""
99 self.option = {}
101 # Test rules
102 self.rules = []
103 self.files = []
104 self.expectfailure = False
106 if os.path.isfile(self.name):
107 # all tests expect this to be available
108 from pmpkg import pmpkg
109 execfile(self.name)
110 else:
111 raise IOError("file %s does not exist!" % self.name)
113 def generate(self):
117 print "==> Generating test environment"
119 # Cleanup leftover files from a previous test session
120 if os.path.isdir(self.root):
121 shutil.rmtree(self.root)
122 vprint("\t%s" % self.root)
124 # Create directory structure
125 vprint(" Creating directory structure:")
126 dbdir = os.path.join(self.root, util.PM_DBPATH)
127 cachedir = os.path.join(self.root, util.PM_CACHEDIR)
128 syncdir = os.path.join(self.root, util.SYNCREPO)
129 tmpdir = os.path.join(self.root, util.TMPDIR)
130 logdir = os.path.join(self.root, os.path.dirname(util.LOGFILE))
131 etcdir = os.path.join(self.root, os.path.dirname(util.PACCONF))
132 bindir = os.path.join(self.root, "bin")
133 sys_dirs = [dbdir, cachedir, syncdir, tmpdir, logdir, etcdir, bindir]
134 for sys_dir in sys_dirs:
135 if not os.path.isdir(sys_dir):
136 vprint("\t%s" % sys_dir[len(self.root)+1:])
137 os.makedirs(sys_dir, 0755)
138 # Only the dynamically linked binary is needed for fakechroot
139 shutil.copy("/bin/sh", bindir)
141 # Configuration file
142 vprint(" Creating configuration file")
143 util.mkcfgfile(util.PACCONF, self.root, self.option, self.db)
145 # Creating packages
146 vprint(" Creating package archives")
147 for pkg in self.localpkgs:
148 vprint("\t%s" % os.path.join(util.TMPDIR, pkg.filename()))
149 pkg.makepkg(tmpdir)
150 for key, value in self.db.iteritems():
151 if key == "local" and not self.createlocalpkgs:
152 continue
153 for pkg in value.pkgs:
154 vprint("\t%s" % os.path.join(util.PM_CACHEDIR, pkg.filename()))
155 if self.cachepkgs:
156 pkg.makepkg(cachedir)
157 else:
158 pkg.makepkg(os.path.join(syncdir, value.treename))
159 pkg.md5sum = util.getmd5sum(pkg.path)
160 pkg.csize = os.stat(pkg.path)[stat.ST_SIZE]
162 # Populating databases
163 vprint(" Populating databases")
164 for key, value in self.db.iteritems():
165 for pkg in value.pkgs:
166 vprint("\t%s/%s" % (key, pkg.fullname()))
167 if key == "local":
168 pkg.installdate = time.ctime()
169 value.db_write(pkg)
171 # Creating sync database archives
172 vprint(" Creating sync database archives")
173 for key, value in self.db.iteritems():
174 if key == "local":
175 continue
176 vprint("\t" + value.treename)
177 value.gensync()
178 serverpath = os.path.join(syncdir, value.treename)
179 util.mkdir(serverpath)
180 shutil.copy(value.dbfile, serverpath)
182 # Filesystem
183 vprint(" Populating file system")
184 for pkg in self.db["local"].pkgs:
185 vprint("\tinstalling %s" % pkg.fullname())
186 for f in pkg.files:
187 vprint("\t%s" % f)
188 path = os.path.join(self.root, f)
189 util.mkfile(path, f)
190 if os.path.isfile(path):
191 os.utime(path, (355, 355))
192 for f in self.filesystem:
193 vprint("\t%s" % f)
194 path = os.path.join(self.root, f)
195 util.mkfile(path, f)
196 if os.path.isfile(path):
197 os.utime(path, (355, 355))
199 # Done.
200 vprint(" Taking a snapshot of the file system")
201 for roots, dirs, files in os.walk(self.root):
202 for i in files:
203 filename = os.path.join(roots, i)
204 f = pmfile.pmfile(self.root, filename.replace(self.root + "/", ""))
205 self.files.append(f)
206 vprint("\t%s" % f.name)
208 def run(self, pacman):
212 if os.path.isfile(util.PM_LOCK):
213 print "\tERROR: another pacman session is on-going -- skipping"
214 return
216 print "==> Running test"
217 vprint("\tpacman %s" % self.args)
219 cmd = [""]
220 if os.geteuid() != 0:
221 fakeroot = util.which("fakeroot")
222 if not fakeroot:
223 print "WARNING: fakeroot not found!"
224 else:
225 cmd.append("fakeroot")
227 fakechroot = util.which("fakechroot")
228 if fakechroot:
229 cmd.append("fakechroot")
231 if pacman["gdb"]:
232 cmd.append("libtool execute gdb --args")
233 if pacman["valgrind"]:
234 cmd.append("valgrind -q --tool=memcheck --leak-check=full --show-reachable=yes --suppressions=%s/valgrind.supp" % os.getcwd())
235 cmd.append("\"%s\" --config=\"%s\" --root=\"%s\" --dbpath=\"%s\" --cachedir=\"%s\"" \
236 % (pacman["bin"],
237 os.path.join(self.root, util.PACCONF),
238 self.root,
239 os.path.join(self.root, util.PM_DBPATH),
240 os.path.join(self.root, util.PM_CACHEDIR)))
241 if not pacman["manual-confirm"]:
242 cmd.append("--noconfirm")
243 if pacman["debug"]:
244 cmd.append("--debug=%s" % pacman["debug"])
245 cmd.append("%s" % self.args)
246 if not pacman["gdb"] and not pacman["valgrind"] and not pacman["nolog"]:
247 cmd.append(">\"%s\" 2>&1" % os.path.join(self.root, util.LOGFILE))
248 vprint("\trunning: %s" % " ".join(cmd))
250 # Change to the tmp dir before running pacman, so that local package
251 # archives are made available more easily.
252 curdir = os.getcwd()
253 tmpdir = os.path.join(self.root, util.TMPDIR)
254 os.chdir(tmpdir)
256 time_start = time.time()
257 self.retcode = os.system(" ".join(cmd))
258 time_end = time.time()
259 vprint("\ttime elapsed: %ds" % (time_end - time_start))
261 if self.retcode == None:
262 self.retcode = 0
263 else:
264 self.retcode /= 256
265 vprint("\tretcode = %s" % self.retcode)
266 os.chdir(curdir)
268 # Check if the lock is still there
269 if os.path.isfile(util.PM_LOCK):
270 print "\tERROR: %s not removed" % util.PM_LOCK
271 os.unlink(util.PM_LOCK)
272 # Look for a core file
273 if os.path.isfile(os.path.join(self.root, util.TMPDIR, "core")):
274 print "\tERROR: pacman dumped a core file"
276 def check(self):
280 print "==> Checking rules"
282 for i in self.rules:
283 success = i.check(self)
284 if success == 1:
285 msg = " OK "
286 self.result["success"] += 1
287 elif success == 0:
288 msg = "FAIL"
289 self.result["fail"] += 1
290 else:
291 msg = "SKIP"
292 print "\t[%s] %s" % (msg, i)
294 # vim: set ts=4 sw=4 et: