|
|
"""Example for generating an arbitrary DAG as a dependency map.
|
|
|
|
|
|
This demo uses networkx to generate the graph.
|
|
|
|
|
|
Authors
|
|
|
-------
|
|
|
* MinRK
|
|
|
"""
|
|
|
import networkx as nx
|
|
|
from random import randint, random
|
|
|
from IPython import parallel
|
|
|
|
|
|
def randomwait():
|
|
|
import time
|
|
|
from random import random
|
|
|
time.sleep(random())
|
|
|
return time.time()
|
|
|
|
|
|
|
|
|
def random_dag(nodes, edges):
|
|
|
"""Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
|
|
|
G = nx.DiGraph()
|
|
|
for i in range(nodes):
|
|
|
G.add_node(i)
|
|
|
while edges > 0:
|
|
|
a = randint(0,nodes-1)
|
|
|
b=a
|
|
|
while b==a:
|
|
|
b = randint(0,nodes-1)
|
|
|
G.add_edge(a,b)
|
|
|
if nx.is_directed_acyclic_graph(G):
|
|
|
edges -= 1
|
|
|
else:
|
|
|
# we closed a loop!
|
|
|
G.remove_edge(a,b)
|
|
|
return G
|
|
|
|
|
|
def add_children(G, parent, level, n=2):
|
|
|
"""Add children recursively to a binary tree."""
|
|
|
if level == 0:
|
|
|
return
|
|
|
for i in range(n):
|
|
|
child = parent+str(i)
|
|
|
G.add_node(child)
|
|
|
G.add_edge(parent,child)
|
|
|
add_children(G, child, level-1, n)
|
|
|
|
|
|
def make_bintree(levels):
|
|
|
"""Make a symmetrical binary tree with @levels"""
|
|
|
G = nx.DiGraph()
|
|
|
root = '0'
|
|
|
G.add_node(root)
|
|
|
add_children(G, root, levels, 2)
|
|
|
return G
|
|
|
|
|
|
def submit_jobs(view, G, jobs):
|
|
|
"""Submit jobs via client where G describes the time dependencies."""
|
|
|
results = {}
|
|
|
for node in nx.topological_sort(G):
|
|
|
with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]):
|
|
|
results[node] = view.apply(jobs[node])
|
|
|
return results
|
|
|
|
|
|
def validate_tree(G, results):
|
|
|
"""Validate that jobs executed after their dependencies."""
|
|
|
for node in G:
|
|
|
started = results[node].metadata.started
|
|
|
for parent in G.predecessors(node):
|
|
|
finished = results[parent].metadata.completed
|
|
|
assert started > finished, "%s should have happened after %s"%(node, parent)
|
|
|
|
|
|
def main(nodes, edges):
|
|
|
"""Generate a random graph, submit jobs, then validate that the
|
|
|
dependency order was enforced.
|
|
|
Finally, plot the graph, with time on the x-axis, and
|
|
|
in-degree on the y (just for spread). All arrows must
|
|
|
point at least slightly to the right if the graph is valid.
|
|
|
"""
|
|
|
from matplotlib import pyplot as plt
|
|
|
from matplotlib.dates import date2num
|
|
|
from matplotlib.cm import gist_rainbow
|
|
|
print "building DAG"
|
|
|
G = random_dag(nodes, edges)
|
|
|
jobs = {}
|
|
|
pos = {}
|
|
|
colors = {}
|
|
|
for node in G:
|
|
|
jobs[node] = randomwait
|
|
|
|
|
|
client = parallel.Client()
|
|
|
view = client.load_balanced_view()
|
|
|
print "submitting %i tasks with %i dependencies"%(nodes,edges)
|
|
|
results = submit_jobs(view, G, jobs)
|
|
|
print "waiting for results"
|
|
|
view.wait()
|
|
|
print "done"
|
|
|
for node in G:
|
|
|
md = results[node].metadata
|
|
|
start = date2num(md.started)
|
|
|
runtime = date2num(md.completed) - start
|
|
|
pos[node] = (start, runtime)
|
|
|
colors[node] = md.engine_id
|
|
|
validate_tree(G, results)
|
|
|
nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow,
|
|
|
with_labels=False)
|
|
|
x,y = zip(*pos.values())
|
|
|
xmin,ymin = map(min, (x,y))
|
|
|
xmax,ymax = map(max, (x,y))
|
|
|
xscale = xmax-xmin
|
|
|
yscale = ymax-ymin
|
|
|
plt.xlim(xmin-xscale*.1,xmax+xscale*.1)
|
|
|
plt.ylim(ymin-yscale*.1,ymax+yscale*.1)
|
|
|
return G,results
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
from matplotlib import pyplot as plt
|
|
|
# main(5,10)
|
|
|
main(32,96)
|
|
|
plt.show()
|
|
|
|