"""Example for generating an arbitrary DAG as a dependency map. This demo uses networkx to generate the graph. Authors ------- * MinRK """ import networkx as nx from random import randint, random from IPython.zmq.parallel import client as cmod def randomwait(): import time from random import random time.sleep(random()) return time.time() def random_dag(nodes, edges): """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" G = nx.DiGraph() for i in range(nodes): G.add_node(i) while edges > 0: a = randint(0,nodes-1) b=a while b==a: b = randint(0,nodes-1) G.add_edge(a,b) if nx.is_directed_acyclic_graph(G): edges -= 1 else: # we closed a loop! G.remove_edge(a,b) return G def add_children(G, parent, level, n=2): """Add children recursively to a binary tree.""" if level == 0: return for i in range(n): child = parent+str(i) G.add_node(child) G.add_edge(parent,child) add_children(G, child, level-1, n) def make_bintree(levels): """Make a symmetrical binary tree with @levels""" G = nx.DiGraph() root = '0' G.add_node(root) add_children(G, root, levels, 2) return G def submit_jobs(client, G, jobs): """Submit jobs via client where G describes the time dependencies.""" results = {} for node in nx.topological_sort(G): deps = [ results[n] for n in G.predecessors(node) ] results[node] = client.apply(jobs[node], after=deps) return results def validate_tree(G, results): """Validate that jobs executed after their dependencies.""" for node in G: started = results[node].metadata.started for parent in G.predecessors(node): finished = results[parent].metadata.completed assert started > finished, "%s should have happened after %s"%(node, parent) def main(nodes, edges): """Generate a random graph, submit jobs, then validate that the dependency order was enforced. Finally, plot the graph, with time on the x-axis, and in-degree on the y (just for spread). All arrows must point at least slightly to the right if the graph is valid. """ import pylab from matplotlib.dates import date2num from matplotlib.cm import gist_rainbow print "building DAG" G = random_dag(nodes, edges) jobs = {} pos = {} colors = {} for node in G: jobs[node] = randomwait client = cmod.Client() print "submitting %i tasks with %i dependencies"%(nodes,edges) results = submit_jobs(client, G, jobs) print "waiting for results" client.barrier() print "done" for node in G: md = results[node].metadata start = date2num(md.started) runtime = date2num(md.completed) - start pos[node] = (start, runtime) colors[node] = md.engine_id validate_tree(G, results) nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow, with_labels=False) x,y = zip(*pos.values()) xmin,ymin = map(min, (x,y)) xmax,ymax = map(max, (x,y)) xscale = xmax-xmin yscale = ymax-ymin pylab.xlim(xmin-xscale*.1,xmax+xscale*.1) pylab.ylim(ymin-yscale*.1,ymax+yscale*.1) return G,results if __name__ == '__main__': import pylab # main(5,10) main(32,96) pylab.show()