2 from testlib
import testutil
, PygrTestProgram
, SkipTest
3 from pygr
.sqlgraph
import SQLTable
, SQLTableNoCache
,\
4 MapView
, GraphView
, DBServerInfo
, import_sqlite
5 from pygr
import logger
7 class SQLTableCatcher(SQLTable
):
8 def generic_iterator(self
, *args
, **kwargs
):
10 assert not self
.catchIter
, 'this should not iterate!'
11 except AttributeError:
13 return SQLTable
.generic_iterator(self
, *args
, **kwargs
)
15 class SQLTable_Setup(unittest
.TestCase
):
16 tableClass
= SQLTableCatcher
17 def __init__(self
, *args
, **kwargs
):
18 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
19 self
.serverInfo
= DBServerInfo() # share conn for all tests
22 self
.load_data(writeable
=self
.writeable
)
24 raise SkipTest('missing MySQLdb module?')
25 def load_data(self
, tableName
='test.sqltable_test', writeable
=False):
26 'create 3 tables and load 9 rows for our tests'
27 self
.tableName
= tableName
28 self
.joinTable1
= joinTable1
= tableName
+ '1'
29 self
.joinTable2
= joinTable2
= tableName
+ '2'
31 CREATE TABLE %s (primary_id INTEGER PRIMARY KEY %%(AUTO_INCREMENT)s, seq_id TEXT, start INTEGER, stop INTEGER)
33 self
.db
= self
.tableClass(tableName
, dropIfExists
=True,
34 serverInfo
=self
.serverInfo
,
35 createTable
=createTable
,
37 self
.sourceDB
= self
.tableClass(joinTable1
, serverInfo
=self
.serverInfo
,
38 dropIfExists
=True, createTable
="""\
39 CREATE TABLE %s (my_id INTEGER PRIMARY KEY,
42 self
.targetDB
= self
.tableClass(joinTable2
, serverInfo
=self
.serverInfo
,
43 dropIfExists
=True, createTable
="""\
44 CREATE TABLE %s (third_id INTEGER PRIMARY KEY,
48 INSERT INTO %s (seq_id, start, stop) VALUES ('seq1', 0, 10)
49 INSERT INTO %s (seq_id, start, stop) VALUES ('seq2', 5, 15)
50 INSERT INTO %s VALUES (2,'seq2')
51 INSERT INTO %s VALUES (3,'seq3')
52 INSERT INTO %s VALUES (4,'seq4')
53 INSERT INTO %s VALUES (7, 'seq2')
54 INSERT INTO %s VALUES (99, 'seq3')
55 INSERT INTO %s VALUES (6, 'seq4')
56 INSERT INTO %s VALUES (8, 'seq4')
57 """ % tuple(([tableName
]*2) + ([joinTable1
]*3) + ([joinTable2
]*4))
58 for line
in sql
.strip().splitlines(): # insert our test data
59 self
.db
.cursor
.execute(line
.strip())
61 self
.db
.cursor
.execute('drop table if exists %s' % self
.tableName
)
62 self
.db
.cursor
.execute('drop table if exists %s' % self
.joinTable1
)
63 self
.db
.cursor
.execute('drop table if exists %s' % self
.joinTable2
)
64 self
.serverInfo
.close()
66 class SQLTable_Test(SQLTable_Setup
):
67 writeable
= False # read-only database interface
73 assert len(self
.db
) == len(self
.db
.keys())
74 def test_contains(self
):
77 assert 'foo' not in self
.db
78 def test_has_key(self
):
79 assert self
.db
.has_key(1)
80 assert self
.db
.has_key(2)
81 assert not self
.db
.has_key('foo')
83 assert self
.db
.get('foo') is None
84 assert self
.db
.get(1) == self
.db
[1]
85 assert self
.db
.get(2) == self
.db
[2]
87 i
= [ k
for (k
,v
) in self
.db
.items() ]
90 def test_iterkeys(self
):
93 ik
= list(self
.db
.iterkeys())
96 def test_itervalues(self
):
99 iv
= list(self
.db
.itervalues())
102 def test_itervalues_long(self
):
103 """test iterator isolation from queries run inside iterator loop """
104 sql
= 'insert into %s (start) values (1)' % self
.tableName
105 for i
in range(40000): # insert 40000 rows
106 self
.db
.cursor
.execute(sql
)
108 print 'begin itervalues()'
109 for o
in self
.db
.itervalues():
110 status
= 99 in self
.db
# make it do a query inside iterator loop
112 print 'begin values()'
113 kv
= [o
.id for o
in self
.db
.values()]
114 assert len(kv
) == len(iv
)
117 def test_iteritems(self
):
120 ii
= list(self
.db
.iteritems())
123 def test_readonly(self
):
124 'test error handling of write attempts to read-only DB'
126 self
.db
.new(seq_id
='freddy', start
=3000, stop
=4500)
127 raise AssertionError('failed to trap attempt to write to db')
133 raise AssertionError('failed to trap attempt to write to db')
138 raise AssertionError('failed to trap attempt to write to db')
142 ### @CTB need to test write access
143 def test_mapview(self
):
144 'test MapView of SQL join'
145 m
= MapView(self
.sourceDB
, self
.targetDB
,"""\
146 SELECT t2.third_id FROM %s t1, %s t2
147 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
148 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
)
149 assert m
[self
.sourceDB
[2]] == self
.targetDB
[7]
150 assert m
[self
.sourceDB
[3]] == self
.targetDB
[99]
151 assert self
.sourceDB
[2] in m
153 d
= m
[self
.sourceDB
[4]]
154 raise AssertionError('failed to trap non-unique mapping')
159 raise AssertionError('failed to trap non-invertible mapping')
162 self
.sourceDB
.cursor
.execute("INSERT INTO %s VALUES (5,'seq78')"
163 % self
.sourceDB
.name
)
164 assert len(self
.sourceDB
) == 4
168 correct
= [self
.sourceDB
[2],self
.sourceDB
[3]]
171 def test_mapview_inverse(self
):
172 'test inverse MapView of SQL join'
173 self
.sourceDB
.catchIter
= self
.targetDB
.catchIter
= True
174 m
= MapView(self
.sourceDB
, self
.targetDB
,"""\
175 SELECT t2.third_id FROM %s t1, %s t2
176 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
177 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
,
179 SELECT t1.my_id FROM %s t1, %s t2
180 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
181 """ % (self
.joinTable1
,self
.joinTable2
))
182 r
= ~m
# get the inverse
183 assert self
.sourceDB
[2] == r
[self
.targetDB
[7]]
184 assert self
.sourceDB
[3] == r
[self
.targetDB
[99]]
185 assert self
.targetDB
[7] in r
187 m
= ~r
# get the inverse of the inverse!
188 assert m
[self
.sourceDB
[2]] == self
.targetDB
[7]
189 assert m
[self
.sourceDB
[3]] == self
.targetDB
[99]
190 assert self
.sourceDB
[2] in m
192 d
= m
[self
.sourceDB
[4]]
193 raise AssertionError('failed to trap non-unique mapping')
196 def test_graphview(self
):
197 'test GraphView of SQL join'
198 m
= GraphView(self
.sourceDB
, self
.targetDB
,"""\
199 SELECT t2.third_id FROM %s t1, %s t2
200 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
201 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
)
202 d
= m
[self
.sourceDB
[4]]
204 assert self
.targetDB
[6] in d
and self
.targetDB
[8] in d
205 assert self
.sourceDB
[2] in m
207 self
.sourceDB
.cursor
.execute("INSERT INTO %s VALUES (5,'seq78')"
208 % self
.sourceDB
.name
)
209 assert len(self
.sourceDB
) == 4
213 correct
= [self
.sourceDB
[2],self
.sourceDB
[3],self
.sourceDB
[4]]
217 def test_graphview_inverse(self
):
218 'test inverse GraphView of SQL join'
219 m
= GraphView(self
.sourceDB
, self
.targetDB
,"""\
220 SELECT t2.third_id FROM %s t1, %s t2
221 WHERE t1.my_id=%%s and t1.other_id=t2.other_id
222 """ % (self
.joinTable1
,self
.joinTable2
), serverInfo
=self
.serverInfo
,
224 SELECT t1.my_id FROM %s t1, %s t2
225 WHERE t2.third_id=%%s and t1.other_id=t2.other_id
226 """ % (self
.joinTable1
,self
.joinTable2
))
227 r
= ~m
# get the inverse
228 assert self
.sourceDB
[2] in r
[self
.targetDB
[7]]
229 assert self
.sourceDB
[3] in r
[self
.targetDB
[99]]
230 assert self
.targetDB
[7] in r
231 d
= r
[self
.targetDB
[6]]
233 assert self
.sourceDB
[4] in d
235 m
= ~r
# get inverse of the inverse!
236 d
= m
[self
.sourceDB
[4]]
238 assert self
.targetDB
[6] in d
and self
.targetDB
[8] in d
239 assert self
.sourceDB
[2] in m
241 class SQLiteBase(testutil
.SQLite_Mixin
):
242 def sqlite_load(self
):
243 self
.load_data('sqltable_test', writeable
=self
.writeable
)
245 class SQLiteTable_Test(SQLiteBase
, SQLTable_Test
):
248 ## class SQLitePickle_Test(SQLiteTable_Test):
250 ## """Pickle / unpickle our serverInfo before trying to use it """
251 ## SQLiteTable_Test.setUp(self)
252 ## self.serverInfo.close()
254 ## s = pickle.dumps(self.serverInfo)
255 ## del self.serverInfo
256 ## self.serverInfo = pickle.loads(s)
257 ## self.db = self.tableClass(self.tableName, serverInfo=self.serverInfo)
258 ## self.sourceDB = self.tableClass(self.joinTable1,
259 ## serverInfo=self.serverInfo)
260 ## self.targetDB = self.tableClass(self.joinTable2,
261 ## serverInfo=self.serverInfo)
263 class SQLTable_NoCache_Test(SQLTable_Test
):
264 tableClass
= SQLTableNoCache
266 class SQLiteTable_NoCache_Test(SQLiteTable_Test
):
267 tableClass
= SQLTableNoCache
269 class SQLTableRW_Test(SQLTable_Setup
):
270 'test write operations'
273 'test row creation with auto inc ID'
275 o
= self
.db
.new(seq_id
='freddy', start
=3000, stop
=4500)
276 assert len(self
.db
) == n
+ 1
277 t
= self
.tableClass(self
.tableName
,
278 serverInfo
=self
.serverInfo
) # requery the db
280 assert result
.seq_id
== 'freddy' and result
.start
==3000 \
281 and result
.stop
==4500
283 'check row creation with specified ID'
285 o
= self
.db
.new(id=99, seq_id
='jeff', start
=3000, stop
=4500)
286 assert len(self
.db
) == n
+ 1
288 t
= self
.tableClass(self
.tableName
,
289 serverInfo
=self
.serverInfo
) # requery the db
291 assert result
.seq_id
== 'jeff' and result
.start
==3000 \
292 and result
.stop
==4500
294 'test changing an attr value'
296 assert o
.seq_id
== 'seq2'
297 o
.seq_id
= 'newval' # overwrite this attribute
298 assert o
.seq_id
== 'newval' # check cached value
299 t
= self
.tableClass(self
.tableName
,
300 serverInfo
=self
.serverInfo
) # requery the db
302 assert result
.seq_id
== 'newval'
303 def test_delitem(self
):
304 'test deletion of a row'
307 assert len(self
.db
) == n
- 1
310 raise AssertionError('old ID still exists!')
313 def test_setitem(self
):
314 'test assigning new ID to existing object'
315 o
= self
.db
.new(id=17, seq_id
='bob', start
=2000, stop
=2500)
320 raise AssertionError('old ID still exists!')
323 t
= self
.tableClass(self
.tableName
,
324 serverInfo
=self
.serverInfo
) # requery the db
326 assert result
.seq_id
== 'bob' and result
.start
==2000 \
327 and result
.stop
==2500
330 raise AssertionError('old ID still exists!')
335 class SQLiteTableRW_Test(SQLiteBase
, SQLTableRW_Test
):
338 class SQLTableRW_NoCache_Test(SQLTableRW_Test
):
339 tableClass
= SQLTableNoCache
341 class SQLiteTableRW_NoCache_Test(SQLiteTableRW_Test
):
342 tableClass
= SQLTableNoCache
344 class Ensembl_Test(unittest
.TestCase
):
347 # test will be skipped if mysql module or ensembldb server unavailable
349 logger
.debug('accessing ensembldb.ensembl.org')
350 conn
= DBServerInfo(host
='ensembldb.ensembl.org', user
='anonymous',
353 translationDB
= SQLTableCatcher('homo_sapiens_core_47_36i.translation',
355 translationDB
.catchIter
= True # should not iter in this test!
356 exonDB
= SQLTable('homo_sapiens_core_47_36i.exon', serverInfo
=conn
)
357 except ImportError,e
:
360 sql_statement
= '''SELECT t3.exon_id FROM
361 homo_sapiens_core_47_36i.translation AS tr,
362 homo_sapiens_core_47_36i.exon_transcript AS t1,
363 homo_sapiens_core_47_36i.exon_transcript AS t2,
364 homo_sapiens_core_47_36i.exon_transcript AS t3 WHERE tr.translation_id = %s
365 AND tr.transcript_id = t1.transcript_id AND t1.transcript_id =
366 t2.transcript_id AND t2.transcript_id = t3.transcript_id AND t1.exon_id =
367 tr.start_exon_id AND t2.exon_id = tr.end_exon_id AND t3.rank >= t1.rank AND
368 t3.rank <= t2.rank ORDER BY t3.rank
370 print 'creating GraphView...'
371 self
.translationExons
= GraphView(translationDB
, exonDB
,
372 sql_statement
, serverInfo
=conn
)
373 print 'getting translation...'
374 self
.translation
= translationDB
[15121]
376 def test_orderBy(self
):
377 "Ensemble access, test order by"
378 'test issue 53: ensure that the ORDER BY results are correct'
380 exons
= self
.translationExons
[self
.translation
] # do the query
382 result
= [e
.id for e
in exons
]
383 correct
= [95160,95020,95035,95050,95059,95069,95081,95088,95101,
385 self
.assertEqual(result
, correct
) # make sure the exact order matches
388 if __name__
== '__main__':
389 PygrTestProgram(verbosity
=2)