Merge commit 'remotes/ctb/xmlrpc_patches' into tryme
[pygr.git] / tests / graph_test.py
blobe14e57ad2dfb955985f694497763ee4b320defe2
1 """
2 Test some of the basics underpinning the graph system.
3 """
5 import os, unittest
7 from testlib import testutil, PygrTestProgram, SkipTest
8 from pygr import mapping, graphquery, sqlgraph
10 class Node(object):
11 def __init__(self, id):
12 self.id = id
15 class Query_Test(unittest.TestCase):
16 "Pygr Query tests"
18 def get_node(self, k):
19 try:
20 db = self.nodeDB
21 except AttributeError:
22 return k
23 else:
24 try:
25 return db[k]
26 except KeyError:
27 db[k] = Node(k)
28 return db[k]
29 def node_graph(self, g):
30 try:
31 db = self.nodeDB
32 except AttributeError:
33 return g
34 out = {}
35 for k,e in g.items():
36 k = self.get_node(k)
37 d = out.setdefault(k, {})
38 for dest,edge in e.items():
39 d[self.get_node(dest)] = edge
40 return out
42 def node_list(self, l):
43 try:
44 db = self.nodeDB
45 except AttributeError:
46 return l
47 out = []
48 for k in l:
49 out.append(self.get_node(k))
50 return out
52 def node_result(self, r):
53 try:
54 db = self.nodeDB
55 except AttributeError:
56 return r
57 l = []
58 for d in r:
59 d2 = {}
60 for k,v in d.items():
61 d2[k] = self.get_node(v)
62 l.append(d2)
63 return l
65 def update_graph(self, datagraph):
66 try:
67 g = self.datagraph
68 except AttributeError:
69 return datagraph
70 else:
71 g.update(datagraph)
72 return g
74 def dqcmp(self, datagraph, querygraph, result):
75 datagraph = self.update_graph(self.node_graph(datagraph))
76 l = [ d.copy() for d in graphquery.GraphQuery(datagraph, querygraph) ]
77 assert len(l) == len(result), 'length mismatch'
78 l.sort()
79 result = self.node_result(result)
80 result.sort()
81 for i in range(len(l)):
82 assert l[i] == result[i], 'incorrect result'
84 def test_basicquery_test(self):
85 "Basic query"
86 datagraph = {0: {1: None, 2: None, 3: None},
87 1: {2: None}, 3: {4: None, 5: None},
88 4: {6: None}, 5: {6: None}, 2: {}, 6: {}}
89 querygraph = {0: {1: None, 2: None, 3: None},
90 3:{4: None},1:{},2:{},4:{}}
91 result = [{0: 0, 1: 1, 2: 2, 3: 3, 4: 4},
92 {0: 0, 1: 1, 2: 2, 3: 3, 4: 5},
93 {0: 0, 1: 2, 2: 1, 3: 3, 4: 4},
94 {0: 0, 1: 2, 2: 1, 3: 3, 4: 5}]
96 self.dqcmp(datagraph, querygraph, result)
98 def test_iter(self):
99 'test basic iteration'
100 g = {0: {1: None, 2: None, 3: None},
101 1: {2: None}, 3: {4: None, 5: None},
102 4: {6: None}, 5: {6: None}, 2: {}, 6: {}}
103 datagraph = self.update_graph(self.node_graph(g))
104 l = list(iter(datagraph))
105 l.sort()
106 result = self.node_list([0,1,2,3,4,5,6])
107 result.sort()
108 assert l == result
110 def test_cyclicquery(self):
111 "Cyclic QG against cyclic DG @CTB comment?"
112 datagraph = { 1 :{2:None}, 2:{3:None}, 3:{4:None}, 4:{5:None},
113 5:{2:None}}
114 querygraph = {0:{1:None}, 1:{2:None}, 2:{4:None}, 3:{1:None},
115 4:{3:None}}
116 result = [ {0: 1, 1: 2, 2: 3, 3: 5, 4: 4} ]
117 self.dqcmp(datagraph, querygraph, result)
119 def test_cyclicacyclicquery(self):
120 "Cyclic QG against acyclic DG"
121 datagraph = {0:{1:None}, 1:{3:None}, 5:{3:None}, 4:{5:None},
122 2:{4:None,1:None}, 3:{}}
123 querygraph = {0:{1:None}, 1:{3:None}, 3:{5:None}, 5:{4:None},
124 4:{2:None}, 2:{1:None}}
125 result = []
126 self.dqcmp(datagraph,querygraph,result)
128 def test_symmetricquery_test(self):
129 "Symmetrical QG against symmetrical DG"
130 datagraph = {1:{2:None},2:{3:None,4:None},5:{2:None},3:{},4:{}}
131 querygraph = {0:{1:None},1:{2:None},2:{}}
132 result = [{0: 1, 1: 2, 2: 3}, {0: 1, 1: 2, 2: 4},
133 {0: 5, 1: 2, 2: 3}, {0: 5, 1: 2, 2: 4}]
134 self.dqcmp(datagraph,querygraph,result)
136 def test_filteredquery(self):
137 "Test a filter against a query"
138 datagraph = {0: {1: None, 2: None, 3: None}, 1: {2: None, 3: None},
139 3: {4: None}}
140 querygraph = {0:{1:{'filter':lambda toNode,**kw:toNode == self.get_node(3)}},1:{}}
141 result = [{0: 0, 1: 3},{0: 1, 1: 3}]
142 self.dqcmp(datagraph,querygraph,result)
144 def test_headlessquery(self):
145 "Test a query with no head nodes"
146 datagraph = {0:{1:None},1:{2:None},2:{3:None},3:{4:None},4:{1:None}}
147 querygraph = {0:{1:None},1:{2:None},2:{3:None},3:{0:None}}
148 result = [{0: 1, 1: 2, 2: 3, 3: 4},
149 {0: 2, 1: 3, 2: 4, 3: 1},
150 {0: 3, 1: 4, 2: 1, 3: 2},
151 {0: 4, 1: 1, 2: 2, 3: 3}]
152 self.dqcmp(datagraph,querygraph,result)
154 class Mapping_Test(Query_Test):
155 "Tests mappings"
157 def setUp(self):
158 self.datagraph = mapping.dictGraph()
160 def test_graphdict(self):
161 "Graph dictionary"
162 datagraph = self.datagraph
163 datagraph += self.get_node(1)
164 datagraph[self.get_node(1)] += self.get_node(2)
165 results = {1: {2: None}, 2: {}}
166 results = self.node_graph(results)
167 assert datagraph == results, 'incorrect result'
169 def test_nodedel(self):
170 "Node deletion"
171 datagraph = self.datagraph
172 datagraph += self.get_node(1)
173 datagraph += self.get_node(2)
174 datagraph[self.get_node(2)] += self.get_node(3)
175 datagraph -= self.get_node(1)
176 results = {2: {3: None}, 3: {}}
177 results = self.node_graph(results)
178 assert datagraph == results, 'incorrect result'
180 def test_delraise(self):
181 "Delete raise"
182 datagraph = self.datagraph
183 datagraph += self.get_node(1)
184 datagraph += self.get_node(2)
185 datagraph[self.get_node(2)] += self.get_node(3)
186 try:
187 for i in range(0,2):
188 datagraph -= self.get_node(3)
189 raise ValueError('failed to catch bad node deletion attempt')
190 except KeyError:
191 pass # THIS IS THE CORRECT RESULT
193 def test_setitemraise(self):
194 "Setitemraise"
195 datagraph = self.datagraph
196 datagraph += self.get_node(1)
197 try:
198 datagraph[self.get_node(1)] = self.get_node(2)
199 raise KeyError('failed to catch bad setitem attempt')
200 except ValueError:
201 pass # THIS IS THE CORRECT RESULT
203 def test_graphedges(self):
204 "Graphedges"
205 datagraph = self.datagraph
206 graphvals = {1:{2:None},2:{3:None,4:None},5:{2:None},3:{},4:{}}
207 graphvals = self.node_graph(graphvals)
208 edge_list = [[self.get_node(1), self.get_node(2),None],
209 [self.get_node(2), self.get_node(3),None],
210 [self.get_node(2), self.get_node(4),None],
211 [self.get_node(5), self.get_node(2),None]]
212 for i in graphvals:
213 datagraph += i
214 for n in graphvals[i].keys():
215 datagraph[i] += n
216 edge_results = []
217 for e in datagraph.edges():
218 edge_results.append(e)
219 edge_results.sort()
220 edge_results = [list(t) for t in edge_results]
221 edge_list.sort()
222 #print 'edge_results:',edge_results
223 assert edge_results == edge_list, 'incorrect result'
225 class Graph_Test(Mapping_Test):
226 "Run same tests on mapping.Graph class"
228 def setUp(self):
229 self.datagraph = mapping.Graph()
231 class Graph_DB_Test(Mapping_Test):
232 "test mapping.Graph with sourceDB, targetDB but no edgeDB"
234 def setUp(self):
235 self.nodeDB = {1:Node(1), 2:Node(2)}
236 self.datagraph = mapping.Graph(sourceDB=self.nodeDB,
237 targetDB=self.nodeDB)
238 def test_no_edge_db(self):
239 'test behavior with no edgeDB'
240 self.datagraph += self.nodeDB[1] # add node
241 self.datagraph[self.nodeDB[1]][self.nodeDB[2]] = 3 # add edge
243 assert self.datagraph[self.nodeDB[1]][self.nodeDB[2]] == 3
246 class GraphShelve_Test(Mapping_Test):
247 "Run same tests on mapping.Graph class"
249 def setUp(self):
251 tmp = testutil.TempDir('graphshelve-test')
252 filename = tmp.subfile() # needs a random name each time
253 self.datagraph = mapping.Graph(filename=filename, intKeys=True)
255 def tearDown(self):
256 self.datagraph.close()
259 class GraphShelve_DB_Test(Mapping_Test):
260 "Run same tests on mapping.Graph class"
262 def setUp(self):
263 self.nodeDB = {}
264 tmp = testutil.TempDir('graphshelve-test')
265 filename = tmp.subfile() # needs a random name each time
266 self.datagraph = mapping.Graph(sourceDB=self.nodeDB,
267 targetDB=self.nodeDB,
268 filename=filename, intKeys=True)
270 def tearDown(self):
271 self.datagraph.close()
274 class SQLGraph_Test(Mapping_Test):
275 "Runs the same tests on mapping.SQLGraph class"
276 dbname = 'test.dumbo_foo_test'
278 def setUp(self):
279 if not testutil.mysql_enabled():
280 raise SkipTest, "no MySQL"
282 createOpts = dict(source_id='int', target_id='int', edge_id='int')
283 self.datagraph = sqlgraph.SQLGraph(self.dbname, dropIfExists=True,
284 createTable=createOpts)
286 def tearDown(self):
287 self.datagraph.cursor.execute('drop table if exists %s' % self.dbname)
289 class SQLGraph_DB_Test(Mapping_Test):
290 "Runs the same tests on mapping.SQLGraph class"
291 dbname = 'test.dumbo_foo_test'
293 def setUp(self):
294 if not testutil.mysql_enabled():
295 raise SkipTest, "no MySQL"
297 self.nodeDB = {}
298 createOpts = dict(source_id='int', target_id='int', edge_id='int')
299 self.datagraph = sqlgraph.SQLGraph(self.dbname, dropIfExists=True,
300 createTable=createOpts,
301 sourceDB=self.nodeDB,
302 targetDB=self.nodeDB)
304 def tearDown(self):
305 self.datagraph.cursor.execute('drop table if exists %s' % self.dbname)
307 class SQLiteGraph_Test(testutil.SQLite_Mixin, Mapping_Test):
308 'run same tests on mapping.SQLGraph class using sqlite'
309 def sqlite_load(self):
310 createOpts = dict(source_id='int', target_id='int', edge_id='int')
311 self.datagraph = sqlgraph.SQLGraph('testgraph',
312 serverInfo=self.serverInfo,
313 dropIfExists=True,
314 createTable=createOpts)
316 class SQLiteGraph_DB_Test(testutil.SQLite_Mixin, Mapping_Test):
317 'run same tests on mapping.SQLGraph class using sqlite'
318 def sqlite_load(self):
319 self.nodeDB = {}
320 createOpts = dict(source_id='int', target_id='int', edge_id='int')
321 self.datagraph = sqlgraph.SQLGraph('testgraph',
322 serverInfo=self.serverInfo,
323 dropIfExists=True,
324 createTable=createOpts,
325 sourceDB=self.nodeDB,
326 targetDB=self.nodeDB)
328 # test currently unused, requires access to leelab data
329 ## from pygr import worldbase
330 ## class Splicegraph_Test(unittest.TestCase):
332 ## def setUp(self):
333 ## self.sg = worldbase.Bio.Annotation.ASAP2.Isoform.HUMAN.\
334 ## hg17.splicegraph()
336 ## def exonskip_megatest(self):
337 ## 'perform exon skip query'
338 ## query = {0:{1:None,2:None},1:{2:None},2:{}}
339 ## gq = graphquery.GraphQuery(self.sg, query)
340 ## l = list(gq)
341 ## assert len(l) == 11546, 'test exact size of exonskip set'
343 if __name__ == '__main__':
344 PygrTestProgram(verbosity=2)