added test for GraphView inverse and improved MapView inverse test
[pygr.git] / tests / sqltable_test.py
blobcefd1bf71df52f4e03d8872071f432276a0e83a9
1 import os, unittest
2 from testlib import testutil, PygrTestProgram, SkipTest
3 from pygr.sqlgraph import SQLTable, SQLTableNoCache,\
4 MapView, GraphView, DBServerInfo, import_sqlite
5 from pygr import logger
7 class SQLTable_Setup(unittest.TestCase):
8 tableClass = SQLTable
9 def __init__(self, *args, **kwargs):
10 unittest.TestCase.__init__(self, *args, **kwargs)
11 self.serverInfo = DBServerInfo() # share conn for all tests
12 def setUp(self):
13 try:
14 self.load_data(writeable=self.writeable)
15 except ImportError:
16 raise SkipTest('missing MySQLdb module?')
17 def load_data(self, tableName='test.sqltable_test', writeable=False):
18 'create 3 tables and load 9 rows for our tests'
19 self.tableName = tableName
20 self.joinTable1 = joinTable1 = tableName + '1'
21 self.joinTable2 = joinTable2 = tableName + '2'
22 createTable = """\
23 CREATE TABLE %s (primary_id INTEGER PRIMARY KEY %%(AUTO_INCREMENT)s, seq_id TEXT, start INTEGER, stop INTEGER)
24 """ % tableName
25 self.db = self.tableClass(tableName, dropIfExists=True,
26 serverInfo=self.serverInfo,
27 createTable=createTable,
28 writeable=writeable)
29 self.sourceDB = self.tableClass(joinTable1, serverInfo=self.serverInfo,
30 dropIfExists=True, createTable="""\
31 CREATE TABLE %s (my_id INTEGER PRIMARY KEY,
32 other_id VARCHAR(16))
33 """ % joinTable1)
34 self.targetDB = self.tableClass(joinTable2, serverInfo=self.serverInfo,
35 dropIfExists=True, createTable="""\
36 CREATE TABLE %s (third_id INTEGER PRIMARY KEY,
37 other_id VARCHAR(16))
38 """ % joinTable2)
39 sql = """
40 INSERT INTO %s (seq_id, start, stop) VALUES ('seq1', 0, 10)
41 INSERT INTO %s (seq_id, start, stop) VALUES ('seq2', 5, 15)
42 INSERT INTO %s VALUES (2,'seq2')
43 INSERT INTO %s VALUES (3,'seq3')
44 INSERT INTO %s VALUES (4,'seq4')
45 INSERT INTO %s VALUES (7, 'seq2')
46 INSERT INTO %s VALUES (99, 'seq3')
47 INSERT INTO %s VALUES (6, 'seq4')
48 INSERT INTO %s VALUES (8, 'seq4')
49 """ % tuple(([tableName]*2) + ([joinTable1]*3) + ([joinTable2]*4))
50 for line in sql.strip().splitlines(): # insert our test data
51 self.db.cursor.execute(line.strip())
52 def tearDown(self):
53 self.db.cursor.execute('drop table if exists %s' % self.tableName)
54 self.db.cursor.execute('drop table if exists %s' % self.joinTable1)
55 self.db.cursor.execute('drop table if exists %s' % self.joinTable2)
56 self.serverInfo.close()
58 class SQLTable_Test(SQLTable_Setup):
59 writeable = False # read-only database interface
60 def test_keys(self):
61 k = self.db.keys()
62 k.sort()
63 assert k == [1, 2]
64 def test_len(self):
65 assert len(self.db) == len(self.db.keys())
66 def test_contains(self):
67 assert 1 in self.db
68 assert 2 in self.db
69 assert 'foo' not in self.db
70 def test_has_key(self):
71 assert self.db.has_key(1)
72 assert self.db.has_key(2)
73 assert not self.db.has_key('foo')
74 def test_get(self):
75 assert self.db.get('foo') is None
76 assert self.db.get(1) == self.db[1]
77 assert self.db.get(2) == self.db[2]
78 def test_items(self):
79 i = [ k for (k,v) in self.db.items() ]
80 i.sort()
81 assert i == [1, 2]
82 def test_iterkeys(self):
83 kk = self.db.keys()
84 kk.sort()
85 ik = list(self.db.iterkeys())
86 ik.sort()
87 assert kk == ik
88 def test_itervalues(self):
89 kv = self.db.values()
90 kv.sort()
91 iv = list(self.db.itervalues())
92 iv.sort()
93 assert kv == iv
94 def test_itervalues_long(self):
95 """test iterator isolation from queries run inside iterator loop """
96 sql = 'insert into %s (start) values (1)' % self.tableName
97 for i in range(40000): # insert 40000 rows
98 self.db.cursor.execute(sql)
99 iv = []
100 for o in self.db.itervalues():
101 status = 99 in self.db # make it do a query inside iterator loop
102 iv.append(o.id)
103 kv = [o.id for o in self.db.values()]
104 assert len(kv) == len(iv)
105 assert kv == iv
106 def test_iteritems(self):
107 ki = self.db.items()
108 ki.sort()
109 ii = list(self.db.iteritems())
110 ii.sort()
111 assert ki == ii
112 def test_readonly(self):
113 'test error handling of write attempts to read-only DB'
114 try:
115 self.db.new(seq_id='freddy', start=3000, stop=4500)
116 raise AssertionError('failed to trap attempt to write to db')
117 except ValueError:
118 pass
119 o = self.db[1]
120 try:
121 self.db[33] = o
122 raise AssertionError('failed to trap attempt to write to db')
123 except ValueError:
124 pass
125 try:
126 del self.db[2]
127 raise AssertionError('failed to trap attempt to write to db')
128 except ValueError:
129 pass
131 ### @CTB need to test write access
132 def test_mapview(self):
133 'test MapView of SQL join'
134 m = MapView(self.sourceDB, self.targetDB,"""\
135 SELECT t2.third_id FROM %s t1, %s t2
136 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
137 """ % (self.joinTable1,self.joinTable2), serverInfo=self.serverInfo)
138 assert m[self.sourceDB[2]] == self.targetDB[7]
139 assert m[self.sourceDB[3]] == self.targetDB[99]
140 assert self.sourceDB[2] in m
141 try:
142 d = m[self.sourceDB[4]]
143 raise AssertionError('failed to trap non-unique mapping')
144 except KeyError:
145 pass
146 try:
147 r = ~m
148 raise AssertionError('failed to trap non-invertible mapping')
149 except ValueError:
150 pass
151 self.sourceDB.cursor.execute("INSERT INTO %s VALUES (5,'seq78')"
152 % self.sourceDB.name)
153 assert len(self.sourceDB) == 4
154 assert len(m) == 2
155 l = m.keys()
156 l.sort()
157 correct = [self.sourceDB[2],self.sourceDB[3]]
158 correct.sort()
159 assert l == correct
160 def test_mapview_inverse(self):
161 'test inverse MapView of SQL join'
162 m = MapView(self.sourceDB, self.targetDB,"""\
163 SELECT t2.third_id FROM %s t1, %s t2
164 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
165 """ % (self.joinTable1,self.joinTable2), serverInfo=self.serverInfo,
166 inverseSQL="""\
167 SELECT t1.my_id FROM %s t1, %s t2
168 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
169 """ % (self.joinTable1,self.joinTable2))
170 r = ~m # get the inverse
171 assert self.sourceDB[2] == r[self.targetDB[7]]
172 assert self.sourceDB[3] == r[self.targetDB[99]]
173 assert self.targetDB[7] in r
175 m = ~r # get the inverse of the inverse!
176 assert m[self.sourceDB[2]] == self.targetDB[7]
177 assert m[self.sourceDB[3]] == self.targetDB[99]
178 assert self.sourceDB[2] in m
179 try:
180 d = m[self.sourceDB[4]]
181 raise AssertionError('failed to trap non-unique mapping')
182 except KeyError:
183 pass
184 def test_graphview(self):
185 'test GraphView of SQL join'
186 m = GraphView(self.sourceDB, self.targetDB,"""\
187 SELECT t2.third_id FROM %s t1, %s t2
188 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
189 """ % (self.joinTable1,self.joinTable2), serverInfo=self.serverInfo)
190 d = m[self.sourceDB[4]]
191 assert len(d) == 2
192 assert self.targetDB[6] in d and self.targetDB[8] in d
193 assert self.sourceDB[2] in m
195 def test_graphview_inverse(self):
196 'test inverse GraphView of SQL join'
197 m = GraphView(self.sourceDB, self.targetDB,"""\
198 SELECT t2.third_id FROM %s t1, %s t2
199 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
200 """ % (self.joinTable1,self.joinTable2), serverInfo=self.serverInfo,
201 inverseSQL="""\
202 SELECT t1.my_id FROM %s t1, %s t2
203 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
204 """ % (self.joinTable1,self.joinTable2))
205 r = ~m # get the inverse
206 assert self.sourceDB[2] in r[self.targetDB[7]]
207 assert self.sourceDB[3] in r[self.targetDB[99]]
208 assert self.targetDB[7] in r
209 d = r[self.targetDB[6]]
210 assert len(d) == 1
211 assert self.sourceDB[4] in d
213 m = ~r # get inverse of the inverse!
214 d = m[self.sourceDB[4]]
215 assert len(d) == 2
216 assert self.targetDB[6] in d and self.targetDB[8] in d
217 assert self.sourceDB[2] in m
219 class SQLiteBase(testutil.SQLite_Mixin):
220 def sqlite_load(self):
221 self.load_data('sqltable_test', writeable=self.writeable)
223 class SQLiteTable_Test(SQLiteBase, SQLTable_Test):
224 pass
226 ## class SQLitePickle_Test(SQLiteTable_Test):
227 ## def setUp(self):
228 ## """Pickle / unpickle our serverInfo before trying to use it """
229 ## SQLiteTable_Test.setUp(self)
230 ## self.serverInfo.close()
231 ## import pickle
232 ## s = pickle.dumps(self.serverInfo)
233 ## del self.serverInfo
234 ## self.serverInfo = pickle.loads(s)
235 ## self.db = self.tableClass(self.tableName, serverInfo=self.serverInfo)
236 ## self.sourceDB = self.tableClass(self.joinTable1,
237 ## serverInfo=self.serverInfo)
238 ## self.targetDB = self.tableClass(self.joinTable2,
239 ## serverInfo=self.serverInfo)
241 class SQLTable_NoCache_Test(SQLTable_Test):
242 tableClass = SQLTableNoCache
244 class SQLiteTable_NoCache_Test(SQLiteTable_Test):
245 tableClass = SQLTableNoCache
247 class SQLTableRW_Test(SQLTable_Setup):
248 'test write operations'
249 writeable = True
250 def test_new(self):
251 'test row creation with auto inc ID'
252 n = len(self.db)
253 o = self.db.new(seq_id='freddy', start=3000, stop=4500)
254 assert len(self.db) == n + 1
255 t = self.tableClass(self.tableName,
256 serverInfo=self.serverInfo) # requery the db
257 result = t[o.id]
258 assert result.seq_id == 'freddy' and result.start==3000 \
259 and result.stop==4500
260 def test_new2(self):
261 'check row creation with specified ID'
262 n = len(self.db)
263 o = self.db.new(id=99, seq_id='jeff', start=3000, stop=4500)
264 assert len(self.db) == n + 1
265 assert o.id == 99
266 t = self.tableClass(self.tableName,
267 serverInfo=self.serverInfo) # requery the db
268 result = t[99]
269 assert result.seq_id == 'jeff' and result.start==3000 \
270 and result.stop==4500
271 def test_attr(self):
272 'test changing an attr value'
273 o = self.db[2]
274 assert o.seq_id == 'seq2'
275 o.seq_id = 'newval' # overwrite this attribute
276 assert o.seq_id == 'newval' # check cached value
277 t = self.tableClass(self.tableName,
278 serverInfo=self.serverInfo) # requery the db
279 result = t[2]
280 assert result.seq_id == 'newval'
281 def test_delitem(self):
282 'test deletion of a row'
283 n = len(self.db)
284 del self.db[1]
285 assert len(self.db) == n - 1
286 try:
287 result = self.db[1]
288 raise AssertionError('old ID still exists!')
289 except KeyError:
290 pass
291 def test_setitem(self):
292 'test assigning new ID to existing object'
293 o = self.db.new(id=17, seq_id='bob', start=2000, stop=2500)
294 self.db[13] = o
295 assert o.id == 13
296 try:
297 result = self.db[17]
298 raise AssertionError('old ID still exists!')
299 except KeyError:
300 pass
301 t = self.tableClass(self.tableName,
302 serverInfo=self.serverInfo) # requery the db
303 result = t[13]
304 assert result.seq_id == 'bob' and result.start==2000 \
305 and result.stop==2500
306 try:
307 result = t[17]
308 raise AssertionError('old ID still exists!')
309 except KeyError:
310 pass
313 class SQLiteTableRW_Test(SQLiteBase, SQLTableRW_Test):
314 pass
316 class SQLTableRW_NoCache_Test(SQLTableRW_Test):
317 tableClass = SQLTableNoCache
319 class SQLiteTableRW_NoCache_Test(SQLiteTableRW_Test):
320 tableClass = SQLTableNoCache
322 class Ensembl_Test(unittest.TestCase):
324 def setUp(self):
325 # test will be skipped if mysql module or ensembldb server unavailable
327 logger.debug('accessing ensembldb.ensembl.org')
328 conn = DBServerInfo(host='ensembldb.ensembl.org', user='anonymous',
329 passwd='')
330 try:
331 translationDB = SQLTable('homo_sapiens_core_47_36i.translation',
332 serverInfo=conn)
333 exonDB = SQLTable('homo_sapiens_core_47_36i.exon', serverInfo=conn)
334 except ImportError,e:
335 raise SkipTest(e)
337 sql_statement = '''SELECT t3.exon_id FROM
338 homo_sapiens_core_47_36i.translation AS tr,
339 homo_sapiens_core_47_36i.exon_transcript AS t1,
340 homo_sapiens_core_47_36i.exon_transcript AS t2,
341 homo_sapiens_core_47_36i.exon_transcript AS t3 WHERE tr.translation_id = %s
342 AND tr.transcript_id = t1.transcript_id AND t1.transcript_id =
343 t2.transcript_id AND t2.transcript_id = t3.transcript_id AND t1.exon_id =
344 tr.start_exon_id AND t2.exon_id = tr.end_exon_id AND t3.rank >= t1.rank AND
345 t3.rank <= t2.rank ORDER BY t3.rank
347 self.translationExons = GraphView(translationDB, exonDB,
348 sql_statement, serverInfo=conn)
349 self.translation = translationDB[15121]
351 def test_orderBy(self):
352 "Ensemble access, test order by"
353 'test issue 53: ensure that the ORDER BY results are correct'
354 exons = self.translationExons[self.translation] # do the query
355 result = [e.id for e in exons]
356 correct = [95160,95020,95035,95050,95059,95069,95081,95088,95101,
357 95110,95172]
358 self.assertEqual(result, correct) # make sure the exact order matches
361 if __name__ == '__main__':
362 PygrTestProgram(verbosity=2)