Show More
@@ -0,0 +1,61 b'' | |||
|
1 | """An example for handling results in a way that AsyncMapResult doesn't provide | |
|
2 | ||
|
3 | Specifically, out-of-order results with some special handing of metadata. | |
|
4 | ||
|
5 | This just submits a bunch of jobs, waits on the results, and prints the stdout | |
|
6 | and results of each as they finish. | |
|
7 | ||
|
8 | Authors | |
|
9 | ------- | |
|
10 | * MinRK | |
|
11 | """ | |
|
12 | import time | |
|
13 | import random | |
|
14 | ||
|
15 | from IPython import parallel | |
|
16 | ||
|
17 | # create client & views | |
|
18 | rc = parallel.Client() | |
|
19 | dv = rc[:] | |
|
20 | v = rc.load_balanced_view() | |
|
21 | ||
|
22 | ||
|
23 | # scatter 'id', so id=0,1,2 on engines 0,1,2 | |
|
24 | dv.scatter('id', rc.ids, flatten=True) | |
|
25 | print dv['id'] | |
|
26 | ||
|
27 | ||
|
28 | def sleep_here(count, t): | |
|
29 | """simple function that takes args, prints a short message, sleeps for a time, and returns the same args""" | |
|
30 | import time,sys | |
|
31 | print "hi from engine %i" % id | |
|
32 | sys.stdout.flush() | |
|
33 | time.sleep(t) | |
|
34 | return count,t | |
|
35 | ||
|
36 | amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2) | |
|
37 | ||
|
38 | pending = set(amr.msg_ids) | |
|
39 | while pending: | |
|
40 | try: | |
|
41 | rc.wait(pending, 1e-3) | |
|
42 | except parallel.TimeoutError: | |
|
43 | # ignore timeouterrors, since they only mean that at least one isn't done | |
|
44 | pass | |
|
45 | # finished is the set of msg_ids that are complete | |
|
46 | finished = pending.difference(rc.outstanding) | |
|
47 | # update pending to exclude those that just finished | |
|
48 | pending = pending.difference(finished) | |
|
49 | for msg_id in finished: | |
|
50 | # we know these are done, so don't worry about blocking | |
|
51 | ar = rc.get_result(msg_id) | |
|
52 | print "job id %s finished on engine %i" % (msg_id, ar.engine_id) | |
|
53 | print "with stdout:" | |
|
54 | print ' ' + ar.stdout.replace('\n', '\n ').rstrip() | |
|
55 | print "and results:" | |
|
56 | ||
|
57 | # note that each job in a map always returns a list of length chunksize | |
|
58 | # even if chunksize == 1 | |
|
59 | for (count,t) in ar.result: | |
|
60 | print " item %i: slept for %.2fs" % (count, t) | |
|
61 |
@@ -0,0 +1,83 b'' | |||
|
1 | """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines. | |
|
2 | ||
|
3 | This connects to the default cluster, or you can pass the path to your ipcontroller-client.json | |
|
4 | ||
|
5 | Try running this script, and then running a few jobs that print (and call sys.stdout.flush), | |
|
6 | and you will see the print statements as they arrive, notably not waiting for the results | |
|
7 | to finish. | |
|
8 | ||
|
9 | You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines, | |
|
10 | and easily filter by message type. | |
|
11 | ||
|
12 | Authors | |
|
13 | ------- | |
|
14 | * MinRK | |
|
15 | """ | |
|
16 | ||
|
17 | import os | |
|
18 | import sys | |
|
19 | import json | |
|
20 | import zmq | |
|
21 | ||
|
22 | from IPython.zmq.session import Session | |
|
23 | from IPython.parallel.util import disambiguate_url | |
|
24 | from IPython.utils.py3compat import str_to_bytes | |
|
25 | from IPython.utils.path import get_security_file | |
|
26 | ||
|
27 | def main(connection_file): | |
|
28 | """watch iopub channel, and print messages""" | |
|
29 | ||
|
30 | ctx = zmq.Context.instance() | |
|
31 | ||
|
32 | with open(connection_file) as f: | |
|
33 | cfg = json.loads(f.read()) | |
|
34 | ||
|
35 | location = cfg['location'] | |
|
36 | reg_url = cfg['url'] | |
|
37 | session = Session(key=str_to_bytes(cfg['exec_key'])) | |
|
38 | ||
|
39 | query = ctx.socket(zmq.DEALER) | |
|
40 | query.connect(disambiguate_url(cfg['url'], location)) | |
|
41 | session.send(query, "connection_request") | |
|
42 | idents,msg = session.recv(query, mode=0) | |
|
43 | c = msg['content'] | |
|
44 | iopub_url = disambiguate_url(c['iopub'], location) | |
|
45 | sub = ctx.socket(zmq.SUB) | |
|
46 | # This will subscribe to all messages: | |
|
47 | sub.setsockopt(zmq.SUBSCRIBE, b'') | |
|
48 | # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout | |
|
49 | # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes | |
|
50 | # to everything from engine 1, but there is no way to subscribe to | |
|
51 | # just stdout from everyone. | |
|
52 | # multiple calls to subscribe will add subscriptions, e.g. to subscribe to | |
|
53 | # engine 1's stderr and engine 2's stdout: | |
|
54 | # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr') | |
|
55 | # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout') | |
|
56 | sub.connect(iopub_url) | |
|
57 | while True: | |
|
58 | try: | |
|
59 | idents,msg = session.recv(sub, mode=0) | |
|
60 | except KeyboardInterrupt: | |
|
61 | return | |
|
62 | # ident always length 1 here | |
|
63 | topic = idents[0] | |
|
64 | if msg['msg_type'] == 'stream': | |
|
65 | # stdout/stderr | |
|
66 | # stream names are in msg['content']['name'], if you want to handle | |
|
67 | # them differently | |
|
68 | print "%s: %s" % (topic, msg['content']['data']) | |
|
69 | elif msg['msg_type'] == 'pyerr': | |
|
70 | # Python traceback | |
|
71 | c = msg['content'] | |
|
72 | print topic + ':' | |
|
73 | for line in c['traceback']: | |
|
74 | # indent lines | |
|
75 | print ' ' + line | |
|
76 | ||
|
77 | if __name__ == '__main__': | |
|
78 | if len(sys.argv) > 1: | |
|
79 | cf = sys.argv[1] | |
|
80 | else: | |
|
81 | # This gets the security file for the default profile: | |
|
82 | cf = get_security_file('ipcontroller-client.json') | |
|
83 | main(cf) No newline at end of file |
@@ -0,0 +1,52 b'' | |||
|
1 | """Example of iteration through AsyncMapResult, without waiting for all results | |
|
2 | ||
|
3 | Authors | |
|
4 | ------- | |
|
5 | * MinRK | |
|
6 | """ | |
|
7 | import time | |
|
8 | ||
|
9 | from IPython import parallel | |
|
10 | ||
|
11 | # create client & view | |
|
12 | rc = parallel.Client() | |
|
13 | dv = rc[:] | |
|
14 | v = rc.load_balanced_view() | |
|
15 | ||
|
16 | # scatter 'id', so id=0,1,2 on engines 0,1,2 | |
|
17 | dv.scatter('id', rc.ids, flatten=True) | |
|
18 | print "Engine IDs: ", dv['id'] | |
|
19 | ||
|
20 | # create a Reference to `id`. This will be a different value on each engine | |
|
21 | ref = parallel.Reference('id') | |
|
22 | print "sleeping for `id` seconds on each engine" | |
|
23 | tic = time.time() | |
|
24 | ar = dv.apply(time.sleep, ref) | |
|
25 | for i,r in enumerate(ar): | |
|
26 | print "%i: %.3f"%(i, time.time()-tic) | |
|
27 | ||
|
28 | def sleep_here(t): | |
|
29 | import time | |
|
30 | time.sleep(t) | |
|
31 | return id,t | |
|
32 | ||
|
33 | # one call per task | |
|
34 | print "running with one call per task" | |
|
35 | amr = v.map(sleep_here, [.01*t for t in range(100)]) | |
|
36 | tic = time.time() | |
|
37 | for i,r in enumerate(amr): | |
|
38 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |
|
39 | ||
|
40 | print "running with four calls per task" | |
|
41 | # with chunksize, we can have four calls per task | |
|
42 | amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) | |
|
43 | tic = time.time() | |
|
44 | for i,r in enumerate(amr): | |
|
45 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |
|
46 | ||
|
47 | print "running with two calls per task, with unordered results" | |
|
48 | # We can even iterate through faster results first, with ordered=False | |
|
49 | amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) | |
|
50 | tic = time.time() | |
|
51 | for i,r in enumerate(amr): | |
|
52 | print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic) |
|
1 | NO CONTENT: new file 100644, binary diff hidden |
@@ -30,7 +30,8 b' A Sample DAG' | |||
|
30 | 30 | |
|
31 | 31 | Here, we have a very simple 5-node DAG: |
|
32 | 32 | |
|
33 |
.. figure:: figs/ |
|
|
33 | .. figure:: figs/simpledag.* | |
|
34 | :width: 600px | |
|
34 | 35 | |
|
35 | 36 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 |
|
36 | 37 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on |
@@ -117,11 +118,13 b' on which it depends:' | |||
|
117 | 118 | In [6]: results = {} |
|
118 | 119 | |
|
119 | 120 | In [7]: for node in G.topological_sort(): |
|
120 |
...: |
|
|
121 |
...: |
|
|
122 |
...: |
|
|
123 |
...: |
|
|
124 |
...: |
|
|
121 | ...: # get list of AsyncResult objects from nodes | |
|
122 | ...: # leading into this one as dependencies | |
|
123 | ...: deps = [ results[n] for n in G.predecessors(node) ] | |
|
124 | ...: # submit and store AsyncResult object | |
|
125 | ...: with view.temp_flags(after=deps, block=False): | |
|
126 | ...: results[node] = view.apply_with_flags(jobs[node]) | |
|
127 | ||
|
125 | 128 | |
|
126 | 129 | Now that we have submitted all the jobs, we can wait for the results: |
|
127 | 130 | |
@@ -155,16 +158,17 b' will be at the top, and quick, small tasks will be at the bottom.' | |||
|
155 | 158 | In [12]: pos = {}; colors = {} |
|
156 | 159 | |
|
157 | 160 | In [12]: for node in G: |
|
158 |
|
|
|
159 |
|
|
|
160 |
|
|
|
161 |
|
|
|
162 |
|
|
|
161 | ....: md = results[node].metadata | |
|
162 | ....: start = date2num(md.started) | |
|
163 | ....: runtime = date2num(md.completed) - start | |
|
164 | ....: pos[node] = (start, runtime) | |
|
165 | ....: colors[node] = md.engine_id | |
|
163 | 166 | |
|
164 | 167 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), |
|
165 |
|
|
|
168 | ....: cmap=gist_rainbow) | |
|
166 | 169 | |
|
167 |
.. figure:: figs/ |
|
|
170 | .. figure:: figs/dagdeps.* | |
|
171 | :width: 600px | |
|
168 | 172 | |
|
169 | 173 | Time started on x, runtime on y, and color-coded by engine-id (in this case there |
|
170 | 174 | were four engines). Edges denote dependencies. |
@@ -60,6 +60,10 b' the ``I`` in IPython. The following are some example usage cases for IPython:' | |||
|
60 | 60 | Architecture overview |
|
61 | 61 | ===================== |
|
62 | 62 | |
|
63 | .. figure:: figs/wideView.png | |
|
64 | :width: 300px | |
|
65 | ||
|
66 | ||
|
63 | 67 | The IPython architecture consists of four components: |
|
64 | 68 | |
|
65 | 69 | * The IPython engine. |
@@ -99,7 +103,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote' | |||
|
99 | 103 | The controller also provides a single point of contact for users who wish to |
|
100 | 104 | utilize the engines connected to the controller. There are different ways of |
|
101 | 105 | working with a controller. In IPython, all of these models are implemented via |
|
102 |
the |
|
|
106 | the :meth:`.View.apply` method, after | |
|
103 | 107 | constructing :class:`.View` objects to represent subsets of engines. The two |
|
104 | 108 | primary models for interacting with engines are: |
|
105 | 109 | |
@@ -181,6 +185,34 b' ipcontroller-client.json' | |||
|
181 | 185 | but since the controller may listen on different ports for clients and |
|
182 | 186 | engines, it is stored separately. |
|
183 | 187 | |
|
188 | ipcontroller-client.json will look something like this, under default localhost | |
|
189 | circumstances: | |
|
190 | ||
|
191 | .. sourcecode:: python | |
|
192 | ||
|
193 | { | |
|
194 | "url":"tcp:\/\/127.0.0.1:54424", | |
|
195 | "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130", | |
|
196 | "ssh":"", | |
|
197 | "location":"10.19.1.135" | |
|
198 | } | |
|
199 | ||
|
200 | If, however, you are running the controller on a work node on a cluster, you will likely | |
|
201 | need to use ssh tunnels to connect clients from your laptop to it. You will also | |
|
202 | probably need to instruct the controller to listen for engines coming from other work nodes | |
|
203 | on the cluster. An example of ipcontroller-client.json, as created by:: | |
|
204 | ||
|
205 | $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com | |
|
206 | ||
|
207 | ||
|
208 | .. sourcecode:: python | |
|
209 | ||
|
210 | { | |
|
211 | "url":"tcp:\/\/*:54424", | |
|
212 | "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130", | |
|
213 | "ssh":"login.mycluster.com", | |
|
214 | "location":"10.0.0.2" | |
|
215 | } | |
|
184 | 216 | More details of how these JSON files are used are given below. |
|
185 | 217 | |
|
186 | 218 | A detailed description of the security model and its implementation in IPython |
@@ -248,7 +280,7 b' then you would connect to it with:' | |||
|
248 | 280 | |
|
249 | 281 | .. sourcecode:: ipython |
|
250 | 282 | |
|
251 | In [2]: c = Client(sshserver='myhub.example.com') | |
|
283 | In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com') | |
|
252 | 284 | |
|
253 | 285 | Where 'myhub.example.com' is the url or IP address of the machine on |
|
254 | 286 | which the Hub process is running (or another machine that has direct access to the Hub's ports). |
@@ -24,8 +24,8 b' the :command:`ipcluster` command::' | |||
|
24 | 24 | For more detailed information about starting the controller and engines, see |
|
25 | 25 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. |
|
26 | 26 | |
|
27 |
Creating a `` |
|
|
28 | ============================== | |
|
27 | Creating a ``DirectView`` instance | |
|
28 | ================================== | |
|
29 | 29 | |
|
30 | 30 | The first step is to import the IPython :mod:`IPython.parallel` |
|
31 | 31 | module and then create a :class:`.Client` instance: |
@@ -117,10 +117,10 b' two decorators:' | |||
|
117 | 117 | .. sourcecode:: ipython |
|
118 | 118 | |
|
119 | 119 | In [10]: @dview.remote(block=True) |
|
120 |
|
|
|
121 |
|
|
|
122 |
|
|
|
123 |
|
|
|
120 | ....: def getpid(): | |
|
121 | ....: import os | |
|
122 | ....: return os.getpid() | |
|
123 | ....: | |
|
124 | 124 | |
|
125 | 125 | In [11]: getpid() |
|
126 | 126 | Out[11]: [12345, 12346, 12347, 12348] |
@@ -135,8 +135,8 b' operations and distribute them, reconstructing the result.' | |||
|
135 | 135 | In [13]: A = np.random.random((64,48)) |
|
136 | 136 | |
|
137 | 137 | In [14]: @dview.parallel(block=True) |
|
138 |
|
|
|
139 |
|
|
|
138 | ....: def pmul(A,B): | |
|
139 | ....: return A*B | |
|
140 | 140 | |
|
141 | 141 | In [15]: C_local = A*A |
|
142 | 142 | |
@@ -183,6 +183,8 b' dv.track : bool' | |||
|
183 | 183 | This is primarily useful for non-copying sends of numpy arrays that you plan to |
|
184 | 184 | edit in-place. You need to know when it becomes safe to edit the buffer |
|
185 | 185 | without corrupting the message. |
|
186 | dv.targets : int, list of ints | |
|
187 | which targets this view is associated with. | |
|
186 | 188 | |
|
187 | 189 | |
|
188 | 190 | Creating a view is simple: index-access on a client creates a :class:`.DirectView`. |
@@ -260,10 +262,10 b' local Python/IPython session:' | |||
|
260 | 262 | |
|
261 | 263 | # define our function |
|
262 | 264 | In [6]: def wait(t): |
|
263 |
|
|
|
264 |
|
|
|
265 |
|
|
|
266 |
|
|
|
265 | ....: import time | |
|
266 | ....: tic = time.time() | |
|
267 | ....: time.sleep(t) | |
|
268 | ....: return time.time()-tic | |
|
267 | 269 | |
|
268 | 270 | # In non-blocking mode |
|
269 | 271 | In [7]: ar = dview.apply_async(wait, 2) |
@@ -326,7 +328,7 b' and blocks until all of the associated results are ready:' | |||
|
326 | 328 | The ``block`` and ``targets`` keyword arguments and attributes |
|
327 | 329 | -------------------------------------------------------------- |
|
328 | 330 | |
|
329 |
Most DirectView methods (excluding :meth:`apply` |
|
|
331 | Most DirectView methods (excluding :meth:`apply`) accept ``block`` and | |
|
330 | 332 | ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the |
|
331 | 333 | blocking mode and which engines the command is applied to. The :class:`View` class also has |
|
332 | 334 | :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword |
@@ -362,11 +364,6 b' The :attr:`block` and :attr:`targets` instance attributes of the' | |||
|
362 | 364 | Parallel magic commands |
|
363 | 365 | ----------------------- |
|
364 | 366 | |
|
365 | .. warning:: | |
|
366 | ||
|
367 | The magics have not been changed to work with the zeromq system. The | |
|
368 | magics do work, but *do not* print stdin/out like they used to in IPython.kernel. | |
|
369 | ||
|
370 | 367 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) |
|
371 | 368 | that make it more pleasant to execute Python commands on the engines |
|
372 | 369 | interactively. These are simply shortcuts to :meth:`execute` and |
@@ -376,9 +373,6 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||
|
376 | 373 | |
|
377 | 374 | .. sourcecode:: ipython |
|
378 | 375 | |
|
379 | # load the parallel magic extension: | |
|
380 | In [21]: %load_ext parallelmagic | |
|
381 | ||
|
382 | 376 | # Create a DirectView for all targets |
|
383 | 377 | In [22]: dv = rc[:] |
|
384 | 378 | |
@@ -387,10 +381,10 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||
|
387 | 381 | |
|
388 | 382 | In [24]: dv.block=True |
|
389 | 383 | |
|
390 | In [25]: import numpy | |
|
391 | ||
|
392 |
|
|
|
393 | Parallel execution on engines: [0, 1, 2, 3] | |
|
384 | # import numpy here and everywhere | |
|
385 | In [25]: with dv.sync_imports(): | |
|
386 | ....: import numpy | |
|
387 | importing numpy on engine(s) | |
|
394 | 388 | |
|
395 | 389 | In [27]: %px a = numpy.random.rand(2,2) |
|
396 | 390 | Parallel execution on engines: [0, 1, 2, 3] |
@@ -400,10 +394,10 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||
|
400 | 394 | |
|
401 | 395 | In [28]: dv['ev'] |
|
402 | 396 | Out[28]: [ array([ 1.09522024, -0.09645227]), |
|
403 |
|
|
|
404 |
|
|
|
405 |
|
|
|
406 |
|
|
|
397 | ....: array([ 1.21435496, -0.35546712]), | |
|
398 | ....: array([ 0.72180653, 0.07133042]), | |
|
399 | ....: array([ 1.46384341, 1.04353244e-04]) | |
|
400 | ....: ] | |
|
407 | 401 | |
|
408 | 402 | The ``%result`` magic gets the most recent result, or takes an argument |
|
409 | 403 | specifying the index of the result to be requested. It is simply a shortcut to the |
@@ -415,9 +409,9 b' specifying the index of the result to be requested. It is simply a shortcut to t' | |||
|
415 | 409 | |
|
416 | 410 | In [30]: %result |
|
417 | 411 | Out[30]: [ [ 1.28167017 0.14197338], |
|
418 |
|
|
|
419 |
|
|
|
420 |
|
|
|
412 | ....: [-0.14093616 1.27877273], | |
|
413 | ....: [-0.37023573 1.06779409], | |
|
414 | ....: [ 0.83664764 -0.25602658] ] | |
|
421 | 415 | |
|
422 | 416 | The ``%autopx`` magic switches to a mode where everything you type is executed |
|
423 | 417 | on the engines given by the :attr:`targets` attribute: |
@@ -452,9 +446,9 b' on the engines given by the :attr:`targets` attribute:' | |||
|
452 | 446 | |
|
453 | 447 | In [37]: dv['ans'] |
|
454 | 448 | Out[37]: [ 'Average max eigenvalue is: 10.1387247332', |
|
455 |
|
|
|
456 |
|
|
|
457 |
|
|
|
449 | ....: 'Average max eigenvalue is: 10.2076902286', | |
|
450 | ....: 'Average max eigenvalue is: 10.1891484655', | |
|
451 | ....: 'Average max eigenvalue is: 10.1158837784',] | |
|
458 | 452 | |
|
459 | 453 | |
|
460 | 454 | Moving Python objects around |
@@ -522,7 +516,7 b' follow that terminology. However, it is important to remember that in' | |||
|
522 | 516 | IPython's :class:`Client` class, :meth:`scatter` is from the |
|
523 | 517 | interactive IPython session to the engines and :meth:`gather` is from the |
|
524 | 518 | engines back to the interactive IPython session. For scatter/gather operations |
|
525 |
between engines, MPI should be used |
|
|
519 | between engines, MPI, pyzmq, or some other direct interconnect should be used. | |
|
526 | 520 | |
|
527 | 521 | .. sourcecode:: ipython |
|
528 | 522 | |
@@ -568,7 +562,7 b" created by a DirectView's :meth:`sync_imports` method:" | |||
|
568 | 562 | .. sourcecode:: ipython |
|
569 | 563 | |
|
570 | 564 | In [69]: with dview.sync_imports(): |
|
571 |
|
|
|
565 | ....: import numpy | |
|
572 | 566 | importing numpy on engine(s) |
|
573 | 567 | |
|
574 | 568 | Any imports made inside the block will also be performed on the view's engines. |
@@ -588,15 +582,15 b' execution, and will fail with an UnmetDependencyError.' | |||
|
588 | 582 | In [69]: from IPython.parallel import require |
|
589 | 583 | |
|
590 | 584 | In [70]: @require('re'): |
|
591 |
|
|
|
592 |
|
|
|
593 |
|
|
|
585 | ....: def findall(pat, x): | |
|
586 | ....: # re is guaranteed to be available | |
|
587 | ....: return re.findall(pat, x) | |
|
594 | 588 | |
|
595 | 589 | # you can also pass modules themselves, that you already have locally: |
|
596 | 590 | In [71]: @require(time): |
|
597 |
|
|
|
598 |
|
|
|
599 |
|
|
|
591 | ....: def wait(t): | |
|
592 | ....: time.sleep(t) | |
|
593 | ....: return t | |
|
600 | 594 | |
|
601 | 595 | .. _parallel_exceptions: |
|
602 | 596 |
@@ -29,8 +29,8 b' the :command:`ipcluster` command::' | |||
|
29 | 29 | For more detailed information about starting the controller and engines, see |
|
30 | 30 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. |
|
31 | 31 | |
|
32 |
Creating a `` |
|
|
33 | ============================== | |
|
32 | Creating a ``LoadBalancedView`` instance | |
|
33 | ======================================== | |
|
34 | 34 | |
|
35 | 35 | The first step is to import the IPython :mod:`IPython.parallel` |
|
36 | 36 | module and then create a :class:`.Client` instance, and we will also be using |
@@ -113,12 +113,24 b' that turns any Python function into a parallel function:' | |||
|
113 | 113 | |
|
114 | 114 | .. _parallel_taskmap: |
|
115 | 115 | |
|
116 | The AsyncMapResult | |
|
117 | ================== | |
|
116 | Map results are iterable! | |
|
117 | ------------------------- | |
|
118 | 118 | |
|
119 | When you call ``lview.map_async(f, sequence)``, or just :meth:`map` with `block=True`, then | |
|
120 | what you get in return will be an :class:`~AsyncMapResult` object. These are similar to | |
|
121 | AsyncResult objects, but with one key difference | |
|
119 | When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult` | |
|
120 | object), you can actually iterate through them, and act on the results as they arrive: | |
|
121 | ||
|
122 | .. literalinclude:: ../../examples/parallel/itermapresult.py | |
|
123 | :language: python | |
|
124 | :lines: 9-34 | |
|
125 | ||
|
126 | .. seealso:: | |
|
127 | ||
|
128 | When AsyncResult or the AsyncMapResult don't provide what you need (for instance, | |
|
129 | handling individual results as they arrive, but with metadata), you can always | |
|
130 | just split the original result's ``msg_ids`` attribute, and handle them as you like. | |
|
131 | ||
|
132 | For an example of this, see :file:`docs/examples/parallel/customresult.py` | |
|
133 | ||
|
122 | 134 | |
|
123 | 135 | .. _parallel_dependencies: |
|
124 | 136 | |
@@ -166,8 +178,8 b' you specify are importable:' | |||
|
166 | 178 | .. sourcecode:: ipython |
|
167 | 179 | |
|
168 | 180 | In [10]: @require('numpy', 'zmq') |
|
169 |
|
|
|
170 |
|
|
|
181 | ....: def myfunc(): | |
|
182 | ....: return dostuff() | |
|
171 | 183 | |
|
172 | 184 | Now, any time you apply :func:`myfunc`, the task will only run on a machine that has |
|
173 | 185 | numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported. |
@@ -184,16 +196,16 b' will be assigned to another engine. If the dependency returns *anything other th' | |||
|
184 | 196 | .. sourcecode:: ipython |
|
185 | 197 | |
|
186 | 198 | In [10]: def platform_specific(plat): |
|
187 |
|
|
|
188 |
|
|
|
199 | ....: import sys | |
|
200 | ....: return sys.platform == plat | |
|
189 | 201 | |
|
190 | 202 | In [11]: @depend(platform_specific, 'darwin') |
|
191 |
|
|
|
192 |
|
|
|
203 | ....: def mactask(): | |
|
204 | ....: do_mac_stuff() | |
|
193 | 205 | |
|
194 | 206 | In [12]: @depend(platform_specific, 'nt') |
|
195 |
|
|
|
196 |
|
|
|
207 | ....: def wintask(): | |
|
208 | ....: do_windows_stuff() | |
|
197 | 209 | |
|
198 | 210 | In this case, any time you apply ``mytask``, it will only run on an OSX machine. |
|
199 | 211 | ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)`` |
@@ -209,7 +221,7 b' the :class:`dependent` object that the decorators use:' | |||
|
209 | 221 | .. sourcecode::ipython |
|
210 | 222 | |
|
211 | 223 | In [13]: def mytask(*args): |
|
212 |
|
|
|
224 | ....: dostuff() | |
|
213 | 225 | |
|
214 | 226 | In [14]: mactask = dependent(mytask, platform_specific, 'darwin') |
|
215 | 227 | # this is the same as decorating the declaration of mytask with @depend |
@@ -222,8 +234,8 b' the :class:`dependent` object that the decorators use:' | |||
|
222 | 234 | |
|
223 | 235 | # is equivalent to: |
|
224 | 236 | In [17]: @depend(g, *dargs, **dkwargs) |
|
225 |
|
|
|
226 |
|
|
|
237 | ....: def t(a,b,c): | |
|
238 | ....: # contents of f | |
|
227 | 239 | |
|
228 | 240 | Graph Dependencies |
|
229 | 241 | ------------------ |
@@ -287,10 +299,11 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||
|
287 | 299 | |
|
288 | 300 | In [16]: ar2 = lview.apply(f2) |
|
289 | 301 | |
|
290 |
In [17]: |
|
|
291 | ||
|
292 | In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5) | |
|
302 | In [17]: with lview.temp_flags(after=[ar,ar2]): | |
|
303 | ....: ar3 = lview.apply(f3) | |
|
293 | 304 | |
|
305 | In [18]: with lview.temp_flags(follow=[ar], timeout=2.5) | |
|
306 | ....: ar4 = lview.apply(f3) | |
|
294 | 307 | |
|
295 | 308 | .. seealso:: |
|
296 | 309 | |
@@ -440,7 +453,7 b' The following is an overview of how to use these classes together:' | |||
|
440 | 453 | 2. Define some functions to be run as tasks |
|
441 | 454 | 3. Submit your tasks to using the :meth:`apply` method of your |
|
442 | 455 | :class:`LoadBalancedView` instance. |
|
443 | 4. Use :meth:`Client.get_result` to get the results of the | |
|
456 | 4. Use :meth:`.Client.get_result` to get the results of the | |
|
444 | 457 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
|
445 | 458 | for and then receive the results. |
|
446 | 459 |
General Comments 0
You need to be logged in to leave comments.
Login now