##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
MinRK -
Show More
@@ -55,11 +55,11 def make_bintree(levels):
55 55
56 56 def submit_jobs(client, G, jobs):
57 57 """Submit jobs via client where G describes the time dependencies."""
58 msg_ids = {}
58 results = {}
59 59 for node in nx.topological_sort(G):
60 deps = [ msg_ids[n] for n in G.predecessors(node) ]
61 msg_ids[node] = client.apply(jobs[node], after=deps)
62 return msg_ids
60 deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ]
61 results[node] = client.apply(jobs[node], after=deps)
62 return results
63 63
64 64 def validate_tree(G, times):
65 65 """Validate that jobs executed after their dependencies."""
@@ -76,6 +76,7 def main(nodes, edges):
76 76 in-degree on the y (just for spread). All arrows must
77 77 point at least slightly to the right if the graph is valid.
78 78 """
79 print "building DAG"
79 80 G = random_dag(nodes, edges)
80 81 jobs = {}
81 82 msg_ids = {}
@@ -85,11 +86,13 def main(nodes, edges):
85 86 jobs[node] = randomwait
86 87
87 88 client = cmod.Client('tcp://127.0.0.1:10101')
88
89 msg_ids = submit_jobs(client, G, jobs)
89 print "submitting tasks"
90 results = submit_jobs(client, G, jobs)
91 print "waiting for results"
90 92 client.barrier()
93 print "done"
91 94 for node in G:
92 times[node] = client.results[msg_ids[node]]
95 times[node] = results[node].get()
93 96 pos[node] = (times[node], G.in_degree(node)+random())
94 97
95 98 validate_tree(G, times)
General Comments 0
You need to be logged in to leave comments. Login now