##// END OF EJS Templates
update parallel docs with some changes from scipy tutorial...
MinRK -
Show More
@@ -0,0 +1,61 b''
1 """An example for handling results in a way that AsyncMapResult doesn't provide
2
3 Specifically, out-of-order results with some special handing of metadata.
4
5 This just submits a bunch of jobs, waits on the results, and prints the stdout
6 and results of each as they finish.
7
8 Authors
9 -------
10 * MinRK
11 """
12 import time
13 import random
14
15 from IPython import parallel
16
17 # create client & views
18 rc = parallel.Client()
19 dv = rc[:]
20 v = rc.load_balanced_view()
21
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
26
27
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
31 print "hi from engine %i" % id
32 sys.stdout.flush()
33 time.sleep(t)
34 return count,t
35
36 amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
37
38 pending = set(amr.msg_ids)
39 while pending:
40 try:
41 rc.wait(pending, 1e-3)
42 except parallel.TimeoutError:
43 # ignore timeouterrors, since they only mean that at least one isn't done
44 pass
45 # finished is the set of msg_ids that are complete
46 finished = pending.difference(rc.outstanding)
47 # update pending to exclude those that just finished
48 pending = pending.difference(finished)
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
53 print "with stdout:"
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
55 print "and results:"
56
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
61
@@ -0,0 +1,83 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
11
12 Authors
13 -------
14 * MinRK
15 """
16
17 import os
18 import sys
19 import json
20 import zmq
21
22 from IPython.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
26
27 def main(connection_file):
28 """watch iopub channel, and print messages"""
29
30 ctx = zmq.Context.instance()
31
32 with open(connection_file) as f:
33 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
46 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
57 while True:
58 try:
59 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
61 return
62 # ident always length 1 here
63 topic = idents[0]
64 if msg['msg_type'] == 'stream':
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
71 c = msg['content']
72 print topic + ':'
73 for line in c['traceback']:
74 # indent lines
75 print ' ' + line
76
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
79 cf = sys.argv[1]
80 else:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
@@ -0,0 +1,52 b''
1 """Example of iteration through AsyncMapResult, without waiting for all results
2
3 Authors
4 -------
5 * MinRK
6 """
7 import time
8
9 from IPython import parallel
10
11 # create client & view
12 rc = parallel.Client()
13 dv = rc[:]
14 v = rc.load_balanced_view()
15
16 # scatter 'id', so id=0,1,2 on engines 0,1,2
17 dv.scatter('id', rc.ids, flatten=True)
18 print "Engine IDs: ", dv['id']
19
20 # create a Reference to `id`. This will be a different value on each engine
21 ref = parallel.Reference('id')
22 print "sleeping for `id` seconds on each engine"
23 tic = time.time()
24 ar = dv.apply(time.sleep, ref)
25 for i,r in enumerate(ar):
26 print "%i: %.3f"%(i, time.time()-tic)
27
28 def sleep_here(t):
29 import time
30 time.sleep(t)
31 return id,t
32
33 # one call per task
34 print "running with one call per task"
35 amr = v.map(sleep_here, [.01*t for t in range(100)])
36 tic = time.time()
37 for i,r in enumerate(amr):
38 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
39
40 print "running with four calls per task"
41 # with chunksize, we can have four calls per task
42 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
43 tic = time.time()
44 for i,r in enumerate(amr):
45 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
46
47 print "running with two calls per task, with unordered results"
48 # We can even iterate through faster results first, with ordered=False
49 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
50 tic = time.time()
51 for i,r in enumerate(amr):
52 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -1,173 +1,177 b''
1 .. _dag_dependencies:
1 .. _dag_dependencies:
2
2
3 ================
3 ================
4 DAG Dependencies
4 DAG Dependencies
5 ================
5 ================
6
6
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 a nx DAG to task dependencies.
10 a nx DAG to task dependencies.
11
11
12 The full script that runs this demo can be found in
12 The full script that runs this demo can be found in
13 :file:`docs/examples/parallel/dagdeps.py`.
13 :file:`docs/examples/parallel/dagdeps.py`.
14
14
15 Why are DAGs good for task dependencies?
15 Why are DAGs good for task dependencies?
16 ----------------------------------------
16 ----------------------------------------
17
17
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 the nodes. For our purposes, each node would be a task, and each edge would be a
19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 there must not be any closed loops in the graph. This is important for dependencies,
23 there must not be any closed loops in the graph. This is important for dependencies,
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 dependencies to cause a deadlock.
26 dependencies to cause a deadlock.
27
27
28 A Sample DAG
28 A Sample DAG
29 ------------
29 ------------
30
30
31 Here, we have a very simple 5-node DAG:
31 Here, we have a very simple 5-node DAG:
32
32
33 .. figure:: figs/ simpledag.*
33 .. figure:: figs/simpledag.*
34 :width: 600px
34
35
35 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 1 and 2; and 4 depends only on 1.
38 1 and 2; and 4 depends only on 1.
38
39
39 A possible sequence of events for this workflow:
40 A possible sequence of events for this workflow:
40
41
41 0. Task 0 can run right away
42 0. Task 0 can run right away
42 1. 0 finishes, so 1,2 can start
43 1. 0 finishes, so 1,2 can start
43 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 3. 2 finishes, and 3 can finally start
45 3. 2 finishes, and 3 can finally start
45
46
46
47
47 Further, taking failures into account, assuming all dependencies are run with the default
48 Further, taking failures into account, assuming all dependencies are run with the default
48 `success=True,failure=False`, the following cases would occur for each node's failure:
49 `success=True,failure=False`, the following cases would occur for each node's failure:
49
50
50 0. fails: all other tasks fail as Impossible
51 0. fails: all other tasks fail as Impossible
51 1. 2 can still succeed, but 3,4 are unreachable
52 1. 2 can still succeed, but 3,4 are unreachable
52 2. 3 becomes unreachable, but 4 is unaffected
53 2. 3 becomes unreachable, but 4 is unaffected
53 3. and 4. are terminal, and can have no effect on other nodes
54 3. and 4. are terminal, and can have no effect on other nodes
54
55
55 The code to generate the simple DAG:
56 The code to generate the simple DAG:
56
57
57 .. sourcecode:: python
58 .. sourcecode:: python
58
59
59 import networkx as nx
60 import networkx as nx
60
61
61 G = nx.DiGraph()
62 G = nx.DiGraph()
62
63
63 # add 5 nodes, labeled 0-4:
64 # add 5 nodes, labeled 0-4:
64 map(G.add_node, range(5))
65 map(G.add_node, range(5))
65 # 1,2 depend on 0:
66 # 1,2 depend on 0:
66 G.add_edge(0,1)
67 G.add_edge(0,1)
67 G.add_edge(0,2)
68 G.add_edge(0,2)
68 # 3 depends on 1,2
69 # 3 depends on 1,2
69 G.add_edge(1,3)
70 G.add_edge(1,3)
70 G.add_edge(2,3)
71 G.add_edge(2,3)
71 # 4 depends on 1
72 # 4 depends on 1
72 G.add_edge(1,4)
73 G.add_edge(1,4)
73
74
74 # now draw the graph:
75 # now draw the graph:
75 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 3 : (0,2), 4 : (2,2)}
77 3 : (0,2), 4 : (2,2)}
77 nx.draw(G, pos, edge_color='r')
78 nx.draw(G, pos, edge_color='r')
78
79
79
80
80 For demonstration purposes, we have a function that generates a random DAG with a given
81 For demonstration purposes, we have a function that generates a random DAG with a given
81 number of nodes and edges.
82 number of nodes and edges.
82
83
83 .. literalinclude:: ../../examples/parallel/dagdeps.py
84 .. literalinclude:: ../../examples/parallel/dagdeps.py
84 :language: python
85 :language: python
85 :lines: 20-36
86 :lines: 20-36
86
87
87 So first, we start with a graph of 32 nodes, with 128 edges:
88 So first, we start with a graph of 32 nodes, with 128 edges:
88
89
89 .. sourcecode:: ipython
90 .. sourcecode:: ipython
90
91
91 In [2]: G = random_dag(32,128)
92 In [2]: G = random_dag(32,128)
92
93
93 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94
95
95 .. sourcecode:: ipython
96 .. sourcecode:: ipython
96
97
97 In [3]: jobs = {}
98 In [3]: jobs = {}
98
99
99 # in reality, each job would presumably be different
100 # in reality, each job would presumably be different
100 # randomwait is just a function that sleeps for a random interval
101 # randomwait is just a function that sleeps for a random interval
101 In [4]: for node in G:
102 In [4]: for node in G:
102 ...: jobs[node] = randomwait
103 ...: jobs[node] = randomwait
103
104
104 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 before other jobs it may depend on. Fortunately, NetworkX provides a
108 before other jobs it may depend on. Fortunately, NetworkX provides a
108 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 guarantees that when you arrive at a node, you have already visited all the nodes it
110 guarantees that when you arrive at a node, you have already visited all the nodes it
110 on which it depends:
111 on which it depends:
111
112
112 .. sourcecode:: ipython
113 .. sourcecode:: ipython
113
114
114 In [5]: rc = Client()
115 In [5]: rc = Client()
115 In [5]: view = rc.load_balanced_view()
116 In [5]: view = rc.load_balanced_view()
116
117
117 In [6]: results = {}
118 In [6]: results = {}
118
119
119 In [7]: for node in G.topological_sort():
120 In [7]: for node in G.topological_sort():
120 ...: # get list of AsyncResult objects from nodes
121 ...: # get list of AsyncResult objects from nodes
121 ...: # leading into this one as dependencies
122 ...: # leading into this one as dependencies
122 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: # submit and store AsyncResult object
124 ...: # submit and store AsyncResult object
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
125
128
126 Now that we have submitted all the jobs, we can wait for the results:
129 Now that we have submitted all the jobs, we can wait for the results:
127
130
128 .. sourcecode:: ipython
131 .. sourcecode:: ipython
129
132
130 In [8]: view.wait(results.values())
133 In [8]: view.wait(results.values())
131
134
132 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
135 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
133 raised an error if a task failed). But we don't know that the ordering was properly
136 raised an error if a task failed). But we don't know that the ordering was properly
134 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
137 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
135
138
136 These objects store a variety of metadata about each task, including various timestamps.
139 These objects store a variety of metadata about each task, including various timestamps.
137 We can validate that the dependencies were respected by checking that each task was
140 We can validate that the dependencies were respected by checking that each task was
138 started after all of its predecessors were completed:
141 started after all of its predecessors were completed:
139
142
140 .. literalinclude:: ../../examples/parallel/dagdeps.py
143 .. literalinclude:: ../../examples/parallel/dagdeps.py
141 :language: python
144 :language: python
142 :lines: 64-70
145 :lines: 64-70
143
146
144 We can also validate the graph visually. By drawing the graph with each node's x-position
147 We can also validate the graph visually. By drawing the graph with each node's x-position
145 as its start time, all arrows must be pointing to the right if dependencies were respected.
148 as its start time, all arrows must be pointing to the right if dependencies were respected.
146 For spreading, the y-position will be the runtime of the task, so long tasks
149 For spreading, the y-position will be the runtime of the task, so long tasks
147 will be at the top, and quick, small tasks will be at the bottom.
150 will be at the top, and quick, small tasks will be at the bottom.
148
151
149 .. sourcecode:: ipython
152 .. sourcecode:: ipython
150
153
151 In [10]: from matplotlib.dates import date2num
154 In [10]: from matplotlib.dates import date2num
152
155
153 In [11]: from matplotlib.cm import gist_rainbow
156 In [11]: from matplotlib.cm import gist_rainbow
154
157
155 In [12]: pos = {}; colors = {}
158 In [12]: pos = {}; colors = {}
156
159
157 In [12]: for node in G:
160 In [12]: for node in G:
158 ...: md = results[node].metadata
161 ....: md = results[node].metadata
159 ...: start = date2num(md.started)
162 ....: start = date2num(md.started)
160 ...: runtime = date2num(md.completed) - start
163 ....: runtime = date2num(md.completed) - start
161 ...: pos[node] = (start, runtime)
164 ....: pos[node] = (start, runtime)
162 ...: colors[node] = md.engine_id
165 ....: colors[node] = md.engine_id
163
166
164 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165 ...: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
166
169
167 .. figure:: figs/ dagdeps.*
170 .. figure:: figs/dagdeps.*
171 :width: 600px
168
172
169 Time started on x, runtime on y, and color-coded by engine-id (in this case there
173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 were four engines). Edges denote dependencies.
174 were four engines). Edges denote dependencies.
171
175
172
176
173 .. _NetworkX: http://networkx.lanl.gov/
177 .. _NetworkX: http://networkx.lanl.gov/
@@ -1,263 +1,295 b''
1 .. _parallel_overview:
1 .. _parallel_overview:
2
2
3 ============================
3 ============================
4 Overview and getting started
4 Overview and getting started
5 ============================
5 ============================
6
6
7 Introduction
7 Introduction
8 ============
8 ============
9
9
10 This section gives an overview of IPython's sophisticated and powerful
10 This section gives an overview of IPython's sophisticated and powerful
11 architecture for parallel and distributed computing. This architecture
11 architecture for parallel and distributed computing. This architecture
12 abstracts out parallelism in a very general way, which enables IPython to
12 abstracts out parallelism in a very general way, which enables IPython to
13 support many different styles of parallelism including:
13 support many different styles of parallelism including:
14
14
15 * Single program, multiple data (SPMD) parallelism.
15 * Single program, multiple data (SPMD) parallelism.
16 * Multiple program, multiple data (MPMD) parallelism.
16 * Multiple program, multiple data (MPMD) parallelism.
17 * Message passing using MPI.
17 * Message passing using MPI.
18 * Task farming.
18 * Task farming.
19 * Data parallel.
19 * Data parallel.
20 * Combinations of these approaches.
20 * Combinations of these approaches.
21 * Custom user defined approaches.
21 * Custom user defined approaches.
22
22
23 Most importantly, IPython enables all types of parallel applications to
23 Most importantly, IPython enables all types of parallel applications to
24 be developed, executed, debugged and monitored *interactively*. Hence,
24 be developed, executed, debugged and monitored *interactively*. Hence,
25 the ``I`` in IPython. The following are some example usage cases for IPython:
25 the ``I`` in IPython. The following are some example usage cases for IPython:
26
26
27 * Quickly parallelize algorithms that are embarrassingly parallel
27 * Quickly parallelize algorithms that are embarrassingly parallel
28 using a number of simple approaches. Many simple things can be
28 using a number of simple approaches. Many simple things can be
29 parallelized interactively in one or two lines of code.
29 parallelized interactively in one or two lines of code.
30
30
31 * Steer traditional MPI applications on a supercomputer from an
31 * Steer traditional MPI applications on a supercomputer from an
32 IPython session on your laptop.
32 IPython session on your laptop.
33
33
34 * Analyze and visualize large datasets (that could be remote and/or
34 * Analyze and visualize large datasets (that could be remote and/or
35 distributed) interactively using IPython and tools like
35 distributed) interactively using IPython and tools like
36 matplotlib/TVTK.
36 matplotlib/TVTK.
37
37
38 * Develop, test and debug new parallel algorithms
38 * Develop, test and debug new parallel algorithms
39 (that may use MPI) interactively.
39 (that may use MPI) interactively.
40
40
41 * Tie together multiple MPI jobs running on different systems into
41 * Tie together multiple MPI jobs running on different systems into
42 one giant distributed and parallel system.
42 one giant distributed and parallel system.
43
43
44 * Start a parallel job on your cluster and then have a remote
44 * Start a parallel job on your cluster and then have a remote
45 collaborator connect to it and pull back data into their
45 collaborator connect to it and pull back data into their
46 local IPython session for plotting and analysis.
46 local IPython session for plotting and analysis.
47
47
48 * Run a set of tasks on a set of CPUs using dynamic load balancing.
48 * Run a set of tasks on a set of CPUs using dynamic load balancing.
49
49
50 .. tip::
50 .. tip::
51
51
52 At the SciPy 2011 conference in Austin, Min Ragan-Kelley presented a
52 At the SciPy 2011 conference in Austin, Min Ragan-Kelley presented a
53 complete 4-hour tutorial on the use of these features, and all the materials
53 complete 4-hour tutorial on the use of these features, and all the materials
54 for the tutorial are now `available online`__. That tutorial provides an
54 for the tutorial are now `available online`__. That tutorial provides an
55 excellent, hands-on oriented complement to the reference documentation
55 excellent, hands-on oriented complement to the reference documentation
56 presented here.
56 presented here.
57
57
58 .. __: http://minrk.github.com/scipy-tutorial-2011
58 .. __: http://minrk.github.com/scipy-tutorial-2011
59
59
60 Architecture overview
60 Architecture overview
61 =====================
61 =====================
62
62
63 .. figure:: figs/wideView.png
64 :width: 300px
65
66
63 The IPython architecture consists of four components:
67 The IPython architecture consists of four components:
64
68
65 * The IPython engine.
69 * The IPython engine.
66 * The IPython hub.
70 * The IPython hub.
67 * The IPython schedulers.
71 * The IPython schedulers.
68 * The controller client.
72 * The controller client.
69
73
70 These components live in the :mod:`IPython.parallel` package and are
74 These components live in the :mod:`IPython.parallel` package and are
71 installed with IPython. They do, however, have additional dependencies
75 installed with IPython. They do, however, have additional dependencies
72 that must be installed. For more information, see our
76 that must be installed. For more information, see our
73 :ref:`installation documentation <install_index>`.
77 :ref:`installation documentation <install_index>`.
74
78
75 .. TODO: include zmq in install_index
79 .. TODO: include zmq in install_index
76
80
77 IPython engine
81 IPython engine
78 ---------------
82 ---------------
79
83
80 The IPython engine is a Python instance that takes Python commands over a
84 The IPython engine is a Python instance that takes Python commands over a
81 network connection. Eventually, the IPython engine will be a full IPython
85 network connection. Eventually, the IPython engine will be a full IPython
82 interpreter, but for now, it is a regular Python interpreter. The engine
86 interpreter, but for now, it is a regular Python interpreter. The engine
83 can also handle incoming and outgoing Python objects sent over a network
87 can also handle incoming and outgoing Python objects sent over a network
84 connection. When multiple engines are started, parallel and distributed
88 connection. When multiple engines are started, parallel and distributed
85 computing becomes possible. An important feature of an IPython engine is
89 computing becomes possible. An important feature of an IPython engine is
86 that it blocks while user code is being executed. Read on for how the
90 that it blocks while user code is being executed. Read on for how the
87 IPython controller solves this problem to expose a clean asynchronous API
91 IPython controller solves this problem to expose a clean asynchronous API
88 to the user.
92 to the user.
89
93
90 IPython controller
94 IPython controller
91 ------------------
95 ------------------
92
96
93 The IPython controller processes provide an interface for working with a set of engines.
97 The IPython controller processes provide an interface for working with a set of engines.
94 At a general level, the controller is a collection of processes to which IPython engines
98 At a general level, the controller is a collection of processes to which IPython engines
95 and clients can connect. The controller is composed of a :class:`Hub` and a collection of
99 and clients can connect. The controller is composed of a :class:`Hub` and a collection of
96 :class:`Schedulers`. These Schedulers are typically run in separate processes but on the
100 :class:`Schedulers`. These Schedulers are typically run in separate processes but on the
97 same machine as the Hub, but can be run anywhere from local threads or on remote machines.
101 same machine as the Hub, but can be run anywhere from local threads or on remote machines.
98
102
99 The controller also provides a single point of contact for users who wish to
103 The controller also provides a single point of contact for users who wish to
100 utilize the engines connected to the controller. There are different ways of
104 utilize the engines connected to the controller. There are different ways of
101 working with a controller. In IPython, all of these models are implemented via
105 working with a controller. In IPython, all of these models are implemented via
102 the client's :meth:`.View.apply` method, with various arguments, or
106 the :meth:`.View.apply` method, after
103 constructing :class:`.View` objects to represent subsets of engines. The two
107 constructing :class:`.View` objects to represent subsets of engines. The two
104 primary models for interacting with engines are:
108 primary models for interacting with engines are:
105
109
106 * A **Direct** interface, where engines are addressed explicitly.
110 * A **Direct** interface, where engines are addressed explicitly.
107 * A **LoadBalanced** interface, where the Scheduler is trusted with assigning work to
111 * A **LoadBalanced** interface, where the Scheduler is trusted with assigning work to
108 appropriate engines.
112 appropriate engines.
109
113
110 Advanced users can readily extend the View models to enable other
114 Advanced users can readily extend the View models to enable other
111 styles of parallelism.
115 styles of parallelism.
112
116
113 .. note::
117 .. note::
114
118
115 A single controller and set of engines can be used with multiple models
119 A single controller and set of engines can be used with multiple models
116 simultaneously. This opens the door for lots of interesting things.
120 simultaneously. This opens the door for lots of interesting things.
117
121
118
122
119 The Hub
123 The Hub
120 *******
124 *******
121
125
122 The center of an IPython cluster is the Hub. This is the process that keeps
126 The center of an IPython cluster is the Hub. This is the process that keeps
123 track of engine connections, schedulers, clients, as well as all task requests and
127 track of engine connections, schedulers, clients, as well as all task requests and
124 results. The primary role of the Hub is to facilitate queries of the cluster state, and
128 results. The primary role of the Hub is to facilitate queries of the cluster state, and
125 minimize the necessary information required to establish the many connections involved in
129 minimize the necessary information required to establish the many connections involved in
126 connecting new clients and engines.
130 connecting new clients and engines.
127
131
128
132
129 Schedulers
133 Schedulers
130 **********
134 **********
131
135
132 All actions that can be performed on the engine go through a Scheduler. While the engines
136 All actions that can be performed on the engine go through a Scheduler. While the engines
133 themselves block when user code is run, the schedulers hide that from the user to provide
137 themselves block when user code is run, the schedulers hide that from the user to provide
134 a fully asynchronous interface to a set of engines.
138 a fully asynchronous interface to a set of engines.
135
139
136
140
137 IPython client and views
141 IPython client and views
138 ------------------------
142 ------------------------
139
143
140 There is one primary object, the :class:`~.parallel.Client`, for connecting to a cluster.
144 There is one primary object, the :class:`~.parallel.Client`, for connecting to a cluster.
141 For each execution model, there is a corresponding :class:`~.parallel.View`. These views
145 For each execution model, there is a corresponding :class:`~.parallel.View`. These views
142 allow users to interact with a set of engines through the interface. Here are the two default
146 allow users to interact with a set of engines through the interface. Here are the two default
143 views:
147 views:
144
148
145 * The :class:`DirectView` class for explicit addressing.
149 * The :class:`DirectView` class for explicit addressing.
146 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
150 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
147
151
148 Security
152 Security
149 --------
153 --------
150
154
151 IPython uses ZeroMQ for networking, which has provided many advantages, but
155 IPython uses ZeroMQ for networking, which has provided many advantages, but
152 one of the setbacks is its utter lack of security [ZeroMQ]_. By default, no IPython
156 one of the setbacks is its utter lack of security [ZeroMQ]_. By default, no IPython
153 connections are encrypted, but open ports only listen on localhost. The only
157 connections are encrypted, but open ports only listen on localhost. The only
154 source of security for IPython is via ssh-tunnel. IPython supports both shell
158 source of security for IPython is via ssh-tunnel. IPython supports both shell
155 (`openssh`) and `paramiko` based tunnels for connections. There is a key necessary
159 (`openssh`) and `paramiko` based tunnels for connections. There is a key necessary
156 to submit requests, but due to the lack of encryption, it does not provide
160 to submit requests, but due to the lack of encryption, it does not provide
157 significant security if loopback traffic is compromised.
161 significant security if loopback traffic is compromised.
158
162
159 In our architecture, the controller is the only process that listens on
163 In our architecture, the controller is the only process that listens on
160 network ports, and is thus the main point of vulnerability. The standard model
164 network ports, and is thus the main point of vulnerability. The standard model
161 for secure connections is to designate that the controller listen on
165 for secure connections is to designate that the controller listen on
162 localhost, and use ssh-tunnels to connect clients and/or
166 localhost, and use ssh-tunnels to connect clients and/or
163 engines.
167 engines.
164
168
165 To connect and authenticate to the controller an engine or client needs
169 To connect and authenticate to the controller an engine or client needs
166 some information that the controller has stored in a JSON file.
170 some information that the controller has stored in a JSON file.
167 Thus, the JSON files need to be copied to a location where
171 Thus, the JSON files need to be copied to a location where
168 the clients and engines can find them. Typically, this is the
172 the clients and engines can find them. Typically, this is the
169 :file:`~/.ipython/profile_default/security` directory on the host where the
173 :file:`~/.ipython/profile_default/security` directory on the host where the
170 client/engine is running (which could be a different host than the controller).
174 client/engine is running (which could be a different host than the controller).
171 Once the JSON files are copied over, everything should work fine.
175 Once the JSON files are copied over, everything should work fine.
172
176
173 Currently, there are two JSON files that the controller creates:
177 Currently, there are two JSON files that the controller creates:
174
178
175 ipcontroller-engine.json
179 ipcontroller-engine.json
176 This JSON file has the information necessary for an engine to connect
180 This JSON file has the information necessary for an engine to connect
177 to a controller.
181 to a controller.
178
182
179 ipcontroller-client.json
183 ipcontroller-client.json
180 The client's connection information. This may not differ from the engine's,
184 The client's connection information. This may not differ from the engine's,
181 but since the controller may listen on different ports for clients and
185 but since the controller may listen on different ports for clients and
182 engines, it is stored separately.
186 engines, it is stored separately.
183
187
188 ipcontroller-client.json will look something like this, under default localhost
189 circumstances:
190
191 .. sourcecode:: python
192
193 {
194 "url":"tcp:\/\/127.0.0.1:54424",
195 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
196 "ssh":"",
197 "location":"10.19.1.135"
198 }
199
200 If, however, you are running the controller on a work node on a cluster, you will likely
201 need to use ssh tunnels to connect clients from your laptop to it. You will also
202 probably need to instruct the controller to listen for engines coming from other work nodes
203 on the cluster. An example of ipcontroller-client.json, as created by::
204
205 $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com
206
207
208 .. sourcecode:: python
209
210 {
211 "url":"tcp:\/\/*:54424",
212 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
213 "ssh":"login.mycluster.com",
214 "location":"10.0.0.2"
215 }
184 More details of how these JSON files are used are given below.
216 More details of how these JSON files are used are given below.
185
217
186 A detailed description of the security model and its implementation in IPython
218 A detailed description of the security model and its implementation in IPython
187 can be found :ref:`here <parallelsecurity>`.
219 can be found :ref:`here <parallelsecurity>`.
188
220
189 .. warning::
221 .. warning::
190
222
191 Even at its most secure, the Controller listens on ports on localhost, and
223 Even at its most secure, the Controller listens on ports on localhost, and
192 every time you make a tunnel, you open a localhost port on the connecting
224 every time you make a tunnel, you open a localhost port on the connecting
193 machine that points to the Controller. If localhost on the Controller's
225 machine that points to the Controller. If localhost on the Controller's
194 machine, or the machine of any client or engine, is untrusted, then your
226 machine, or the machine of any client or engine, is untrusted, then your
195 Controller is insecure. There is no way around this with ZeroMQ.
227 Controller is insecure. There is no way around this with ZeroMQ.
196
228
197
229
198
230
199 Getting Started
231 Getting Started
200 ===============
232 ===============
201
233
202 To use IPython for parallel computing, you need to start one instance of the
234 To use IPython for parallel computing, you need to start one instance of the
203 controller and one or more instances of the engine. Initially, it is best to
235 controller and one or more instances of the engine. Initially, it is best to
204 simply start a controller and engines on a single host using the
236 simply start a controller and engines on a single host using the
205 :command:`ipcluster` command. To start a controller and 4 engines on your
237 :command:`ipcluster` command. To start a controller and 4 engines on your
206 localhost, just do::
238 localhost, just do::
207
239
208 $ ipcluster start -n 4
240 $ ipcluster start -n 4
209
241
210 More details about starting the IPython controller and engines can be found
242 More details about starting the IPython controller and engines can be found
211 :ref:`here <parallel_process>`
243 :ref:`here <parallel_process>`
212
244
213 Once you have started the IPython controller and one or more engines, you
245 Once you have started the IPython controller and one or more engines, you
214 are ready to use the engines to do something useful. To make sure
246 are ready to use the engines to do something useful. To make sure
215 everything is working correctly, try the following commands:
247 everything is working correctly, try the following commands:
216
248
217 .. sourcecode:: ipython
249 .. sourcecode:: ipython
218
250
219 In [1]: from IPython.parallel import Client
251 In [1]: from IPython.parallel import Client
220
252
221 In [2]: c = Client()
253 In [2]: c = Client()
222
254
223 In [4]: c.ids
255 In [4]: c.ids
224 Out[4]: set([0, 1, 2, 3])
256 Out[4]: set([0, 1, 2, 3])
225
257
226 In [5]: c[:].apply_sync(lambda : "Hello, World")
258 In [5]: c[:].apply_sync(lambda : "Hello, World")
227 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
259 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
228
260
229
261
230 When a client is created with no arguments, the client tries to find the corresponding JSON file
262 When a client is created with no arguments, the client tries to find the corresponding JSON file
231 in the local `~/.ipython/profile_default/security` directory. Or if you specified a profile,
263 in the local `~/.ipython/profile_default/security` directory. Or if you specified a profile,
232 you can use that with the Client. This should cover most cases:
264 you can use that with the Client. This should cover most cases:
233
265
234 .. sourcecode:: ipython
266 .. sourcecode:: ipython
235
267
236 In [2]: c = Client(profile='myprofile')
268 In [2]: c = Client(profile='myprofile')
237
269
238 If you have put the JSON file in a different location or it has a different name, create the
270 If you have put the JSON file in a different location or it has a different name, create the
239 client like this:
271 client like this:
240
272
241 .. sourcecode:: ipython
273 .. sourcecode:: ipython
242
274
243 In [2]: c = Client('/path/to/my/ipcontroller-client.json')
275 In [2]: c = Client('/path/to/my/ipcontroller-client.json')
244
276
245 Remember, a client needs to be able to see the Hub's ports to connect. So if they are on a
277 Remember, a client needs to be able to see the Hub's ports to connect. So if they are on a
246 different machine, you may need to use an ssh server to tunnel access to that machine,
278 different machine, you may need to use an ssh server to tunnel access to that machine,
247 then you would connect to it with:
279 then you would connect to it with:
248
280
249 .. sourcecode:: ipython
281 .. sourcecode:: ipython
250
282
251 In [2]: c = Client(sshserver='myhub.example.com')
283 In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com')
252
284
253 Where 'myhub.example.com' is the url or IP address of the machine on
285 Where 'myhub.example.com' is the url or IP address of the machine on
254 which the Hub process is running (or another machine that has direct access to the Hub's ports).
286 which the Hub process is running (or another machine that has direct access to the Hub's ports).
255
287
256 The SSH server may already be specified in ipcontroller-client.json, if the controller was
288 The SSH server may already be specified in ipcontroller-client.json, if the controller was
257 instructed at its launch time.
289 instructed at its launch time.
258
290
259 You are now ready to learn more about the :ref:`Direct
291 You are now ready to learn more about the :ref:`Direct
260 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
292 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
261 controller.
293 controller.
262
294
263 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
295 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
@@ -1,847 +1,841 b''
1 .. _parallel_multiengine:
1 .. _parallel_multiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
5 ==========================
5 ==========================
6
6
7 The direct, or multiengine, interface represents one possible way of working with a set of
7 The direct, or multiengine, interface represents one possible way of working with a set of
8 IPython engines. The basic idea behind the multiengine interface is that the
8 IPython engines. The basic idea behind the multiengine interface is that the
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is the best place for
12 and is designed with interactive usage in mind, and is the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
16 ===========================================
16 ===========================================
17
17
18 To follow along with this tutorial, you will need to start the IPython
18 To follow along with this tutorial, you will need to start the IPython
19 controller and four IPython engines. The simplest way of doing this is to use
19 controller and four IPython engines. The simplest way of doing this is to use
20 the :command:`ipcluster` command::
20 the :command:`ipcluster` command::
21
21
22 $ ipcluster start -n 4
22 $ ipcluster start -n 4
23
23
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26
26
27 Creating a ``Client`` instance
27 Creating a ``DirectView`` instance
28 ==============================
28 ==================================
29
29
30 The first step is to import the IPython :mod:`IPython.parallel`
30 The first step is to import the IPython :mod:`IPython.parallel`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
32
32
33 .. sourcecode:: ipython
33 .. sourcecode:: ipython
34
34
35 In [1]: from IPython.parallel import Client
35 In [1]: from IPython.parallel import Client
36
36
37 In [2]: rc = Client()
37 In [2]: rc = Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/profile_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/profile_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
44 .. sourcecode:: ipython
44 .. sourcecode:: ipython
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 # or to connect with a specific profile you have set up:
48 # or to connect with a specific profile you have set up:
49 In [3]: rc = Client(profile='mpi')
49 In [3]: rc = Client(profile='mpi')
50
50
51
51
52 To make sure there are engines connected to the controller, users can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: [0, 1, 2, 3]
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
63 constructed via list-access to the client:
64
64
65 .. sourcecode:: ipython
65 .. sourcecode:: ipython
66
66
67 In [4]: dview = rc[:] # use all engines
67 In [4]: dview = rc[:] # use all engines
68
68
69 .. seealso::
69 .. seealso::
70
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72
72
73
73
74 Quick and easy parallelism
74 Quick and easy parallelism
75 ==========================
75 ==========================
76
76
77 In many cases, you simply want to apply a Python function to a sequence of
77 In many cases, you simply want to apply a Python function to a sequence of
78 objects, but *in parallel*. The client interface provides a simple way
78 objects, but *in parallel*. The client interface provides a simple way
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80
80
81 Parallel map
81 Parallel map
82 ------------
82 ------------
83
83
84 Python's builtin :func:`map` functions allows a function to be applied to a
84 Python's builtin :func:`map` functions allows a function to be applied to a
85 sequence element-by-element. This type of code is typically trivial to
85 sequence element-by-element. This type of code is typically trivial to
86 parallelize. In fact, since IPython's interface is all about functions anyway,
86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 DirectView's :meth:`map` method:
88 DirectView's :meth:`map` method:
89
89
90 .. sourcecode:: ipython
90 .. sourcecode:: ipython
91
91
92 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
93
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95
95
96 In [67]: serial_result==parallel_result
96 In [67]: serial_result==parallel_result
97 Out[67]: True
97 Out[67]: True
98
98
99
99
100 .. note::
100 .. note::
101
101
102 The :class:`DirectView`'s version of :meth:`map` does
102 The :class:`DirectView`'s version of :meth:`map` does
103 not do dynamic load balancing. For a load balanced version, use a
103 not do dynamic load balancing. For a load balanced version, use a
104 :class:`LoadBalancedView`.
104 :class:`LoadBalancedView`.
105
105
106 .. seealso::
106 .. seealso::
107
107
108 :meth:`map` is implemented via :class:`ParallelFunction`.
108 :meth:`map` is implemented via :class:`ParallelFunction`.
109
109
110 Remote function decorators
110 Remote function decorators
111 --------------------------
111 --------------------------
112
112
113 Remote functions are just like normal functions, but when they are called,
113 Remote functions are just like normal functions, but when they are called,
114 they execute on one or more engines, rather than locally. IPython provides
114 they execute on one or more engines, rather than locally. IPython provides
115 two decorators:
115 two decorators:
116
116
117 .. sourcecode:: ipython
117 .. sourcecode:: ipython
118
118
119 In [10]: @dview.remote(block=True)
119 In [10]: @dview.remote(block=True)
120 ...: def getpid():
120 ....: def getpid():
121 ...: import os
121 ....: import os
122 ...: return os.getpid()
122 ....: return os.getpid()
123 ...:
123 ....:
124
124
125 In [11]: getpid()
125 In [11]: getpid()
126 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
127
127
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 operations and distribute them, reconstructing the result.
129 operations and distribute them, reconstructing the result.
130
130
131 .. sourcecode:: ipython
131 .. sourcecode:: ipython
132
132
133 In [12]: import numpy as np
133 In [12]: import numpy as np
134
134
135 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
136
136
137 In [14]: @dview.parallel(block=True)
137 In [14]: @dview.parallel(block=True)
138 ...: def pmul(A,B):
138 ....: def pmul(A,B):
139 ...: return A*B
139 ....: return A*B
140
140
141 In [15]: C_local = A*A
141 In [15]: C_local = A*A
142
142
143 In [16]: C_remote = pmul(A,A)
143 In [16]: C_remote = pmul(A,A)
144
144
145 In [17]: (C_local == C_remote).all()
145 In [17]: (C_local == C_remote).all()
146 Out[17]: True
146 Out[17]: True
147
147
148 .. seealso::
148 .. seealso::
149
149
150 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
150 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
151 options.
151 options.
152
152
153 Calling Python functions
153 Calling Python functions
154 ========================
154 ========================
155
155
156 The most basic type of operation that can be performed on the engines is to
156 The most basic type of operation that can be performed on the engines is to
157 execute Python code or call Python functions. Executing Python code can be
157 execute Python code or call Python functions. Executing Python code can be
158 done in blocking or non-blocking mode (non-blocking is default) using the
158 done in blocking or non-blocking mode (non-blocking is default) using the
159 :meth:`.View.execute` method, and calling functions can be done via the
159 :meth:`.View.execute` method, and calling functions can be done via the
160 :meth:`.View.apply` method.
160 :meth:`.View.apply` method.
161
161
162 apply
162 apply
163 -----
163 -----
164
164
165 The main method for doing remote execution (in fact, all methods that
165 The main method for doing remote execution (in fact, all methods that
166 communicate with the engines are built on top of it), is :meth:`View.apply`.
166 communicate with the engines are built on top of it), is :meth:`View.apply`.
167
167
168 We strive to provide the cleanest interface we can, so `apply` has the following
168 We strive to provide the cleanest interface we can, so `apply` has the following
169 signature:
169 signature:
170
170
171 .. sourcecode:: python
171 .. sourcecode:: python
172
172
173 view.apply(f, *args, **kwargs)
173 view.apply(f, *args, **kwargs)
174
174
175 There are various ways to call functions with IPython, and these flags are set as
175 There are various ways to call functions with IPython, and these flags are set as
176 attributes of the View. The ``DirectView`` has just two of these flags:
176 attributes of the View. The ``DirectView`` has just two of these flags:
177
177
178 dv.block : bool
178 dv.block : bool
179 whether to wait for the result, or return an :class:`AsyncResult` object
179 whether to wait for the result, or return an :class:`AsyncResult` object
180 immediately
180 immediately
181 dv.track : bool
181 dv.track : bool
182 whether to instruct pyzmq to track when zeromq is done sending the message.
182 whether to instruct pyzmq to track when zeromq is done sending the message.
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 edit in-place. You need to know when it becomes safe to edit the buffer
184 edit in-place. You need to know when it becomes safe to edit the buffer
185 without corrupting the message.
185 without corrupting the message.
186 dv.targets : int, list of ints
187 which targets this view is associated with.
186
188
187
189
188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
190 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
189
191
190 .. sourcecode:: ipython
192 .. sourcecode:: ipython
191
193
192 In [4]: view = rc[1:3]
194 In [4]: view = rc[1:3]
193 Out[4]: <DirectView [1, 2]>
195 Out[4]: <DirectView [1, 2]>
194
196
195 In [5]: view.apply<tab>
197 In [5]: view.apply<tab>
196 view.apply view.apply_async view.apply_sync
198 view.apply view.apply_async view.apply_sync
197
199
198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
200 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
199
201
200 Blocking execution
202 Blocking execution
201 ------------------
203 ------------------
202
204
203 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
205 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
204 these examples) submits the command to the controller, which places the
206 these examples) submits the command to the controller, which places the
205 command in the engines' queues for execution. The :meth:`apply` call then
207 command in the engines' queues for execution. The :meth:`apply` call then
206 blocks until the engines are done executing the command:
208 blocks until the engines are done executing the command:
207
209
208 .. sourcecode:: ipython
210 .. sourcecode:: ipython
209
211
210 In [2]: dview = rc[:] # A DirectView of all engines
212 In [2]: dview = rc[:] # A DirectView of all engines
211 In [3]: dview.block=True
213 In [3]: dview.block=True
212 In [4]: dview['a'] = 5
214 In [4]: dview['a'] = 5
213
215
214 In [5]: dview['b'] = 10
216 In [5]: dview['b'] = 10
215
217
216 In [6]: dview.apply(lambda x: a+b+x, 27)
218 In [6]: dview.apply(lambda x: a+b+x, 27)
217 Out[6]: [42, 42, 42, 42]
219 Out[6]: [42, 42, 42, 42]
218
220
219 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
221 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
220 method:
222 method:
221
223
222 In [7]: dview.block=False
224 In [7]: dview.block=False
223
225
224 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
226 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
225 Out[8]: [42, 42, 42, 42]
227 Out[8]: [42, 42, 42, 42]
226
228
227 Python commands can be executed as strings on specific engines by using a View's ``execute``
229 Python commands can be executed as strings on specific engines by using a View's ``execute``
228 method:
230 method:
229
231
230 .. sourcecode:: ipython
232 .. sourcecode:: ipython
231
233
232 In [6]: rc[::2].execute('c=a+b')
234 In [6]: rc[::2].execute('c=a+b')
233
235
234 In [7]: rc[1::2].execute('c=a-b')
236 In [7]: rc[1::2].execute('c=a-b')
235
237
236 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
238 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
237 Out[8]: [15, -5, 15, -5]
239 Out[8]: [15, -5, 15, -5]
238
240
239
241
240 Non-blocking execution
242 Non-blocking execution
241 ----------------------
243 ----------------------
242
244
243 In non-blocking mode, :meth:`apply` submits the command to be executed and
245 In non-blocking mode, :meth:`apply` submits the command to be executed and
244 then returns a :class:`AsyncResult` object immediately. The
246 then returns a :class:`AsyncResult` object immediately. The
245 :class:`AsyncResult` object gives you a way of getting a result at a later
247 :class:`AsyncResult` object gives you a way of getting a result at a later
246 time through its :meth:`get` method.
248 time through its :meth:`get` method.
247
249
248 .. Note::
250 .. Note::
249
251
250 The :class:`AsyncResult` object provides a superset of the interface in
252 The :class:`AsyncResult` object provides a superset of the interface in
251 :py:class:`multiprocessing.pool.AsyncResult`. See the
253 :py:class:`multiprocessing.pool.AsyncResult`. See the
252 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
254 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
253 for more.
255 for more.
254
256
255
257
256 This allows you to quickly submit long running commands without blocking your
258 This allows you to quickly submit long running commands without blocking your
257 local Python/IPython session:
259 local Python/IPython session:
258
260
259 .. sourcecode:: ipython
261 .. sourcecode:: ipython
260
262
261 # define our function
263 # define our function
262 In [6]: def wait(t):
264 In [6]: def wait(t):
263 ...: import time
265 ....: import time
264 ...: tic = time.time()
266 ....: tic = time.time()
265 ...: time.sleep(t)
267 ....: time.sleep(t)
266 ...: return time.time()-tic
268 ....: return time.time()-tic
267
269
268 # In non-blocking mode
270 # In non-blocking mode
269 In [7]: ar = dview.apply_async(wait, 2)
271 In [7]: ar = dview.apply_async(wait, 2)
270
272
271 # Now block for the result
273 # Now block for the result
272 In [8]: ar.get()
274 In [8]: ar.get()
273 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
275 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
274
276
275 # Again in non-blocking mode
277 # Again in non-blocking mode
276 In [9]: ar = dview.apply_async(wait, 10)
278 In [9]: ar = dview.apply_async(wait, 10)
277
279
278 # Poll to see if the result is ready
280 # Poll to see if the result is ready
279 In [10]: ar.ready()
281 In [10]: ar.ready()
280 Out[10]: False
282 Out[10]: False
281
283
282 # ask for the result, but wait a maximum of 1 second:
284 # ask for the result, but wait a maximum of 1 second:
283 In [45]: ar.get(1)
285 In [45]: ar.get(1)
284 ---------------------------------------------------------------------------
286 ---------------------------------------------------------------------------
285 TimeoutError Traceback (most recent call last)
287 TimeoutError Traceback (most recent call last)
286 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
288 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
287 ----> 1 ar.get(1)
289 ----> 1 ar.get(1)
288
290
289 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
291 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
290 62 raise self._exception
292 62 raise self._exception
291 63 else:
293 63 else:
292 ---> 64 raise error.TimeoutError("Result not ready.")
294 ---> 64 raise error.TimeoutError("Result not ready.")
293 65
295 65
294 66 def ready(self):
296 66 def ready(self):
295
297
296 TimeoutError: Result not ready.
298 TimeoutError: Result not ready.
297
299
298 .. Note::
300 .. Note::
299
301
300 Note the import inside the function. This is a common model, to ensure
302 Note the import inside the function. This is a common model, to ensure
301 that the appropriate modules are imported where the task is run. You can
303 that the appropriate modules are imported where the task is run. You can
302 also manually import modules into the engine(s) namespace(s) via
304 also manually import modules into the engine(s) namespace(s) via
303 :meth:`view.execute('import numpy')`.
305 :meth:`view.execute('import numpy')`.
304
306
305 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
307 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
306 are done. For this, there is a the method :meth:`wait`. This method takes a
308 are done. For this, there is a the method :meth:`wait`. This method takes a
307 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
309 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
308 and blocks until all of the associated results are ready:
310 and blocks until all of the associated results are ready:
309
311
310 .. sourcecode:: ipython
312 .. sourcecode:: ipython
311
313
312 In [72]: dview.block=False
314 In [72]: dview.block=False
313
315
314 # A trivial list of AsyncResults objects
316 # A trivial list of AsyncResults objects
315 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
317 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
316
318
317 # Wait until all of them are done
319 # Wait until all of them are done
318 In [74]: dview.wait(pr_list)
320 In [74]: dview.wait(pr_list)
319
321
320 # Then, their results are ready using get() or the `.r` attribute
322 # Then, their results are ready using get() or the `.r` attribute
321 In [75]: pr_list[0].get()
323 In [75]: pr_list[0].get()
322 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
324 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
323
325
324
326
325
327
326 The ``block`` and ``targets`` keyword arguments and attributes
328 The ``block`` and ``targets`` keyword arguments and attributes
327 --------------------------------------------------------------
329 --------------------------------------------------------------
328
330
329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
331 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
330 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
332 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 blocking mode and which engines the command is applied to. The :class:`View` class also has
333 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
334 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
333 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
335 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
334
336
335 * If no keyword argument is provided, the instance attributes are used.
337 * If no keyword argument is provided, the instance attributes are used.
336 * Keyword argument, if provided override the instance attributes for
338 * Keyword argument, if provided override the instance attributes for
337 the duration of a single call.
339 the duration of a single call.
338
340
339 The following examples demonstrate how to use the instance attributes:
341 The following examples demonstrate how to use the instance attributes:
340
342
341 .. sourcecode:: ipython
343 .. sourcecode:: ipython
342
344
343 In [16]: dview.targets = [0,2]
345 In [16]: dview.targets = [0,2]
344
346
345 In [17]: dview.block = False
347 In [17]: dview.block = False
346
348
347 In [18]: ar = dview.apply(lambda : 10)
349 In [18]: ar = dview.apply(lambda : 10)
348
350
349 In [19]: ar.get()
351 In [19]: ar.get()
350 Out[19]: [10, 10]
352 Out[19]: [10, 10]
351
353
352 In [16]: dview.targets = v.client.ids # all engines (4)
354 In [16]: dview.targets = v.client.ids # all engines (4)
353
355
354 In [21]: dview.block = True
356 In [21]: dview.block = True
355
357
356 In [22]: dview.apply(lambda : 42)
358 In [22]: dview.apply(lambda : 42)
357 Out[22]: [42, 42, 42, 42]
359 Out[22]: [42, 42, 42, 42]
358
360
359 The :attr:`block` and :attr:`targets` instance attributes of the
361 The :attr:`block` and :attr:`targets` instance attributes of the
360 :class:`.DirectView` also determine the behavior of the parallel magic commands.
362 :class:`.DirectView` also determine the behavior of the parallel magic commands.
361
363
362 Parallel magic commands
364 Parallel magic commands
363 -----------------------
365 -----------------------
364
366
365 .. warning::
366
367 The magics have not been changed to work with the zeromq system. The
368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369
370 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 that make it more pleasant to execute Python commands on the engines
368 that make it more pleasant to execute Python commands on the engines
372 interactively. These are simply shortcuts to :meth:`execute` and
369 interactively. These are simply shortcuts to :meth:`execute` and
373 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
370 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
374 Python command on the engines specified by the :attr:`targets` attribute of the
371 Python command on the engines specified by the :attr:`targets` attribute of the
375 :class:`DirectView` instance:
372 :class:`DirectView` instance:
376
373
377 .. sourcecode:: ipython
374 .. sourcecode:: ipython
378
375
379 # load the parallel magic extension:
380 In [21]: %load_ext parallelmagic
381
382 # Create a DirectView for all targets
376 # Create a DirectView for all targets
383 In [22]: dv = rc[:]
377 In [22]: dv = rc[:]
384
378
385 # Make this DirectView active for parallel magic commands
379 # Make this DirectView active for parallel magic commands
386 In [23]: dv.activate()
380 In [23]: dv.activate()
387
381
388 In [24]: dv.block=True
382 In [24]: dv.block=True
389
383
390 In [25]: import numpy
384 # import numpy here and everywhere
391
385 In [25]: with dv.sync_imports():
392 In [26]: %px import numpy
386 ....: import numpy
393 Parallel execution on engines: [0, 1, 2, 3]
387 importing numpy on engine(s)
394
388
395 In [27]: %px a = numpy.random.rand(2,2)
389 In [27]: %px a = numpy.random.rand(2,2)
396 Parallel execution on engines: [0, 1, 2, 3]
390 Parallel execution on engines: [0, 1, 2, 3]
397
391
398 In [28]: %px ev = numpy.linalg.eigvals(a)
392 In [28]: %px ev = numpy.linalg.eigvals(a)
399 Parallel execution on engines: [0, 1, 2, 3]
393 Parallel execution on engines: [0, 1, 2, 3]
400
394
401 In [28]: dv['ev']
395 In [28]: dv['ev']
402 Out[28]: [ array([ 1.09522024, -0.09645227]),
396 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 array([ 1.21435496, -0.35546712]),
397 ....: array([ 1.21435496, -0.35546712]),
404 array([ 0.72180653, 0.07133042]),
398 ....: array([ 0.72180653, 0.07133042]),
405 array([ 1.46384341e+00, 1.04353244e-04])
399 ....: array([ 1.46384341, 1.04353244e-04])
406 ]
400 ....: ]
407
401
408 The ``%result`` magic gets the most recent result, or takes an argument
402 The ``%result`` magic gets the most recent result, or takes an argument
409 specifying the index of the result to be requested. It is simply a shortcut to the
403 specifying the index of the result to be requested. It is simply a shortcut to the
410 :meth:`get_result` method:
404 :meth:`get_result` method:
411
405
412 .. sourcecode:: ipython
406 .. sourcecode:: ipython
413
407
414 In [29]: dv.apply_async(lambda : ev)
408 In [29]: dv.apply_async(lambda : ev)
415
409
416 In [30]: %result
410 In [30]: %result
417 Out[30]: [ [ 1.28167017 0.14197338],
411 Out[30]: [ [ 1.28167017 0.14197338],
418 [-0.14093616 1.27877273],
412 ....: [-0.14093616 1.27877273],
419 [-0.37023573 1.06779409],
413 ....: [-0.37023573 1.06779409],
420 [ 0.83664764 -0.25602658] ]
414 ....: [ 0.83664764 -0.25602658] ]
421
415
422 The ``%autopx`` magic switches to a mode where everything you type is executed
416 The ``%autopx`` magic switches to a mode where everything you type is executed
423 on the engines given by the :attr:`targets` attribute:
417 on the engines given by the :attr:`targets` attribute:
424
418
425 .. sourcecode:: ipython
419 .. sourcecode:: ipython
426
420
427 In [30]: dv.block=False
421 In [30]: dv.block=False
428
422
429 In [31]: %autopx
423 In [31]: %autopx
430 Auto Parallel Enabled
424 Auto Parallel Enabled
431 Type %autopx to disable
425 Type %autopx to disable
432
426
433 In [32]: max_evals = []
427 In [32]: max_evals = []
434 <IPython.parallel.AsyncResult object at 0x17b8a70>
428 <IPython.parallel.AsyncResult object at 0x17b8a70>
435
429
436 In [33]: for i in range(100):
430 In [33]: for i in range(100):
437 ....: a = numpy.random.rand(10,10)
431 ....: a = numpy.random.rand(10,10)
438 ....: a = a+a.transpose()
432 ....: a = a+a.transpose()
439 ....: evals = numpy.linalg.eigvals(a)
433 ....: evals = numpy.linalg.eigvals(a)
440 ....: max_evals.append(evals[0].real)
434 ....: max_evals.append(evals[0].real)
441 ....:
435 ....:
442 ....:
436 ....:
443 <IPython.parallel.AsyncResult object at 0x17af8f0>
437 <IPython.parallel.AsyncResult object at 0x17af8f0>
444
438
445 In [34]: %autopx
439 In [34]: %autopx
446 Auto Parallel Disabled
440 Auto Parallel Disabled
447
441
448 In [35]: dv.block=True
442 In [35]: dv.block=True
449
443
450 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
444 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
451 Parallel execution on engines: [0, 1, 2, 3]
445 Parallel execution on engines: [0, 1, 2, 3]
452
446
453 In [37]: dv['ans']
447 In [37]: dv['ans']
454 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
448 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 'Average max eigenvalue is: 10.2076902286',
449 ....: 'Average max eigenvalue is: 10.2076902286',
456 'Average max eigenvalue is: 10.1891484655',
450 ....: 'Average max eigenvalue is: 10.1891484655',
457 'Average max eigenvalue is: 10.1158837784',]
451 ....: 'Average max eigenvalue is: 10.1158837784',]
458
452
459
453
460 Moving Python objects around
454 Moving Python objects around
461 ============================
455 ============================
462
456
463 In addition to calling functions and executing code on engines, you can
457 In addition to calling functions and executing code on engines, you can
464 transfer Python objects to and from your IPython session and the engines. In
458 transfer Python objects to and from your IPython session and the engines. In
465 IPython, these operations are called :meth:`push` (sending an object to the
459 IPython, these operations are called :meth:`push` (sending an object to the
466 engines) and :meth:`pull` (getting an object from the engines).
460 engines) and :meth:`pull` (getting an object from the engines).
467
461
468 Basic push and pull
462 Basic push and pull
469 -------------------
463 -------------------
470
464
471 Here are some examples of how you use :meth:`push` and :meth:`pull`:
465 Here are some examples of how you use :meth:`push` and :meth:`pull`:
472
466
473 .. sourcecode:: ipython
467 .. sourcecode:: ipython
474
468
475 In [38]: dview.push(dict(a=1.03234,b=3453))
469 In [38]: dview.push(dict(a=1.03234,b=3453))
476 Out[38]: [None,None,None,None]
470 Out[38]: [None,None,None,None]
477
471
478 In [39]: dview.pull('a')
472 In [39]: dview.pull('a')
479 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
473 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
480
474
481 In [40]: dview.pull('b', targets=0)
475 In [40]: dview.pull('b', targets=0)
482 Out[40]: 3453
476 Out[40]: 3453
483
477
484 In [41]: dview.pull(('a','b'))
478 In [41]: dview.pull(('a','b'))
485 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
479 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
486
480
487 In [43]: dview.push(dict(c='speed'))
481 In [43]: dview.push(dict(c='speed'))
488 Out[43]: [None,None,None,None]
482 Out[43]: [None,None,None,None]
489
483
490 In non-blocking mode :meth:`push` and :meth:`pull` also return
484 In non-blocking mode :meth:`push` and :meth:`pull` also return
491 :class:`AsyncResult` objects:
485 :class:`AsyncResult` objects:
492
486
493 .. sourcecode:: ipython
487 .. sourcecode:: ipython
494
488
495 In [48]: ar = dview.pull('a', block=False)
489 In [48]: ar = dview.pull('a', block=False)
496
490
497 In [49]: ar.get()
491 In [49]: ar.get()
498 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
492 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
499
493
500
494
501 Dictionary interface
495 Dictionary interface
502 --------------------
496 --------------------
503
497
504 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
498 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
505 dictionary-style access by key and methods such as :meth:`get` and
499 dictionary-style access by key and methods such as :meth:`get` and
506 :meth:`update` for convenience. This make the remote namespaces of the engines
500 :meth:`update` for convenience. This make the remote namespaces of the engines
507 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
501 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
508
502
509 .. sourcecode:: ipython
503 .. sourcecode:: ipython
510
504
511 In [51]: dview['a']=['foo','bar']
505 In [51]: dview['a']=['foo','bar']
512
506
513 In [52]: dview['a']
507 In [52]: dview['a']
514 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
508 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
515
509
516 Scatter and gather
510 Scatter and gather
517 ------------------
511 ------------------
518
512
519 Sometimes it is useful to partition a sequence and push the partitions to
513 Sometimes it is useful to partition a sequence and push the partitions to
520 different engines. In MPI language, this is know as scatter/gather and we
514 different engines. In MPI language, this is know as scatter/gather and we
521 follow that terminology. However, it is important to remember that in
515 follow that terminology. However, it is important to remember that in
522 IPython's :class:`Client` class, :meth:`scatter` is from the
516 IPython's :class:`Client` class, :meth:`scatter` is from the
523 interactive IPython session to the engines and :meth:`gather` is from the
517 interactive IPython session to the engines and :meth:`gather` is from the
524 engines back to the interactive IPython session. For scatter/gather operations
518 engines back to the interactive IPython session. For scatter/gather operations
525 between engines, MPI should be used:
519 between engines, MPI, pyzmq, or some other direct interconnect should be used.
526
520
527 .. sourcecode:: ipython
521 .. sourcecode:: ipython
528
522
529 In [58]: dview.scatter('a',range(16))
523 In [58]: dview.scatter('a',range(16))
530 Out[58]: [None,None,None,None]
524 Out[58]: [None,None,None,None]
531
525
532 In [59]: dview['a']
526 In [59]: dview['a']
533 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
527 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
534
528
535 In [60]: dview.gather('a')
529 In [60]: dview.gather('a')
536 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
530 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
537
531
538 Other things to look at
532 Other things to look at
539 =======================
533 =======================
540
534
541 How to do parallel list comprehensions
535 How to do parallel list comprehensions
542 --------------------------------------
536 --------------------------------------
543
537
544 In many cases list comprehensions are nicer than using the map function. While
538 In many cases list comprehensions are nicer than using the map function. While
545 we don't have fully parallel list comprehensions, it is simple to get the
539 we don't have fully parallel list comprehensions, it is simple to get the
546 basic effect using :meth:`scatter` and :meth:`gather`:
540 basic effect using :meth:`scatter` and :meth:`gather`:
547
541
548 .. sourcecode:: ipython
542 .. sourcecode:: ipython
549
543
550 In [66]: dview.scatter('x',range(64))
544 In [66]: dview.scatter('x',range(64))
551
545
552 In [67]: %px y = [i**10 for i in x]
546 In [67]: %px y = [i**10 for i in x]
553 Parallel execution on engines: [0, 1, 2, 3]
547 Parallel execution on engines: [0, 1, 2, 3]
554 Out[67]:
548 Out[67]:
555
549
556 In [68]: y = dview.gather('y')
550 In [68]: y = dview.gather('y')
557
551
558 In [69]: print y
552 In [69]: print y
559 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
553 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
560
554
561 Remote imports
555 Remote imports
562 --------------
556 --------------
563
557
564 Sometimes you will want to import packages both in your interactive session
558 Sometimes you will want to import packages both in your interactive session
565 and on your remote engines. This can be done with the :class:`ContextManager`
559 and on your remote engines. This can be done with the :class:`ContextManager`
566 created by a DirectView's :meth:`sync_imports` method:
560 created by a DirectView's :meth:`sync_imports` method:
567
561
568 .. sourcecode:: ipython
562 .. sourcecode:: ipython
569
563
570 In [69]: with dview.sync_imports():
564 In [69]: with dview.sync_imports():
571 ...: import numpy
565 ....: import numpy
572 importing numpy on engine(s)
566 importing numpy on engine(s)
573
567
574 Any imports made inside the block will also be performed on the view's engines.
568 Any imports made inside the block will also be performed on the view's engines.
575 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
569 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
576 whether the local imports should also be performed. However, support for `local=False`
570 whether the local imports should also be performed. However, support for `local=False`
577 has not been implemented, so only packages that can be imported locally will work
571 has not been implemented, so only packages that can be imported locally will work
578 this way.
572 this way.
579
573
580 You can also specify imports via the ``@require`` decorator. This is a decorator
574 You can also specify imports via the ``@require`` decorator. This is a decorator
581 designed for use in Dependencies, but can be used to handle remote imports as well.
575 designed for use in Dependencies, but can be used to handle remote imports as well.
582 Modules or module names passed to ``@require`` will be imported before the decorated
576 Modules or module names passed to ``@require`` will be imported before the decorated
583 function is called. If they cannot be imported, the decorated function will never
577 function is called. If they cannot be imported, the decorated function will never
584 execution, and will fail with an UnmetDependencyError.
578 execution, and will fail with an UnmetDependencyError.
585
579
586 .. sourcecode:: ipython
580 .. sourcecode:: ipython
587
581
588 In [69]: from IPython.parallel import require
582 In [69]: from IPython.parallel import require
589
583
590 In [70]: @require('re'):
584 In [70]: @require('re'):
591 ...: def findall(pat, x):
585 ....: def findall(pat, x):
592 ...: # re is guaranteed to be available
586 ....: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
587 ....: return re.findall(pat, x)
594
588
595 # you can also pass modules themselves, that you already have locally:
589 # you can also pass modules themselves, that you already have locally:
596 In [71]: @require(time):
590 In [71]: @require(time):
597 ...: def wait(t):
591 ....: def wait(t):
598 ...: time.sleep(t)
592 ....: time.sleep(t)
599 ...: return t
593 ....: return t
600
594
601 .. _parallel_exceptions:
595 .. _parallel_exceptions:
602
596
603 Parallel exceptions
597 Parallel exceptions
604 -------------------
598 -------------------
605
599
606 In the multiengine interface, parallel commands can raise Python exceptions,
600 In the multiengine interface, parallel commands can raise Python exceptions,
607 just like serial commands. But, it is a little subtle, because a single
601 just like serial commands. But, it is a little subtle, because a single
608 parallel command can actually raise multiple exceptions (one for each engine
602 parallel command can actually raise multiple exceptions (one for each engine
609 the command was run on). To express this idea, we have a
603 the command was run on). To express this idea, we have a
610 :exc:`CompositeError` exception class that will be raised in most cases. The
604 :exc:`CompositeError` exception class that will be raised in most cases. The
611 :exc:`CompositeError` class is a special type of exception that wraps one or
605 :exc:`CompositeError` class is a special type of exception that wraps one or
612 more other types of exceptions. Here is how it works:
606 more other types of exceptions. Here is how it works:
613
607
614 .. sourcecode:: ipython
608 .. sourcecode:: ipython
615
609
616 In [76]: dview.block=True
610 In [76]: dview.block=True
617
611
618 In [77]: dview.execute('1/0')
612 In [77]: dview.execute('1/0')
619 ---------------------------------------------------------------------------
613 ---------------------------------------------------------------------------
620 CompositeError Traceback (most recent call last)
614 CompositeError Traceback (most recent call last)
621 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
615 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
622 ----> 1 dview.execute('1/0')
616 ----> 1 dview.execute('1/0')
623
617
624 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
618 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
625 591 default: self.block
619 591 default: self.block
626 592 """
620 592 """
627 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
621 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
628 594
622 594
629 595 def run(self, filename, targets=None, block=None):
623 595 def run(self, filename, targets=None, block=None):
630
624
631 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
625 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
632
626
633 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
627 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
634 55 def sync_results(f, self, *args, **kwargs):
628 55 def sync_results(f, self, *args, **kwargs):
635 56 """sync relevant results from self.client to our results attribute."""
629 56 """sync relevant results from self.client to our results attribute."""
636 ---> 57 ret = f(self, *args, **kwargs)
630 ---> 57 ret = f(self, *args, **kwargs)
637 58 delta = self.outstanding.difference(self.client.outstanding)
631 58 delta = self.outstanding.difference(self.client.outstanding)
638 59 completed = self.outstanding.intersection(delta)
632 59 completed = self.outstanding.intersection(delta)
639
633
640 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
634 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
641
635
642 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
636 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
643 44 n_previous = len(self.client.history)
637 44 n_previous = len(self.client.history)
644 45 try:
638 45 try:
645 ---> 46 ret = f(self, *args, **kwargs)
639 ---> 46 ret = f(self, *args, **kwargs)
646 47 finally:
640 47 finally:
647 48 nmsgs = len(self.client.history) - n_previous
641 48 nmsgs = len(self.client.history) - n_previous
648
642
649 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
643 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
650 529 if block:
644 529 if block:
651 530 try:
645 530 try:
652 --> 531 return ar.get()
646 --> 531 return ar.get()
653 532 except KeyboardInterrupt:
647 532 except KeyboardInterrupt:
654 533 pass
648 533 pass
655
649
656 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
650 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
657 101 return self._result
651 101 return self._result
658 102 else:
652 102 else:
659 --> 103 raise self._exception
653 --> 103 raise self._exception
660 104 else:
654 104 else:
661 105 raise error.TimeoutError("Result not ready.")
655 105 raise error.TimeoutError("Result not ready.")
662
656
663 CompositeError: one or more exceptions from call to method: _execute
657 CompositeError: one or more exceptions from call to method: _execute
664 [0:apply]: ZeroDivisionError: integer division or modulo by zero
658 [0:apply]: ZeroDivisionError: integer division or modulo by zero
665 [1:apply]: ZeroDivisionError: integer division or modulo by zero
659 [1:apply]: ZeroDivisionError: integer division or modulo by zero
666 [2:apply]: ZeroDivisionError: integer division or modulo by zero
660 [2:apply]: ZeroDivisionError: integer division or modulo by zero
667 [3:apply]: ZeroDivisionError: integer division or modulo by zero
661 [3:apply]: ZeroDivisionError: integer division or modulo by zero
668
662
669 Notice how the error message printed when :exc:`CompositeError` is raised has
663 Notice how the error message printed when :exc:`CompositeError` is raised has
670 information about the individual exceptions that were raised on each engine.
664 information about the individual exceptions that were raised on each engine.
671 If you want, you can even raise one of these original exceptions:
665 If you want, you can even raise one of these original exceptions:
672
666
673 .. sourcecode:: ipython
667 .. sourcecode:: ipython
674
668
675 In [80]: try:
669 In [80]: try:
676 ....: dview.execute('1/0')
670 ....: dview.execute('1/0')
677 ....: except parallel.error.CompositeError, e:
671 ....: except parallel.error.CompositeError, e:
678 ....: e.raise_exception()
672 ....: e.raise_exception()
679 ....:
673 ....:
680 ....:
674 ....:
681 ---------------------------------------------------------------------------
675 ---------------------------------------------------------------------------
682 RemoteError Traceback (most recent call last)
676 RemoteError Traceback (most recent call last)
683 /home/user/<ipython-input-17-8597e7e39858> in <module>()
677 /home/user/<ipython-input-17-8597e7e39858> in <module>()
684 2 dview.execute('1/0')
678 2 dview.execute('1/0')
685 3 except CompositeError as e:
679 3 except CompositeError as e:
686 ----> 4 e.raise_exception()
680 ----> 4 e.raise_exception()
687
681
688 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
682 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
689 266 raise IndexError("an exception with index %i does not exist"%excid)
683 266 raise IndexError("an exception with index %i does not exist"%excid)
690 267 else:
684 267 else:
691 --> 268 raise RemoteError(en, ev, etb, ei)
685 --> 268 raise RemoteError(en, ev, etb, ei)
692 269
686 269
693 270
687 270
694
688
695 RemoteError: ZeroDivisionError(integer division or modulo by zero)
689 RemoteError: ZeroDivisionError(integer division or modulo by zero)
696 Traceback (most recent call last):
690 Traceback (most recent call last):
697 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
691 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
698 exec code in working,working
692 exec code in working,working
699 File "<string>", line 1, in <module>
693 File "<string>", line 1, in <module>
700 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
694 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
701 exec code in globals()
695 exec code in globals()
702 File "<string>", line 1, in <module>
696 File "<string>", line 1, in <module>
703 ZeroDivisionError: integer division or modulo by zero
697 ZeroDivisionError: integer division or modulo by zero
704
698
705 If you are working in IPython, you can simple type ``%debug`` after one of
699 If you are working in IPython, you can simple type ``%debug`` after one of
706 these :exc:`CompositeError` exceptions is raised, and inspect the exception
700 these :exc:`CompositeError` exceptions is raised, and inspect the exception
707 instance:
701 instance:
708
702
709 .. sourcecode:: ipython
703 .. sourcecode:: ipython
710
704
711 In [81]: dview.execute('1/0')
705 In [81]: dview.execute('1/0')
712 ---------------------------------------------------------------------------
706 ---------------------------------------------------------------------------
713 CompositeError Traceback (most recent call last)
707 CompositeError Traceback (most recent call last)
714 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
708 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
715 ----> 1 dview.execute('1/0')
709 ----> 1 dview.execute('1/0')
716
710
717 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
711 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
718 591 default: self.block
712 591 default: self.block
719 592 """
713 592 """
720 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
714 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
721 594
715 594
722 595 def run(self, filename, targets=None, block=None):
716 595 def run(self, filename, targets=None, block=None):
723
717
724 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
718 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
725
719
726 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
720 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
727 55 def sync_results(f, self, *args, **kwargs):
721 55 def sync_results(f, self, *args, **kwargs):
728 56 """sync relevant results from self.client to our results attribute."""
722 56 """sync relevant results from self.client to our results attribute."""
729 ---> 57 ret = f(self, *args, **kwargs)
723 ---> 57 ret = f(self, *args, **kwargs)
730 58 delta = self.outstanding.difference(self.client.outstanding)
724 58 delta = self.outstanding.difference(self.client.outstanding)
731 59 completed = self.outstanding.intersection(delta)
725 59 completed = self.outstanding.intersection(delta)
732
726
733 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
727 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
734
728
735 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
729 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
736 44 n_previous = len(self.client.history)
730 44 n_previous = len(self.client.history)
737 45 try:
731 45 try:
738 ---> 46 ret = f(self, *args, **kwargs)
732 ---> 46 ret = f(self, *args, **kwargs)
739 47 finally:
733 47 finally:
740 48 nmsgs = len(self.client.history) - n_previous
734 48 nmsgs = len(self.client.history) - n_previous
741
735
742 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
736 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
743 529 if block:
737 529 if block:
744 530 try:
738 530 try:
745 --> 531 return ar.get()
739 --> 531 return ar.get()
746 532 except KeyboardInterrupt:
740 532 except KeyboardInterrupt:
747 533 pass
741 533 pass
748
742
749 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
743 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
750 101 return self._result
744 101 return self._result
751 102 else:
745 102 else:
752 --> 103 raise self._exception
746 --> 103 raise self._exception
753 104 else:
747 104 else:
754 105 raise error.TimeoutError("Result not ready.")
748 105 raise error.TimeoutError("Result not ready.")
755
749
756 CompositeError: one or more exceptions from call to method: _execute
750 CompositeError: one or more exceptions from call to method: _execute
757 [0:apply]: ZeroDivisionError: integer division or modulo by zero
751 [0:apply]: ZeroDivisionError: integer division or modulo by zero
758 [1:apply]: ZeroDivisionError: integer division or modulo by zero
752 [1:apply]: ZeroDivisionError: integer division or modulo by zero
759 [2:apply]: ZeroDivisionError: integer division or modulo by zero
753 [2:apply]: ZeroDivisionError: integer division or modulo by zero
760 [3:apply]: ZeroDivisionError: integer division or modulo by zero
754 [3:apply]: ZeroDivisionError: integer division or modulo by zero
761
755
762 In [82]: %debug
756 In [82]: %debug
763 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
757 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
764 102 else:
758 102 else:
765 --> 103 raise self._exception
759 --> 103 raise self._exception
766 104 else:
760 104 else:
767
761
768 # With the debugger running, self._exception is the exceptions instance. We can tab complete
762 # With the debugger running, self._exception is the exceptions instance. We can tab complete
769 # on it and see the extra methods that are available.
763 # on it and see the extra methods that are available.
770 ipdb> self._exception.<tab>
764 ipdb> self._exception.<tab>
771 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
765 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
772 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
766 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
773 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
767 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
774 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
768 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
775 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
769 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
776 ipdb> self._exception.print_tracebacks()
770 ipdb> self._exception.print_tracebacks()
777 [0:apply]:
771 [0:apply]:
778 Traceback (most recent call last):
772 Traceback (most recent call last):
779 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
773 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
780 exec code in working,working
774 exec code in working,working
781 File "<string>", line 1, in <module>
775 File "<string>", line 1, in <module>
782 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
776 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
783 exec code in globals()
777 exec code in globals()
784 File "<string>", line 1, in <module>
778 File "<string>", line 1, in <module>
785 ZeroDivisionError: integer division or modulo by zero
779 ZeroDivisionError: integer division or modulo by zero
786
780
787
781
788 [1:apply]:
782 [1:apply]:
789 Traceback (most recent call last):
783 Traceback (most recent call last):
790 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
784 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
791 exec code in working,working
785 exec code in working,working
792 File "<string>", line 1, in <module>
786 File "<string>", line 1, in <module>
793 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
787 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
794 exec code in globals()
788 exec code in globals()
795 File "<string>", line 1, in <module>
789 File "<string>", line 1, in <module>
796 ZeroDivisionError: integer division or modulo by zero
790 ZeroDivisionError: integer division or modulo by zero
797
791
798
792
799 [2:apply]:
793 [2:apply]:
800 Traceback (most recent call last):
794 Traceback (most recent call last):
801 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
795 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
802 exec code in working,working
796 exec code in working,working
803 File "<string>", line 1, in <module>
797 File "<string>", line 1, in <module>
804 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
798 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
805 exec code in globals()
799 exec code in globals()
806 File "<string>", line 1, in <module>
800 File "<string>", line 1, in <module>
807 ZeroDivisionError: integer division or modulo by zero
801 ZeroDivisionError: integer division or modulo by zero
808
802
809
803
810 [3:apply]:
804 [3:apply]:
811 Traceback (most recent call last):
805 Traceback (most recent call last):
812 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
806 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
813 exec code in working,working
807 exec code in working,working
814 File "<string>", line 1, in <module>
808 File "<string>", line 1, in <module>
815 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
809 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
816 exec code in globals()
810 exec code in globals()
817 File "<string>", line 1, in <module>
811 File "<string>", line 1, in <module>
818 ZeroDivisionError: integer division or modulo by zero
812 ZeroDivisionError: integer division or modulo by zero
819
813
820
814
821 All of this same error handling magic even works in non-blocking mode:
815 All of this same error handling magic even works in non-blocking mode:
822
816
823 .. sourcecode:: ipython
817 .. sourcecode:: ipython
824
818
825 In [83]: dview.block=False
819 In [83]: dview.block=False
826
820
827 In [84]: ar = dview.execute('1/0')
821 In [84]: ar = dview.execute('1/0')
828
822
829 In [85]: ar.get()
823 In [85]: ar.get()
830 ---------------------------------------------------------------------------
824 ---------------------------------------------------------------------------
831 CompositeError Traceback (most recent call last)
825 CompositeError Traceback (most recent call last)
832 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
826 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
833 ----> 1 ar.get()
827 ----> 1 ar.get()
834
828
835 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
829 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
836 101 return self._result
830 101 return self._result
837 102 else:
831 102 else:
838 --> 103 raise self._exception
832 --> 103 raise self._exception
839 104 else:
833 104 else:
840 105 raise error.TimeoutError("Result not ready.")
834 105 raise error.TimeoutError("Result not ready.")
841
835
842 CompositeError: one or more exceptions from call to method: _execute
836 CompositeError: one or more exceptions from call to method: _execute
843 [0:apply]: ZeroDivisionError: integer division or modulo by zero
837 [0:apply]: ZeroDivisionError: integer division or modulo by zero
844 [1:apply]: ZeroDivisionError: integer division or modulo by zero
838 [1:apply]: ZeroDivisionError: integer division or modulo by zero
845 [2:apply]: ZeroDivisionError: integer division or modulo by zero
839 [2:apply]: ZeroDivisionError: integer division or modulo by zero
846 [3:apply]: ZeroDivisionError: integer division or modulo by zero
840 [3:apply]: ZeroDivisionError: integer division or modulo by zero
847
841
@@ -1,449 +1,462 b''
1 .. _parallel_task:
1 .. _parallel_task:
2
2
3 ==========================
3 ==========================
4 The IPython task interface
4 The IPython task interface
5 ==========================
5 ==========================
6
6
7 The task interface to the cluster presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface the user have no direct access to individual engines. By
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is simultaneously
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 simpler and more powerful.
11 simpler and more powerful.
12
12
13 Best of all, the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 to take advantage of their respective strengths. When the user can break up
14 to take advantage of their respective strengths. When the user can break up
15 the user's work into segments that do not depend on previous execution, the
15 the user's work into segments that do not depend on previous execution, the
16 task interface is ideal. But it also has more power and flexibility, allowing
16 task interface is ideal. But it also has more power and flexibility, allowing
17 the user to guide the distribution of jobs, without having to assign tasks to
17 the user to guide the distribution of jobs, without having to assign tasks to
18 engines explicitly.
18 engines explicitly.
19
19
20 Starting the IPython controller and engines
20 Starting the IPython controller and engines
21 ===========================================
21 ===========================================
22
22
23 To follow along with this tutorial, you will need to start the IPython
23 To follow along with this tutorial, you will need to start the IPython
24 controller and four IPython engines. The simplest way of doing this is to use
24 controller and four IPython engines. The simplest way of doing this is to use
25 the :command:`ipcluster` command::
25 the :command:`ipcluster` command::
26
26
27 $ ipcluster start -n 4
27 $ ipcluster start -n 4
28
28
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``LoadBalancedView`` instance
33 ==============================
33 ========================================
34
34
35 The first step is to import the IPython :mod:`IPython.parallel`
35 The first step is to import the IPython :mod:`IPython.parallel`
36 module and then create a :class:`.Client` instance, and we will also be using
36 module and then create a :class:`.Client` instance, and we will also be using
37 a :class:`LoadBalancedView`, here called `lview`:
37 a :class:`LoadBalancedView`, here called `lview`:
38
38
39 .. sourcecode:: ipython
39 .. sourcecode:: ipython
40
40
41 In [1]: from IPython.parallel import Client
41 In [1]: from IPython.parallel import Client
42
42
43 In [2]: rc = Client()
43 In [2]: rc = Client()
44
44
45
45
46 This form assumes that the controller was started on localhost with default
46 This form assumes that the controller was started on localhost with default
47 configuration. If not, the location of the controller must be given as an
47 configuration. If not, the location of the controller must be given as an
48 argument to the constructor:
48 argument to the constructor:
49
49
50 .. sourcecode:: ipython
50 .. sourcecode:: ipython
51
51
52 # for a visible LAN controller listening on an external port:
52 # for a visible LAN controller listening on an external port:
53 In [2]: rc = Client('tcp://192.168.1.16:10101')
53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 # or to connect with a specific profile you have set up:
54 # or to connect with a specific profile you have set up:
55 In [3]: rc = Client(profile='mpi')
55 In [3]: rc = Client(profile='mpi')
56
56
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 be constructed via the client's :meth:`load_balanced_view` method:
58 be constructed via the client's :meth:`load_balanced_view` method:
59
59
60 .. sourcecode:: ipython
60 .. sourcecode:: ipython
61
61
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63
63
64 .. seealso::
64 .. seealso::
65
65
66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67
67
68
68
69 Quick and easy parallelism
69 Quick and easy parallelism
70 ==========================
70 ==========================
71
71
72 In many cases, you simply want to apply a Python function to a sequence of
72 In many cases, you simply want to apply a Python function to a sequence of
73 objects, but *in parallel*. Like the multiengine interface, these can be
73 objects, but *in parallel*. Like the multiengine interface, these can be
74 implemented via the task interface. The exact same tools can perform these
74 implemented via the task interface. The exact same tools can perform these
75 actions in load-balanced ways as well as multiplexed ways: a parallel version
75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 execution time per item varies significantly, you should use the versions in
78 execution time per item varies significantly, you should use the versions in
79 the task interface.
79 the task interface.
80
80
81 Parallel map
81 Parallel map
82 ------------
82 ------------
83
83
84 To load-balance :meth:`map`,simply use a LoadBalancedView:
84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85
85
86 .. sourcecode:: ipython
86 .. sourcecode:: ipython
87
87
88 In [62]: lview.block = True
88 In [62]: lview.block = True
89
89
90 In [63]: serial_result = map(lambda x:x**10, range(32))
90 In [63]: serial_result = map(lambda x:x**10, range(32))
91
91
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93
93
94 In [65]: serial_result==parallel_result
94 In [65]: serial_result==parallel_result
95 Out[65]: True
95 Out[65]: True
96
96
97 Parallel function decorator
97 Parallel function decorator
98 ---------------------------
98 ---------------------------
99
99
100 Parallel functions are just like normal function, but they can be called on
100 Parallel functions are just like normal function, but they can be called on
101 sequences and *in parallel*. The multiengine interface provides a decorator
101 sequences and *in parallel*. The multiengine interface provides a decorator
102 that turns any Python function into a parallel function:
102 that turns any Python function into a parallel function:
103
103
104 .. sourcecode:: ipython
104 .. sourcecode:: ipython
105
105
106 In [10]: @lview.parallel()
106 In [10]: @lview.parallel()
107 ....: def f(x):
107 ....: def f(x):
108 ....: return 10.0*x**4
108 ....: return 10.0*x**4
109 ....:
109 ....:
110
110
111 In [11]: f.map(range(32)) # this is done in parallel
111 In [11]: f.map(range(32)) # this is done in parallel
112 Out[11]: [0.0,10.0,160.0,...]
112 Out[11]: [0.0,10.0,160.0,...]
113
113
114 .. _parallel_taskmap:
114 .. _parallel_taskmap:
115
115
116 The AsyncMapResult
116 Map results are iterable!
117 ==================
117 -------------------------
118
118
119 When you call ``lview.map_async(f, sequence)``, or just :meth:`map` with `block=True`, then
119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 what you get in return will be an :class:`~AsyncMapResult` object. These are similar to
120 object), you can actually iterate through them, and act on the results as they arrive:
121 AsyncResult objects, but with one key difference
121
122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 :language: python
124 :lines: 9-34
125
126 .. seealso::
127
128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 handling individual results as they arrive, but with metadata), you can always
130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131
132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
133
122
134
123 .. _parallel_dependencies:
135 .. _parallel_dependencies:
124
136
125 Dependencies
137 Dependencies
126 ============
138 ============
127
139
128 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
140 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
129 may want to associate some kind of `Dependency` that describes when, where, or whether
141 may want to associate some kind of `Dependency` that describes when, where, or whether
130 a task can be run. In IPython, we provide two types of dependencies:
142 a task can be run. In IPython, we provide two types of dependencies:
131 `Functional Dependencies`_ and `Graph Dependencies`_
143 `Functional Dependencies`_ and `Graph Dependencies`_
132
144
133 .. note::
145 .. note::
134
146
135 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
147 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
136 and you will see errors or warnings if you try to use dependencies with the pure
148 and you will see errors or warnings if you try to use dependencies with the pure
137 scheduler.
149 scheduler.
138
150
139 Functional Dependencies
151 Functional Dependencies
140 -----------------------
152 -----------------------
141
153
142 Functional dependencies are used to determine whether a given engine is capable of running
154 Functional dependencies are used to determine whether a given engine is capable of running
143 a particular task. This is implemented via a special :class:`Exception` class,
155 a particular task. This is implemented via a special :class:`Exception` class,
144 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
156 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
145 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
157 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
146 the error up to the client like any other error, catches the error, and submits the task
158 the error up to the client like any other error, catches the error, and submits the task
147 to a different engine. This will repeat indefinitely, and a task will never be submitted
159 to a different engine. This will repeat indefinitely, and a task will never be submitted
148 to a given engine a second time.
160 to a given engine a second time.
149
161
150 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
162 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
151 some decorators for facilitating this behavior.
163 some decorators for facilitating this behavior.
152
164
153 There are two decorators and a class used for functional dependencies:
165 There are two decorators and a class used for functional dependencies:
154
166
155 .. sourcecode:: ipython
167 .. sourcecode:: ipython
156
168
157 In [9]: from IPython.parallel import depend, require, dependent
169 In [9]: from IPython.parallel import depend, require, dependent
158
170
159 @require
171 @require
160 ********
172 ********
161
173
162 The simplest sort of dependency is requiring that a Python module is available. The
174 The simplest sort of dependency is requiring that a Python module is available. The
163 ``@require`` decorator lets you define a function that will only run on engines where names
175 ``@require`` decorator lets you define a function that will only run on engines where names
164 you specify are importable:
176 you specify are importable:
165
177
166 .. sourcecode:: ipython
178 .. sourcecode:: ipython
167
179
168 In [10]: @require('numpy', 'zmq')
180 In [10]: @require('numpy', 'zmq')
169 ...: def myfunc():
181 ....: def myfunc():
170 ...: return dostuff()
182 ....: return dostuff()
171
183
172 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
173 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
174
186
175 @depend
187 @depend
176 *******
188 *******
177
189
178 The ``@depend`` decorator lets you decorate any function with any *other* function to
190 The ``@depend`` decorator lets you decorate any function with any *other* function to
179 evaluate the dependency. The dependency function will be called at the start of the task,
191 evaluate the dependency. The dependency function will be called at the start of the task,
180 and if it returns ``False``, then the dependency will be considered unmet, and the task
192 and if it returns ``False``, then the dependency will be considered unmet, and the task
181 will be assigned to another engine. If the dependency returns *anything other than
193 will be assigned to another engine. If the dependency returns *anything other than
182 ``False``*, the rest of the task will continue.
194 ``False``*, the rest of the task will continue.
183
195
184 .. sourcecode:: ipython
196 .. sourcecode:: ipython
185
197
186 In [10]: def platform_specific(plat):
198 In [10]: def platform_specific(plat):
187 ...: import sys
199 ....: import sys
188 ...: return sys.platform == plat
200 ....: return sys.platform == plat
189
201
190 In [11]: @depend(platform_specific, 'darwin')
202 In [11]: @depend(platform_specific, 'darwin')
191 ...: def mactask():
203 ....: def mactask():
192 ...: do_mac_stuff()
204 ....: do_mac_stuff()
193
205
194 In [12]: @depend(platform_specific, 'nt')
206 In [12]: @depend(platform_specific, 'nt')
195 ...: def wintask():
207 ....: def wintask():
196 ...: do_windows_stuff()
208 ....: do_windows_stuff()
197
209
198 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
199 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
200 signature.
212 signature.
201
213
202 dependents
214 dependents
203 **********
215 **********
204
216
205 You don't have to use the decorators on your tasks, if for instance you may want
217 You don't have to use the decorators on your tasks, if for instance you may want
206 to run tasks with a single function but varying dependencies, you can directly construct
218 to run tasks with a single function but varying dependencies, you can directly construct
207 the :class:`dependent` object that the decorators use:
219 the :class:`dependent` object that the decorators use:
208
220
209 .. sourcecode::ipython
221 .. sourcecode::ipython
210
222
211 In [13]: def mytask(*args):
223 In [13]: def mytask(*args):
212 ...: dostuff()
224 ....: dostuff()
213
225
214 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
215 # this is the same as decorating the declaration of mytask with @depend
227 # this is the same as decorating the declaration of mytask with @depend
216 # but you can do it again:
228 # but you can do it again:
217
229
218 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
230 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
219
231
220 # in general:
232 # in general:
221 In [16]: t = dependent(f, g, *dargs, **dkwargs)
233 In [16]: t = dependent(f, g, *dargs, **dkwargs)
222
234
223 # is equivalent to:
235 # is equivalent to:
224 In [17]: @depend(g, *dargs, **dkwargs)
236 In [17]: @depend(g, *dargs, **dkwargs)
225 ...: def t(a,b,c):
237 ....: def t(a,b,c):
226 ...: # contents of f
238 ....: # contents of f
227
239
228 Graph Dependencies
240 Graph Dependencies
229 ------------------
241 ------------------
230
242
231 Sometimes you want to restrict the time and/or location to run a given task as a function
243 Sometimes you want to restrict the time and/or location to run a given task as a function
232 of the time and/or location of other tasks. This is implemented via a subclass of
244 of the time and/or location of other tasks. This is implemented via a subclass of
233 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
245 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
234 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
246 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
235 has been met.
247 has been met.
236
248
237 The switches we provide for interpreting whether a given dependency set has been met:
249 The switches we provide for interpreting whether a given dependency set has been met:
238
250
239 any|all
251 any|all
240 Whether the dependency is considered met if *any* of the dependencies are done, or
252 Whether the dependency is considered met if *any* of the dependencies are done, or
241 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
253 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
242 boolean attribute, which defaults to ``True``.
254 boolean attribute, which defaults to ``True``.
243
255
244 success [default: True]
256 success [default: True]
245 Whether to consider tasks that succeeded as fulfilling dependencies.
257 Whether to consider tasks that succeeded as fulfilling dependencies.
246
258
247 failure [default : False]
259 failure [default : False]
248 Whether to consider tasks that failed as fulfilling dependencies.
260 Whether to consider tasks that failed as fulfilling dependencies.
249 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
261 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
250 only when tasks have failed.
262 only when tasks have failed.
251
263
252 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
264 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
253 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
265 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
254 not care whether the task succeeds, and always want the second task to run, in which case you
266 not care whether the task succeeds, and always want the second task to run, in which case you
255 should use `success=failure=True`. The default behavior is to only use successes.
267 should use `success=failure=True`. The default behavior is to only use successes.
256
268
257 There are other switches for interpretation that are made at the *task* level. These are
269 There are other switches for interpretation that are made at the *task* level. These are
258 specified via keyword arguments to the client's :meth:`apply` method.
270 specified via keyword arguments to the client's :meth:`apply` method.
259
271
260 after,follow
272 after,follow
261 You may want to run a task *after* a given set of dependencies have been run and/or
273 You may want to run a task *after* a given set of dependencies have been run and/or
262 run it *where* another set of dependencies are met. To support this, every task has an
274 run it *where* another set of dependencies are met. To support this, every task has an
263 `after` dependency to restrict time, and a `follow` dependency to restrict
275 `after` dependency to restrict time, and a `follow` dependency to restrict
264 destination.
276 destination.
265
277
266 timeout
278 timeout
267 You may also want to set a time-limit for how long the scheduler should wait before a
279 You may also want to set a time-limit for how long the scheduler should wait before a
268 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
280 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
269 indicates that the task should never timeout. If the timeout is reached, and the
281 indicates that the task should never timeout. If the timeout is reached, and the
270 scheduler still hasn't been able to assign the task to an engine, the task will fail
282 scheduler still hasn't been able to assign the task to an engine, the task will fail
271 with a :class:`DependencyTimeout`.
283 with a :class:`DependencyTimeout`.
272
284
273 .. note::
285 .. note::
274
286
275 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
287 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
276 task to run after a job submitted via the MUX interface.
288 task to run after a job submitted via the MUX interface.
277
289
278 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
290 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
279 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
291 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
280 `follow` and `after` keywords to :meth:`client.apply`:
292 `follow` and `after` keywords to :meth:`client.apply`:
281
293
282 .. sourcecode:: ipython
294 .. sourcecode:: ipython
283
295
284 In [14]: client.block=False
296 In [14]: client.block=False
285
297
286 In [15]: ar = lview.apply(f, args, kwargs)
298 In [15]: ar = lview.apply(f, args, kwargs)
287
299
288 In [16]: ar2 = lview.apply(f2)
300 In [16]: ar2 = lview.apply(f2)
289
301
290 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
302 In [17]: with lview.temp_flags(after=[ar,ar2]):
291
303 ....: ar3 = lview.apply(f3)
292 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
293
304
305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 ....: ar4 = lview.apply(f3)
294
307
295 .. seealso::
308 .. seealso::
296
309
297 Some parallel workloads can be described as a `Directed Acyclic Graph
310 Some parallel workloads can be described as a `Directed Acyclic Graph
298 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
311 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
299 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
312 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
300 onto task dependencies.
313 onto task dependencies.
301
314
302
315
303 Impossible Dependencies
316 Impossible Dependencies
304 ***********************
317 ***********************
305
318
306 The schedulers do perform some analysis on graph dependencies to determine whether they
319 The schedulers do perform some analysis on graph dependencies to determine whether they
307 are not possible to be met. If the scheduler does discover that a dependency cannot be
320 are not possible to be met. If the scheduler does discover that a dependency cannot be
308 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
321 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
309 scheduler realized that a task can never be run, it won't sit indefinitely in the
322 scheduler realized that a task can never be run, it won't sit indefinitely in the
310 scheduler clogging the pipeline.
323 scheduler clogging the pipeline.
311
324
312 The basic cases that are checked:
325 The basic cases that are checked:
313
326
314 * depending on nonexistent messages
327 * depending on nonexistent messages
315 * `follow` dependencies were run on more than one machine and `all=True`
328 * `follow` dependencies were run on more than one machine and `all=True`
316 * any dependencies failed and `all=True,success=True,failures=False`
329 * any dependencies failed and `all=True,success=True,failures=False`
317 * all dependencies failed and `all=False,success=True,failure=False`
330 * all dependencies failed and `all=False,success=True,failure=False`
318
331
319 .. warning::
332 .. warning::
320
333
321 This analysis has not been proven to be rigorous, so it is likely possible for tasks
334 This analysis has not been proven to be rigorous, so it is likely possible for tasks
322 to become impossible to run in obscure situations, so a timeout may be a good choice.
335 to become impossible to run in obscure situations, so a timeout may be a good choice.
323
336
324
337
325 Retries and Resubmit
338 Retries and Resubmit
326 ====================
339 ====================
327
340
328 Retries
341 Retries
329 -------
342 -------
330
343
331 Another flag for tasks is `retries`. This is an integer, specifying how many times
344 Another flag for tasks is `retries`. This is an integer, specifying how many times
332 a task should be resubmitted after failure. This is useful for tasks that should still run
345 a task should be resubmitted after failure. This is useful for tasks that should still run
333 if their engine was shutdown, or may have some statistical chance of failing. The default
346 if their engine was shutdown, or may have some statistical chance of failing. The default
334 is to not retry tasks.
347 is to not retry tasks.
335
348
336 Resubmit
349 Resubmit
337 --------
350 --------
338
351
339 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
352 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
340 you have fixed the error, or because you want to restore the cluster to an interrupted state.
353 you have fixed the error, or because you want to restore the cluster to an interrupted state.
341 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
354 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
342 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
355 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
343 a task that is pending - only those that have finished, either successful or unsuccessful.
356 a task that is pending - only those that have finished, either successful or unsuccessful.
344
357
345 .. _parallel_schedulers:
358 .. _parallel_schedulers:
346
359
347 Schedulers
360 Schedulers
348 ==========
361 ==========
349
362
350 There are a variety of valid ways to determine where jobs should be assigned in a
363 There are a variety of valid ways to determine where jobs should be assigned in a
351 load-balancing situation. In IPython, we support several standard schemes, and
364 load-balancing situation. In IPython, we support several standard schemes, and
352 even make it easy to define your own. The scheme can be selected via the ``scheme``
365 even make it easy to define your own. The scheme can be selected via the ``scheme``
353 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
366 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
354 of a controller config object.
367 of a controller config object.
355
368
356 The built-in routing schemes:
369 The built-in routing schemes:
357
370
358 To select one of these schemes, simply do::
371 To select one of these schemes, simply do::
359
372
360 $ ipcontroller --scheme=<schemename>
373 $ ipcontroller --scheme=<schemename>
361 for instance:
374 for instance:
362 $ ipcontroller --scheme=lru
375 $ ipcontroller --scheme=lru
363
376
364 lru: Least Recently Used
377 lru: Least Recently Used
365
378
366 Always assign work to the least-recently-used engine. A close relative of
379 Always assign work to the least-recently-used engine. A close relative of
367 round-robin, it will be fair with respect to the number of tasks, agnostic
380 round-robin, it will be fair with respect to the number of tasks, agnostic
368 with respect to runtime of each task.
381 with respect to runtime of each task.
369
382
370 plainrandom: Plain Random
383 plainrandom: Plain Random
371
384
372 Randomly picks an engine on which to run.
385 Randomly picks an engine on which to run.
373
386
374 twobin: Two-Bin Random
387 twobin: Two-Bin Random
375
388
376 **Requires numpy**
389 **Requires numpy**
377
390
378 Pick two engines at random, and use the LRU of the two. This is known to be better
391 Pick two engines at random, and use the LRU of the two. This is known to be better
379 than plain random in many cases, but requires a small amount of computation.
392 than plain random in many cases, but requires a small amount of computation.
380
393
381 leastload: Least Load
394 leastload: Least Load
382
395
383 **This is the default scheme**
396 **This is the default scheme**
384
397
385 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
398 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
386
399
387 weighted: Weighted Two-Bin Random
400 weighted: Weighted Two-Bin Random
388
401
389 **Requires numpy**
402 **Requires numpy**
390
403
391 Pick two engines at random using the number of outstanding tasks as inverse weights,
404 Pick two engines at random using the number of outstanding tasks as inverse weights,
392 and use the one with the lower load.
405 and use the one with the lower load.
393
406
394
407
395 Pure ZMQ Scheduler
408 Pure ZMQ Scheduler
396 ------------------
409 ------------------
397
410
398 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
411 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
399 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
412 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
400 load-balancing. This scheduler does not support any of the advanced features of the Python
413 load-balancing. This scheduler does not support any of the advanced features of the Python
401 :class:`.Scheduler`.
414 :class:`.Scheduler`.
402
415
403 Disabled features when using the ZMQ Scheduler:
416 Disabled features when using the ZMQ Scheduler:
404
417
405 * Engine unregistration
418 * Engine unregistration
406 Task farming will be disabled if an engine unregisters.
419 Task farming will be disabled if an engine unregisters.
407 Further, if an engine is unregistered during computation, the scheduler may not recover.
420 Further, if an engine is unregistered during computation, the scheduler may not recover.
408 * Dependencies
421 * Dependencies
409 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
422 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
410 based on message content.
423 based on message content.
411 * Early destination notification
424 * Early destination notification
412 The Python schedulers know which engine gets which task, and notify the Hub. This
425 The Python schedulers know which engine gets which task, and notify the Hub. This
413 allows graceful handling of Engines coming and going. There is no way to know
426 allows graceful handling of Engines coming and going. There is no way to know
414 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
427 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
415 engine until they *finish*. This makes recovery from engine shutdown very difficult.
428 engine until they *finish*. This makes recovery from engine shutdown very difficult.
416
429
417
430
418 .. note::
431 .. note::
419
432
420 TODO: performance comparisons
433 TODO: performance comparisons
421
434
422
435
423
436
424
437
425 More details
438 More details
426 ============
439 ============
427
440
428 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
441 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
429 of flexibility in how tasks are defined and run. The next places to look are
442 of flexibility in how tasks are defined and run. The next places to look are
430 in the following classes:
443 in the following classes:
431
444
432 * :class:`~IPython.parallel.client.view.LoadBalancedView`
445 * :class:`~IPython.parallel.client.view.LoadBalancedView`
433 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
446 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
434 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
447 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
435 * :mod:`~IPython.parallel.controller.dependency`
448 * :mod:`~IPython.parallel.controller.dependency`
436
449
437 The following is an overview of how to use these classes together:
450 The following is an overview of how to use these classes together:
438
451
439 1. Create a :class:`Client` and :class:`LoadBalancedView`
452 1. Create a :class:`Client` and :class:`LoadBalancedView`
440 2. Define some functions to be run as tasks
453 2. Define some functions to be run as tasks
441 3. Submit your tasks to using the :meth:`apply` method of your
454 3. Submit your tasks to using the :meth:`apply` method of your
442 :class:`LoadBalancedView` instance.
455 :class:`LoadBalancedView` instance.
443 4. Use :meth:`Client.get_result` to get the results of the
456 4. Use :meth:`.Client.get_result` to get the results of the
444 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
457 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
445 for and then receive the results.
458 for and then receive the results.
446
459
447 .. seealso::
460 .. seealso::
448
461
449 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
462 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now