From 9f1a03abe249bf089519eec18dba3867c662621e 2011-04-08 00:38:16 From: MinRK <benjaminrk@gmail.com> Date: 2011-04-08 00:38:16 Subject: [PATCH] tweak dagdeps for new AsyncResult objects --- diff --git a/examples/demo/dag/dagdeps.py b/examples/demo/dag/dagdeps.py index 4988b6b..8291962 100644 --- a/examples/demo/dag/dagdeps.py +++ b/examples/demo/dag/dagdeps.py @@ -55,11 +55,11 @@ def make_bintree(levels): def submit_jobs(client, G, jobs): """Submit jobs via client where G describes the time dependencies.""" - msg_ids = {} + results = {} for node in nx.topological_sort(G): - deps = [ msg_ids[n] for n in G.predecessors(node) ] - msg_ids[node] = client.apply(jobs[node], after=deps) - return msg_ids + deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ] + results[node] = client.apply(jobs[node], after=deps) + return results def validate_tree(G, times): """Validate that jobs executed after their dependencies.""" @@ -76,6 +76,7 @@ def main(nodes, edges): in-degree on the y (just for spread). All arrows must point at least slightly to the right if the graph is valid. """ + print "building DAG" G = random_dag(nodes, edges) jobs = {} msg_ids = {} @@ -85,11 +86,13 @@ def main(nodes, edges): jobs[node] = randomwait client = cmod.Client('tcp://127.0.0.1:10101') - - msg_ids = submit_jobs(client, G, jobs) + print "submitting tasks" + results = submit_jobs(client, G, jobs) + print "waiting for results" client.barrier() + print "done" for node in G: - times[node] = client.results[msg_ids[node]] + times[node] = results[node].get() pos[node] = (times[node], G.in_degree(node)+random()) validate_tree(G, times)