Show More
@@ -1,177 +1,177 b'' | |||
|
1 | 1 | .. _dag_dependencies: |
|
2 | 2 | |
|
3 | 3 | ================ |
|
4 | 4 | DAG Dependencies |
|
5 | 5 | ================ |
|
6 | 6 | |
|
7 | 7 | Often, parallel workflow is described in terms of a `Directed Acyclic Graph |
|
8 | 8 | <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library |
|
9 | 9 | for working with Graphs is NetworkX_. Here, we will walk through a demo mapping |
|
10 | 10 | a nx DAG to task dependencies. |
|
11 | 11 | |
|
12 | 12 | The full script that runs this demo can be found in |
|
13 | 13 | :file:`examples/parallel/dagdeps.py`. |
|
14 | 14 | |
|
15 | 15 | Why are DAGs good for task dependencies? |
|
16 | 16 | ---------------------------------------- |
|
17 | 17 | |
|
18 | 18 | The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect |
|
19 | 19 | the nodes. For our purposes, each node would be a task, and each edge would be a |
|
20 | 20 | dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a |
|
21 | 21 | direction associated with it. So we can interpret the edge (a,b) as meaning that b depends |
|
22 | 22 | on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that |
|
23 | 23 | there must not be any closed loops in the graph. This is important for dependencies, |
|
24 | 24 | because if a loop were closed, then a task could ultimately depend on itself, and never be |
|
25 | 25 | able to run. If your workflow can be described as a DAG, then it is impossible for your |
|
26 | 26 | dependencies to cause a deadlock. |
|
27 | 27 | |
|
28 | 28 | A Sample DAG |
|
29 | 29 | ------------ |
|
30 | 30 | |
|
31 | 31 | Here, we have a very simple 5-node DAG: |
|
32 | 32 | |
|
33 | 33 | .. figure:: figs/simpledag.* |
|
34 | 34 | :width: 600px |
|
35 | 35 | |
|
36 | 36 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 |
|
37 | 37 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on |
|
38 | 38 | 1 and 2; and 4 depends only on 1. |
|
39 | 39 | |
|
40 | 40 | A possible sequence of events for this workflow: |
|
41 | 41 | |
|
42 | 42 | 0. Task 0 can run right away |
|
43 | 43 | 1. 0 finishes, so 1,2 can start |
|
44 | 44 | 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away |
|
45 | 45 | 3. 2 finishes, and 3 can finally start |
|
46 | 46 | |
|
47 | 47 | |
|
48 | 48 | Further, taking failures into account, assuming all dependencies are run with the default |
|
49 | 49 | `success=True,failure=False`, the following cases would occur for each node's failure: |
|
50 | 50 | |
|
51 | 51 | 0. fails: all other tasks fail as Impossible |
|
52 | 52 | 1. 2 can still succeed, but 3,4 are unreachable |
|
53 | 53 | 2. 3 becomes unreachable, but 4 is unaffected |
|
54 | 54 | 3. and 4. are terminal, and can have no effect on other nodes |
|
55 | 55 | |
|
56 | 56 | The code to generate the simple DAG: |
|
57 | 57 | |
|
58 | 58 | .. sourcecode:: python |
|
59 | 59 | |
|
60 | 60 | import networkx as nx |
|
61 | ||
|
61 | ||
|
62 | 62 | G = nx.DiGraph() |
|
63 | ||
|
63 | ||
|
64 | 64 | # add 5 nodes, labeled 0-4: |
|
65 | 65 | map(G.add_node, range(5)) |
|
66 | 66 | # 1,2 depend on 0: |
|
67 | 67 | G.add_edge(0,1) |
|
68 | 68 | G.add_edge(0,2) |
|
69 | 69 | # 3 depends on 1,2 |
|
70 | 70 | G.add_edge(1,3) |
|
71 | 71 | G.add_edge(2,3) |
|
72 | 72 | # 4 depends on 1 |
|
73 | 73 | G.add_edge(1,4) |
|
74 | ||
|
74 | ||
|
75 | 75 | # now draw the graph: |
|
76 | 76 | pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1), |
|
77 | 77 | 3 : (0,2), 4 : (2,2)} |
|
78 | 78 | nx.draw(G, pos, edge_color='r') |
|
79 | 79 | |
|
80 | 80 | |
|
81 | 81 | For demonstration purposes, we have a function that generates a random DAG with a given |
|
82 | 82 | number of nodes and edges. |
|
83 | 83 | |
|
84 | 84 | .. literalinclude:: ../../../examples/parallel/dagdeps.py |
|
85 | 85 | :language: python |
|
86 | 86 | :lines: 20-36 |
|
87 | 87 | |
|
88 | 88 | So first, we start with a graph of 32 nodes, with 128 edges: |
|
89 | 89 | |
|
90 | 90 | .. sourcecode:: ipython |
|
91 | 91 | |
|
92 | 92 | In [2]: G = random_dag(32,128) |
|
93 | 93 | |
|
94 | 94 | Now, we need to build our dict of jobs corresponding to the nodes on the graph: |
|
95 | 95 | |
|
96 | 96 | .. sourcecode:: ipython |
|
97 | 97 | |
|
98 | 98 | In [3]: jobs = {} |
|
99 | ||
|
99 | ||
|
100 | 100 | # in reality, each job would presumably be different |
|
101 | 101 | # randomwait is just a function that sleeps for a random interval |
|
102 | 102 | In [4]: for node in G: |
|
103 |
...: jobs[node] = randomwait |
|
|
103 | ...: jobs[node] = randomwait | |
|
104 | 104 | |
|
105 | 105 | Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs, |
|
106 | 106 | and linking up the dependencies. Since we don't know a job's msg_id until it is submitted, |
|
107 | 107 | which is necessary for building dependencies, it is critical that we don't submit any jobs |
|
108 | 108 | before other jobs it may depend on. Fortunately, NetworkX provides a |
|
109 | 109 | :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that |
|
110 | 110 | guarantees that when you arrive at a node, you have already visited all the nodes it |
|
111 | 111 | on which it depends: |
|
112 | 112 | |
|
113 | 113 | .. sourcecode:: ipython |
|
114 | 114 | |
|
115 | 115 | In [5]: rc = Client() |
|
116 | 116 | In [5]: view = rc.load_balanced_view() |
|
117 | ||
|
117 | ||
|
118 | 118 | In [6]: results = {} |
|
119 | ||
|
120 |
In [7]: for node in |
|
|
119 | ||
|
120 | In [7]: for node in nx.topological_sort(G): | |
|
121 | 121 | ...: # get list of AsyncResult objects from nodes |
|
122 | 122 | ...: # leading into this one as dependencies |
|
123 | 123 | ...: deps = [ results[n] for n in G.predecessors(node) ] |
|
124 | 124 | ...: # submit and store AsyncResult object |
|
125 | 125 | ...: with view.temp_flags(after=deps, block=False): |
|
126 | 126 | ...: results[node] = view.apply_with_flags(jobs[node]) |
|
127 | 127 | |
|
128 | 128 | |
|
129 | 129 | Now that we have submitted all the jobs, we can wait for the results: |
|
130 | 130 | |
|
131 | 131 | .. sourcecode:: ipython |
|
132 | 132 | |
|
133 | 133 | In [8]: view.wait(results.values()) |
|
134 | 134 | |
|
135 | 135 | Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have |
|
136 | 136 | raised an error if a task failed). But we don't know that the ordering was properly |
|
137 | 137 | respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult. |
|
138 | 138 | |
|
139 | 139 | These objects store a variety of metadata about each task, including various timestamps. |
|
140 | 140 | We can validate that the dependencies were respected by checking that each task was |
|
141 | 141 | started after all of its predecessors were completed: |
|
142 | 142 | |
|
143 | 143 | .. literalinclude:: ../../../examples/parallel/dagdeps.py |
|
144 | 144 | :language: python |
|
145 | 145 | :lines: 64-70 |
|
146 | 146 | |
|
147 | 147 | We can also validate the graph visually. By drawing the graph with each node's x-position |
|
148 | 148 | as its start time, all arrows must be pointing to the right if dependencies were respected. |
|
149 | 149 | For spreading, the y-position will be the runtime of the task, so long tasks |
|
150 | 150 | will be at the top, and quick, small tasks will be at the bottom. |
|
151 | 151 | |
|
152 | 152 | .. sourcecode:: ipython |
|
153 | 153 | |
|
154 | 154 | In [10]: from matplotlib.dates import date2num |
|
155 | ||
|
155 | ||
|
156 | 156 | In [11]: from matplotlib.cm import gist_rainbow |
|
157 | ||
|
157 | ||
|
158 | 158 | In [12]: pos = {}; colors = {} |
|
159 | ||
|
159 | ||
|
160 | 160 | In [12]: for node in G: |
|
161 | 161 | ....: md = results[node].metadata |
|
162 | 162 | ....: start = date2num(md.started) |
|
163 | 163 | ....: runtime = date2num(md.completed) - start |
|
164 | 164 | ....: pos[node] = (start, runtime) |
|
165 | 165 | ....: colors[node] = md.engine_id |
|
166 | ||
|
166 | ||
|
167 | 167 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), |
|
168 | 168 | ....: cmap=gist_rainbow) |
|
169 | 169 | |
|
170 | 170 | .. figure:: figs/dagdeps.* |
|
171 | 171 | :width: 600px |
|
172 | 172 | |
|
173 | 173 | Time started on x, runtime on y, and color-coded by engine-id (in this case there |
|
174 | 174 | were four engines). Edges denote dependencies. |
|
175 | 175 | |
|
176 | 176 | |
|
177 | 177 | .. _NetworkX: http://networkx.lanl.gov/ |
General Comments 0
You need to be logged in to leave comments.
Login now