##// END OF EJS Templates
update parallel docs with some changes from scipy tutorial...
MinRK -
Show More
@@ -0,0 +1,61 b''
1 """An example for handling results in a way that AsyncMapResult doesn't provide
2
3 Specifically, out-of-order results with some special handing of metadata.
4
5 This just submits a bunch of jobs, waits on the results, and prints the stdout
6 and results of each as they finish.
7
8 Authors
9 -------
10 * MinRK
11 """
12 import time
13 import random
14
15 from IPython import parallel
16
17 # create client & views
18 rc = parallel.Client()
19 dv = rc[:]
20 v = rc.load_balanced_view()
21
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
26
27
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
31 print "hi from engine %i" % id
32 sys.stdout.flush()
33 time.sleep(t)
34 return count,t
35
36 amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
37
38 pending = set(amr.msg_ids)
39 while pending:
40 try:
41 rc.wait(pending, 1e-3)
42 except parallel.TimeoutError:
43 # ignore timeouterrors, since they only mean that at least one isn't done
44 pass
45 # finished is the set of msg_ids that are complete
46 finished = pending.difference(rc.outstanding)
47 # update pending to exclude those that just finished
48 pending = pending.difference(finished)
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
53 print "with stdout:"
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
55 print "and results:"
56
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
61
@@ -0,0 +1,83 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
11
12 Authors
13 -------
14 * MinRK
15 """
16
17 import os
18 import sys
19 import json
20 import zmq
21
22 from IPython.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
26
27 def main(connection_file):
28 """watch iopub channel, and print messages"""
29
30 ctx = zmq.Context.instance()
31
32 with open(connection_file) as f:
33 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
46 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
57 while True:
58 try:
59 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
61 return
62 # ident always length 1 here
63 topic = idents[0]
64 if msg['msg_type'] == 'stream':
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
71 c = msg['content']
72 print topic + ':'
73 for line in c['traceback']:
74 # indent lines
75 print ' ' + line
76
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
79 cf = sys.argv[1]
80 else:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
@@ -0,0 +1,52 b''
1 """Example of iteration through AsyncMapResult, without waiting for all results
2
3 Authors
4 -------
5 * MinRK
6 """
7 import time
8
9 from IPython import parallel
10
11 # create client & view
12 rc = parallel.Client()
13 dv = rc[:]
14 v = rc.load_balanced_view()
15
16 # scatter 'id', so id=0,1,2 on engines 0,1,2
17 dv.scatter('id', rc.ids, flatten=True)
18 print "Engine IDs: ", dv['id']
19
20 # create a Reference to `id`. This will be a different value on each engine
21 ref = parallel.Reference('id')
22 print "sleeping for `id` seconds on each engine"
23 tic = time.time()
24 ar = dv.apply(time.sleep, ref)
25 for i,r in enumerate(ar):
26 print "%i: %.3f"%(i, time.time()-tic)
27
28 def sleep_here(t):
29 import time
30 time.sleep(t)
31 return id,t
32
33 # one call per task
34 print "running with one call per task"
35 amr = v.map(sleep_here, [.01*t for t in range(100)])
36 tic = time.time()
37 for i,r in enumerate(amr):
38 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
39
40 print "running with four calls per task"
41 # with chunksize, we can have four calls per task
42 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
43 tic = time.time()
44 for i,r in enumerate(amr):
45 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
46
47 print "running with two calls per task, with unordered results"
48 # We can even iterate through faster results first, with ordered=False
49 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
50 tic = time.time()
51 for i,r in enumerate(amr):
52 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
1 NO CONTENT: new file 100644, binary diff hidden
@@ -1,173 +1,177 b''
1 1 .. _dag_dependencies:
2 2
3 3 ================
4 4 DAG Dependencies
5 5 ================
6 6
7 7 Often, parallel workflow is described in terms of a `Directed Acyclic Graph
8 8 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_ or DAG. A popular library
9 9 for working with Graphs is NetworkX_. Here, we will walk through a demo mapping
10 10 a nx DAG to task dependencies.
11 11
12 12 The full script that runs this demo can be found in
13 13 :file:`docs/examples/parallel/dagdeps.py`.
14 14
15 15 Why are DAGs good for task dependencies?
16 16 ----------------------------------------
17 17
18 18 The 'G' in DAG is 'Graph'. A Graph is a collection of **nodes** and **edges** that connect
19 19 the nodes. For our purposes, each node would be a task, and each edge would be a
20 20 dependency. The 'D' in DAG stands for 'Directed'. This means that each edge has a
21 21 direction associated with it. So we can interpret the edge (a,b) as meaning that b depends
22 22 on a, whereas the edge (b,a) would mean a depends on b. The 'A' is 'Acyclic', meaning that
23 23 there must not be any closed loops in the graph. This is important for dependencies,
24 24 because if a loop were closed, then a task could ultimately depend on itself, and never be
25 25 able to run. If your workflow can be described as a DAG, then it is impossible for your
26 26 dependencies to cause a deadlock.
27 27
28 28 A Sample DAG
29 29 ------------
30 30
31 31 Here, we have a very simple 5-node DAG:
32 32
33 33 .. figure:: figs/ simpledag.*
34 :width: 600px
34 35
35 36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 38 1 and 2; and 4 depends only on 1.
38 39
39 40 A possible sequence of events for this workflow:
40 41
41 42 0. Task 0 can run right away
42 43 1. 0 finishes, so 1,2 can start
43 44 2. 1 finishes, 3 is still waiting on 2, but 4 can start right away
44 45 3. 2 finishes, and 3 can finally start
45 46
46 47
47 48 Further, taking failures into account, assuming all dependencies are run with the default
48 49 `success=True,failure=False`, the following cases would occur for each node's failure:
49 50
50 51 0. fails: all other tasks fail as Impossible
51 52 1. 2 can still succeed, but 3,4 are unreachable
52 53 2. 3 becomes unreachable, but 4 is unaffected
53 54 3. and 4. are terminal, and can have no effect on other nodes
54 55
55 56 The code to generate the simple DAG:
56 57
57 58 .. sourcecode:: python
58 59
59 60 import networkx as nx
60 61
61 62 G = nx.DiGraph()
62 63
63 64 # add 5 nodes, labeled 0-4:
64 65 map(G.add_node, range(5))
65 66 # 1,2 depend on 0:
66 67 G.add_edge(0,1)
67 68 G.add_edge(0,2)
68 69 # 3 depends on 1,2
69 70 G.add_edge(1,3)
70 71 G.add_edge(2,3)
71 72 # 4 depends on 1
72 73 G.add_edge(1,4)
73 74
74 75 # now draw the graph:
75 76 pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
76 77 3 : (0,2), 4 : (2,2)}
77 78 nx.draw(G, pos, edge_color='r')
78 79
79 80
80 81 For demonstration purposes, we have a function that generates a random DAG with a given
81 82 number of nodes and edges.
82 83
83 84 .. literalinclude:: ../../examples/parallel/dagdeps.py
84 85 :language: python
85 86 :lines: 20-36
86 87
87 88 So first, we start with a graph of 32 nodes, with 128 edges:
88 89
89 90 .. sourcecode:: ipython
90 91
91 92 In [2]: G = random_dag(32,128)
92 93
93 94 Now, we need to build our dict of jobs corresponding to the nodes on the graph:
94 95
95 96 .. sourcecode:: ipython
96 97
97 98 In [3]: jobs = {}
98 99
99 100 # in reality, each job would presumably be different
100 101 # randomwait is just a function that sleeps for a random interval
101 102 In [4]: for node in G:
102 103 ...: jobs[node] = randomwait
103 104
104 105 Once we have a dict of jobs matching the nodes on the graph, we can start submitting jobs,
105 106 and linking up the dependencies. Since we don't know a job's msg_id until it is submitted,
106 107 which is necessary for building dependencies, it is critical that we don't submit any jobs
107 108 before other jobs it may depend on. Fortunately, NetworkX provides a
108 109 :meth:`topological_sort` method which ensures exactly this. It presents an iterable, that
109 110 guarantees that when you arrive at a node, you have already visited all the nodes it
110 111 on which it depends:
111 112
112 113 .. sourcecode:: ipython
113 114
114 115 In [5]: rc = Client()
115 116 In [5]: view = rc.load_balanced_view()
116 117
117 118 In [6]: results = {}
118 119
119 120 In [7]: for node in G.topological_sort():
120 121 ...: # get list of AsyncResult objects from nodes
121 122 ...: # leading into this one as dependencies
122 123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 124 ...: # submit and store AsyncResult object
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
125 128
126 129 Now that we have submitted all the jobs, we can wait for the results:
127 130
128 131 .. sourcecode:: ipython
129 132
130 133 In [8]: view.wait(results.values())
131 134
132 135 Now, at least we know that all the jobs ran and did not fail (``r.get()`` would have
133 136 raised an error if a task failed). But we don't know that the ordering was properly
134 137 respected. For this, we can use the :attr:`metadata` attribute of each AsyncResult.
135 138
136 139 These objects store a variety of metadata about each task, including various timestamps.
137 140 We can validate that the dependencies were respected by checking that each task was
138 141 started after all of its predecessors were completed:
139 142
140 143 .. literalinclude:: ../../examples/parallel/dagdeps.py
141 144 :language: python
142 145 :lines: 64-70
143 146
144 147 We can also validate the graph visually. By drawing the graph with each node's x-position
145 148 as its start time, all arrows must be pointing to the right if dependencies were respected.
146 149 For spreading, the y-position will be the runtime of the task, so long tasks
147 150 will be at the top, and quick, small tasks will be at the bottom.
148 151
149 152 .. sourcecode:: ipython
150 153
151 154 In [10]: from matplotlib.dates import date2num
152 155
153 156 In [11]: from matplotlib.cm import gist_rainbow
154 157
155 158 In [12]: pos = {}; colors = {}
156 159
157 160 In [12]: for node in G:
158 ...: md = results[node].metadata
159 ...: start = date2num(md.started)
160 ...: runtime = date2num(md.completed) - start
161 ...: pos[node] = (start, runtime)
162 ...: colors[node] = md.engine_id
161 ....: md = results[node].metadata
162 ....: start = date2num(md.started)
163 ....: runtime = date2num(md.completed) - start
164 ....: pos[node] = (start, runtime)
165 ....: colors[node] = md.engine_id
163 166
164 167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165 ...: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
166 169
167 170 .. figure:: figs/ dagdeps.*
171 :width: 600px
168 172
169 173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 174 were four engines). Edges denote dependencies.
171 175
172 176
173 177 .. _NetworkX: http://networkx.lanl.gov/
@@ -1,263 +1,295 b''
1 1 .. _parallel_overview:
2 2
3 3 ============================
4 4 Overview and getting started
5 5 ============================
6 6
7 7 Introduction
8 8 ============
9 9
10 10 This section gives an overview of IPython's sophisticated and powerful
11 11 architecture for parallel and distributed computing. This architecture
12 12 abstracts out parallelism in a very general way, which enables IPython to
13 13 support many different styles of parallelism including:
14 14
15 15 * Single program, multiple data (SPMD) parallelism.
16 16 * Multiple program, multiple data (MPMD) parallelism.
17 17 * Message passing using MPI.
18 18 * Task farming.
19 19 * Data parallel.
20 20 * Combinations of these approaches.
21 21 * Custom user defined approaches.
22 22
23 23 Most importantly, IPython enables all types of parallel applications to
24 24 be developed, executed, debugged and monitored *interactively*. Hence,
25 25 the ``I`` in IPython. The following are some example usage cases for IPython:
26 26
27 27 * Quickly parallelize algorithms that are embarrassingly parallel
28 28 using a number of simple approaches. Many simple things can be
29 29 parallelized interactively in one or two lines of code.
30 30
31 31 * Steer traditional MPI applications on a supercomputer from an
32 32 IPython session on your laptop.
33 33
34 34 * Analyze and visualize large datasets (that could be remote and/or
35 35 distributed) interactively using IPython and tools like
36 36 matplotlib/TVTK.
37 37
38 38 * Develop, test and debug new parallel algorithms
39 39 (that may use MPI) interactively.
40 40
41 41 * Tie together multiple MPI jobs running on different systems into
42 42 one giant distributed and parallel system.
43 43
44 44 * Start a parallel job on your cluster and then have a remote
45 45 collaborator connect to it and pull back data into their
46 46 local IPython session for plotting and analysis.
47 47
48 48 * Run a set of tasks on a set of CPUs using dynamic load balancing.
49 49
50 50 .. tip::
51 51
52 52 At the SciPy 2011 conference in Austin, Min Ragan-Kelley presented a
53 53 complete 4-hour tutorial on the use of these features, and all the materials
54 54 for the tutorial are now `available online`__. That tutorial provides an
55 55 excellent, hands-on oriented complement to the reference documentation
56 56 presented here.
57 57
58 58 .. __: http://minrk.github.com/scipy-tutorial-2011
59 59
60 60 Architecture overview
61 61 =====================
62 62
63 .. figure:: figs/wideView.png
64 :width: 300px
65
66
63 67 The IPython architecture consists of four components:
64 68
65 69 * The IPython engine.
66 70 * The IPython hub.
67 71 * The IPython schedulers.
68 72 * The controller client.
69 73
70 74 These components live in the :mod:`IPython.parallel` package and are
71 75 installed with IPython. They do, however, have additional dependencies
72 76 that must be installed. For more information, see our
73 77 :ref:`installation documentation <install_index>`.
74 78
75 79 .. TODO: include zmq in install_index
76 80
77 81 IPython engine
78 82 ---------------
79 83
80 84 The IPython engine is a Python instance that takes Python commands over a
81 85 network connection. Eventually, the IPython engine will be a full IPython
82 86 interpreter, but for now, it is a regular Python interpreter. The engine
83 87 can also handle incoming and outgoing Python objects sent over a network
84 88 connection. When multiple engines are started, parallel and distributed
85 89 computing becomes possible. An important feature of an IPython engine is
86 90 that it blocks while user code is being executed. Read on for how the
87 91 IPython controller solves this problem to expose a clean asynchronous API
88 92 to the user.
89 93
90 94 IPython controller
91 95 ------------------
92 96
93 97 The IPython controller processes provide an interface for working with a set of engines.
94 98 At a general level, the controller is a collection of processes to which IPython engines
95 99 and clients can connect. The controller is composed of a :class:`Hub` and a collection of
96 100 :class:`Schedulers`. These Schedulers are typically run in separate processes but on the
97 101 same machine as the Hub, but can be run anywhere from local threads or on remote machines.
98 102
99 103 The controller also provides a single point of contact for users who wish to
100 104 utilize the engines connected to the controller. There are different ways of
101 105 working with a controller. In IPython, all of these models are implemented via
102 the client's :meth:`.View.apply` method, with various arguments, or
106 the :meth:`.View.apply` method, after
103 107 constructing :class:`.View` objects to represent subsets of engines. The two
104 108 primary models for interacting with engines are:
105 109
106 110 * A **Direct** interface, where engines are addressed explicitly.
107 111 * A **LoadBalanced** interface, where the Scheduler is trusted with assigning work to
108 112 appropriate engines.
109 113
110 114 Advanced users can readily extend the View models to enable other
111 115 styles of parallelism.
112 116
113 117 .. note::
114 118
115 119 A single controller and set of engines can be used with multiple models
116 120 simultaneously. This opens the door for lots of interesting things.
117 121
118 122
119 123 The Hub
120 124 *******
121 125
122 126 The center of an IPython cluster is the Hub. This is the process that keeps
123 127 track of engine connections, schedulers, clients, as well as all task requests and
124 128 results. The primary role of the Hub is to facilitate queries of the cluster state, and
125 129 minimize the necessary information required to establish the many connections involved in
126 130 connecting new clients and engines.
127 131
128 132
129 133 Schedulers
130 134 **********
131 135
132 136 All actions that can be performed on the engine go through a Scheduler. While the engines
133 137 themselves block when user code is run, the schedulers hide that from the user to provide
134 138 a fully asynchronous interface to a set of engines.
135 139
136 140
137 141 IPython client and views
138 142 ------------------------
139 143
140 144 There is one primary object, the :class:`~.parallel.Client`, for connecting to a cluster.
141 145 For each execution model, there is a corresponding :class:`~.parallel.View`. These views
142 146 allow users to interact with a set of engines through the interface. Here are the two default
143 147 views:
144 148
145 149 * The :class:`DirectView` class for explicit addressing.
146 150 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
147 151
148 152 Security
149 153 --------
150 154
151 155 IPython uses ZeroMQ for networking, which has provided many advantages, but
152 156 one of the setbacks is its utter lack of security [ZeroMQ]_. By default, no IPython
153 157 connections are encrypted, but open ports only listen on localhost. The only
154 158 source of security for IPython is via ssh-tunnel. IPython supports both shell
155 159 (`openssh`) and `paramiko` based tunnels for connections. There is a key necessary
156 160 to submit requests, but due to the lack of encryption, it does not provide
157 161 significant security if loopback traffic is compromised.
158 162
159 163 In our architecture, the controller is the only process that listens on
160 164 network ports, and is thus the main point of vulnerability. The standard model
161 165 for secure connections is to designate that the controller listen on
162 166 localhost, and use ssh-tunnels to connect clients and/or
163 167 engines.
164 168
165 169 To connect and authenticate to the controller an engine or client needs
166 170 some information that the controller has stored in a JSON file.
167 171 Thus, the JSON files need to be copied to a location where
168 172 the clients and engines can find them. Typically, this is the
169 173 :file:`~/.ipython/profile_default/security` directory on the host where the
170 174 client/engine is running (which could be a different host than the controller).
171 175 Once the JSON files are copied over, everything should work fine.
172 176
173 177 Currently, there are two JSON files that the controller creates:
174 178
175 179 ipcontroller-engine.json
176 180 This JSON file has the information necessary for an engine to connect
177 181 to a controller.
178 182
179 183 ipcontroller-client.json
180 184 The client's connection information. This may not differ from the engine's,
181 185 but since the controller may listen on different ports for clients and
182 186 engines, it is stored separately.
183 187
188 ipcontroller-client.json will look something like this, under default localhost
189 circumstances:
190
191 .. sourcecode:: python
192
193 {
194 "url":"tcp:\/\/127.0.0.1:54424",
195 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
196 "ssh":"",
197 "location":"10.19.1.135"
198 }
199
200 If, however, you are running the controller on a work node on a cluster, you will likely
201 need to use ssh tunnels to connect clients from your laptop to it. You will also
202 probably need to instruct the controller to listen for engines coming from other work nodes
203 on the cluster. An example of ipcontroller-client.json, as created by::
204
205 $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com
206
207
208 .. sourcecode:: python
209
210 {
211 "url":"tcp:\/\/*:54424",
212 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
213 "ssh":"login.mycluster.com",
214 "location":"10.0.0.2"
215 }
184 216 More details of how these JSON files are used are given below.
185 217
186 218 A detailed description of the security model and its implementation in IPython
187 219 can be found :ref:`here <parallelsecurity>`.
188 220
189 221 .. warning::
190 222
191 223 Even at its most secure, the Controller listens on ports on localhost, and
192 224 every time you make a tunnel, you open a localhost port on the connecting
193 225 machine that points to the Controller. If localhost on the Controller's
194 226 machine, or the machine of any client or engine, is untrusted, then your
195 227 Controller is insecure. There is no way around this with ZeroMQ.
196 228
197 229
198 230
199 231 Getting Started
200 232 ===============
201 233
202 234 To use IPython for parallel computing, you need to start one instance of the
203 235 controller and one or more instances of the engine. Initially, it is best to
204 236 simply start a controller and engines on a single host using the
205 237 :command:`ipcluster` command. To start a controller and 4 engines on your
206 238 localhost, just do::
207 239
208 240 $ ipcluster start -n 4
209 241
210 242 More details about starting the IPython controller and engines can be found
211 243 :ref:`here <parallel_process>`
212 244
213 245 Once you have started the IPython controller and one or more engines, you
214 246 are ready to use the engines to do something useful. To make sure
215 247 everything is working correctly, try the following commands:
216 248
217 249 .. sourcecode:: ipython
218 250
219 251 In [1]: from IPython.parallel import Client
220 252
221 253 In [2]: c = Client()
222 254
223 255 In [4]: c.ids
224 256 Out[4]: set([0, 1, 2, 3])
225 257
226 258 In [5]: c[:].apply_sync(lambda : "Hello, World")
227 259 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
228 260
229 261
230 262 When a client is created with no arguments, the client tries to find the corresponding JSON file
231 263 in the local `~/.ipython/profile_default/security` directory. Or if you specified a profile,
232 264 you can use that with the Client. This should cover most cases:
233 265
234 266 .. sourcecode:: ipython
235 267
236 268 In [2]: c = Client(profile='myprofile')
237 269
238 270 If you have put the JSON file in a different location or it has a different name, create the
239 271 client like this:
240 272
241 273 .. sourcecode:: ipython
242 274
243 275 In [2]: c = Client('/path/to/my/ipcontroller-client.json')
244 276
245 277 Remember, a client needs to be able to see the Hub's ports to connect. So if they are on a
246 278 different machine, you may need to use an ssh server to tunnel access to that machine,
247 279 then you would connect to it with:
248 280
249 281 .. sourcecode:: ipython
250 282
251 In [2]: c = Client(sshserver='myhub.example.com')
283 In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com')
252 284
253 285 Where 'myhub.example.com' is the url or IP address of the machine on
254 286 which the Hub process is running (or another machine that has direct access to the Hub's ports).
255 287
256 288 The SSH server may already be specified in ipcontroller-client.json, if the controller was
257 289 instructed at its launch time.
258 290
259 291 You are now ready to learn more about the :ref:`Direct
260 292 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
261 293 controller.
262 294
263 295 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
@@ -1,847 +1,841 b''
1 1 .. _parallel_multiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
5 5 ==========================
6 6
7 7 The direct, or multiengine, interface represents one possible way of working with a set of
8 8 IPython engines. The basic idea behind the multiengine interface is that the
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 12 and is designed with interactive usage in mind, and is the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
16 16 ===========================================
17 17
18 18 To follow along with this tutorial, you will need to start the IPython
19 19 controller and four IPython engines. The simplest way of doing this is to use
20 20 the :command:`ipcluster` command::
21 21
22 22 $ ipcluster start -n 4
23 23
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26 26
27 Creating a ``Client`` instance
28 ==============================
27 Creating a ``DirectView`` instance
28 ==================================
29 29
30 30 The first step is to import the IPython :mod:`IPython.parallel`
31 31 module and then create a :class:`.Client` instance:
32 32
33 33 .. sourcecode:: ipython
34 34
35 35 In [1]: from IPython.parallel import Client
36 36
37 37 In [2]: rc = Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/profile_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
44 44 .. sourcecode:: ipython
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 48 # or to connect with a specific profile you have set up:
49 49 In [3]: rc = Client(profile='mpi')
50 50
51 51
52 52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 63 constructed via list-access to the client:
64 64
65 65 .. sourcecode:: ipython
66 66
67 67 In [4]: dview = rc[:] # use all engines
68 68
69 69 .. seealso::
70 70
71 71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
75 75 ==========================
76 76
77 77 In many cases, you simply want to apply a Python function to a sequence of
78 78 objects, but *in parallel*. The client interface provides a simple way
79 79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 Python's builtin :func:`map` functions allows a function to be applied to a
85 85 sequence element-by-element. This type of code is typically trivial to
86 86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 88 DirectView's :meth:`map` method:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95 95
96 96 In [67]: serial_result==parallel_result
97 97 Out[67]: True
98 98
99 99
100 100 .. note::
101 101
102 102 The :class:`DirectView`'s version of :meth:`map` does
103 103 not do dynamic load balancing. For a load balanced version, use a
104 104 :class:`LoadBalancedView`.
105 105
106 106 .. seealso::
107 107
108 108 :meth:`map` is implemented via :class:`ParallelFunction`.
109 109
110 110 Remote function decorators
111 111 --------------------------
112 112
113 113 Remote functions are just like normal functions, but when they are called,
114 114 they execute on one or more engines, rather than locally. IPython provides
115 115 two decorators:
116 116
117 117 .. sourcecode:: ipython
118 118
119 119 In [10]: @dview.remote(block=True)
120 ...: def getpid():
121 ...: import os
122 ...: return os.getpid()
123 ...:
120 ....: def getpid():
121 ....: import os
122 ....: return os.getpid()
123 ....:
124 124
125 125 In [11]: getpid()
126 126 Out[11]: [12345, 12346, 12347, 12348]
127 127
128 128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 129 operations and distribute them, reconstructing the result.
130 130
131 131 .. sourcecode:: ipython
132 132
133 133 In [12]: import numpy as np
134 134
135 135 In [13]: A = np.random.random((64,48))
136 136
137 137 In [14]: @dview.parallel(block=True)
138 ...: def pmul(A,B):
139 ...: return A*B
138 ....: def pmul(A,B):
139 ....: return A*B
140 140
141 141 In [15]: C_local = A*A
142 142
143 143 In [16]: C_remote = pmul(A,A)
144 144
145 145 In [17]: (C_local == C_remote).all()
146 146 Out[17]: True
147 147
148 148 .. seealso::
149 149
150 150 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
151 151 options.
152 152
153 153 Calling Python functions
154 154 ========================
155 155
156 156 The most basic type of operation that can be performed on the engines is to
157 157 execute Python code or call Python functions. Executing Python code can be
158 158 done in blocking or non-blocking mode (non-blocking is default) using the
159 159 :meth:`.View.execute` method, and calling functions can be done via the
160 160 :meth:`.View.apply` method.
161 161
162 162 apply
163 163 -----
164 164
165 165 The main method for doing remote execution (in fact, all methods that
166 166 communicate with the engines are built on top of it), is :meth:`View.apply`.
167 167
168 168 We strive to provide the cleanest interface we can, so `apply` has the following
169 169 signature:
170 170
171 171 .. sourcecode:: python
172 172
173 173 view.apply(f, *args, **kwargs)
174 174
175 175 There are various ways to call functions with IPython, and these flags are set as
176 176 attributes of the View. The ``DirectView`` has just two of these flags:
177 177
178 178 dv.block : bool
179 179 whether to wait for the result, or return an :class:`AsyncResult` object
180 180 immediately
181 181 dv.track : bool
182 182 whether to instruct pyzmq to track when zeromq is done sending the message.
183 183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 184 edit in-place. You need to know when it becomes safe to edit the buffer
185 185 without corrupting the message.
186 dv.targets : int, list of ints
187 which targets this view is associated with.
186 188
187 189
188 190 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
189 191
190 192 .. sourcecode:: ipython
191 193
192 194 In [4]: view = rc[1:3]
193 195 Out[4]: <DirectView [1, 2]>
194 196
195 197 In [5]: view.apply<tab>
196 198 view.apply view.apply_async view.apply_sync
197 199
198 200 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
199 201
200 202 Blocking execution
201 203 ------------------
202 204
203 205 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
204 206 these examples) submits the command to the controller, which places the
205 207 command in the engines' queues for execution. The :meth:`apply` call then
206 208 blocks until the engines are done executing the command:
207 209
208 210 .. sourcecode:: ipython
209 211
210 212 In [2]: dview = rc[:] # A DirectView of all engines
211 213 In [3]: dview.block=True
212 214 In [4]: dview['a'] = 5
213 215
214 216 In [5]: dview['b'] = 10
215 217
216 218 In [6]: dview.apply(lambda x: a+b+x, 27)
217 219 Out[6]: [42, 42, 42, 42]
218 220
219 221 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
220 222 method:
221 223
222 224 In [7]: dview.block=False
223 225
224 226 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
225 227 Out[8]: [42, 42, 42, 42]
226 228
227 229 Python commands can be executed as strings on specific engines by using a View's ``execute``
228 230 method:
229 231
230 232 .. sourcecode:: ipython
231 233
232 234 In [6]: rc[::2].execute('c=a+b')
233 235
234 236 In [7]: rc[1::2].execute('c=a-b')
235 237
236 238 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
237 239 Out[8]: [15, -5, 15, -5]
238 240
239 241
240 242 Non-blocking execution
241 243 ----------------------
242 244
243 245 In non-blocking mode, :meth:`apply` submits the command to be executed and
244 246 then returns a :class:`AsyncResult` object immediately. The
245 247 :class:`AsyncResult` object gives you a way of getting a result at a later
246 248 time through its :meth:`get` method.
247 249
248 250 .. Note::
249 251
250 252 The :class:`AsyncResult` object provides a superset of the interface in
251 253 :py:class:`multiprocessing.pool.AsyncResult`. See the
252 254 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
253 255 for more.
254 256
255 257
256 258 This allows you to quickly submit long running commands without blocking your
257 259 local Python/IPython session:
258 260
259 261 .. sourcecode:: ipython
260 262
261 263 # define our function
262 264 In [6]: def wait(t):
263 ...: import time
264 ...: tic = time.time()
265 ...: time.sleep(t)
266 ...: return time.time()-tic
265 ....: import time
266 ....: tic = time.time()
267 ....: time.sleep(t)
268 ....: return time.time()-tic
267 269
268 270 # In non-blocking mode
269 271 In [7]: ar = dview.apply_async(wait, 2)
270 272
271 273 # Now block for the result
272 274 In [8]: ar.get()
273 275 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
274 276
275 277 # Again in non-blocking mode
276 278 In [9]: ar = dview.apply_async(wait, 10)
277 279
278 280 # Poll to see if the result is ready
279 281 In [10]: ar.ready()
280 282 Out[10]: False
281 283
282 284 # ask for the result, but wait a maximum of 1 second:
283 285 In [45]: ar.get(1)
284 286 ---------------------------------------------------------------------------
285 287 TimeoutError Traceback (most recent call last)
286 288 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
287 289 ----> 1 ar.get(1)
288 290
289 291 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
290 292 62 raise self._exception
291 293 63 else:
292 294 ---> 64 raise error.TimeoutError("Result not ready.")
293 295 65
294 296 66 def ready(self):
295 297
296 298 TimeoutError: Result not ready.
297 299
298 300 .. Note::
299 301
300 302 Note the import inside the function. This is a common model, to ensure
301 303 that the appropriate modules are imported where the task is run. You can
302 304 also manually import modules into the engine(s) namespace(s) via
303 305 :meth:`view.execute('import numpy')`.
304 306
305 307 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
306 308 are done. For this, there is a the method :meth:`wait`. This method takes a
307 309 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
308 310 and blocks until all of the associated results are ready:
309 311
310 312 .. sourcecode:: ipython
311 313
312 314 In [72]: dview.block=False
313 315
314 316 # A trivial list of AsyncResults objects
315 317 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
316 318
317 319 # Wait until all of them are done
318 320 In [74]: dview.wait(pr_list)
319 321
320 322 # Then, their results are ready using get() or the `.r` attribute
321 323 In [75]: pr_list[0].get()
322 324 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
323 325
324 326
325 327
326 328 The ``block`` and ``targets`` keyword arguments and attributes
327 329 --------------------------------------------------------------
328 330
329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
331 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
330 332 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 333 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 334 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
333 335 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
334 336
335 337 * If no keyword argument is provided, the instance attributes are used.
336 338 * Keyword argument, if provided override the instance attributes for
337 339 the duration of a single call.
338 340
339 341 The following examples demonstrate how to use the instance attributes:
340 342
341 343 .. sourcecode:: ipython
342 344
343 345 In [16]: dview.targets = [0,2]
344 346
345 347 In [17]: dview.block = False
346 348
347 349 In [18]: ar = dview.apply(lambda : 10)
348 350
349 351 In [19]: ar.get()
350 352 Out[19]: [10, 10]
351 353
352 354 In [16]: dview.targets = v.client.ids # all engines (4)
353 355
354 356 In [21]: dview.block = True
355 357
356 358 In [22]: dview.apply(lambda : 42)
357 359 Out[22]: [42, 42, 42, 42]
358 360
359 361 The :attr:`block` and :attr:`targets` instance attributes of the
360 362 :class:`.DirectView` also determine the behavior of the parallel magic commands.
361 363
362 364 Parallel magic commands
363 365 -----------------------
364 366
365 .. warning::
366
367 The magics have not been changed to work with the zeromq system. The
368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369
370 367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 368 that make it more pleasant to execute Python commands on the engines
372 369 interactively. These are simply shortcuts to :meth:`execute` and
373 370 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
374 371 Python command on the engines specified by the :attr:`targets` attribute of the
375 372 :class:`DirectView` instance:
376 373
377 374 .. sourcecode:: ipython
378 375
379 # load the parallel magic extension:
380 In [21]: %load_ext parallelmagic
381
382 376 # Create a DirectView for all targets
383 377 In [22]: dv = rc[:]
384 378
385 379 # Make this DirectView active for parallel magic commands
386 380 In [23]: dv.activate()
387 381
388 382 In [24]: dv.block=True
389 383
390 In [25]: import numpy
391
392 In [26]: %px import numpy
393 Parallel execution on engines: [0, 1, 2, 3]
384 # import numpy here and everywhere
385 In [25]: with dv.sync_imports():
386 ....: import numpy
387 importing numpy on engine(s)
394 388
395 389 In [27]: %px a = numpy.random.rand(2,2)
396 390 Parallel execution on engines: [0, 1, 2, 3]
397 391
398 392 In [28]: %px ev = numpy.linalg.eigvals(a)
399 393 Parallel execution on engines: [0, 1, 2, 3]
400 394
401 395 In [28]: dv['ev']
402 396 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 array([ 1.21435496, -0.35546712]),
404 array([ 0.72180653, 0.07133042]),
405 array([ 1.46384341e+00, 1.04353244e-04])
406 ]
397 ....: array([ 1.21435496, -0.35546712]),
398 ....: array([ 0.72180653, 0.07133042]),
399 ....: array([ 1.46384341, 1.04353244e-04])
400 ....: ]
407 401
408 402 The ``%result`` magic gets the most recent result, or takes an argument
409 403 specifying the index of the result to be requested. It is simply a shortcut to the
410 404 :meth:`get_result` method:
411 405
412 406 .. sourcecode:: ipython
413 407
414 408 In [29]: dv.apply_async(lambda : ev)
415 409
416 410 In [30]: %result
417 411 Out[30]: [ [ 1.28167017 0.14197338],
418 [-0.14093616 1.27877273],
419 [-0.37023573 1.06779409],
420 [ 0.83664764 -0.25602658] ]
412 ....: [-0.14093616 1.27877273],
413 ....: [-0.37023573 1.06779409],
414 ....: [ 0.83664764 -0.25602658] ]
421 415
422 416 The ``%autopx`` magic switches to a mode where everything you type is executed
423 417 on the engines given by the :attr:`targets` attribute:
424 418
425 419 .. sourcecode:: ipython
426 420
427 421 In [30]: dv.block=False
428 422
429 423 In [31]: %autopx
430 424 Auto Parallel Enabled
431 425 Type %autopx to disable
432 426
433 427 In [32]: max_evals = []
434 428 <IPython.parallel.AsyncResult object at 0x17b8a70>
435 429
436 430 In [33]: for i in range(100):
437 431 ....: a = numpy.random.rand(10,10)
438 432 ....: a = a+a.transpose()
439 433 ....: evals = numpy.linalg.eigvals(a)
440 434 ....: max_evals.append(evals[0].real)
441 435 ....:
442 436 ....:
443 437 <IPython.parallel.AsyncResult object at 0x17af8f0>
444 438
445 439 In [34]: %autopx
446 440 Auto Parallel Disabled
447 441
448 442 In [35]: dv.block=True
449 443
450 444 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
451 445 Parallel execution on engines: [0, 1, 2, 3]
452 446
453 447 In [37]: dv['ans']
454 448 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 'Average max eigenvalue is: 10.2076902286',
456 'Average max eigenvalue is: 10.1891484655',
457 'Average max eigenvalue is: 10.1158837784',]
449 ....: 'Average max eigenvalue is: 10.2076902286',
450 ....: 'Average max eigenvalue is: 10.1891484655',
451 ....: 'Average max eigenvalue is: 10.1158837784',]
458 452
459 453
460 454 Moving Python objects around
461 455 ============================
462 456
463 457 In addition to calling functions and executing code on engines, you can
464 458 transfer Python objects to and from your IPython session and the engines. In
465 459 IPython, these operations are called :meth:`push` (sending an object to the
466 460 engines) and :meth:`pull` (getting an object from the engines).
467 461
468 462 Basic push and pull
469 463 -------------------
470 464
471 465 Here are some examples of how you use :meth:`push` and :meth:`pull`:
472 466
473 467 .. sourcecode:: ipython
474 468
475 469 In [38]: dview.push(dict(a=1.03234,b=3453))
476 470 Out[38]: [None,None,None,None]
477 471
478 472 In [39]: dview.pull('a')
479 473 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
480 474
481 475 In [40]: dview.pull('b', targets=0)
482 476 Out[40]: 3453
483 477
484 478 In [41]: dview.pull(('a','b'))
485 479 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
486 480
487 481 In [43]: dview.push(dict(c='speed'))
488 482 Out[43]: [None,None,None,None]
489 483
490 484 In non-blocking mode :meth:`push` and :meth:`pull` also return
491 485 :class:`AsyncResult` objects:
492 486
493 487 .. sourcecode:: ipython
494 488
495 489 In [48]: ar = dview.pull('a', block=False)
496 490
497 491 In [49]: ar.get()
498 492 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
499 493
500 494
501 495 Dictionary interface
502 496 --------------------
503 497
504 498 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
505 499 dictionary-style access by key and methods such as :meth:`get` and
506 500 :meth:`update` for convenience. This make the remote namespaces of the engines
507 501 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
508 502
509 503 .. sourcecode:: ipython
510 504
511 505 In [51]: dview['a']=['foo','bar']
512 506
513 507 In [52]: dview['a']
514 508 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
515 509
516 510 Scatter and gather
517 511 ------------------
518 512
519 513 Sometimes it is useful to partition a sequence and push the partitions to
520 514 different engines. In MPI language, this is know as scatter/gather and we
521 515 follow that terminology. However, it is important to remember that in
522 516 IPython's :class:`Client` class, :meth:`scatter` is from the
523 517 interactive IPython session to the engines and :meth:`gather` is from the
524 518 engines back to the interactive IPython session. For scatter/gather operations
525 between engines, MPI should be used:
519 between engines, MPI, pyzmq, or some other direct interconnect should be used.
526 520
527 521 .. sourcecode:: ipython
528 522
529 523 In [58]: dview.scatter('a',range(16))
530 524 Out[58]: [None,None,None,None]
531 525
532 526 In [59]: dview['a']
533 527 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
534 528
535 529 In [60]: dview.gather('a')
536 530 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
537 531
538 532 Other things to look at
539 533 =======================
540 534
541 535 How to do parallel list comprehensions
542 536 --------------------------------------
543 537
544 538 In many cases list comprehensions are nicer than using the map function. While
545 539 we don't have fully parallel list comprehensions, it is simple to get the
546 540 basic effect using :meth:`scatter` and :meth:`gather`:
547 541
548 542 .. sourcecode:: ipython
549 543
550 544 In [66]: dview.scatter('x',range(64))
551 545
552 546 In [67]: %px y = [i**10 for i in x]
553 547 Parallel execution on engines: [0, 1, 2, 3]
554 548 Out[67]:
555 549
556 550 In [68]: y = dview.gather('y')
557 551
558 552 In [69]: print y
559 553 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
560 554
561 555 Remote imports
562 556 --------------
563 557
564 558 Sometimes you will want to import packages both in your interactive session
565 559 and on your remote engines. This can be done with the :class:`ContextManager`
566 560 created by a DirectView's :meth:`sync_imports` method:
567 561
568 562 .. sourcecode:: ipython
569 563
570 564 In [69]: with dview.sync_imports():
571 ...: import numpy
565 ....: import numpy
572 566 importing numpy on engine(s)
573 567
574 568 Any imports made inside the block will also be performed on the view's engines.
575 569 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
576 570 whether the local imports should also be performed. However, support for `local=False`
577 571 has not been implemented, so only packages that can be imported locally will work
578 572 this way.
579 573
580 574 You can also specify imports via the ``@require`` decorator. This is a decorator
581 575 designed for use in Dependencies, but can be used to handle remote imports as well.
582 576 Modules or module names passed to ``@require`` will be imported before the decorated
583 577 function is called. If they cannot be imported, the decorated function will never
584 578 execution, and will fail with an UnmetDependencyError.
585 579
586 580 .. sourcecode:: ipython
587 581
588 582 In [69]: from IPython.parallel import require
589 583
590 584 In [70]: @require('re'):
591 ...: def findall(pat, x):
592 ...: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
585 ....: def findall(pat, x):
586 ....: # re is guaranteed to be available
587 ....: return re.findall(pat, x)
594 588
595 589 # you can also pass modules themselves, that you already have locally:
596 590 In [71]: @require(time):
597 ...: def wait(t):
598 ...: time.sleep(t)
599 ...: return t
591 ....: def wait(t):
592 ....: time.sleep(t)
593 ....: return t
600 594
601 595 .. _parallel_exceptions:
602 596
603 597 Parallel exceptions
604 598 -------------------
605 599
606 600 In the multiengine interface, parallel commands can raise Python exceptions,
607 601 just like serial commands. But, it is a little subtle, because a single
608 602 parallel command can actually raise multiple exceptions (one for each engine
609 603 the command was run on). To express this idea, we have a
610 604 :exc:`CompositeError` exception class that will be raised in most cases. The
611 605 :exc:`CompositeError` class is a special type of exception that wraps one or
612 606 more other types of exceptions. Here is how it works:
613 607
614 608 .. sourcecode:: ipython
615 609
616 610 In [76]: dview.block=True
617 611
618 612 In [77]: dview.execute('1/0')
619 613 ---------------------------------------------------------------------------
620 614 CompositeError Traceback (most recent call last)
621 615 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
622 616 ----> 1 dview.execute('1/0')
623 617
624 618 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
625 619 591 default: self.block
626 620 592 """
627 621 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
628 622 594
629 623 595 def run(self, filename, targets=None, block=None):
630 624
631 625 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
632 626
633 627 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
634 628 55 def sync_results(f, self, *args, **kwargs):
635 629 56 """sync relevant results from self.client to our results attribute."""
636 630 ---> 57 ret = f(self, *args, **kwargs)
637 631 58 delta = self.outstanding.difference(self.client.outstanding)
638 632 59 completed = self.outstanding.intersection(delta)
639 633
640 634 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
641 635
642 636 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
643 637 44 n_previous = len(self.client.history)
644 638 45 try:
645 639 ---> 46 ret = f(self, *args, **kwargs)
646 640 47 finally:
647 641 48 nmsgs = len(self.client.history) - n_previous
648 642
649 643 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
650 644 529 if block:
651 645 530 try:
652 646 --> 531 return ar.get()
653 647 532 except KeyboardInterrupt:
654 648 533 pass
655 649
656 650 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
657 651 101 return self._result
658 652 102 else:
659 653 --> 103 raise self._exception
660 654 104 else:
661 655 105 raise error.TimeoutError("Result not ready.")
662 656
663 657 CompositeError: one or more exceptions from call to method: _execute
664 658 [0:apply]: ZeroDivisionError: integer division or modulo by zero
665 659 [1:apply]: ZeroDivisionError: integer division or modulo by zero
666 660 [2:apply]: ZeroDivisionError: integer division or modulo by zero
667 661 [3:apply]: ZeroDivisionError: integer division or modulo by zero
668 662
669 663 Notice how the error message printed when :exc:`CompositeError` is raised has
670 664 information about the individual exceptions that were raised on each engine.
671 665 If you want, you can even raise one of these original exceptions:
672 666
673 667 .. sourcecode:: ipython
674 668
675 669 In [80]: try:
676 670 ....: dview.execute('1/0')
677 671 ....: except parallel.error.CompositeError, e:
678 672 ....: e.raise_exception()
679 673 ....:
680 674 ....:
681 675 ---------------------------------------------------------------------------
682 676 RemoteError Traceback (most recent call last)
683 677 /home/user/<ipython-input-17-8597e7e39858> in <module>()
684 678 2 dview.execute('1/0')
685 679 3 except CompositeError as e:
686 680 ----> 4 e.raise_exception()
687 681
688 682 /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
689 683 266 raise IndexError("an exception with index %i does not exist"%excid)
690 684 267 else:
691 685 --> 268 raise RemoteError(en, ev, etb, ei)
692 686 269
693 687 270
694 688
695 689 RemoteError: ZeroDivisionError(integer division or modulo by zero)
696 690 Traceback (most recent call last):
697 691 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
698 692 exec code in working,working
699 693 File "<string>", line 1, in <module>
700 694 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
701 695 exec code in globals()
702 696 File "<string>", line 1, in <module>
703 697 ZeroDivisionError: integer division or modulo by zero
704 698
705 699 If you are working in IPython, you can simple type ``%debug`` after one of
706 700 these :exc:`CompositeError` exceptions is raised, and inspect the exception
707 701 instance:
708 702
709 703 .. sourcecode:: ipython
710 704
711 705 In [81]: dview.execute('1/0')
712 706 ---------------------------------------------------------------------------
713 707 CompositeError Traceback (most recent call last)
714 708 /home/user/<ipython-input-10-5d56b303a66c> in <module>()
715 709 ----> 1 dview.execute('1/0')
716 710
717 711 /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
718 712 591 default: self.block
719 713 592 """
720 714 --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
721 715 594
722 716 595 def run(self, filename, targets=None, block=None):
723 717
724 718 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
725 719
726 720 /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
727 721 55 def sync_results(f, self, *args, **kwargs):
728 722 56 """sync relevant results from self.client to our results attribute."""
729 723 ---> 57 ret = f(self, *args, **kwargs)
730 724 58 delta = self.outstanding.difference(self.client.outstanding)
731 725 59 completed = self.outstanding.intersection(delta)
732 726
733 727 /home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)
734 728
735 729 /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
736 730 44 n_previous = len(self.client.history)
737 731 45 try:
738 732 ---> 46 ret = f(self, *args, **kwargs)
739 733 47 finally:
740 734 48 nmsgs = len(self.client.history) - n_previous
741 735
742 736 /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
743 737 529 if block:
744 738 530 try:
745 739 --> 531 return ar.get()
746 740 532 except KeyboardInterrupt:
747 741 533 pass
748 742
749 743 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
750 744 101 return self._result
751 745 102 else:
752 746 --> 103 raise self._exception
753 747 104 else:
754 748 105 raise error.TimeoutError("Result not ready.")
755 749
756 750 CompositeError: one or more exceptions from call to method: _execute
757 751 [0:apply]: ZeroDivisionError: integer division or modulo by zero
758 752 [1:apply]: ZeroDivisionError: integer division or modulo by zero
759 753 [2:apply]: ZeroDivisionError: integer division or modulo by zero
760 754 [3:apply]: ZeroDivisionError: integer division or modulo by zero
761 755
762 756 In [82]: %debug
763 757 > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
764 758 102 else:
765 759 --> 103 raise self._exception
766 760 104 else:
767 761
768 762 # With the debugger running, self._exception is the exceptions instance. We can tab complete
769 763 # on it and see the extra methods that are available.
770 764 ipdb> self._exception.<tab>
771 765 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
772 766 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
773 767 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
774 768 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
775 769 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
776 770 ipdb> self._exception.print_tracebacks()
777 771 [0:apply]:
778 772 Traceback (most recent call last):
779 773 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
780 774 exec code in working,working
781 775 File "<string>", line 1, in <module>
782 776 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
783 777 exec code in globals()
784 778 File "<string>", line 1, in <module>
785 779 ZeroDivisionError: integer division or modulo by zero
786 780
787 781
788 782 [1:apply]:
789 783 Traceback (most recent call last):
790 784 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
791 785 exec code in working,working
792 786 File "<string>", line 1, in <module>
793 787 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
794 788 exec code in globals()
795 789 File "<string>", line 1, in <module>
796 790 ZeroDivisionError: integer division or modulo by zero
797 791
798 792
799 793 [2:apply]:
800 794 Traceback (most recent call last):
801 795 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
802 796 exec code in working,working
803 797 File "<string>", line 1, in <module>
804 798 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
805 799 exec code in globals()
806 800 File "<string>", line 1, in <module>
807 801 ZeroDivisionError: integer division or modulo by zero
808 802
809 803
810 804 [3:apply]:
811 805 Traceback (most recent call last):
812 806 File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
813 807 exec code in working,working
814 808 File "<string>", line 1, in <module>
815 809 File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
816 810 exec code in globals()
817 811 File "<string>", line 1, in <module>
818 812 ZeroDivisionError: integer division or modulo by zero
819 813
820 814
821 815 All of this same error handling magic even works in non-blocking mode:
822 816
823 817 .. sourcecode:: ipython
824 818
825 819 In [83]: dview.block=False
826 820
827 821 In [84]: ar = dview.execute('1/0')
828 822
829 823 In [85]: ar.get()
830 824 ---------------------------------------------------------------------------
831 825 CompositeError Traceback (most recent call last)
832 826 /home/user/<ipython-input-21-8531eb3d26fb> in <module>()
833 827 ----> 1 ar.get()
834 828
835 829 /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
836 830 101 return self._result
837 831 102 else:
838 832 --> 103 raise self._exception
839 833 104 else:
840 834 105 raise error.TimeoutError("Result not ready.")
841 835
842 836 CompositeError: one or more exceptions from call to method: _execute
843 837 [0:apply]: ZeroDivisionError: integer division or modulo by zero
844 838 [1:apply]: ZeroDivisionError: integer division or modulo by zero
845 839 [2:apply]: ZeroDivisionError: integer division or modulo by zero
846 840 [3:apply]: ZeroDivisionError: integer division or modulo by zero
847 841
@@ -1,449 +1,462 b''
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 27 $ ipcluster start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31 31
32 Creating a ``Client`` instance
33 ==============================
32 Creating a ``LoadBalancedView`` instance
33 ========================================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_taskmap:
115 115
116 The AsyncMapResult
117 ==================
116 Map results are iterable!
117 -------------------------
118
119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 object), you can actually iterate through them, and act on the results as they arrive:
121
122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 :language: python
124 :lines: 9-34
125
126 .. seealso::
127
128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 handling individual results as they arrive, but with metadata), you can always
130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131
132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
118 133
119 When you call ``lview.map_async(f, sequence)``, or just :meth:`map` with `block=True`, then
120 what you get in return will be an :class:`~AsyncMapResult` object. These are similar to
121 AsyncResult objects, but with one key difference
122 134
123 135 .. _parallel_dependencies:
124 136
125 137 Dependencies
126 138 ============
127 139
128 140 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
129 141 may want to associate some kind of `Dependency` that describes when, where, or whether
130 142 a task can be run. In IPython, we provide two types of dependencies:
131 143 `Functional Dependencies`_ and `Graph Dependencies`_
132 144
133 145 .. note::
134 146
135 147 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
136 148 and you will see errors or warnings if you try to use dependencies with the pure
137 149 scheduler.
138 150
139 151 Functional Dependencies
140 152 -----------------------
141 153
142 154 Functional dependencies are used to determine whether a given engine is capable of running
143 155 a particular task. This is implemented via a special :class:`Exception` class,
144 156 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
145 157 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
146 158 the error up to the client like any other error, catches the error, and submits the task
147 159 to a different engine. This will repeat indefinitely, and a task will never be submitted
148 160 to a given engine a second time.
149 161
150 162 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
151 163 some decorators for facilitating this behavior.
152 164
153 165 There are two decorators and a class used for functional dependencies:
154 166
155 167 .. sourcecode:: ipython
156 168
157 169 In [9]: from IPython.parallel import depend, require, dependent
158 170
159 171 @require
160 172 ********
161 173
162 174 The simplest sort of dependency is requiring that a Python module is available. The
163 175 ``@require`` decorator lets you define a function that will only run on engines where names
164 176 you specify are importable:
165 177
166 178 .. sourcecode:: ipython
167 179
168 180 In [10]: @require('numpy', 'zmq')
169 ...: def myfunc():
170 ...: return dostuff()
181 ....: def myfunc():
182 ....: return dostuff()
171 183
172 184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
173 185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
174 186
175 187 @depend
176 188 *******
177 189
178 190 The ``@depend`` decorator lets you decorate any function with any *other* function to
179 191 evaluate the dependency. The dependency function will be called at the start of the task,
180 192 and if it returns ``False``, then the dependency will be considered unmet, and the task
181 193 will be assigned to another engine. If the dependency returns *anything other than
182 194 ``False``*, the rest of the task will continue.
183 195
184 196 .. sourcecode:: ipython
185 197
186 198 In [10]: def platform_specific(plat):
187 ...: import sys
188 ...: return sys.platform == plat
199 ....: import sys
200 ....: return sys.platform == plat
189 201
190 202 In [11]: @depend(platform_specific, 'darwin')
191 ...: def mactask():
192 ...: do_mac_stuff()
203 ....: def mactask():
204 ....: do_mac_stuff()
193 205
194 206 In [12]: @depend(platform_specific, 'nt')
195 ...: def wintask():
196 ...: do_windows_stuff()
207 ....: def wintask():
208 ....: do_windows_stuff()
197 209
198 210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
199 211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
200 212 signature.
201 213
202 214 dependents
203 215 **********
204 216
205 217 You don't have to use the decorators on your tasks, if for instance you may want
206 218 to run tasks with a single function but varying dependencies, you can directly construct
207 219 the :class:`dependent` object that the decorators use:
208 220
209 221 .. sourcecode::ipython
210 222
211 223 In [13]: def mytask(*args):
212 ...: dostuff()
224 ....: dostuff()
213 225
214 226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
215 227 # this is the same as decorating the declaration of mytask with @depend
216 228 # but you can do it again:
217 229
218 230 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
219 231
220 232 # in general:
221 233 In [16]: t = dependent(f, g, *dargs, **dkwargs)
222 234
223 235 # is equivalent to:
224 236 In [17]: @depend(g, *dargs, **dkwargs)
225 ...: def t(a,b,c):
226 ...: # contents of f
237 ....: def t(a,b,c):
238 ....: # contents of f
227 239
228 240 Graph Dependencies
229 241 ------------------
230 242
231 243 Sometimes you want to restrict the time and/or location to run a given task as a function
232 244 of the time and/or location of other tasks. This is implemented via a subclass of
233 245 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
234 246 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
235 247 has been met.
236 248
237 249 The switches we provide for interpreting whether a given dependency set has been met:
238 250
239 251 any|all
240 252 Whether the dependency is considered met if *any* of the dependencies are done, or
241 253 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
242 254 boolean attribute, which defaults to ``True``.
243 255
244 256 success [default: True]
245 257 Whether to consider tasks that succeeded as fulfilling dependencies.
246 258
247 259 failure [default : False]
248 260 Whether to consider tasks that failed as fulfilling dependencies.
249 261 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
250 262 only when tasks have failed.
251 263
252 264 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
253 265 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
254 266 not care whether the task succeeds, and always want the second task to run, in which case you
255 267 should use `success=failure=True`. The default behavior is to only use successes.
256 268
257 269 There are other switches for interpretation that are made at the *task* level. These are
258 270 specified via keyword arguments to the client's :meth:`apply` method.
259 271
260 272 after,follow
261 273 You may want to run a task *after* a given set of dependencies have been run and/or
262 274 run it *where* another set of dependencies are met. To support this, every task has an
263 275 `after` dependency to restrict time, and a `follow` dependency to restrict
264 276 destination.
265 277
266 278 timeout
267 279 You may also want to set a time-limit for how long the scheduler should wait before a
268 280 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
269 281 indicates that the task should never timeout. If the timeout is reached, and the
270 282 scheduler still hasn't been able to assign the task to an engine, the task will fail
271 283 with a :class:`DependencyTimeout`.
272 284
273 285 .. note::
274 286
275 287 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
276 288 task to run after a job submitted via the MUX interface.
277 289
278 290 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
279 291 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
280 292 `follow` and `after` keywords to :meth:`client.apply`:
281 293
282 294 .. sourcecode:: ipython
283 295
284 296 In [14]: client.block=False
285 297
286 298 In [15]: ar = lview.apply(f, args, kwargs)
287 299
288 300 In [16]: ar2 = lview.apply(f2)
289 301
290 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
291
292 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
302 In [17]: with lview.temp_flags(after=[ar,ar2]):
303 ....: ar3 = lview.apply(f3)
293 304
305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 ....: ar4 = lview.apply(f3)
294 307
295 308 .. seealso::
296 309
297 310 Some parallel workloads can be described as a `Directed Acyclic Graph
298 311 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
299 312 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
300 313 onto task dependencies.
301 314
302 315
303 316 Impossible Dependencies
304 317 ***********************
305 318
306 319 The schedulers do perform some analysis on graph dependencies to determine whether they
307 320 are not possible to be met. If the scheduler does discover that a dependency cannot be
308 321 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
309 322 scheduler realized that a task can never be run, it won't sit indefinitely in the
310 323 scheduler clogging the pipeline.
311 324
312 325 The basic cases that are checked:
313 326
314 327 * depending on nonexistent messages
315 328 * `follow` dependencies were run on more than one machine and `all=True`
316 329 * any dependencies failed and `all=True,success=True,failures=False`
317 330 * all dependencies failed and `all=False,success=True,failure=False`
318 331
319 332 .. warning::
320 333
321 334 This analysis has not been proven to be rigorous, so it is likely possible for tasks
322 335 to become impossible to run in obscure situations, so a timeout may be a good choice.
323 336
324 337
325 338 Retries and Resubmit
326 339 ====================
327 340
328 341 Retries
329 342 -------
330 343
331 344 Another flag for tasks is `retries`. This is an integer, specifying how many times
332 345 a task should be resubmitted after failure. This is useful for tasks that should still run
333 346 if their engine was shutdown, or may have some statistical chance of failing. The default
334 347 is to not retry tasks.
335 348
336 349 Resubmit
337 350 --------
338 351
339 352 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
340 353 you have fixed the error, or because you want to restore the cluster to an interrupted state.
341 354 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
342 355 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
343 356 a task that is pending - only those that have finished, either successful or unsuccessful.
344 357
345 358 .. _parallel_schedulers:
346 359
347 360 Schedulers
348 361 ==========
349 362
350 363 There are a variety of valid ways to determine where jobs should be assigned in a
351 364 load-balancing situation. In IPython, we support several standard schemes, and
352 365 even make it easy to define your own. The scheme can be selected via the ``scheme``
353 366 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
354 367 of a controller config object.
355 368
356 369 The built-in routing schemes:
357 370
358 371 To select one of these schemes, simply do::
359 372
360 373 $ ipcontroller --scheme=<schemename>
361 374 for instance:
362 375 $ ipcontroller --scheme=lru
363 376
364 377 lru: Least Recently Used
365 378
366 379 Always assign work to the least-recently-used engine. A close relative of
367 380 round-robin, it will be fair with respect to the number of tasks, agnostic
368 381 with respect to runtime of each task.
369 382
370 383 plainrandom: Plain Random
371 384
372 385 Randomly picks an engine on which to run.
373 386
374 387 twobin: Two-Bin Random
375 388
376 389 **Requires numpy**
377 390
378 391 Pick two engines at random, and use the LRU of the two. This is known to be better
379 392 than plain random in many cases, but requires a small amount of computation.
380 393
381 394 leastload: Least Load
382 395
383 396 **This is the default scheme**
384 397
385 398 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
386 399
387 400 weighted: Weighted Two-Bin Random
388 401
389 402 **Requires numpy**
390 403
391 404 Pick two engines at random using the number of outstanding tasks as inverse weights,
392 405 and use the one with the lower load.
393 406
394 407
395 408 Pure ZMQ Scheduler
396 409 ------------------
397 410
398 411 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
399 412 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
400 413 load-balancing. This scheduler does not support any of the advanced features of the Python
401 414 :class:`.Scheduler`.
402 415
403 416 Disabled features when using the ZMQ Scheduler:
404 417
405 418 * Engine unregistration
406 419 Task farming will be disabled if an engine unregisters.
407 420 Further, if an engine is unregistered during computation, the scheduler may not recover.
408 421 * Dependencies
409 422 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
410 423 based on message content.
411 424 * Early destination notification
412 425 The Python schedulers know which engine gets which task, and notify the Hub. This
413 426 allows graceful handling of Engines coming and going. There is no way to know
414 427 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
415 428 engine until they *finish*. This makes recovery from engine shutdown very difficult.
416 429
417 430
418 431 .. note::
419 432
420 433 TODO: performance comparisons
421 434
422 435
423 436
424 437
425 438 More details
426 439 ============
427 440
428 441 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
429 442 of flexibility in how tasks are defined and run. The next places to look are
430 443 in the following classes:
431 444
432 445 * :class:`~IPython.parallel.client.view.LoadBalancedView`
433 446 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
434 447 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
435 448 * :mod:`~IPython.parallel.controller.dependency`
436 449
437 450 The following is an overview of how to use these classes together:
438 451
439 452 1. Create a :class:`Client` and :class:`LoadBalancedView`
440 453 2. Define some functions to be run as tasks
441 454 3. Submit your tasks to using the :meth:`apply` method of your
442 455 :class:`LoadBalancedView` instance.
443 4. Use :meth:`Client.get_result` to get the results of the
456 4. Use :meth:`.Client.get_result` to get the results of the
444 457 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
445 458 for and then receive the results.
446 459
447 460 .. seealso::
448 461
449 462 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now