##// END OF EJS Templates
add message tracking to client, add/improve tests
add message tracking to client, add/improve tests

File last commit:

r3636:154798bf
r3654:62f8971b
Show More
dagdeps.py
118 lines | 3.4 KiB | text/x-python | PythonLexer
"""Example for generating an arbitrary DAG as a dependency map.
This demo uses networkx to generate the graph.
Authors
-------
* MinRK
"""
import networkx as nx
from random import randint, random
from IPython.zmq.parallel import client as cmod
def randomwait():
import time
from random import random
time.sleep(random())
return time.time()
def random_dag(nodes, edges):
"""Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges."""
G = nx.DiGraph()
for i in range(nodes):
G.add_node(i)
while edges > 0:
a = randint(0,nodes-1)
b=a
while b==a:
b = randint(0,nodes-1)
G.add_edge(a,b)
if nx.is_directed_acyclic_graph(G):
edges -= 1
else:
# we closed a loop!
G.remove_edge(a,b)
return G
def add_children(G, parent, level, n=2):
"""Add children recursively to a binary tree."""
if level == 0:
return
for i in range(n):
child = parent+str(i)
G.add_node(child)
G.add_edge(parent,child)
add_children(G, child, level-1, n)
def make_bintree(levels):
"""Make a symmetrical binary tree with @levels"""
G = nx.DiGraph()
root = '0'
G.add_node(root)
add_children(G, root, levels, 2)
return G
def submit_jobs(client, G, jobs):
"""Submit jobs via client where G describes the time dependencies."""
results = {}
for node in nx.topological_sort(G):
deps = [ results[n] for n in G.predecessors(node) ]
results[node] = client.apply(jobs[node], after=deps)
return results
def validate_tree(G, results):
"""Validate that jobs executed after their dependencies."""
for node in G:
started = results[node].metadata.started
for parent in G.predecessors(node):
finished = results[parent].metadata.completed
assert started > finished, "%s should have happened after %s"%(node, parent)
def main(nodes, edges):
"""Generate a random graph, submit jobs, then validate that the
dependency order was enforced.
Finally, plot the graph, with time on the x-axis, and
in-degree on the y (just for spread). All arrows must
point at least slightly to the right if the graph is valid.
"""
import pylab
from matplotlib.dates import date2num
from matplotlib.cm import gist_rainbow
print "building DAG"
G = random_dag(nodes, edges)
jobs = {}
pos = {}
colors = {}
for node in G:
jobs[node] = randomwait
client = cmod.Client()
print "submitting %i tasks with %i dependencies"%(nodes,edges)
results = submit_jobs(client, G, jobs)
print "waiting for results"
client.barrier()
print "done"
for node in G:
md = results[node].metadata
start = date2num(md.started)
runtime = date2num(md.completed) - start
pos[node] = (start, runtime)
colors[node] = md.engine_id
validate_tree(G, results)
nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow,
with_labels=False)
x,y = zip(*pos.values())
xmin,ymin = map(min, (x,y))
xmax,ymax = map(max, (x,y))
xscale = xmax-xmin
yscale = ymax-ymin
pylab.xlim(xmin-xscale*.1,xmax+xscale*.1)
pylab.ylim(ymin-yscale*.1,ymax+yscale*.1)
return G,results
if __name__ == '__main__':
import pylab
# main(5,10)
main(32,96)
pylab.show()