Show More
@@ -1,103 +1,106 | |||||
1 | """Example for generating an arbitrary DAG as a dependency map. |
|
1 | """Example for generating an arbitrary DAG as a dependency map. | |
2 |
|
2 | |||
3 | This demo uses networkx to generate the graph. |
|
3 | This demo uses networkx to generate the graph. | |
4 |
|
4 | |||
5 | Authors |
|
5 | Authors | |
6 | ------- |
|
6 | ------- | |
7 | * MinRK |
|
7 | * MinRK | |
8 | """ |
|
8 | """ | |
9 | import networkx as nx |
|
9 | import networkx as nx | |
10 | from random import randint, random |
|
10 | from random import randint, random | |
11 | from IPython.zmq.parallel import client as cmod |
|
11 | from IPython.zmq.parallel import client as cmod | |
12 |
|
12 | |||
13 | def randomwait(): |
|
13 | def randomwait(): | |
14 | import time |
|
14 | import time | |
15 | from random import random |
|
15 | from random import random | |
16 | time.sleep(random()) |
|
16 | time.sleep(random()) | |
17 | return time.time() |
|
17 | return time.time() | |
18 |
|
18 | |||
19 |
|
19 | |||
20 | def random_dag(nodes, edges): |
|
20 | def random_dag(nodes, edges): | |
21 | """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" |
|
21 | """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" | |
22 | G = nx.DiGraph() |
|
22 | G = nx.DiGraph() | |
23 | for i in range(nodes): |
|
23 | for i in range(nodes): | |
24 | G.add_node(i) |
|
24 | G.add_node(i) | |
25 | while edges > 0: |
|
25 | while edges > 0: | |
26 | a = randint(0,nodes-1) |
|
26 | a = randint(0,nodes-1) | |
27 | b=a |
|
27 | b=a | |
28 | while b==a: |
|
28 | while b==a: | |
29 | b = randint(0,nodes-1) |
|
29 | b = randint(0,nodes-1) | |
30 | G.add_edge(a,b) |
|
30 | G.add_edge(a,b) | |
31 | if nx.is_directed_acyclic_graph(G): |
|
31 | if nx.is_directed_acyclic_graph(G): | |
32 | edges -= 1 |
|
32 | edges -= 1 | |
33 | else: |
|
33 | else: | |
34 | # we closed a loop! |
|
34 | # we closed a loop! | |
35 | G.remove_edge(a,b) |
|
35 | G.remove_edge(a,b) | |
36 | return G |
|
36 | return G | |
37 |
|
37 | |||
38 | def add_children(G, parent, level, n=2): |
|
38 | def add_children(G, parent, level, n=2): | |
39 | """Add children recursively to a binary tree.""" |
|
39 | """Add children recursively to a binary tree.""" | |
40 | if level == 0: |
|
40 | if level == 0: | |
41 | return |
|
41 | return | |
42 | for i in range(n): |
|
42 | for i in range(n): | |
43 | child = parent+str(i) |
|
43 | child = parent+str(i) | |
44 | G.add_node(child) |
|
44 | G.add_node(child) | |
45 | G.add_edge(parent,child) |
|
45 | G.add_edge(parent,child) | |
46 | add_children(G, child, level-1, n) |
|
46 | add_children(G, child, level-1, n) | |
47 |
|
47 | |||
48 | def make_bintree(levels): |
|
48 | def make_bintree(levels): | |
49 | """Make a symmetrical binary tree with @levels""" |
|
49 | """Make a symmetrical binary tree with @levels""" | |
50 | G = nx.DiGraph() |
|
50 | G = nx.DiGraph() | |
51 | root = '0' |
|
51 | root = '0' | |
52 | G.add_node(root) |
|
52 | G.add_node(root) | |
53 | add_children(G, root, levels, 2) |
|
53 | add_children(G, root, levels, 2) | |
54 | return G |
|
54 | return G | |
55 |
|
55 | |||
56 | def submit_jobs(client, G, jobs): |
|
56 | def submit_jobs(client, G, jobs): | |
57 | """Submit jobs via client where G describes the time dependencies.""" |
|
57 | """Submit jobs via client where G describes the time dependencies.""" | |
58 |
|
|
58 | results = {} | |
59 | for node in nx.topological_sort(G): |
|
59 | for node in nx.topological_sort(G): | |
60 |
deps = [ msg_ids[ |
|
60 | deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ] | |
61 |
|
|
61 | results[node] = client.apply(jobs[node], after=deps) | |
62 |
return |
|
62 | return results | |
63 |
|
63 | |||
64 | def validate_tree(G, times): |
|
64 | def validate_tree(G, times): | |
65 | """Validate that jobs executed after their dependencies.""" |
|
65 | """Validate that jobs executed after their dependencies.""" | |
66 | for node in G: |
|
66 | for node in G: | |
67 | t = times[node] |
|
67 | t = times[node] | |
68 | for parent in G.predecessors(node): |
|
68 | for parent in G.predecessors(node): | |
69 | pt = times[parent] |
|
69 | pt = times[parent] | |
70 | assert t > pt, "%s should have happened after %s"%(node, parent) |
|
70 | assert t > pt, "%s should have happened after %s"%(node, parent) | |
71 |
|
71 | |||
72 | def main(nodes, edges): |
|
72 | def main(nodes, edges): | |
73 | """Generate a random graph, submit jobs, then validate that the |
|
73 | """Generate a random graph, submit jobs, then validate that the | |
74 | dependency order was enforced. |
|
74 | dependency order was enforced. | |
75 | Finally, plot the graph, with time on the x-axis, and |
|
75 | Finally, plot the graph, with time on the x-axis, and | |
76 | in-degree on the y (just for spread). All arrows must |
|
76 | in-degree on the y (just for spread). All arrows must | |
77 | point at least slightly to the right if the graph is valid. |
|
77 | point at least slightly to the right if the graph is valid. | |
78 | """ |
|
78 | """ | |
|
79 | print "building DAG" | |||
79 | G = random_dag(nodes, edges) |
|
80 | G = random_dag(nodes, edges) | |
80 | jobs = {} |
|
81 | jobs = {} | |
81 | msg_ids = {} |
|
82 | msg_ids = {} | |
82 | times = {} |
|
83 | times = {} | |
83 | pos = {} |
|
84 | pos = {} | |
84 | for node in G: |
|
85 | for node in G: | |
85 | jobs[node] = randomwait |
|
86 | jobs[node] = randomwait | |
86 |
|
87 | |||
87 | client = cmod.Client('tcp://127.0.0.1:10101') |
|
88 | client = cmod.Client('tcp://127.0.0.1:10101') | |
88 |
|
89 | print "submitting tasks" | ||
89 |
|
|
90 | results = submit_jobs(client, G, jobs) | |
|
91 | print "waiting for results" | |||
90 | client.barrier() |
|
92 | client.barrier() | |
|
93 | print "done" | |||
91 | for node in G: |
|
94 | for node in G: | |
92 |
times[node] = |
|
95 | times[node] = results[node].get() | |
93 | pos[node] = (times[node], G.in_degree(node)+random()) |
|
96 | pos[node] = (times[node], G.in_degree(node)+random()) | |
94 |
|
97 | |||
95 | validate_tree(G, times) |
|
98 | validate_tree(G, times) | |
96 | nx.draw(G, pos) |
|
99 | nx.draw(G, pos) | |
97 | return G,times,msg_ids |
|
100 | return G,times,msg_ids | |
98 |
|
101 | |||
99 | if __name__ == '__main__': |
|
102 | if __name__ == '__main__': | |
100 | import pylab |
|
103 | import pylab | |
101 | main(32,128) |
|
104 | main(32,128) | |
102 | pylab.show() |
|
105 | pylab.show() | |
103 | No newline at end of file |
|
106 |
General Comments 0
You need to be logged in to leave comments.
Login now