##// END OF EJS Templates
DOC : fixed minor error in using topological sort...
Thomas A Caswell -
Show More
@@ -1,177 +1,177 b''
1 .. _dag_dependencies:
1 .. _dag_dependencies:
2
2
3 ================
3 ================
4 DAG Dependencies
4 DAG Dependencies
5 ================
5 ================
6
6
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 a nx DAG to task dependencies.
10 a nx DAG to task dependencies.
11
11
12 The full script that runs this demo can be found in
12 The full script that runs this demo can be found in
13 :file:`examples/parallel/dagdeps.py`.
13 :file:`examples/parallel/dagdeps.py`.
14
14
15 Why are DAGs good for task dependencies?
15 Why are DAGs good for task dependencies?
16 ----------------------------------------
16 ----------------------------------------
17
17
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 the nodes. For our purposes, each node would be a task, and each edge would be a
19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 there must not be any closed loops in the graph. This is important for dependencies,
23 there must not be any closed loops in the graph. This is important for dependencies,
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 dependencies to cause a deadlock.
26 dependencies to cause a deadlock.
27
27
28 A Sample DAG
28 A Sample DAG
29 ------------
29 ------------
30
30
31 Here, we have a very simple 5-node DAG:
31 Here, we have a very simple 5-node DAG:
32
32
33 .. figure:: figs/simpledag.*
33 .. figure:: figs/simpledag.*
34 :width: 600px
34 :width: 600px
35
35
36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
38 1 and 2; and 4 depends only on 1.
38 1 and 2; and 4 depends only on 1.
39
39
40 A possible sequence of events for this workflow:
40 A possible sequence of events for this workflow:
41
41
42 0. Task 0 can run right away
42 0. Task 0 can run right away
43 1. 0 finishes, so 1,2 can start
43 1. 0 finishes, so 1,2 can start
44 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
45 3. 2 finishes, and 3 can finally start
45 3. 2 finishes, and 3 can finally start
46
46
47
47
48 Further, taking failures into account, assuming all dependencies are run with the default
48 Further, taking failures into account, assuming all dependencies are run with the default
49 `success=True,failure=False`, the following cases would occur for each node's failure:
49 `success=True,failure=False`, the following cases would occur for each node's failure:
50
50
51 0. fails: all other tasks fail as Impossible
51 0. fails: all other tasks fail as Impossible
52 1. 2 can still succeed, but 3,4 are unreachable
52 1. 2 can still succeed, but 3,4 are unreachable
53 2. 3 becomes unreachable, but 4 is unaffected
53 2. 3 becomes unreachable, but 4 is unaffected
54 3. and 4. are terminal, and can have no effect on other nodes
54 3. and 4. are terminal, and can have no effect on other nodes
55
55
56 The code to generate the simple DAG:
56 The code to generate the simple DAG:
57
57
58 .. sourcecode:: python
58 .. sourcecode:: python
59
59
60 import networkx as nx
60 import networkx as nx
61
61
62 G = nx.DiGraph()
62 G = nx.DiGraph()
63
63
64 # add 5 nodes, labeled 0-4:
64 # add 5 nodes, labeled 0-4:
65 map(G.add_node, range(5))
65 map(G.add_node, range(5))
66 # 1,2 depend on 0:
66 # 1,2 depend on 0:
67 G.add_edge(0,1)
67 G.add_edge(0,1)
68 G.add_edge(0,2)
68 G.add_edge(0,2)
69 # 3 depends on 1,2
69 # 3 depends on 1,2
70 G.add_edge(1,3)
70 G.add_edge(1,3)
71 G.add_edge(2,3)
71 G.add_edge(2,3)
72 # 4 depends on 1
72 # 4 depends on 1
73 G.add_edge(1,4)
73 G.add_edge(1,4)
74
74
75 # now draw the graph:
75 # now draw the graph:
76 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
77 3 : (0,2), 4 : (2,2)}
77 3 : (0,2), 4 : (2,2)}
78 nx.draw(G, pos, edge_color='r')
78 nx.draw(G, pos, edge_color='r')
79
79
80
80
81 For demonstration purposes, we have a function that generates a random DAG with a given
81 For demonstration purposes, we have a function that generates a random DAG with a given
82 number of nodes and edges.
82 number of nodes and edges.
83
83
84 .. literalinclude:: ../../../examples/parallel/dagdeps.py
84 .. literalinclude:: ../../../examples/parallel/dagdeps.py
85 :language: python
85 :language: python
86 :lines: 20-36
86 :lines: 20-36
87
87
88 So first, we start with a graph of 32 nodes, with 128 edges:
88 So first, we start with a graph of 32 nodes, with 128 edges:
89
89
90 .. sourcecode:: ipython
90 .. sourcecode:: ipython
91
91
92 In [2]: G = random_dag(32,128)
92 In [2]: G = random_dag(32,128)
93
93
94 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
95
95
96 .. sourcecode:: ipython
96 .. sourcecode:: ipython
97
97
98 In [3]: jobs = {}
98 In [3]: jobs = {}
99
99
100 # in reality, each job would presumably be different
100 # in reality, each job would presumably be different
101 # randomwait is just a function that sleeps for a random interval
101 # randomwait is just a function that sleeps for a random interval
102 In [4]: for node in G:
102 In [4]: for node in G:
103 ...: jobs[node] = randomwait
103 ...: jobs[node] = randomwait
104
104
105 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
106 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
107 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 which is necessary for building dependencies, it is critical that we don't submit any jobs
108 before other jobs it may depend on. Fortunately, NetworkX provides a
108 before other jobs it may depend on. Fortunately, NetworkX provides a
109 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
110 guarantees that when you arrive at a node, you have already visited all the nodes it
110 guarantees that when you arrive at a node, you have already visited all the nodes it
111 on which it depends:
111 on which it depends:
112
112
113 .. sourcecode:: ipython
113 .. sourcecode:: ipython
114
114
115 In [5]: rc = Client()
115 In [5]: rc = Client()
116 In [5]: view = rc.load_balanced_view()
116 In [5]: view = rc.load_balanced_view()
117
117
118 In [6]: results = {}
118 In [6]: results = {}
119
119
120 In [7]: for node in G.topological_sort():
120 In [7]: for node in nx.topological_sort(G):
121 ...: # get list of AsyncResult objects from nodes
121 ...: # get list of AsyncResult objects from nodes
122 ...: # leading into this one as dependencies
122 ...: # leading into this one as dependencies
123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: deps = [ results[n] for n in G.predecessors(node) ]
124 ...: # submit and store AsyncResult object
124 ...: # submit and store AsyncResult object
125 ...: with view.temp_flags(after=deps, block=False):
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
127
128
128
129 Now that we have submitted all the jobs, we can wait for the results:
129 Now that we have submitted all the jobs, we can wait for the results:
130
130
131 .. sourcecode:: ipython
131 .. sourcecode:: ipython
132
132
133 In [8]: view.wait(results.values())
133 In [8]: view.wait(results.values())
134
134
135 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
135 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
136 raised an error if a task failed). But we don't know that the ordering was properly
136 raised an error if a task failed). But we don't know that the ordering was properly
137 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
137 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
138
138
139 These objects store a variety of metadata about each task, including various timestamps.
139 These objects store a variety of metadata about each task, including various timestamps.
140 We can validate that the dependencies were respected by checking that each task was
140 We can validate that the dependencies were respected by checking that each task was
141 started after all of its predecessors were completed:
141 started after all of its predecessors were completed:
142
142
143 .. literalinclude:: ../../../examples/parallel/dagdeps.py
143 .. literalinclude:: ../../../examples/parallel/dagdeps.py
144 :language: python
144 :language: python
145 :lines: 64-70
145 :lines: 64-70
146
146
147 We can also validate the graph visually. By drawing the graph with each node's x-position
147 We can also validate the graph visually. By drawing the graph with each node's x-position
148 as its start time, all arrows must be pointing to the right if dependencies were respected.
148 as its start time, all arrows must be pointing to the right if dependencies were respected.
149 For spreading, the y-position will be the runtime of the task, so long tasks
149 For spreading, the y-position will be the runtime of the task, so long tasks
150 will be at the top, and quick, small tasks will be at the bottom.
150 will be at the top, and quick, small tasks will be at the bottom.
151
151
152 .. sourcecode:: ipython
152 .. sourcecode:: ipython
153
153
154 In [10]: from matplotlib.dates import date2num
154 In [10]: from matplotlib.dates import date2num
155
155
156 In [11]: from matplotlib.cm import gist_rainbow
156 In [11]: from matplotlib.cm import gist_rainbow
157
157
158 In [12]: pos = {}; colors = {}
158 In [12]: pos = {}; colors = {}
159
159
160 In [12]: for node in G:
160 In [12]: for node in G:
161 ....: md = results[node].metadata
161 ....: md = results[node].metadata
162 ....: start = date2num(md.started)
162 ....: start = date2num(md.started)
163 ....: runtime = date2num(md.completed) - start
163 ....: runtime = date2num(md.completed) - start
164 ....: pos[node] = (start, runtime)
164 ....: pos[node] = (start, runtime)
165 ....: colors[node] = md.engine_id
165 ....: colors[node] = md.engine_id
166
166
167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
168 ....: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
169
169
170 .. figure:: figs/dagdeps.*
170 .. figure:: figs/dagdeps.*
171 :width: 600px
171 :width: 600px
172
172
173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
174 were four engines). Edges denote dependencies.
174 were four engines). Edges denote dependencies.
175
175
176
176
177 .. _NetworkX: http://networkx.lanl.gov/
177 .. _NetworkX: http://networkx.lanl.gov/
General Comments 0
You need to be logged in to leave comments. Login now