##// END OF EJS Templates
update parallel docs with some changes from scipy tutorial...
MinRK -
Show More
@@ -0,0 +1,61 b''
1 """An example for handling results in a way that AsyncMapResult doesn't provide
2
3 Specifically, out-of-order results with some special handing of metadata.
4
5 This just submits a bunch of jobs, waits on the results, and prints the stdout
6 and results of each as they finish.
7
8 Authors
9 -------
10 * MinRK
11 """
12 import time
13 import random
14
15 from IPython import parallel
16
17 # create client & views
18 rc = parallel.Client()
19 dv = rc[:]
20 v = rc.load_balanced_view()
21
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
26
27
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
31 print "hi from engine %i" % id
32 sys.stdout.flush()
33 time.sleep(t)
34 return count,t
35
36 amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
37
38 pending = set(amr.msg_ids)
39 while pending:
40 try:
41 rc.wait(pending, 1e-3)
42 except parallel.TimeoutError:
43 # ignore timeouterrors, since they only mean that at least one isn't done
44 pass
45 # finished is the set of msg_ids that are complete
46 finished = pending.difference(rc.outstanding)
47 # update pending to exclude those that just finished
48 pending = pending.difference(finished)
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
53 print "with stdout:"
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
55 print "and results:"
56
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
61
@@ -0,0 +1,83 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
11
12 Authors
13 -------
14 * MinRK
15 """
16
17 import os
18 import sys
19 import json
20 import zmq
21
22 from IPython.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
26
27 def main(connection_file):
28 """watch iopub channel, and print messages"""
29
30 ctx = zmq.Context.instance()
31
32 with open(connection_file) as f:
33 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
46 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
57 while True:
58 try:
59 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
61 return
62 # ident always length 1 here
63 topic = idents[0]
64 if msg['msg_type'] == 'stream':
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
71 c = msg['content']
72 print topic + ':'
73 for line in c['traceback']:
74 # indent lines
75 print ' ' + line
76
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
79 cf = sys.argv[1]
80 else:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
@@ -0,0 +1,52 b''
1 """Example of iteration through AsyncMapResult, without waiting for all results
2
3 Authors
4 -------
5 * MinRK
6 """
7 import time
8
9 from IPython import parallel
10
11 # create client & view
12 rc = parallel.Client()
13 dv = rc[:]
14 v = rc.load_balanced_view()
15
16 # scatter 'id', so id=0,1,2 on engines 0,1,2
17 dv.scatter('id', rc.ids, flatten=True)
18 print "Engine IDs: ", dv['id']
19
20 # create a Reference to `id`. This will be a different value on each engine
21 ref = parallel.Reference('id')
22 print "sleeping for `id` seconds on each engine"
23 tic = time.time()
24 ar = dv.apply(time.sleep, ref)
25 for i,r in enumerate(ar):
26 print "%i: %.3f"%(i, time.time()-tic)
27
28 def sleep_here(t):
29 import time
30 time.sleep(t)
31 return id,t
32
33 # one call per task
34 print "running with one call per task"
35 amr = v.map(sleep_here, [.01*t for t in range(100)])
36 tic = time.time()
37 for i,r in enumerate(amr):
38 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
39
40 print "running with four calls per task"
41 # with chunksize, we can have four calls per task
42 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
43 tic = time.time()
44 for i,r in enumerate(amr):
45 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
46
47 print "running with two calls per task, with unordered results"
48 # We can even iterate through faster results first, with ordered=False
49 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
50 tic = time.time()
51 for i,r in enumerate(amr):
52 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -30,7 +30,8 b' A Sample DAG'
30
30
31 Here, we have a very simple 5-node DAG:
31 Here, we have a very simple 5-node DAG:
32
32
33 .. figure:: figs/ simpledag.*
33 .. figure:: figs/simpledag.*
34 :width: 600px
34
35
35 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
@@ -117,11 +118,13 b' on which it depends:'
117 In [6]: results = {}
118 In [6]: results = {}
118
119
119 In [7]: for node in G.topological_sort():
120 In [7]: for node in G.topological_sort():
120 ...: # get list of AsyncResult objects from nodes
121 ...: # get list of AsyncResult objects from nodes
121 ...: # leading into this one as dependencies
122 ...: # leading into this one as dependencies
122 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: # submit and store AsyncResult object
124 ...: # submit and store AsyncResult object
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
125
128
126 Now that we have submitted all the jobs, we can wait for the results:
129 Now that we have submitted all the jobs, we can wait for the results:
127
130
@@ -155,16 +158,17 b' will be at the top, and quick, small tasks will be at the bottom.'
155 In [12]: pos = {}; colors = {}
158 In [12]: pos = {}; colors = {}
156
159
157 In [12]: for node in G:
160 In [12]: for node in G:
158 ...: md = results[node].metadata
161 ....: md = results[node].metadata
159 ...: start = date2num(md.started)
162 ....: start = date2num(md.started)
160 ...: runtime = date2num(md.completed) - start
163 ....: runtime = date2num(md.completed) - start
161 ...: pos[node] = (start, runtime)
164 ....: pos[node] = (start, runtime)
162 ...: colors[node] = md.engine_id
165 ....: colors[node] = md.engine_id
163
166
164 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165 ...: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
166
169
167 .. figure:: figs/ dagdeps.*
170 .. figure:: figs/dagdeps.*
171 :width: 600px
168
172
169 Time started on x, runtime on y, and color-coded by engine-id (in this case there
173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 were four engines). Edges denote dependencies.
174 were four engines). Edges denote dependencies.
@@ -60,6 +60,10 b' the ``I`` in IPython. The following are some example usage cases for IPython:'
60 Architecture overview
60 Architecture overview
61 =====================
61 =====================
62
62
63 .. figure:: figs/wideView.png
64 :width: 300px
65
66
63 The IPython architecture consists of four components:
67 The IPython architecture consists of four components:
64
68
65 * The IPython engine.
69 * The IPython engine.
@@ -99,7 +103,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote'
99 The controller also provides a single point of contact for users who wish to
103 The controller also provides a single point of contact for users who wish to
100 utilize the engines connected to the controller. There are different ways of
104 utilize the engines connected to the controller. There are different ways of
101 working with a controller. In IPython, all of these models are implemented via
105 working with a controller. In IPython, all of these models are implemented via
102 the client's :meth:`.View.apply` method, with various arguments, or
106 the :meth:`.View.apply` method, after
103 constructing :class:`.View` objects to represent subsets of engines. The two
107 constructing :class:`.View` objects to represent subsets of engines. The two
104 primary models for interacting with engines are:
108 primary models for interacting with engines are:
105
109
@@ -181,6 +185,34 b' ipcontroller-client.json'
181 but since the controller may listen on different ports for clients and
185 but since the controller may listen on different ports for clients and
182 engines, it is stored separately.
186 engines, it is stored separately.
183
187
188 ipcontroller-client.json will look something like this, under default localhost
189 circumstances:
190
191 .. sourcecode:: python
192
193 {
194 "url":"tcp:\/\/127.0.0.1:54424",
195 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
196 "ssh":"",
197 "location":"10.19.1.135"
198 }
199
200 If, however, you are running the controller on a work node on a cluster, you will likely
201 need to use ssh tunnels to connect clients from your laptop to it. You will also
202 probably need to instruct the controller to listen for engines coming from other work nodes
203 on the cluster. An example of ipcontroller-client.json, as created by::
204
205 $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com
206
207
208 .. sourcecode:: python
209
210 {
211 "url":"tcp:\/\/*:54424",
212 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
213 "ssh":"login.mycluster.com",
214 "location":"10.0.0.2"
215 }
184 More details of how these JSON files are used are given below.
216 More details of how these JSON files are used are given below.
185
217
186 A detailed description of the security model and its implementation in IPython
218 A detailed description of the security model and its implementation in IPython
@@ -248,7 +280,7 b' then you would connect to it with:'
248
280
249 .. sourcecode:: ipython
281 .. sourcecode:: ipython
250
282
251 In [2]: c = Client(sshserver='myhub.example.com')
283 In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com')
252
284
253 Where 'myhub.example.com' is the url or IP address of the machine on
285 Where 'myhub.example.com' is the url or IP address of the machine on
254 which the Hub process is running (or another machine that has direct access to the Hub's ports).
286 which the Hub process is running (or another machine that has direct access to the Hub's ports).
@@ -24,8 +24,8 b' the :command:`ipcluster` command::'
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26
26
27 Creating a ``Client`` instance
27 Creating a ``DirectView`` instance
28 ==============================
28 ==================================
29
29
30 The first step is to import the IPython :mod:`IPython.parallel`
30 The first step is to import the IPython :mod:`IPython.parallel`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
@@ -117,10 +117,10 b' two decorators:'
117 .. sourcecode:: ipython
117 .. sourcecode:: ipython
118
118
119 In [10]: @dview.remote(block=True)
119 In [10]: @dview.remote(block=True)
120 ...: def getpid():
120 ....: def getpid():
121 ...: import os
121 ....: import os
122 ...: return os.getpid()
122 ....: return os.getpid()
123 ...:
123 ....:
124
124
125 In [11]: getpid()
125 In [11]: getpid()
126 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
@@ -135,8 +135,8 b' operations and distribute them, reconstructing the result.'
135 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
136
136
137 In [14]: @dview.parallel(block=True)
137 In [14]: @dview.parallel(block=True)
138 ...: def pmul(A,B):
138 ....: def pmul(A,B):
139 ...: return A*B
139 ....: return A*B
140
140
141 In [15]: C_local = A*A
141 In [15]: C_local = A*A
142
142
@@ -183,6 +183,8 b' dv.track : bool'
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 edit in-place. You need to know when it becomes safe to edit the buffer
184 edit in-place. You need to know when it becomes safe to edit the buffer
185 without corrupting the message.
185 without corrupting the message.
186 dv.targets : int, list of ints
187 which targets this view is associated with.
186
188
187
189
188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
190 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
@@ -260,10 +262,10 b' local Python/IPython session:'
260
262
261 # define our function
263 # define our function
262 In [6]: def wait(t):
264 In [6]: def wait(t):
263 ...: import time
265 ....: import time
264 ...: tic = time.time()
266 ....: tic = time.time()
265 ...: time.sleep(t)
267 ....: time.sleep(t)
266 ...: return time.time()-tic
268 ....: return time.time()-tic
267
269
268 # In non-blocking mode
270 # In non-blocking mode
269 In [7]: ar = dview.apply_async(wait, 2)
271 In [7]: ar = dview.apply_async(wait, 2)
@@ -326,7 +328,7 b' and blocks until all of the associated results are ready:'
326 The ``block`` and ``targets`` keyword arguments and attributes
328 The ``block`` and ``targets`` keyword arguments and attributes
327 --------------------------------------------------------------
329 --------------------------------------------------------------
328
330
329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
331 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
330 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
332 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 blocking mode and which engines the command is applied to. The :class:`View` class also has
333 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
334 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
@@ -362,11 +364,6 b' The :attr:`block` and :attr:`targets` instance attributes of the'
362 Parallel magic commands
364 Parallel magic commands
363 -----------------------
365 -----------------------
364
366
365 .. warning::
366
367 The magics have not been changed to work with the zeromq system. The
368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369
370 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 that make it more pleasant to execute Python commands on the engines
368 that make it more pleasant to execute Python commands on the engines
372 interactively. These are simply shortcuts to :meth:`execute` and
369 interactively. These are simply shortcuts to :meth:`execute` and
@@ -376,9 +373,6 b' Python command on the engines specified by the :attr:`targets` attribute of the'
376
373
377 .. sourcecode:: ipython
374 .. sourcecode:: ipython
378
375
379 # load the parallel magic extension:
380 In [21]: %load_ext parallelmagic
381
382 # Create a DirectView for all targets
376 # Create a DirectView for all targets
383 In [22]: dv = rc[:]
377 In [22]: dv = rc[:]
384
378
@@ -387,10 +381,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
387
381
388 In [24]: dv.block=True
382 In [24]: dv.block=True
389
383
390 In [25]: import numpy
384 # import numpy here and everywhere
391
385 In [25]: with dv.sync_imports():
392 In [26]: %px import numpy
386 ....: import numpy
393 Parallel execution on engines: [0, 1, 2, 3]
387 importing numpy on engine(s)
394
388
395 In [27]: %px a = numpy.random.rand(2,2)
389 In [27]: %px a = numpy.random.rand(2,2)
396 Parallel execution on engines: [0, 1, 2, 3]
390 Parallel execution on engines: [0, 1, 2, 3]
@@ -400,10 +394,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
400
394
401 In [28]: dv['ev']
395 In [28]: dv['ev']
402 Out[28]: [ array([ 1.09522024, -0.09645227]),
396 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 array([ 1.21435496, -0.35546712]),
397 ....: array([ 1.21435496, -0.35546712]),
404 array([ 0.72180653, 0.07133042]),
398 ....: array([ 0.72180653, 0.07133042]),
405 array([ 1.46384341e+00, 1.04353244e-04])
399 ....: array([ 1.46384341, 1.04353244e-04])
406 ]
400 ....: ]
407
401
408 The ``%result`` magic gets the most recent result, or takes an argument
402 The ``%result`` magic gets the most recent result, or takes an argument
409 specifying the index of the result to be requested. It is simply a shortcut to the
403 specifying the index of the result to be requested. It is simply a shortcut to the
@@ -415,9 +409,9 b' specifying the index of the result to be requested. It is simply a shortcut to t'
415
409
416 In [30]: %result
410 In [30]: %result
417 Out[30]: [ [ 1.28167017 0.14197338],
411 Out[30]: [ [ 1.28167017 0.14197338],
418 [-0.14093616 1.27877273],
412 ....: [-0.14093616 1.27877273],
419 [-0.37023573 1.06779409],
413 ....: [-0.37023573 1.06779409],
420 [ 0.83664764 -0.25602658] ]
414 ....: [ 0.83664764 -0.25602658] ]
421
415
422 The ``%autopx`` magic switches to a mode where everything you type is executed
416 The ``%autopx`` magic switches to a mode where everything you type is executed
423 on the engines given by the :attr:`targets` attribute:
417 on the engines given by the :attr:`targets` attribute:
@@ -452,9 +446,9 b' on the engines given by the :attr:`targets` attribute:'
452
446
453 In [37]: dv['ans']
447 In [37]: dv['ans']
454 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
448 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 'Average max eigenvalue is: 10.2076902286',
449 ....: 'Average max eigenvalue is: 10.2076902286',
456 'Average max eigenvalue is: 10.1891484655',
450 ....: 'Average max eigenvalue is: 10.1891484655',
457 'Average max eigenvalue is: 10.1158837784',]
451 ....: 'Average max eigenvalue is: 10.1158837784',]
458
452
459
453
460 Moving Python objects around
454 Moving Python objects around
@@ -522,7 +516,7 b' follow that terminology. However, it is important to remember that in'
522 IPython's :class:`Client` class, :meth:`scatter` is from the
516 IPython's :class:`Client` class, :meth:`scatter` is from the
523 interactive IPython session to the engines and :meth:`gather` is from the
517 interactive IPython session to the engines and :meth:`gather` is from the
524 engines back to the interactive IPython session. For scatter/gather operations
518 engines back to the interactive IPython session. For scatter/gather operations
525 between engines, MPI should be used:
519 between engines, MPI, pyzmq, or some other direct interconnect should be used.
526
520
527 .. sourcecode:: ipython
521 .. sourcecode:: ipython
528
522
@@ -568,7 +562,7 b" created by a DirectView's :meth:`sync_imports` method:"
568 .. sourcecode:: ipython
562 .. sourcecode:: ipython
569
563
570 In [69]: with dview.sync_imports():
564 In [69]: with dview.sync_imports():
571 ...: import numpy
565 ....: import numpy
572 importing numpy on engine(s)
566 importing numpy on engine(s)
573
567
574 Any imports made inside the block will also be performed on the view's engines.
568 Any imports made inside the block will also be performed on the view's engines.
@@ -588,15 +582,15 b' execution, and will fail with an UnmetDependencyError.'
588 In [69]: from IPython.parallel import require
582 In [69]: from IPython.parallel import require
589
583
590 In [70]: @require('re'):
584 In [70]: @require('re'):
591 ...: def findall(pat, x):
585 ....: def findall(pat, x):
592 ...: # re is guaranteed to be available
586 ....: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
587 ....: return re.findall(pat, x)
594
588
595 # you can also pass modules themselves, that you already have locally:
589 # you can also pass modules themselves, that you already have locally:
596 In [71]: @require(time):
590 In [71]: @require(time):
597 ...: def wait(t):
591 ....: def wait(t):
598 ...: time.sleep(t)
592 ....: time.sleep(t)
599 ...: return t
593 ....: return t
600
594
601 .. _parallel_exceptions:
595 .. _parallel_exceptions:
602
596
@@ -29,8 +29,8 b' the :command:`ipcluster` command::'
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``LoadBalancedView`` instance
33 ==============================
33 ========================================
34
34
35 The first step is to import the IPython :mod:`IPython.parallel`
35 The first step is to import the IPython :mod:`IPython.parallel`
36 module and then create a :class:`.Client` instance, and we will also be using
36 module and then create a :class:`.Client` instance, and we will also be using
@@ -113,12 +113,24 b' that turns any Python function into a parallel function:'
113
113
114 .. _parallel_taskmap:
114 .. _parallel_taskmap:
115
115
116 The AsyncMapResult
116 Map results are iterable!
117 ==================
117 -------------------------
118
118
119 When you call ``lview.map_async(f, sequence)``, or just :meth:`map` with `block=True`, then
119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 what you get in return will be an :class:`~AsyncMapResult` object. These are similar to
120 object), you can actually iterate through them, and act on the results as they arrive:
121 AsyncResult objects, but with one key difference
121
122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 :language: python
124 :lines: 9-34
125
126 .. seealso::
127
128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 handling individual results as they arrive, but with metadata), you can always
130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131
132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
133
122
134
123 .. _parallel_dependencies:
135 .. _parallel_dependencies:
124
136
@@ -166,8 +178,8 b' you specify are importable:'
166 .. sourcecode:: ipython
178 .. sourcecode:: ipython
167
179
168 In [10]: @require('numpy', 'zmq')
180 In [10]: @require('numpy', 'zmq')
169 ...: def myfunc():
181 ....: def myfunc():
170 ...: return dostuff()
182 ....: return dostuff()
171
183
172 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
173 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
@@ -184,16 +196,16 b' will be assigned to another engine. If the dependency returns *anything other th'
184 .. sourcecode:: ipython
196 .. sourcecode:: ipython
185
197
186 In [10]: def platform_specific(plat):
198 In [10]: def platform_specific(plat):
187 ...: import sys
199 ....: import sys
188 ...: return sys.platform == plat
200 ....: return sys.platform == plat
189
201
190 In [11]: @depend(platform_specific, 'darwin')
202 In [11]: @depend(platform_specific, 'darwin')
191 ...: def mactask():
203 ....: def mactask():
192 ...: do_mac_stuff()
204 ....: do_mac_stuff()
193
205
194 In [12]: @depend(platform_specific, 'nt')
206 In [12]: @depend(platform_specific, 'nt')
195 ...: def wintask():
207 ....: def wintask():
196 ...: do_windows_stuff()
208 ....: do_windows_stuff()
197
209
198 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
199 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
@@ -209,7 +221,7 b' the :class:`dependent` object that the decorators use:'
209 .. sourcecode::ipython
221 .. sourcecode::ipython
210
222
211 In [13]: def mytask(*args):
223 In [13]: def mytask(*args):
212 ...: dostuff()
224 ....: dostuff()
213
225
214 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
215 # this is the same as decorating the declaration of mytask with @depend
227 # this is the same as decorating the declaration of mytask with @depend
@@ -222,8 +234,8 b' the :class:`dependent` object that the decorators use:'
222
234
223 # is equivalent to:
235 # is equivalent to:
224 In [17]: @depend(g, *dargs, **dkwargs)
236 In [17]: @depend(g, *dargs, **dkwargs)
225 ...: def t(a,b,c):
237 ....: def t(a,b,c):
226 ...: # contents of f
238 ....: # contents of f
227
239
228 Graph Dependencies
240 Graph Dependencies
229 ------------------
241 ------------------
@@ -287,10 +299,11 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
287
299
288 In [16]: ar2 = lview.apply(f2)
300 In [16]: ar2 = lview.apply(f2)
289
301
290 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
302 In [17]: with lview.temp_flags(after=[ar,ar2]):
291
303 ....: ar3 = lview.apply(f3)
292 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
293
304
305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 ....: ar4 = lview.apply(f3)
294
307
295 .. seealso::
308 .. seealso::
296
309
@@ -440,7 +453,7 b' The following is an overview of how to use these classes together:'
440 2. Define some functions to be run as tasks
453 2. Define some functions to be run as tasks
441 3. Submit your tasks to using the :meth:`apply` method of your
454 3. Submit your tasks to using the :meth:`apply` method of your
442 :class:`LoadBalancedView` instance.
455 :class:`LoadBalancedView` instance.
443 4. Use :meth:`Client.get_result` to get the results of the
456 4. Use :meth:`.Client.get_result` to get the results of the
444 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
457 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
445 for and then receive the results.
458 for and then receive the results.
446
459
General Comments 0
You need to be logged in to leave comments. Login now