##// END OF EJS Templates
DOC : fixed minor error in using topological sort...
Thomas A Caswell -
Show More
@@ -1,177 +1,177 b''
1 1 .. _dag_dependencies:
2 2
3 3 ================
4 4 DAG Dependencies
5 5 ================
6 6
7 7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 10 a nx DAG to task dependencies.
11 11
12 12 The full script that runs this demo can be found in
13 13 :file:`examples/parallel/dagdeps.py`.
14 14
15 15 Why are DAGs good for task dependencies?
16 16 ----------------------------------------
17 17
18 18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 23 there must not be any closed loops in the graph. This is important for dependencies,
24 24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 26 dependencies to cause a deadlock.
27 27
28 28 A Sample DAG
29 29 ------------
30 30
31 31 Here, we have a very simple 5-node DAG:
32 32
33 33 .. figure:: figs/simpledag.*
34 34 :width: 600px
35 35
36 36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
37 37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
38 38 1 and 2; and 4 depends only on 1.
39 39
40 40 A possible sequence of events for this workflow:
41 41
42 42 0. Task 0 can run right away
43 43 1. 0 finishes, so 1,2 can start
44 44 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
45 45 3. 2 finishes, and 3 can finally start
46 46
47 47
48 48 Further, taking failures into account, assuming all dependencies are run with the default
49 49 `success=True,failure=False`, the following cases would occur for each node's failure:
50 50
51 51 0. fails: all other tasks fail as Impossible
52 52 1. 2 can still succeed, but 3,4 are unreachable
53 53 2. 3 becomes unreachable, but 4 is unaffected
54 54 3. and 4. are terminal, and can have no effect on other nodes
55 55
56 56 The code to generate the simple DAG:
57 57
58 58 .. sourcecode:: python
59 59
60 60 import networkx as nx
61
61
62 62 G = nx.DiGraph()
63
63
64 64 # add 5 nodes, labeled 0-4:
65 65 map(G.add_node, range(5))
66 66 # 1,2 depend on 0:
67 67 G.add_edge(0,1)
68 68 G.add_edge(0,2)
69 69 # 3 depends on 1,2
70 70 G.add_edge(1,3)
71 71 G.add_edge(2,3)
72 72 # 4 depends on 1
73 73 G.add_edge(1,4)
74
74
75 75 # now draw the graph:
76 76 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
77 77 3 : (0,2), 4 : (2,2)}
78 78 nx.draw(G, pos, edge_color='r')
79 79
80 80
81 81 For demonstration purposes, we have a function that generates a random DAG with a given
82 82 number of nodes and edges.
83 83
84 84 .. literalinclude:: ../../../examples/parallel/dagdeps.py
85 85 :language: python
86 86 :lines: 20-36
87 87
88 88 So first, we start with a graph of 32 nodes, with 128 edges:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [2]: G = random_dag(32,128)
93 93
94 94 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
95 95
96 96 .. sourcecode:: ipython
97 97
98 98 In [3]: jobs = {}
99
99
100 100 # in reality, each job would presumably be different
101 101 # randomwait is just a function that sleeps for a random interval
102 102 In [4]: for node in G:
103 ...: jobs[node] = randomwait
103 ...: jobs[node] = randomwait
104 104
105 105 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
106 106 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
107 107 which is necessary for building dependencies, it is critical that we don't submit any jobs
108 108 before other jobs it may depend on. Fortunately, NetworkX provides a
109 109 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
110 110 guarantees that when you arrive at a node, you have already visited all the nodes it
111 111 on which it depends:
112 112
113 113 .. sourcecode:: ipython
114 114
115 115 In [5]: rc = Client()
116 116 In [5]: view = rc.load_balanced_view()
117
117
118 118 In [6]: results = {}
119
120 In [7]: for node in G.topological_sort():
119
120 In [7]: for node in nx.topological_sort(G):
121 121 ...: # get list of AsyncResult objects from nodes
122 122 ...: # leading into this one as dependencies
123 123 ...: deps = [ results[n] for n in G.predecessors(node) ]
124 124 ...: # submit and store AsyncResult object
125 125 ...: with view.temp_flags(after=deps, block=False):
126 126 ...: results[node] = view.apply_with_flags(jobs[node])
127 127
128 128
129 129 Now that we have submitted all the jobs, we can wait for the results:
130 130
131 131 .. sourcecode:: ipython
132 132
133 133 In [8]: view.wait(results.values())
134 134
135 135 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
136 136 raised an error if a task failed). But we don't know that the ordering was properly
137 137 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
138 138
139 139 These objects store a variety of metadata about each task, including various timestamps.
140 140 We can validate that the dependencies were respected by checking that each task was
141 141 started after all of its predecessors were completed:
142 142
143 143 .. literalinclude:: ../../../examples/parallel/dagdeps.py
144 144 :language: python
145 145 :lines: 64-70
146 146
147 147 We can also validate the graph visually. By drawing the graph with each node's x-position
148 148 as its start time, all arrows must be pointing to the right if dependencies were respected.
149 149 For spreading, the y-position will be the runtime of the task, so long tasks
150 150 will be at the top, and quick, small tasks will be at the bottom.
151 151
152 152 .. sourcecode:: ipython
153 153
154 154 In [10]: from matplotlib.dates import date2num
155
155
156 156 In [11]: from matplotlib.cm import gist_rainbow
157
157
158 158 In [12]: pos = {}; colors = {}
159
159
160 160 In [12]: for node in G:
161 161 ....: md = results[node].metadata
162 162 ....: start = date2num(md.started)
163 163 ....: runtime = date2num(md.completed) - start
164 164 ....: pos[node] = (start, runtime)
165 165 ....: colors[node] = md.engine_id
166
166
167 167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
168 168 ....: cmap=gist_rainbow)
169 169
170 170 .. figure:: figs/dagdeps.*
171 171 :width: 600px
172 172
173 173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
174 174 were four engines). Edges denote dependencies.
175 175
176 176
177 177 .. _NetworkX: http://networkx.lanl.gov/
General Comments 0
You need to be logged in to leave comments. Login now