2 from testlib
import testutil
, PygrTestProgram
, SkipTest
3 from pygr
.sqlgraph
import SQLTable
, SQLTableNoCache
,\
4 MapView
, GraphView
, DBServerInfo
, import_sqlite
5 from pygr
import logger
7 class SQLTable_Setup(unittest
.TestCase
):
9 def __init__(self
, *args
, **kwargs
):
10 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
11 self
.serverInfo
= DBServerInfo() # share conn for all tests
14 self
.load_data(writeable
=self
.writeable
)
16 raise SkipTest('missing MySQLdb module?')
17 def load_data(self
, tableName
='test.sqltable_test', writeable
=False):
18 'create 3 tables and load 9 rows for our tests'
19 self
.tableName
= tableName
20 self
.joinTable1
= joinTable1
= tableName
+ '1'
21 self
.joinTable2
= joinTable2
= tableName
+ '2'
23 CREATE TABLE %s (primary_id INTEGER PRIMARY KEY %%(AUTO_INCREMENT)s, seq_id TEXT, start INTEGER, stop INTEGER)
25 self
.db
= self
.tableClass(tableName
, dropIfExists
=True,
26 serverInfo
=self
.serverInfo
,
27 createTable
=createTable
,
29 self
.sourceDB
= self
.tableClass(joinTable1
, serverInfo
=self
.serverInfo
,
30 dropIfExists
=True, createTable
="""\
31 CREATE TABLE %s (my_id INTEGER PRIMARY KEY,
34 self
.targetDB
= self
.tableClass(joinTable2
, serverInfo
=self
.serverInfo
,
35 dropIfExists
=True, createTable
="""\
36 CREATE TABLE %s (third_id INTEGER PRIMARY KEY,
40 INSERT INTO %s (seq_id, start, stop) VALUES ('seq1', 0, 10)
41 INSERT INTO %s (seq_id, start, stop) VALUES ('seq2', 5, 15)
42 INSERT INTO %s VALUES (2,'seq2')
43 INSERT INTO %s VALUES (3,'seq3')
44 INSERT INTO %s VALUES (4,'seq4')
45 INSERT INTO %s VALUES (7, 'seq2')
46 INSERT INTO %s VALUES (99, 'seq3')
47 INSERT INTO %s VALUES (6, 'seq4')
48 INSERT INTO %s VALUES (8, 'seq4')
49 """ % tuple(([tableName
]*2) + ([joinTable1
]*3) + ([joinTable2
]*4))
50 for line
in sql
.strip().splitlines(): # insert our test data
51 self
.db
.cursor
.execute(line
.strip())
53 self
.db
.cursor
.execute('drop table if exists %s' % self
.tableName
)
54 self
.db
.cursor
.execute('drop table if exists %s' % self
.joinTable1
)
55 self
.db
.cursor
.execute('drop table if exists %s' % self
.joinTable2
)
56 self
.serverInfo
.close()
58 class SQLTable_Test(SQLTable_Setup
):
59 writeable
= False # read-only database interface
65 assert len(self
.db
) == len(self
.db
.keys())
66 def test_contains(self
):
69 assert 'foo' not in self
.db
70 def test_has_key(self
):
71 assert self
.db
.has_key(1)
72 assert self
.db
.has_key(2)
73 assert not self
.db
.has_key('foo')
75 assert self
.db
.get('foo') is None
76 assert self
.db
.get(1) == self
.db
[1]
77 assert self
.db
.get(2) == self
.db
[2]
79 i
= [ k
for (k
,v
) in self
.db
.items() ]
82 def test_iterkeys(self
):
85 ik
= list(self
.db
.iterkeys())
88 def test_itervalues(self
):
91 iv
= list(self
.db
.itervalues())
94 def test_itervalues_long(self
):
95 """test iterator isolation from queries run inside iterator loop """
96 sql
= 'insert into %s (start) values (1)' % self
.tableName
97 for i
in range(40000): # insert 40000 rows
98 self
.db
.cursor
.execute(sql
)
100 for o
in self
.db
.itervalues():
101 status
= 99 in self
.db
# make it do a query inside iterator loop
103 kv
= [o
.id for o
in self
.db
.values()]
104 assert len(kv
) == len(iv
)
106 def test_iteritems(self
):
109 ii
= list(self
.db
.iteritems())
112 def test_readonly(self
):
113 'test error handling of write attempts to read-only DB'
115 self
.db
.new(seq_id
='freddy', start
=3000, stop
=4500)
116 raise AssertionError('failed to trap attempt to write to db')
122 raise AssertionError('failed to trap attempt to write to db')
127 raise AssertionError('failed to trap attempt to write to db')
131 ### @CTB need to test write access
132 def test_mapview(self
):
133 'test MapView of SQL join'
134 m
= MapView(self
.sourceDB
, self
.targetDB
,"""\
135 SELECT t2.third_id FROM %s t1, %s t2
136 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
137 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
)
138 assert m
[self
.sourceDB
[2]] == self
.targetDB
[7]
139 assert m
[self
.sourceDB
[3]] == self
.targetDB
[99]
140 assert self
.sourceDB
[2] in m
142 d
= m
[self
.sourceDB
[4]]
143 raise AssertionError('failed to trap non-unique mapping')
148 raise AssertionError('failed to trap non-invertible mapping')
151 self
.sourceDB
.cursor
.execute("INSERT INTO %s VALUES (5,'seq78')"
152 % self
.sourceDB
.name
)
153 assert len(self
.sourceDB
) == 4
157 correct
= [self
.sourceDB
[2],self
.sourceDB
[3]]
160 def test_mapview_inverse(self
):
161 'test inverse MapView of SQL join'
162 m
= MapView(self
.sourceDB
, self
.targetDB
,"""\
163 SELECT t2.third_id FROM %s t1, %s t2
164 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
165 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
,
167 SELECT t1.my_id FROM %s t1, %s t2
168 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
169 """ % (self
.joinTable1
,self
.joinTable2
))
170 r
= ~m
# get the inverse
171 assert self
.sourceDB
[2] == r
[self
.targetDB
[7]]
172 assert self
.sourceDB
[3] == r
[self
.targetDB
[99]]
173 assert self
.targetDB
[7] in r
175 m
= ~r
# get the inverse of the inverse!
176 assert m
[self
.sourceDB
[2]] == self
.targetDB
[7]
177 assert m
[self
.sourceDB
[3]] == self
.targetDB
[99]
178 assert self
.sourceDB
[2] in m
180 d
= m
[self
.sourceDB
[4]]
181 raise AssertionError('failed to trap non-unique mapping')
184 def test_graphview(self
):
185 'test GraphView of SQL join'
186 m
= GraphView(self
.sourceDB
, self
.targetDB
,"""\
187 SELECT t2.third_id FROM %s t1, %s t2
188 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
189 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
)
190 d
= m
[self
.sourceDB
[4]]
192 assert self
.targetDB
[6] in d
and self
.targetDB
[8] in d
193 assert self
.sourceDB
[2] in m
195 def test_graphview_inverse(self
):
196 'test inverse GraphView of SQL join'
197 m
= GraphView(self
.sourceDB
, self
.targetDB
,"""\
198 SELECT t2.third_id FROM %s t1, %s t2
199 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
200 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
,
202 SELECT t1.my_id FROM %s t1, %s t2
203 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
204 """ % (self
.joinTable1
,self
.joinTable2
))
205 r
= ~m
# get the inverse
206 assert self
.sourceDB
[2] in r
[self
.targetDB
[7]]
207 assert self
.sourceDB
[3] in r
[self
.targetDB
[99]]
208 assert self
.targetDB
[7] in r
209 d
= r
[self
.targetDB
[6]]
211 assert self
.sourceDB
[4] in d
213 m
= ~r
# get inverse of the inverse!
214 d
= m
[self
.sourceDB
[4]]
216 assert self
.targetDB
[6] in d
and self
.targetDB
[8] in d
217 assert self
.sourceDB
[2] in m
219 class SQLiteBase(testutil
.SQLite_Mixin
):
220 def sqlite_load(self
):
221 self
.load_data('sqltable_test', writeable
=self
.writeable
)
223 class SQLiteTable_Test(SQLiteBase
, SQLTable_Test
):
226 ## class SQLitePickle_Test(SQLiteTable_Test):
228 ## """Pickle / unpickle our serverInfo before trying to use it """
229 ## SQLiteTable_Test.setUp(self)
230 ## self.serverInfo.close()
232 ## s = pickle.dumps(self.serverInfo)
233 ## del self.serverInfo
234 ## self.serverInfo = pickle.loads(s)
235 ## self.db = self.tableClass(self.tableName, serverInfo=self.serverInfo)
236 ## self.sourceDB = self.tableClass(self.joinTable1,
237 ## serverInfo=self.serverInfo)
238 ## self.targetDB = self.tableClass(self.joinTable2,
239 ## serverInfo=self.serverInfo)
241 class SQLTable_NoCache_Test(SQLTable_Test
):
242 tableClass
= SQLTableNoCache
244 class SQLiteTable_NoCache_Test(SQLiteTable_Test
):
245 tableClass
= SQLTableNoCache
247 class SQLTableRW_Test(SQLTable_Setup
):
248 'test write operations'
251 'test row creation with auto inc ID'
253 o
= self
.db
.new(seq_id
='freddy', start
=3000, stop
=4500)
254 assert len(self
.db
) == n
+ 1
255 t
= self
.tableClass(self
.tableName
,
256 serverInfo
=self
.serverInfo
) # requery the db
258 assert result
.seq_id
== 'freddy' and result
.start
==3000 \
259 and result
.stop
==4500
261 'check row creation with specified ID'
263 o
= self
.db
.new(id=99, seq_id
='jeff', start
=3000, stop
=4500)
264 assert len(self
.db
) == n
+ 1
266 t
= self
.tableClass(self
.tableName
,
267 serverInfo
=self
.serverInfo
) # requery the db
269 assert result
.seq_id
== 'jeff' and result
.start
==3000 \
270 and result
.stop
==4500
272 'test changing an attr value'
274 assert o
.seq_id
== 'seq2'
275 o
.seq_id
= 'newval' # overwrite this attribute
276 assert o
.seq_id
== 'newval' # check cached value
277 t
= self
.tableClass(self
.tableName
,
278 serverInfo
=self
.serverInfo
) # requery the db
280 assert result
.seq_id
== 'newval'
281 def test_delitem(self
):
282 'test deletion of a row'
285 assert len(self
.db
) == n
- 1
288 raise AssertionError('old ID still exists!')
291 def test_setitem(self
):
292 'test assigning new ID to existing object'
293 o
= self
.db
.new(id=17, seq_id
='bob', start
=2000, stop
=2500)
298 raise AssertionError('old ID still exists!')
301 t
= self
.tableClass(self
.tableName
,
302 serverInfo
=self
.serverInfo
) # requery the db
304 assert result
.seq_id
== 'bob' and result
.start
==2000 \
305 and result
.stop
==2500
308 raise AssertionError('old ID still exists!')
313 class SQLiteTableRW_Test(SQLiteBase
, SQLTableRW_Test
):
316 class SQLTableRW_NoCache_Test(SQLTableRW_Test
):
317 tableClass
= SQLTableNoCache
319 class SQLiteTableRW_NoCache_Test(SQLiteTableRW_Test
):
320 tableClass
= SQLTableNoCache
322 class Ensembl_Test(unittest
.TestCase
):
325 # test will be skipped if mysql module or ensembldb server unavailable
327 logger
.debug('accessing ensembldb.ensembl.org')
328 conn
= DBServerInfo(host
='ensembldb.ensembl.org', user
='anonymous',
331 translationDB
= SQLTable('homo_sapiens_core_47_36i.translation',
333 exonDB
= SQLTable('homo_sapiens_core_47_36i.exon', serverInfo
=conn
)
334 except ImportError,e
:
337 sql_statement
= '''SELECT t3.exon_id FROM
338 homo_sapiens_core_47_36i.translation AS tr,
339 homo_sapiens_core_47_36i.exon_transcript AS t1,
340 homo_sapiens_core_47_36i.exon_transcript AS t2,
341 homo_sapiens_core_47_36i.exon_transcript AS t3 WHERE tr.translation_id = %s
342 AND tr.transcript_id = t1.transcript_id AND t1.transcript_id =
343 t2.transcript_id AND t2.transcript_id = t3.transcript_id AND t1.exon_id =
344 tr.start_exon_id AND t2.exon_id = tr.end_exon_id AND t3.rank >= t1.rank AND
345 t3.rank <= t2.rank ORDER BY t3.rank
347 self
.translationExons
= GraphView(translationDB
, exonDB
,
348 sql_statement
, serverInfo
=conn
)
349 self
.translation
= translationDB
[15121]
351 def test_orderBy(self
):
352 "Ensemble access, test order by"
353 'test issue 53: ensure that the ORDER BY results are correct'
354 exons
= self
.translationExons
[self
.translation
] # do the query
355 result
= [e
.id for e
in exons
]
356 correct
= [95160,95020,95035,95050,95059,95069,95081,95088,95101,
358 self
.assertEqual(result
, correct
) # make sure the exact order matches
361 if __name__
== '__main__':
362 PygrTestProgram(verbosity
=2)