dagdeps.py
120 lines
| 3.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3564 | """Example for generating an arbitrary DAG as a dependency map. | ||
This demo uses networkx to generate the graph. | ||||
Authors | ||||
------- | ||||
* MinRK | ||||
""" | ||||
import networkx as nx | ||||
from random import randint, random | ||||
MinRK
|
r3666 | from IPython import parallel | ||
MinRK
|
r3564 | |||
def randomwait(): | ||||
import time | ||||
from random import random | ||||
time.sleep(random()) | ||||
return time.time() | ||||
def random_dag(nodes, edges): | ||||
"""Generate a random Directed Acyclic Graph (DAG) with a given number of nodes and edges.""" | ||||
G = nx.DiGraph() | ||||
for i in range(nodes): | ||||
G.add_node(i) | ||||
while edges > 0: | ||||
a = randint(0,nodes-1) | ||||
b=a | ||||
while b==a: | ||||
b = randint(0,nodes-1) | ||||
G.add_edge(a,b) | ||||
if nx.is_directed_acyclic_graph(G): | ||||
edges -= 1 | ||||
else: | ||||
# we closed a loop! | ||||
G.remove_edge(a,b) | ||||
return G | ||||
def add_children(G, parent, level, n=2): | ||||
"""Add children recursively to a binary tree.""" | ||||
if level == 0: | ||||
return | ||||
for i in range(n): | ||||
child = parent+str(i) | ||||
G.add_node(child) | ||||
G.add_edge(parent,child) | ||||
add_children(G, child, level-1, n) | ||||
def make_bintree(levels): | ||||
"""Make a symmetrical binary tree with @levels""" | ||||
G = nx.DiGraph() | ||||
root = '0' | ||||
G.add_node(root) | ||||
add_children(G, root, levels, 2) | ||||
return G | ||||
MinRK
|
r3664 | def submit_jobs(view, G, jobs): | ||
MinRK
|
r3564 | """Submit jobs via client where G describes the time dependencies.""" | ||
MinRK
|
r3606 | results = {} | ||
MinRK
|
r3564 | for node in nx.topological_sort(G): | ||
MinRK
|
r3664 | with view.temp_flags(after=[ results[n] for n in G.predecessors(node) ]): | ||
results[node] = view.apply(jobs[node]) | ||||
MinRK
|
r3606 | return results | ||
MinRK
|
r3564 | |||
MinRK
|
r3609 | def validate_tree(G, results): | ||
MinRK
|
r3564 | """Validate that jobs executed after their dependencies.""" | ||
for node in G: | ||||
MinRK
|
r3609 | started = results[node].metadata.started | ||
MinRK
|
r3564 | for parent in G.predecessors(node): | ||
MinRK
|
r3609 | finished = results[parent].metadata.completed | ||
assert started > finished, "%s should have happened after %s"%(node, parent) | ||||
MinRK
|
r3564 | |||
def main(nodes, edges): | ||||
"""Generate a random graph, submit jobs, then validate that the | ||||
dependency order was enforced. | ||||
Finally, plot the graph, with time on the x-axis, and | ||||
in-degree on the y (just for spread). All arrows must | ||||
point at least slightly to the right if the graph is valid. | ||||
""" | ||||
MinRK
|
r3664 | from matplotlib import pyplot as plt | ||
MinRK
|
r3609 | from matplotlib.dates import date2num | ||
MinRK
|
r3624 | from matplotlib.cm import gist_rainbow | ||
Thomas Kluyver
|
r6455 | print("building DAG") | ||
MinRK
|
r3564 | G = random_dag(nodes, edges) | ||
jobs = {} | ||||
pos = {} | ||||
MinRK
|
r3624 | colors = {} | ||
MinRK
|
r3564 | for node in G: | ||
jobs[node] = randomwait | ||||
MinRK
|
r3666 | client = parallel.Client() | ||
MinRK
|
r3664 | view = client.load_balanced_view() | ||
Thomas Kluyver
|
r6455 | print("submitting %i tasks with %i dependencies"%(nodes,edges)) | ||
MinRK
|
r3664 | results = submit_jobs(view, G, jobs) | ||
Thomas Kluyver
|
r6455 | print("waiting for results") | ||
MinRK
|
r3664 | view.wait() | ||
Thomas Kluyver
|
r6455 | print("done") | ||
MinRK
|
r3564 | for node in G: | ||
MinRK
|
r3624 | md = results[node].metadata | ||
start = date2num(md.started) | ||||
runtime = date2num(md.completed) - start | ||||
pos[node] = (start, runtime) | ||||
colors[node] = md.engine_id | ||||
MinRK
|
r3609 | validate_tree(G, results) | ||
MinRK
|
r3636 | nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), cmap=gist_rainbow, | ||
with_labels=False) | ||||
x,y = zip(*pos.values()) | ||||
xmin,ymin = map(min, (x,y)) | ||||
xmax,ymax = map(max, (x,y)) | ||||
xscale = xmax-xmin | ||||
yscale = ymax-ymin | ||||
MinRK
|
r3664 | plt.xlim(xmin-xscale*.1,xmax+xscale*.1) | ||
plt.ylim(ymin-yscale*.1,ymax+yscale*.1) | ||||
MinRK
|
r3609 | return G,results | ||
MinRK
|
r3564 | |||
if __name__ == '__main__': | ||||
MinRK
|
r3664 | from matplotlib import pyplot as plt | ||
MinRK
|
r3624 | # main(5,10) | ||
main(32,96) | ||||
MinRK
|
r3664 | plt.show() | ||
Thomas Kluyver
|
r6455 | |||