Show More
@@ -1,177 +1,177 b'' | |||||
1 | .. _dag_dependencies: |
|
1 | .. _dag_dependencies: | |
2 |
|
2 | |||
3 | ================ |
|
3 | ================ | |
4 | DAG Dependencies |
|
4 | DAG Dependencies | |
5 | ================ |
|
5 | ================ | |
6 |
|
6 | |||
7 | Often, parallel workflow is described in terms of a `Directed Acyclic Graph |
|
7 | Often, parallel workflow is described in terms of a `Directed Acyclic Graph | |
8 | <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library |
|
8 | <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library | |
9 | for working with Graphs is NetworkX_. Here, we will walk through a demo mapping |
|
9 | for working with Graphs is NetworkX_. Here, we will walk through a demo mapping | |
10 | a nx DAG to task dependencies. |
|
10 | a nx DAG to task dependencies. | |
11 |
|
11 | |||
12 | The full script that runs this demo can be found in |
|
12 | The full script that runs this demo can be found in | |
13 | :file:`examples/parallel/dagdeps.py`. |
|
13 | :file:`examples/parallel/dagdeps.py`. | |
14 |
|
14 | |||
15 | Why are DAGs good for task dependencies? |
|
15 | Why are DAGs good for task dependencies? | |
16 | ---------------------------------------- |
|
16 | ---------------------------------------- | |
17 |
|
17 | |||
18 | The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect |
|
18 | The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect | |
19 | the nodes. For our purposes, each node would be a task, and each edge would be a |
|
19 | the nodes. For our purposes, each node would be a task, and each edge would be a | |
20 | dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a |
|
20 | dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a | |
21 | direction associated with it. So we can interpret the edge (a,b) as meaning that b depends |
|
21 | direction associated with it. So we can interpret the edge (a,b) as meaning that b depends | |
22 | on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that |
|
22 | on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that | |
23 | there must not be any closed loops in the graph. This is important for dependencies, |
|
23 | there must not be any closed loops in the graph. This is important for dependencies, | |
24 | because if a loop were closed, then a task could ultimately depend on itself, and never be |
|
24 | because if a loop were closed, then a task could ultimately depend on itself, and never be | |
25 | able to run. If your workflow can be described as a DAG, then it is impossible for your |
|
25 | able to run. If your workflow can be described as a DAG, then it is impossible for your | |
26 | dependencies to cause a deadlock. |
|
26 | dependencies to cause a deadlock. | |
27 |
|
27 | |||
28 | A Sample DAG |
|
28 | A Sample DAG | |
29 | ------------ |
|
29 | ------------ | |
30 |
|
30 | |||
31 | Here, we have a very simple 5-node DAG: |
|
31 | Here, we have a very simple 5-node DAG: | |
32 |
|
32 | |||
33 | .. figure:: figs/simpledag.* |
|
33 | .. figure:: figs/simpledag.* | |
34 | :width: 600px |
|
34 | :width: 600px | |
35 |
|
35 | |||
36 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 |
|
36 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 | |
37 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on |
|
37 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on | |
38 | 1 and 2; and 4 depends only on 1. |
|
38 | 1 and 2; and 4 depends only on 1. | |
39 |
|
39 | |||
40 | A possible sequence of events for this workflow: |
|
40 | A possible sequence of events for this workflow: | |
41 |
|
41 | |||
42 | 0. Task 0 can run right away |
|
42 | 0. Task 0 can run right away | |
43 | 1. 0 finishes, so 1,2 can start |
|
43 | 1. 0 finishes, so 1,2 can start | |
44 | 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away |
|
44 | 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away | |
45 | 3. 2 finishes, and 3 can finally start |
|
45 | 3. 2 finishes, and 3 can finally start | |
46 |
|
46 | |||
47 |
|
47 | |||
48 | Further, taking failures into account, assuming all dependencies are run with the default |
|
48 | Further, taking failures into account, assuming all dependencies are run with the default | |
49 | `success=True,failure=False`, the following cases would occur for each node's failure: |
|
49 | `success=True,failure=False`, the following cases would occur for each node's failure: | |
50 |
|
50 | |||
51 | 0. fails: all other tasks fail as Impossible |
|
51 | 0. fails: all other tasks fail as Impossible | |
52 | 1. 2 can still succeed, but 3,4 are unreachable |
|
52 | 1. 2 can still succeed, but 3,4 are unreachable | |
53 | 2. 3 becomes unreachable, but 4 is unaffected |
|
53 | 2. 3 becomes unreachable, but 4 is unaffected | |
54 | 3. and 4. are terminal, and can have no effect on other nodes |
|
54 | 3. and 4. are terminal, and can have no effect on other nodes | |
55 |
|
55 | |||
56 | The code to generate the simple DAG: |
|
56 | The code to generate the simple DAG: | |
57 |
|
57 | |||
58 | .. sourcecode:: python |
|
58 | .. sourcecode:: python | |
59 |
|
59 | |||
60 | import networkx as nx |
|
60 | import networkx as nx | |
61 |
|
61 | |||
62 | G = nx.DiGraph() |
|
62 | G = nx.DiGraph() | |
63 |
|
63 | |||
64 | # add 5 nodes, labeled 0-4: |
|
64 | # add 5 nodes, labeled 0-4: | |
65 | map(G.add_node, range(5)) |
|
65 | map(G.add_node, range(5)) | |
66 | # 1,2 depend on 0: |
|
66 | # 1,2 depend on 0: | |
67 | G.add_edge(0,1) |
|
67 | G.add_edge(0,1) | |
68 | G.add_edge(0,2) |
|
68 | G.add_edge(0,2) | |
69 | # 3 depends on 1,2 |
|
69 | # 3 depends on 1,2 | |
70 | G.add_edge(1,3) |
|
70 | G.add_edge(1,3) | |
71 | G.add_edge(2,3) |
|
71 | G.add_edge(2,3) | |
72 | # 4 depends on 1 |
|
72 | # 4 depends on 1 | |
73 | G.add_edge(1,4) |
|
73 | G.add_edge(1,4) | |
74 |
|
74 | |||
75 | # now draw the graph: |
|
75 | # now draw the graph: | |
76 | pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1), |
|
76 | pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1), | |
77 | 3 : (0,2), 4 : (2,2)} |
|
77 | 3 : (0,2), 4 : (2,2)} | |
78 | nx.draw(G, pos, edge_color='r') |
|
78 | nx.draw(G, pos, edge_color='r') | |
79 |
|
79 | |||
80 |
|
80 | |||
81 | For demonstration purposes, we have a function that generates a random DAG with a given |
|
81 | For demonstration purposes, we have a function that generates a random DAG with a given | |
82 | number of nodes and edges. |
|
82 | number of nodes and edges. | |
83 |
|
83 | |||
84 | .. literalinclude:: ../../../examples/parallel/dagdeps.py |
|
84 | .. literalinclude:: ../../../examples/parallel/dagdeps.py | |
85 | :language: python |
|
85 | :language: python | |
86 | :lines: 20-36 |
|
86 | :lines: 20-36 | |
87 |
|
87 | |||
88 | So first, we start with a graph of 32 nodes, with 128 edges: |
|
88 | So first, we start with a graph of 32 nodes, with 128 edges: | |
89 |
|
89 | |||
90 | .. sourcecode:: ipython |
|
90 | .. sourcecode:: ipython | |
91 |
|
91 | |||
92 | In [2]: G = random_dag(32,128) |
|
92 | In [2]: G = random_dag(32,128) | |
93 |
|
93 | |||
94 | Now, we need to build our dict of jobs corresponding to the nodes on the graph: |
|
94 | Now, we need to build our dict of jobs corresponding to the nodes on the graph: | |
95 |
|
95 | |||
96 | .. sourcecode:: ipython |
|
96 | .. sourcecode:: ipython | |
97 |
|
97 | |||
98 | In [3]: jobs = {} |
|
98 | In [3]: jobs = {} | |
99 |
|
99 | |||
100 | # in reality, each job would presumably be different |
|
100 | # in reality, each job would presumably be different | |
101 | # randomwait is just a function that sleeps for a random interval |
|
101 | # randomwait is just a function that sleeps for a random interval | |
102 | In [4]: for node in G: |
|
102 | In [4]: for node in G: | |
103 |
...: jobs[node] = randomwait |
|
103 | ...: jobs[node] = randomwait | |
104 |
|
104 | |||
105 | Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs, |
|
105 | Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs, | |
106 | and linking up the dependencies. Since we don't know a job's msg_id until it is submitted, |
|
106 | and linking up the dependencies. Since we don't know a job's msg_id until it is submitted, | |
107 | which is necessary for building dependencies, it is critical that we don't submit any jobs |
|
107 | which is necessary for building dependencies, it is critical that we don't submit any jobs | |
108 | before other jobs it may depend on. Fortunately, NetworkX provides a |
|
108 | before other jobs it may depend on. Fortunately, NetworkX provides a | |
109 | :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that |
|
109 | :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that | |
110 | guarantees that when you arrive at a node, you have already visited all the nodes it |
|
110 | guarantees that when you arrive at a node, you have already visited all the nodes it | |
111 | on which it depends: |
|
111 | on which it depends: | |
112 |
|
112 | |||
113 | .. sourcecode:: ipython |
|
113 | .. sourcecode:: ipython | |
114 |
|
114 | |||
115 | In [5]: rc = Client() |
|
115 | In [5]: rc = Client() | |
116 | In [5]: view = rc.load_balanced_view() |
|
116 | In [5]: view = rc.load_balanced_view() | |
117 |
|
117 | |||
118 | In [6]: results = {} |
|
118 | In [6]: results = {} | |
119 |
|
119 | |||
120 |
In [7]: for node in |
|
120 | In [7]: for node in nx.topological_sort(G): | |
121 | ...: # get list of AsyncResult objects from nodes |
|
121 | ...: # get list of AsyncResult objects from nodes | |
122 | ...: # leading into this one as dependencies |
|
122 | ...: # leading into this one as dependencies | |
123 | ...: deps = [ results[n] for n in G.predecessors(node) ] |
|
123 | ...: deps = [ results[n] for n in G.predecessors(node) ] | |
124 | ...: # submit and store AsyncResult object |
|
124 | ...: # submit and store AsyncResult object | |
125 | ...: with view.temp_flags(after=deps, block=False): |
|
125 | ...: with view.temp_flags(after=deps, block=False): | |
126 | ...: results[node] = view.apply_with_flags(jobs[node]) |
|
126 | ...: results[node] = view.apply_with_flags(jobs[node]) | |
127 |
|
127 | |||
128 |
|
128 | |||
129 | Now that we have submitted all the jobs, we can wait for the results: |
|
129 | Now that we have submitted all the jobs, we can wait for the results: | |
130 |
|
130 | |||
131 | .. sourcecode:: ipython |
|
131 | .. sourcecode:: ipython | |
132 |
|
132 | |||
133 | In [8]: view.wait(results.values()) |
|
133 | In [8]: view.wait(results.values()) | |
134 |
|
134 | |||
135 | Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have |
|
135 | Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have | |
136 | raised an error if a task failed). But we don't know that the ordering was properly |
|
136 | raised an error if a task failed). But we don't know that the ordering was properly | |
137 | respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult. |
|
137 | respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult. | |
138 |
|
138 | |||
139 | These objects store a variety of metadata about each task, including various timestamps. |
|
139 | These objects store a variety of metadata about each task, including various timestamps. | |
140 | We can validate that the dependencies were respected by checking that each task was |
|
140 | We can validate that the dependencies were respected by checking that each task was | |
141 | started after all of its predecessors were completed: |
|
141 | started after all of its predecessors were completed: | |
142 |
|
142 | |||
143 | .. literalinclude:: ../../../examples/parallel/dagdeps.py |
|
143 | .. literalinclude:: ../../../examples/parallel/dagdeps.py | |
144 | :language: python |
|
144 | :language: python | |
145 | :lines: 64-70 |
|
145 | :lines: 64-70 | |
146 |
|
146 | |||
147 | We can also validate the graph visually. By drawing the graph with each node's x-position |
|
147 | We can also validate the graph visually. By drawing the graph with each node's x-position | |
148 | as its start time, all arrows must be pointing to the right if dependencies were respected. |
|
148 | as its start time, all arrows must be pointing to the right if dependencies were respected. | |
149 | For spreading, the y-position will be the runtime of the task, so long tasks |
|
149 | For spreading, the y-position will be the runtime of the task, so long tasks | |
150 | will be at the top, and quick, small tasks will be at the bottom. |
|
150 | will be at the top, and quick, small tasks will be at the bottom. | |
151 |
|
151 | |||
152 | .. sourcecode:: ipython |
|
152 | .. sourcecode:: ipython | |
153 |
|
153 | |||
154 | In [10]: from matplotlib.dates import date2num |
|
154 | In [10]: from matplotlib.dates import date2num | |
155 |
|
155 | |||
156 | In [11]: from matplotlib.cm import gist_rainbow |
|
156 | In [11]: from matplotlib.cm import gist_rainbow | |
157 |
|
157 | |||
158 | In [12]: pos = {}; colors = {} |
|
158 | In [12]: pos = {}; colors = {} | |
159 |
|
159 | |||
160 | In [12]: for node in G: |
|
160 | In [12]: for node in G: | |
161 | ....: md = results[node].metadata |
|
161 | ....: md = results[node].metadata | |
162 | ....: start = date2num(md.started) |
|
162 | ....: start = date2num(md.started) | |
163 | ....: runtime = date2num(md.completed) - start |
|
163 | ....: runtime = date2num(md.completed) - start | |
164 | ....: pos[node] = (start, runtime) |
|
164 | ....: pos[node] = (start, runtime) | |
165 | ....: colors[node] = md.engine_id |
|
165 | ....: colors[node] = md.engine_id | |
166 |
|
166 | |||
167 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), |
|
167 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), | |
168 | ....: cmap=gist_rainbow) |
|
168 | ....: cmap=gist_rainbow) | |
169 |
|
169 | |||
170 | .. figure:: figs/dagdeps.* |
|
170 | .. figure:: figs/dagdeps.* | |
171 | :width: 600px |
|
171 | :width: 600px | |
172 |
|
172 | |||
173 | Time started on x, runtime on y, and color-coded by engine-id (in this case there |
|
173 | Time started on x, runtime on y, and color-coded by engine-id (in this case there | |
174 | were four engines). Edges denote dependencies. |
|
174 | were four engines). Edges denote dependencies. | |
175 |
|
175 | |||
176 |
|
176 | |||
177 | .. _NetworkX: http://networkx.lanl.gov/ |
|
177 | .. _NetworkX: http://networkx.lanl.gov/ |
General Comments 0
You need to be logged in to leave comments.
Login now