2 Test some of the basics underpinning the graph system.
7 from testlib
import testutil
, PygrTestProgram
, SkipTest
8 from pygr
import mapping
, graphquery
, sqlgraph
11 def __init__(self
, id):
15 class Query_Test(unittest
.TestCase
):
18 def get_node(self
, k
):
21 except AttributeError:
29 def node_graph(self
, g
):
32 except AttributeError:
37 d
= out
.setdefault(k
, {})
38 for dest
,edge
in e
.items():
39 d
[self
.get_node(dest
)] = edge
42 def node_list(self
, l
):
45 except AttributeError:
49 out
.append(self
.get_node(k
))
52 def node_result(self
, r
):
55 except AttributeError:
61 d2
[k
] = self
.get_node(v
)
65 def update_graph(self
, datagraph
):
68 except AttributeError:
74 def dqcmp(self
, datagraph
, querygraph
, result
):
75 datagraph
= self
.update_graph(self
.node_graph(datagraph
))
76 l
= [ d
.copy() for d
in graphquery
.GraphQuery(datagraph
, querygraph
) ]
77 assert len(l
) == len(result
), 'length mismatch'
79 result
= self
.node_result(result
)
81 for i
in range(len(l
)):
82 assert l
[i
] == result
[i
], 'incorrect result'
84 def test_basicquery_test(self
):
86 datagraph
= {0: {1: None, 2: None, 3: None},
87 1: {2: None}, 3: {4: None, 5: None},
88 4: {6: None}, 5: {6: None}, 2: {}, 6: {}}
89 querygraph
= {0: {1: None, 2: None, 3: None},
90 3:{4: None},1:{},2:{},4:{}}
91 result
= [{0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
92 {0: 0, 1: 1, 2: 2, 3: 3, 4: 5},
93 {0: 0, 1: 2, 2: 1, 3: 3, 4: 4},
94 {0: 0, 1: 2, 2: 1, 3: 3, 4: 5}]
96 self
.dqcmp(datagraph
, querygraph
, result
)
99 'test basic iteration'
100 g
= {0: {1: None, 2: None, 3: None},
101 1: {2: None}, 3: {4: None, 5: None},
102 4: {6: None}, 5: {6: None}, 2: {}, 6: {}}
103 datagraph
= self
.update_graph(self
.node_graph(g
))
104 l
= list(iter(datagraph
))
106 result
= self
.node_list([0,1,2,3,4,5,6])
110 def test_cyclicquery(self
):
111 "Cyclic QG against cyclic DG @CTB comment?"
112 datagraph
= { 1 :{2:None}, 2:{3:None}, 3:{4:None}, 4:{5:None},
114 querygraph
= {0:{1:None}, 1:{2:None}, 2:{4:None}, 3:{1:None},
116 result
= [ {0: 1, 1: 2, 2: 3, 3: 5, 4: 4} ]
117 self
.dqcmp(datagraph
, querygraph
, result
)
119 def test_cyclicacyclicquery(self
):
120 "Cyclic QG against acyclic DG"
121 datagraph
= {0:{1:None}, 1:{3:None}, 5:{3:None}, 4:{5:None},
122 2:{4:None,1:None}, 3:{}}
123 querygraph
= {0:{1:None}, 1:{3:None}, 3:{5:None}, 5:{4:None},
124 4:{2:None}, 2:{1:None}}
126 self
.dqcmp(datagraph
,querygraph
,result
)
128 def test_symmetricquery_test(self
):
129 "Symmetrical QG against symmetrical DG"
130 datagraph
= {1:{2:None},2:{3:None,4:None},5:{2:None},3:{},4:{}}
131 querygraph
= {0:{1:None},1:{2:None},2:{}}
132 result
= [{0: 1, 1: 2, 2: 3}, {0: 1, 1: 2, 2: 4},
133 {0: 5, 1: 2, 2: 3}, {0: 5, 1: 2, 2: 4}]
134 self
.dqcmp(datagraph
,querygraph
,result
)
136 def test_filteredquery(self
):
137 "Test a filter against a query"
138 datagraph
= {0: {1: None, 2: None, 3: None}, 1: {2: None, 3: None},
140 querygraph
= {0:{1:{'filter':lambda toNode
,**kw
:toNode
== self
.get_node(3)}},1:{}}
141 result
= [{0: 0, 1: 3},{0: 1, 1: 3}]
142 self
.dqcmp(datagraph
,querygraph
,result
)
144 def test_headlessquery(self
):
145 "Test a query with no head nodes"
146 datagraph
= {0:{1:None},1:{2:None},2:{3:None},3:{4:None},4:{1:None}}
147 querygraph
= {0:{1:None},1:{2:None},2:{3:None},3:{0:None}}
148 result
= [{0: 1, 1: 2, 2: 3, 3: 4},
149 {0: 2, 1: 3, 2: 4, 3: 1},
150 {0: 3, 1: 4, 2: 1, 3: 2},
151 {0: 4, 1: 1, 2: 2, 3: 3}]
152 self
.dqcmp(datagraph
,querygraph
,result
)
154 class Mapping_Test(Query_Test
):
158 self
.datagraph
= mapping
.dictGraph()
160 def test_graphdict(self
):
162 datagraph
= self
.datagraph
163 datagraph
+= self
.get_node(1)
164 datagraph
[self
.get_node(1)] += self
.get_node(2)
165 results
= {1: {2: None}, 2: {}}
166 results
= self
.node_graph(results
)
167 assert datagraph
== results
, 'incorrect result'
169 def test_nodedel(self
):
171 datagraph
= self
.datagraph
172 datagraph
+= self
.get_node(1)
173 datagraph
+= self
.get_node(2)
174 datagraph
[self
.get_node(2)] += self
.get_node(3)
175 datagraph
-= self
.get_node(1)
176 results
= {2: {3: None}, 3: {}}
177 results
= self
.node_graph(results
)
178 assert datagraph
== results
, 'incorrect result'
180 def test_delraise(self
):
182 datagraph
= self
.datagraph
183 datagraph
+= self
.get_node(1)
184 datagraph
+= self
.get_node(2)
185 datagraph
[self
.get_node(2)] += self
.get_node(3)
188 datagraph
-= self
.get_node(3)
189 raise ValueError('failed to catch bad node deletion attempt')
191 pass # THIS IS THE CORRECT RESULT
193 def test_setitemraise(self
):
195 datagraph
= self
.datagraph
196 datagraph
+= self
.get_node(1)
198 datagraph
[self
.get_node(1)] = self
.get_node(2)
199 raise KeyError('failed to catch bad setitem attempt')
201 pass # THIS IS THE CORRECT RESULT
203 def test_graphedges(self
):
205 datagraph
= self
.datagraph
206 graphvals
= {1:{2:None},2:{3:None,4:None},5:{2:None},3:{},4:{}}
207 graphvals
= self
.node_graph(graphvals
)
208 edge_list
= [[self
.get_node(1), self
.get_node(2),None],
209 [self
.get_node(2), self
.get_node(3),None],
210 [self
.get_node(2), self
.get_node(4),None],
211 [self
.get_node(5), self
.get_node(2),None]]
214 for n
in graphvals
[i
].keys():
217 for e
in datagraph
.edges():
218 edge_results
.append(e
)
220 edge_results
= [list(t
) for t
in edge_results
]
222 #print 'edge_results:',edge_results
223 assert edge_results
== edge_list
, 'incorrect result'
225 class Graph_Test(Mapping_Test
):
226 "Run same tests on mapping.Graph class"
229 self
.datagraph
= mapping
.Graph()
231 class Graph_DB_Test(Mapping_Test
):
232 "test mapping.Graph with sourceDB, targetDB but no edgeDB"
235 self
.nodeDB
= {1:Node(1), 2:Node(2)}
236 self
.datagraph
= mapping
.Graph(sourceDB
=self
.nodeDB
,
237 targetDB
=self
.nodeDB
)
238 def test_no_edge_db(self
):
239 'test behavior with no edgeDB'
240 self
.datagraph
+= self
.nodeDB
[1] # add node
241 self
.datagraph
[self
.nodeDB
[1]][self
.nodeDB
[2]] = 3 # add edge
243 assert self
.datagraph
[self
.nodeDB
[1]][self
.nodeDB
[2]] == 3
246 class GraphShelve_Test(Mapping_Test
):
247 "Run same tests on mapping.Graph class"
251 tmp
= testutil
.TempDir('graphshelve-test')
252 filename
= tmp
.subfile() # needs a random name each time
253 self
.datagraph
= mapping
.Graph(filename
=filename
, intKeys
=True)
256 self
.datagraph
.close()
259 class GraphShelve_DB_Test(Mapping_Test
):
260 "Run same tests on mapping.Graph class"
264 tmp
= testutil
.TempDir('graphshelve-test')
265 filename
= tmp
.subfile() # needs a random name each time
266 self
.datagraph
= mapping
.Graph(sourceDB
=self
.nodeDB
,
267 targetDB
=self
.nodeDB
,
268 filename
=filename
, intKeys
=True)
271 self
.datagraph
.close()
274 class SQLGraph_Test(Mapping_Test
):
275 "Runs the same tests on mapping.SQLGraph class"
276 dbname
= 'test.dumbo_foo_test'
279 if not testutil
.mysql_enabled():
280 raise SkipTest
, "no MySQL"
282 createOpts
= dict(source_id
='int', target_id
='int', edge_id
='int')
283 self
.datagraph
= sqlgraph
.SQLGraph(self
.dbname
, dropIfExists
=True,
284 createTable
=createOpts
)
287 self
.datagraph
.cursor
.execute('drop table if exists %s' % self
.dbname
)
289 class SQLGraph_DB_Test(Mapping_Test
):
290 "Runs the same tests on mapping.SQLGraph class"
291 dbname
= 'test.dumbo_foo_test'
294 if not testutil
.mysql_enabled():
295 raise SkipTest
, "no MySQL"
298 createOpts
= dict(source_id
='int', target_id
='int', edge_id
='int')
299 self
.datagraph
= sqlgraph
.SQLGraph(self
.dbname
, dropIfExists
=True,
300 createTable
=createOpts
,
301 sourceDB
=self
.nodeDB
,
302 targetDB
=self
.nodeDB
)
305 self
.datagraph
.cursor
.execute('drop table if exists %s' % self
.dbname
)
307 class SQLiteGraph_Test(testutil
.SQLite_Mixin
, Mapping_Test
):
308 'run same tests on mapping.SQLGraph class using sqlite'
309 def sqlite_load(self
):
310 createOpts
= dict(source_id
='int', target_id
='int', edge_id
='int')
311 self
.datagraph
= sqlgraph
.SQLGraph('testgraph',
312 serverInfo
=self
.serverInfo
,
314 createTable
=createOpts
)
316 class SQLiteGraph_DB_Test(testutil
.SQLite_Mixin
, Mapping_Test
):
317 'run same tests on mapping.SQLGraph class using sqlite'
318 def sqlite_load(self
):
320 createOpts
= dict(source_id
='int', target_id
='int', edge_id
='int')
321 self
.datagraph
= sqlgraph
.SQLGraph('testgraph',
322 serverInfo
=self
.serverInfo
,
324 createTable
=createOpts
,
325 sourceDB
=self
.nodeDB
,
326 targetDB
=self
.nodeDB
)
328 # test currently unused, requires access to leelab data
329 ## from pygr import worldbase
330 ## class Splicegraph_Test(unittest.TestCase):
333 ## self.sg = worldbase.Bio.Annotation.ASAP2.Isoform.HUMAN.\
334 ## hg17.splicegraph()
336 ## def exonskip_megatest(self):
337 ## 'perform exon skip query'
338 ## query = {0:{1:None,2:None},1:{2:None},2:{}}
339 ## gq = graphquery.GraphQuery(self.sg, query)
341 ## assert len(l) == 11546, 'test exact size of exonskip set'
343 if __name__
== '__main__':
344 PygrTestProgram(verbosity
=2)