##// END OF EJS Templates
%qtconsole implied bind_kernel on engines
%qtconsole implied bind_kernel on engines

File last commit:

r6455:15863dc1
r7313:6a8318d4
Show More
dagdeps.py
120 lines | 3.5 KiB | text/x-python | PythonLexer
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 """Example for generating an arbitrary DAG as a dependency map.
This demo uses networkx to generate the graph.
Authors
-------
* MinRK
"""
import networkx as nx
from random import randint, random
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython import parallel
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564
def randomwait():
import time
from random import random
time.sleep(random())
return time.time()
def random_dag(nodes, edges):
"""Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
G = nx.DiGraph()
for i in range(nodes):
G.add_node(i)
while edges > 0:
a = randint(0,nodes-1)
b=a
while b==a:
b = randint(0,nodes-1)
G.add_edge(a,b)
if nx.is_directed_acyclic_graph(G):
edges -= 1
else:
# we closed a loop!
G.remove_edge(a,b)
return G
def add_children(G, parent, level, n=2):
"""Add children recursively to a binary tree."""
if level == 0:
return
for i in range(n):
child = parent+str(i)
G.add_node(child)
G.add_edge(parent,child)
add_children(G, child, level-1, n)
def make_bintree(levels):
"""Make a symmetrical binary tree with @levels"""
G = nx.DiGraph()
root = '0'
G.add_node(root)
add_children(G, root, levels, 2)
return G
MinRK
update API after sagedays29...
r3664 def submit_jobs(view, G, jobs):
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 """Submit jobs via client where G describes the time dependencies."""
MinRK
tweak dagdeps for new AsyncResult objects
r3606 results = {}
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 for node in nx.topological_sort(G):
MinRK
update API after sagedays29...
r3664 with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]):
results[node] = view.apply(jobs[node])
MinRK
tweak dagdeps for new AsyncResult objects
r3606 return results
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564
MinRK
updated newparallel examples, moved into docs
r3609 def validate_tree(G, results):
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 """Validate that jobs executed after their dependencies."""
for node in G:
MinRK
updated newparallel examples, moved into docs
r3609 started = results[node].metadata.started
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 for parent in G.predecessors(node):
MinRK
updated newparallel examples, moved into docs
r3609 finished = results[parent].metadata.completed
assert started > finished, "%s should have happened after %s"%(node, parent)
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564
def main(nodes, edges):
"""Generate a random graph, submit jobs, then validate that the
dependency order was enforced.
Finally, plot the graph, with time on the x-axis, and
in-degree on the y (just for spread). All arrows must
point at least slightly to the right if the graph is valid.
"""
MinRK
update API after sagedays29...
r3664 from matplotlib import pyplot as plt
MinRK
updated newparallel examples, moved into docs
r3609 from matplotlib.dates import date2num
MinRK
dependency tweaks + dependency/scheduler docs
r3624 from matplotlib.cm import gist_rainbow
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print("building DAG")
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 G = random_dag(nodes, edges)
jobs = {}
pos = {}
MinRK
dependency tweaks + dependency/scheduler docs
r3624 colors = {}
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 for node in G:
jobs[node] = randomwait
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 client = parallel.Client()
MinRK
update API after sagedays29...
r3664 view = client.load_balanced_view()
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print("submitting %i tasks with %i dependencies"%(nodes,edges))
MinRK
update API after sagedays29...
r3664 results = submit_jobs(view, G, jobs)
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print("waiting for results")
MinRK
update API after sagedays29...
r3664 view.wait()
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print("done")
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564 for node in G:
MinRK
dependency tweaks + dependency/scheduler docs
r3624 md = results[node].metadata
start = date2num(md.started)
runtime = date2num(md.completed) - start
pos[node] = (start, runtime)
colors[node] = md.engine_id
MinRK
updated newparallel examples, moved into docs
r3609 validate_tree(G, results)
MinRK
Client -> HasTraits, update examples with API tweaks
r3636 nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow,
with_labels=False)
x,y = zip(*pos.values())
xmin,ymin = map(min, (x,y))
xmax,ymax = map(max, (x,y))
xscale = xmax-xmin
yscale = ymax-ymin
MinRK
update API after sagedays29...
r3664 plt.xlim(xmin-xscale*.1,xmax+xscale*.1)
plt.ylim(ymin-yscale*.1,ymax+yscale*.1)
MinRK
updated newparallel examples, moved into docs
r3609 return G,results
MinRK
added py4science demos as examples + NetworkX DAG dependencies
r3564
if __name__ == '__main__':
MinRK
update API after sagedays29...
r3664 from matplotlib import pyplot as plt
MinRK
dependency tweaks + dependency/scheduler docs
r3624 # main(5,10)
main(32,96)
MinRK
update API after sagedays29...
r3664 plt.show()
Thomas Kluyver
Update print syntax in parallel examples.
r6455