"""Example for generating an arbitrary DAG as a dependency map. This demo uses networkx to generate the graph. Authors ------- * MinRK """ import networkx as nx from random import randint, random from IPython.zmq.parallel import client as cmod def randomwait(): import time from random import random time.sleep(random()) return time.time() def random_dag(nodes, edges): """Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" G = nx.DiGraph() for i in range(nodes): G.add_node(i) while edges > 0: a = randint(0,nodes-1) b=a while b==a: b = randint(0,nodes-1) G.add_edge(a,b) if nx.is_directed_acyclic_graph(G): edges -= 1 else: # we closed a loop! G.remove_edge(a,b) return G def add_children(G, parent, level, n=2): """Add children recursively to a binary tree.""" if level == 0: return for i in range(n): child = parent+str(i) G.add_node(child) G.add_edge(parent,child) add_children(G, child, level-1, n) def make_bintree(levels): """Make a symmetrical binary tree with @levels""" G = nx.DiGraph() root = '0' G.add_node(root) add_children(G, root, levels, 2) return G def submit_jobs(client, G, jobs): """Submit jobs via client where G describes the time dependencies.""" results = {} for node in nx.topological_sort(G): deps = [ results[n].msg_ids[0] for n in G.predecessors(node) ] results[node] = client.apply(jobs[node], after=deps) return results def validate_tree(G, results): """Validate that jobs executed after their dependencies.""" for node in G: started = results[node].metadata.started for parent in G.predecessors(node): finished = results[parent].metadata.completed assert started > finished, "%s should have happened after %s"%(node, parent) def main(nodes, edges): """Generate a random graph, submit jobs, then validate that the dependency order was enforced. Finally, plot the graph, with time on the x-axis, and in-degree on the y (just for spread). All arrows must point at least slightly to the right if the graph is valid. """ from matplotlib.dates import date2num print "building DAG" G = random_dag(nodes, edges) jobs = {} pos = {} for node in G: jobs[node] = randomwait client = cmod.Client() print "submitting tasks" results = submit_jobs(client, G, jobs) print "waiting for results" client.barrier() print "done" for node in G: # times[node] = results[node].get() t = date2num(results[node].metadata.started) pos[node] = (t, G.in_degree(node)+random()) validate_tree(G, results) nx.draw(G, pos) return G,results if __name__ == '__main__': import pylab main(32,128) pylab.show()