dagdeps.py
109 lines
| 3.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3564 | """Example for generating an arbitrary DAG as a dependency map. | |
This demo uses networkx to generate the graph. | |||
Authors | |||
------- | |||
* MinRK | |||
""" | |||
import networkx as nx | |||
from random import randint, random | |||
from IPython.zmq.parallel import client as cmod | |||
def randomwait(): | |||
import time | |||
from random import random | |||
time.sleep(random()) | |||
return time.time() | |||
def random_dag(nodes, edges): | |||
"""Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" | |||
G = nx.DiGraph() | |||
for i in range(nodes): | |||
G.add_node(i) | |||
while edges > 0: | |||
a = randint(0,nodes-1) | |||
b=a | |||
while b==a: | |||
b = randint(0,nodes-1) | |||
G.add_edge(a,b) | |||
if nx.is_directed_acyclic_graph(G): | |||
edges -= 1 | |||
else: | |||
# we closed a loop! | |||
G.remove_edge(a,b) | |||
return G | |||
def add_children(G, parent, level, n=2): | |||
"""Add children recursively to a binary tree.""" | |||
if level == 0: | |||
return | |||
for i in range(n): | |||
child = parent+str(i) | |||
G.add_node(child) | |||
G.add_edge(parent,child) | |||
add_children(G, child, level-1, n) | |||
def make_bintree(levels): | |||
"""Make a symmetrical binary tree with @levels""" | |||
G = nx.DiGraph() | |||
root = '0' | |||
G.add_node(root) | |||
add_children(G, root, levels, 2) | |||
return G | |||
def submit_jobs(client, G, jobs): | |||
"""Submit jobs via client where G describes the time dependencies.""" | |||
MinRK
|
r3606 | results = {} | |
MinRK
|
r3564 | for node in nx.topological_sort(G): | |
MinRK
|
r3624 | deps = [ results[n] for n in G.predecessors(node) ] | |
MinRK
|
r3606 | results[node] = client.apply(jobs[node], after=deps) | |
return results | |||
MinRK
|
r3564 | ||
MinRK
|
r3609 | def validate_tree(G, results): | |
MinRK
|
r3564 | """Validate that jobs executed after their dependencies.""" | |
for node in G: | |||
MinRK
|
r3609 | started = results[node].metadata.started | |
MinRK
|
r3564 | for parent in G.predecessors(node): | |
MinRK
|
r3609 | finished = results[parent].metadata.completed | |
assert started > finished, "%s should have happened after %s"%(node, parent) | |||
MinRK
|
r3564 | ||
def main(nodes, edges): | |||
"""Generate a random graph, submit jobs, then validate that the | |||
dependency order was enforced. | |||
Finally, plot the graph, with time on the x-axis, and | |||
in-degree on the y (just for spread). All arrows must | |||
point at least slightly to the right if the graph is valid. | |||
""" | |||
MinRK
|
r3609 | from matplotlib.dates import date2num | |
MinRK
|
r3624 | from matplotlib.cm import gist_rainbow | |
MinRK
|
r3606 | print "building DAG" | |
MinRK
|
r3564 | G = random_dag(nodes, edges) | |
jobs = {} | |||
pos = {} | |||
MinRK
|
r3624 | colors = {} | |
MinRK
|
r3564 | for node in G: | |
jobs[node] = randomwait | |||
MinRK
|
r3609 | client = cmod.Client() | |
MinRK
|
r3624 | print "submitting %i tasks with %i dependencies"%(nodes,edges) | |
MinRK
|
r3606 | results = submit_jobs(client, G, jobs) | |
print "waiting for results" | |||
MinRK
|
r3564 | client.barrier() | |
MinRK
|
r3606 | print "done" | |
MinRK
|
r3564 | for node in G: | |
MinRK
|
r3624 | md = results[node].metadata | |
start = date2num(md.started) | |||
runtime = date2num(md.completed) - start | |||
pos[node] = (start, runtime) | |||
colors[node] = md.engine_id | |||
MinRK
|
r3609 | validate_tree(G, results) | |
MinRK
|
r3624 | nx.draw(G, pos, node_list = colors.keys(), node_color=colors.values(), cmap=gist_rainbow) | |
MinRK
|
r3609 | return G,results | |
MinRK
|
r3564 | ||
if __name__ == '__main__': | |||
import pylab | |||
MinRK
|
r3624 | # main(5,10) | |
main(32,96) | |||
MinRK
|
r3564 | pylab.show() | |