##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
MinRK -
Show More
@@ -1,103 +1,106
1 """Example for generating an arbitrary DAG as a dependency map.
1 """Example for generating an arbitrary DAG as a dependency map.
2
2
3 This demo uses networkx to generate the graph.
3 This demo uses networkx to generate the graph.
4
4
5 Authors
5 Authors
6 -------
6 -------
7 * MinRK
7 * MinRK
8 """
8 """
9 import networkx as nx
9 import networkx as nx
10 from random import randint, random
10 from random import randint, random
11 from IPython.zmq.parallel import client as cmod
11 from IPython.zmq.parallel import client as cmod
12
12
13 def randomwait():
13 def randomwait():
14 import time
14 import time
15 from random import random
15 from random import random
16 time.sleep(random())
16 time.sleep(random())
17 return time.time()
17 return time.time()
18
18
19
19
20 def random_dag(nodes, edges):
20 def random_dag(nodes, edges):
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
21 """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
22 G = nx.DiGraph()
22 G = nx.DiGraph()
23 for i in range(nodes):
23 for i in range(nodes):
24 G.add_node(i)
24 G.add_node(i)
25 while edges > 0:
25 while edges > 0:
26 a = randint(0,nodes-1)
26 a = randint(0,nodes-1)
27 b=a
27 b=a
28 while b==a:
28 while b==a:
29 b = randint(0,nodes-1)
29 b = randint(0,nodes-1)
30 G.add_edge(a,b)
30 G.add_edge(a,b)
31 if nx.is_directed_acyclic_graph(G):
31 if nx.is_directed_acyclic_graph(G):
32 edges -= 1
32 edges -= 1
33 else:
33 else:
34 # we closed a loop!
34 # we closed a loop!
35 G.remove_edge(a,b)
35 G.remove_edge(a,b)
36 return G
36 return G
37
37
38 def add_children(G, parent, level, n=2):
38 def add_children(G, parent, level, n=2):
39 """Add children recursively to a binary tree."""
39 """Add children recursively to a binary tree."""
40 if level == 0:
40 if level == 0:
41 return
41 return
42 for i in range(n):
42 for i in range(n):
43 child = parent+str(i)
43 child = parent+str(i)
44 G.add_node(child)
44 G.add_node(child)
45 G.add_edge(parent,child)
45 G.add_edge(parent,child)
46 add_children(G, child, level-1, n)
46 add_children(G, child, level-1, n)
47
47
48 def make_bintree(levels):
48 def make_bintree(levels):
49 """Make a symmetrical binary tree with @levels"""
49 """Make a symmetrical binary tree with @levels"""
50 G = nx.DiGraph()
50 G = nx.DiGraph()
51 root = '0'
51 root = '0'
52 G.add_node(root)
52 G.add_node(root)
53 add_children(G, root, levels, 2)
53 add_children(G, root, levels, 2)
54 return G
54 return G
55
55
56 def submit_jobs(client, G, jobs):
56 def submit_jobs(client, G, jobs):
57 """Submit jobs via client where G describes the time dependencies."""
57 """Submit jobs via client where G describes the time dependencies."""
58 msg_ids = {}
58 results = {}
59 for node in nx.topological_sort(G):
59 for node in nx.topological_sort(G):
60 deps = [ msg_ids[n] for n in G.predecessors(node) ]
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
61 msg_ids[node] = client.apply(jobs[node], after=deps)
61 results[node] = client.apply(jobs[node], after=deps)
62 return msg_ids
62 return results
63
63
64 def validate_tree(G, times):
64 def validate_tree(G, times):
65 """Validate that jobs executed after their dependencies."""
65 """Validate that jobs executed after their dependencies."""
66 for node in G:
66 for node in G:
67 t = times[node]
67 t = times[node]
68 for parent in G.predecessors(node):
68 for parent in G.predecessors(node):
69 pt = times[parent]
69 pt = times[parent]
70 assert t > pt, "%s should have happened after %s"%(node, parent)
70 assert t > pt, "%s should have happened after %s"%(node, parent)
71
71
72 def main(nodes, edges):
72 def main(nodes, edges):
73 """Generate a random graph, submit jobs, then validate that the
73 """Generate a random graph, submit jobs, then validate that the
74 dependency order was enforced.
74 dependency order was enforced.
75 Finally, plot the graph, with time on the x-axis, and
75 Finally, plot the graph, with time on the x-axis, and
76 in-degree on the y (just for spread). All arrows must
76 in-degree on the y (just for spread). All arrows must
77 point at least slightly to the right if the graph is valid.
77 point at least slightly to the right if the graph is valid.
78 """
78 """
79 print "building DAG"
79 G = random_dag(nodes, edges)
80 G = random_dag(nodes, edges)
80 jobs = {}
81 jobs = {}
81 msg_ids = {}
82 msg_ids = {}
82 times = {}
83 times = {}
83 pos = {}
84 pos = {}
84 for node in G:
85 for node in G:
85 jobs[node] = randomwait
86 jobs[node] = randomwait
86
87
87 client = cmod.Client('tcp://127.0.0.1:10101')
88 client = cmod.Client('tcp://127.0.0.1:10101')
88
89 print "submitting tasks"
89 msg_ids = submit_jobs(client, G, jobs)
90 results = submit_jobs(client, G, jobs)
91 print "waiting for results"
90 client.barrier()
92 client.barrier()
93 print "done"
91 for node in G:
94 for node in G:
92 times[node] = client.results[msg_ids[node]]
95 times[node] = results[node].get()
93 pos[node] = (times[node], G.in_degree(node)+random())
96 pos[node] = (times[node], G.in_degree(node)+random())
94
97
95 validate_tree(G, times)
98 validate_tree(G, times)
96 nx.draw(G, pos)
99 nx.draw(G, pos)
97 return G,times,msg_ids
100 return G,times,msg_ids
98
101
99 if __name__ == '__main__':
102 if __name__ == '__main__':
100 import pylab
103 import pylab
101 main(32,128)
104 main(32,128)
102 pylab.show()
105 pylab.show()
103 No newline at end of file
106
General Comments 0
You need to be logged in to leave comments. Login now