##// END OF EJS Templates
update parallel docs with some changes from scipy tutorial...
MinRK -
Show More
@@ -0,0 +1,61 b''
1 """An example for handling results in a way that AsyncMapResult doesn't provide
2
3 Specifically, out-of-order results with some special handing of metadata.
4
5 This just submits a bunch of jobs, waits on the results, and prints the stdout
6 and results of each as they finish.
7
8 Authors
9 -------
10 * MinRK
11 """
12 import time
13 import random
14
15 from IPython import parallel
16
17 # create client & views
18 rc = parallel.Client()
19 dv = rc[:]
20 v = rc.load_balanced_view()
21
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
26
27
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
31 print "hi from engine %i" % id
32 sys.stdout.flush()
33 time.sleep(t)
34 return count,t
35
36 amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
37
38 pending = set(amr.msg_ids)
39 while pending:
40 try:
41 rc.wait(pending, 1e-3)
42 except parallel.TimeoutError:
43 # ignore timeouterrors, since they only mean that at least one isn't done
44 pass
45 # finished is the set of msg_ids that are complete
46 finished = pending.difference(rc.outstanding)
47 # update pending to exclude those that just finished
48 pending = pending.difference(finished)
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
53 print "with stdout:"
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
55 print "and results:"
56
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
61
@@ -0,0 +1,83 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
11
12 Authors
13 -------
14 * MinRK
15 """
16
17 import os
18 import sys
19 import json
20 import zmq
21
22 from IPython.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
26
27 def main(connection_file):
28 """watch iopub channel, and print messages"""
29
30 ctx = zmq.Context.instance()
31
32 with open(connection_file) as f:
33 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
46 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
57 while True:
58 try:
59 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
61 return
62 # ident always length 1 here
63 topic = idents[0]
64 if msg['msg_type'] == 'stream':
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
71 c = msg['content']
72 print topic + ':'
73 for line in c['traceback']:
74 # indent lines
75 print ' ' + line
76
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
79 cf = sys.argv[1]
80 else:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
@@ -0,0 +1,52 b''
1 """Example of iteration through AsyncMapResult, without waiting for all results
2
3 Authors
4 -------
5 * MinRK
6 """
7 import time
8
9 from IPython import parallel
10
11 # create client & view
12 rc = parallel.Client()
13 dv = rc[:]
14 v = rc.load_balanced_view()
15
16 # scatter 'id', so id=0,1,2 on engines 0,1,2
17 dv.scatter('id', rc.ids, flatten=True)
18 print "Engine IDs: ", dv['id']
19
20 # create a Reference to `id`. This will be a different value on each engine
21 ref = parallel.Reference('id')
22 print "sleeping for `id` seconds on each engine"
23 tic = time.time()
24 ar = dv.apply(time.sleep, ref)
25 for i,r in enumerate(ar):
26 print "%i: %.3f"%(i, time.time()-tic)
27
28 def sleep_here(t):
29 import time
30 time.sleep(t)
31 return id,t
32
33 # one call per task
34 print "running with one call per task"
35 amr = v.map(sleep_here, [.01*t for t in range(100)])
36 tic = time.time()
37 for i,r in enumerate(amr):
38 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
39
40 print "running with four calls per task"
41 # with chunksize, we can have four calls per task
42 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
43 tic = time.time()
44 for i,r in enumerate(amr):
45 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
46
47 print "running with two calls per task, with unordered results"
48 # We can even iterate through faster results first, with ordered=False
49 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
50 tic = time.time()
51 for i,r in enumerate(amr):
52 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
1 NO CONTENT: new file 100644, binary diff hidden
@@ -31,6 +31,7 b' A Sample DAG'
31 31 Here, we have a very simple 5-node DAG:
32 32
33 33 .. figure:: figs/ simpledag.*
34 :width: 600px
34 35
35 36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
@@ -121,7 +122,9 b' on which it depends:'
121 122 ...: # leading into this one as dependencies
122 123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 124 ...: # submit and store AsyncResult object
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
125 128
126 129 Now that we have submitted all the jobs, we can wait for the results:
127 130
@@ -155,16 +158,17 b' will be at the top, and quick, small tasks will be at the bottom.'
155 158 In [12]: pos = {}; colors = {}
156 159
157 160 In [12]: for node in G:
158 ...: md = results[node].metadata
159 ...: start = date2num(md.started)
160 ...: runtime = date2num(md.completed) - start
161 ...: pos[node] = (start, runtime)
162 ...: colors[node] = md.engine_id
161 ....: md = results[node].metadata
162 ....: start = date2num(md.started)
163 ....: runtime = date2num(md.completed) - start
164 ....: pos[node] = (start, runtime)
165 ....: colors[node] = md.engine_id
163 166
164 167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165 ...: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
166 169
167 170 .. figure:: figs/ dagdeps.*
171 :width: 600px
168 172
169 173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 174 were four engines). Edges denote dependencies.
@@ -60,6 +60,10 b' the ``I`` in IPython. The following are some example usage cases for IPython:'
60 60 Architecture overview
61 61 =====================
62 62
63 .. figure:: figs/wideView.png
64 :width: 300px
65
66
63 67 The IPython architecture consists of four components:
64 68
65 69 * The IPython engine.
@@ -99,7 +103,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote'
99 103 The controller also provides a single point of contact for users who wish to
100 104 utilize the engines connected to the controller. There are different ways of
101 105 working with a controller. In IPython, all of these models are implemented via
102 the client's :meth:`.View.apply` method, with various arguments, or
106 the :meth:`.View.apply` method, after
103 107 constructing :class:`.View` objects to represent subsets of engines. The two
104 108 primary models for interacting with engines are:
105 109
@@ -181,6 +185,34 b' ipcontroller-client.json'
181 185 but since the controller may listen on different ports for clients and
182 186 engines, it is stored separately.
183 187
188 ipcontroller-client.json will look something like this, under default localhost
189 circumstances:
190
191 .. sourcecode:: python
192
193 {
194 "url":"tcp:\/\/127.0.0.1:54424",
195 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
196 "ssh":"",
197 "location":"10.19.1.135"
198 }
199
200 If, however, you are running the controller on a work node on a cluster, you will likely
201 need to use ssh tunnels to connect clients from your laptop to it. You will also
202 probably need to instruct the controller to listen for engines coming from other work nodes
203 on the cluster. An example of ipcontroller-client.json, as created by::
204
205 $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com
206
207
208 .. sourcecode:: python
209
210 {
211 "url":"tcp:\/\/*:54424",
212 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
213 "ssh":"login.mycluster.com",
214 "location":"10.0.0.2"
215 }
184 216 More details of how these JSON files are used are given below.
185 217
186 218 A detailed description of the security model and its implementation in IPython
@@ -248,7 +280,7 b' then you would connect to it with:'
248 280
249 281 .. sourcecode:: ipython
250 282
251 In [2]: c = Client(sshserver='myhub.example.com')
283 In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com')
252 284
253 285 Where 'myhub.example.com' is the url or IP address of the machine on
254 286 which the Hub process is running (or another machine that has direct access to the Hub's ports).
@@ -24,8 +24,8 b' the :command:`ipcluster` command::'
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26 26
27 Creating a ``Client`` instance
28 ==============================
27 Creating a ``DirectView`` instance
28 ==================================
29 29
30 30 The first step is to import the IPython :mod:`IPython.parallel`
31 31 module and then create a :class:`.Client` instance:
@@ -117,10 +117,10 b' two decorators:'
117 117 .. sourcecode:: ipython
118 118
119 119 In [10]: @dview.remote(block=True)
120 ...: def getpid():
121 ...: import os
122 ...: return os.getpid()
123 ...:
120 ....: def getpid():
121 ....: import os
122 ....: return os.getpid()
123 ....:
124 124
125 125 In [11]: getpid()
126 126 Out[11]: [12345, 12346, 12347, 12348]
@@ -135,8 +135,8 b' operations and distribute them, reconstructing the result.'
135 135 In [13]: A = np.random.random((64,48))
136 136
137 137 In [14]: @dview.parallel(block=True)
138 ...: def pmul(A,B):
139 ...: return A*B
138 ....: def pmul(A,B):
139 ....: return A*B
140 140
141 141 In [15]: C_local = A*A
142 142
@@ -183,6 +183,8 b' dv.track : bool'
183 183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 184 edit in-place. You need to know when it becomes safe to edit the buffer
185 185 without corrupting the message.
186 dv.targets : int, list of ints
187 which targets this view is associated with.
186 188
187 189
188 190 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
@@ -260,10 +262,10 b' local Python/IPython session:'
260 262
261 263 # define our function
262 264 In [6]: def wait(t):
263 ...: import time
264 ...: tic = time.time()
265 ...: time.sleep(t)
266 ...: return time.time()-tic
265 ....: import time
266 ....: tic = time.time()
267 ....: time.sleep(t)
268 ....: return time.time()-tic
267 269
268 270 # In non-blocking mode
269 271 In [7]: ar = dview.apply_async(wait, 2)
@@ -326,7 +328,7 b' and blocks until all of the associated results are ready:'
326 328 The ``block`` and ``targets`` keyword arguments and attributes
327 329 --------------------------------------------------------------
328 330
329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
331 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
330 332 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 333 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 334 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
@@ -362,11 +364,6 b' The :attr:`block` and :attr:`targets` instance attributes of the'
362 364 Parallel magic commands
363 365 -----------------------
364 366
365 .. warning::
366
367 The magics have not been changed to work with the zeromq system. The
368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369
370 367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 368 that make it more pleasant to execute Python commands on the engines
372 369 interactively. These are simply shortcuts to :meth:`execute` and
@@ -376,9 +373,6 b' Python command on the engines specified by the :attr:`targets` attribute of the'
376 373
377 374 .. sourcecode:: ipython
378 375
379 # load the parallel magic extension:
380 In [21]: %load_ext parallelmagic
381
382 376 # Create a DirectView for all targets
383 377 In [22]: dv = rc[:]
384 378
@@ -387,10 +381,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
387 381
388 382 In [24]: dv.block=True
389 383
390 In [25]: import numpy
391
392 In [26]: %px import numpy
393 Parallel execution on engines: [0, 1, 2, 3]
384 # import numpy here and everywhere
385 In [25]: with dv.sync_imports():
386 ....: import numpy
387 importing numpy on engine(s)
394 388
395 389 In [27]: %px a = numpy.random.rand(2,2)
396 390 Parallel execution on engines: [0, 1, 2, 3]
@@ -400,10 +394,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
400 394
401 395 In [28]: dv['ev']
402 396 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 array([ 1.21435496, -0.35546712]),
404 array([ 0.72180653, 0.07133042]),
405 array([ 1.46384341e+00, 1.04353244e-04])
406 ]
397 ....: array([ 1.21435496, -0.35546712]),
398 ....: array([ 0.72180653, 0.07133042]),
399 ....: array([ 1.46384341, 1.04353244e-04])
400 ....: ]
407 401
408 402 The ``%result`` magic gets the most recent result, or takes an argument
409 403 specifying the index of the result to be requested. It is simply a shortcut to the
@@ -415,9 +409,9 b' specifying the index of the result to be requested. It is simply a shortcut to t'
415 409
416 410 In [30]: %result
417 411 Out[30]: [ [ 1.28167017 0.14197338],
418 [-0.14093616 1.27877273],
419 [-0.37023573 1.06779409],
420 [ 0.83664764 -0.25602658] ]
412 ....: [-0.14093616 1.27877273],
413 ....: [-0.37023573 1.06779409],
414 ....: [ 0.83664764 -0.25602658] ]
421 415
422 416 The ``%autopx`` magic switches to a mode where everything you type is executed
423 417 on the engines given by the :attr:`targets` attribute:
@@ -452,9 +446,9 b' on the engines given by the :attr:`targets` attribute:'
452 446
453 447 In [37]: dv['ans']
454 448 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 'Average max eigenvalue is: 10.2076902286',
456 'Average max eigenvalue is: 10.1891484655',
457 'Average max eigenvalue is: 10.1158837784',]
449 ....: 'Average max eigenvalue is: 10.2076902286',
450 ....: 'Average max eigenvalue is: 10.1891484655',
451 ....: 'Average max eigenvalue is: 10.1158837784',]
458 452
459 453
460 454 Moving Python objects around
@@ -522,7 +516,7 b' follow that terminology. However, it is important to remember that in'
522 516 IPython's :class:`Client` class, :meth:`scatter` is from the
523 517 interactive IPython session to the engines and :meth:`gather` is from the
524 518 engines back to the interactive IPython session. For scatter/gather operations
525 between engines, MPI should be used:
519 between engines, MPI, pyzmq, or some other direct interconnect should be used.
526 520
527 521 .. sourcecode:: ipython
528 522
@@ -568,7 +562,7 b" created by a DirectView's :meth:`sync_imports` method:"
568 562 .. sourcecode:: ipython
569 563
570 564 In [69]: with dview.sync_imports():
571 ...: import numpy
565 ....: import numpy
572 566 importing numpy on engine(s)
573 567
574 568 Any imports made inside the block will also be performed on the view's engines.
@@ -588,15 +582,15 b' execution, and will fail with an UnmetDependencyError.'
588 582 In [69]: from IPython.parallel import require
589 583
590 584 In [70]: @require('re'):
591 ...: def findall(pat, x):
592 ...: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
585 ....: def findall(pat, x):
586 ....: # re is guaranteed to be available
587 ....: return re.findall(pat, x)
594 588
595 589 # you can also pass modules themselves, that you already have locally:
596 590 In [71]: @require(time):
597 ...: def wait(t):
598 ...: time.sleep(t)
599 ...: return t
591 ....: def wait(t):
592 ....: time.sleep(t)
593 ....: return t
600 594
601 595 .. _parallel_exceptions:
602 596
@@ -29,8 +29,8 b' the :command:`ipcluster` command::'
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31 31
32 Creating a ``Client`` instance
33 ==============================
32 Creating a ``LoadBalancedView`` instance
33 ========================================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
@@ -113,12 +113,24 b' that turns any Python function into a parallel function:'
113 113
114 114 .. _parallel_taskmap:
115 115
116 The AsyncMapResult
117 ==================
116 Map results are iterable!
117 -------------------------
118
119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 object), you can actually iterate through them, and act on the results as they arrive:
121
122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 :language: python
124 :lines: 9-34
125
126 .. seealso::
127
128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 handling individual results as they arrive, but with metadata), you can always
130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131
132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
118 133
119 When you call ``lview.map_async(f, sequence)``, or just :meth:`map` with `block=True`, then
120 what you get in return will be an :class:`~AsyncMapResult` object. These are similar to
121 AsyncResult objects, but with one key difference
122 134
123 135 .. _parallel_dependencies:
124 136
@@ -166,8 +178,8 b' you specify are importable:'
166 178 .. sourcecode:: ipython
167 179
168 180 In [10]: @require('numpy', 'zmq')
169 ...: def myfunc():
170 ...: return dostuff()
181 ....: def myfunc():
182 ....: return dostuff()
171 183
172 184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
173 185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
@@ -184,16 +196,16 b' will be assigned to another engine. If the dependency returns *anything other th'
184 196 .. sourcecode:: ipython
185 197
186 198 In [10]: def platform_specific(plat):
187 ...: import sys
188 ...: return sys.platform == plat
199 ....: import sys
200 ....: return sys.platform == plat
189 201
190 202 In [11]: @depend(platform_specific, 'darwin')
191 ...: def mactask():
192 ...: do_mac_stuff()
203 ....: def mactask():
204 ....: do_mac_stuff()
193 205
194 206 In [12]: @depend(platform_specific, 'nt')
195 ...: def wintask():
196 ...: do_windows_stuff()
207 ....: def wintask():
208 ....: do_windows_stuff()
197 209
198 210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
199 211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
@@ -209,7 +221,7 b' the :class:`dependent` object that the decorators use:'
209 221 .. sourcecode::ipython
210 222
211 223 In [13]: def mytask(*args):
212 ...: dostuff()
224 ....: dostuff()
213 225
214 226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
215 227 # this is the same as decorating the declaration of mytask with @depend
@@ -222,8 +234,8 b' the :class:`dependent` object that the decorators use:'
222 234
223 235 # is equivalent to:
224 236 In [17]: @depend(g, *dargs, **dkwargs)
225 ...: def t(a,b,c):
226 ...: # contents of f
237 ....: def t(a,b,c):
238 ....: # contents of f
227 239
228 240 Graph Dependencies
229 241 ------------------
@@ -287,10 +299,11 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
287 299
288 300 In [16]: ar2 = lview.apply(f2)
289 301
290 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
291
292 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
302 In [17]: with lview.temp_flags(after=[ar,ar2]):
303 ....: ar3 = lview.apply(f3)
293 304
305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 ....: ar4 = lview.apply(f3)
294 307
295 308 .. seealso::
296 309
@@ -440,7 +453,7 b' The following is an overview of how to use these classes together:'
440 453 2. Define some functions to be run as tasks
441 454 3. Submit your tasks to using the :meth:`apply` method of your
442 455 :class:`LoadBalancedView` instance.
443 4. Use :meth:`Client.get_result` to get the results of the
456 4. Use :meth:`.Client.get_result` to get the results of the
444 457 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
445 458 for and then receive the results.
446 459
General Comments 0
You need to be logged in to leave comments. Login now