##// END OF EJS Templates
Merge pull request #920 from minrk/parallel...
Fernando Perez -
r5236:854f3d91 merge
parent child Browse files
Show More
@@ -0,0 +1,61 b''
1 """An example for handling results in a way that AsyncMapResult doesn't provide
2
3 Specifically, out-of-order results with some special handing of metadata.
4
5 This just submits a bunch of jobs, waits on the results, and prints the stdout
6 and results of each as they finish.
7
8 Authors
9 -------
10 * MinRK
11 """
12 import time
13 import random
14
15 from IPython import parallel
16
17 # create client & views
18 rc = parallel.Client()
19 dv = rc[:]
20 v = rc.load_balanced_view()
21
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
26
27
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
31 print "hi from engine %i" % id
32 sys.stdout.flush()
33 time.sleep(t)
34 return count,t
35
36 amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)
37
38 pending = set(amr.msg_ids)
39 while pending:
40 try:
41 rc.wait(pending, 1e-3)
42 except parallel.TimeoutError:
43 # ignore timeouterrors, since they only mean that at least one isn't done
44 pass
45 # finished is the set of msg_ids that are complete
46 finished = pending.difference(rc.outstanding)
47 # update pending to exclude those that just finished
48 pending = pending.difference(finished)
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
53 print "with stdout:"
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
55 print "and results:"
56
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
61
@@ -0,0 +1,83 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
11
12 Authors
13 -------
14 * MinRK
15 """
16
17 import os
18 import sys
19 import json
20 import zmq
21
22 from IPython.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
26
27 def main(connection_file):
28 """watch iopub channel, and print messages"""
29
30 ctx = zmq.Context.instance()
31
32 with open(connection_file) as f:
33 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
46 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
57 while True:
58 try:
59 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
61 return
62 # ident always length 1 here
63 topic = idents[0]
64 if msg['msg_type'] == 'stream':
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
71 c = msg['content']
72 print topic + ':'
73 for line in c['traceback']:
74 # indent lines
75 print ' ' + line
76
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
79 cf = sys.argv[1]
80 else:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
@@ -0,0 +1,65 b''
1 """Example of iteration through AsyncMapResults, without waiting for all results
2
3 When you call view.map(func, sequence), you will receive a special AsyncMapResult
4 object. These objects are used to reconstruct the results of the split call.
5 One feature AsyncResults provide is that they are iterable *immediately*, so
6 you can iterate through the actual results as they complete.
7
8 This is useful if you submit a large number of tasks that may take some time,
9 but want to perform logic on elements in the result, or even abort subsequent
10 tasks in cases where you are searching for the first affirmative result.
11
12 By default, the results will match the ordering of the submitted sequence, but
13 if you call `map(...ordered=False)`, then results will be provided to the iterator
14 on a first come first serve basis.
15
16 Authors
17 -------
18 * MinRK
19 """
20 import time
21
22 from IPython import parallel
23
24 # create client & view
25 rc = parallel.Client()
26 dv = rc[:]
27 v = rc.load_balanced_view()
28
29 # scatter 'id', so id=0,1,2 on engines 0,1,2
30 dv.scatter('id', rc.ids, flatten=True)
31 print "Engine IDs: ", dv['id']
32
33 # create a Reference to `id`. This will be a different value on each engine
34 ref = parallel.Reference('id')
35 print "sleeping for `id` seconds on each engine"
36 tic = time.time()
37 ar = dv.apply(time.sleep, ref)
38 for i,r in enumerate(ar):
39 print "%i: %.3f"%(i, time.time()-tic)
40
41 def sleep_here(t):
42 import time
43 time.sleep(t)
44 return id,t
45
46 # one call per task
47 print "running with one call per task"
48 amr = v.map(sleep_here, [.01*t for t in range(100)])
49 tic = time.time()
50 for i,r in enumerate(amr):
51 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
52
53 print "running with four calls per task"
54 # with chunksize, we can have four calls per task
55 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
56 tic = time.time()
57 for i,r in enumerate(amr):
58 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
59
60 print "running with two calls per task, with unordered results"
61 # We can even iterate through faster results first, with ordered=False
62 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
63 tic = time.time()
64 for i,r in enumerate(amr):
65 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
1 NO CONTENT: new file 100644, binary diff hidden
NO CONTENT: new file 100644, binary diff hidden
@@ -38,7 +38,7 b' from IPython.core.profiledir import ProfileDir'
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, Any,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
@@ -233,6 +233,12 b' class IPClusterEngines(BaseParallelApplication):'
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
237 def _engine_launcher_changed(self, name, old, new):
238 if isinstance(new, basestring):
239 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
240 " use engine_launcher_class" % self.__class__.__name__)
241 self.engine_launcher_class = new
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
242 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 config=True,
243 config=True,
238 help="""The class for launching a set of Engines. Change this value
244 help="""The class for launching a set of Engines. Change this value
@@ -249,11 +255,22 b' class IPClusterEngines(BaseParallelApplication):'
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
255 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
256 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
257 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
258 LSFEngineSetLauncher : use LSF (bsub) to submit engines to a batch queue
252 SSHEngineSetLauncher : use SSH to start the controller
259 SSHEngineSetLauncher : use SSH to start the controller
253 Note that SSH does *not* move the connection files
260 Note that SSH does *not* move the connection files
254 around, so you will likely have to do this manually
261 around, so you will likely have to do this manually
255 unless the machines are on a shared file system.
262 unless the machines are on a shared file system.
256 WindowsHPCEngineSetLauncher : use Windows HPC
263 WindowsHPCEngineSetLauncher : use Windows HPC
264
265 If you are using one of IPython's builtin launchers, you can specify just the
266 prefix, e.g:
267
268 c.IPClusterEngines.engine_launcher_class = 'SSH'
269
270 or:
271
272 ipcluster start --engines 'MPIExec'
273
257 """
274 """
258 )
275 )
259 daemonize = Bool(False, config=True,
276 daemonize = Bool(False, config=True,
@@ -265,9 +282,11 b' class IPClusterEngines(BaseParallelApplication):'
265 if new:
282 if new:
266 self.log_to_file = True
283 self.log_to_file = True
267
284
285 early_shutdown = Int(30, config=True, help="The timeout (in seconds)")
286 _stopping = False
287
268 aliases = Dict(engine_aliases)
288 aliases = Dict(engine_aliases)
269 flags = Dict(engine_flags)
289 flags = Dict(engine_flags)
270 _stopping = False
271
290
272 @catch_config_error
291 @catch_config_error
273 def initialize(self, argv=None):
292 def initialize(self, argv=None):
@@ -277,7 +296,6 b' class IPClusterEngines(BaseParallelApplication):'
277
296
278 def init_launchers(self):
297 def init_launchers(self):
279 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
298 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
280 self.engine_launcher.on_stop(lambda r: self.loop.stop())
281
299
282 def init_signal(self):
300 def init_signal(self):
283 # Setup signals
301 # Setup signals
@@ -304,13 +322,42 b' class IPClusterEngines(BaseParallelApplication):'
304 )
322 )
305 return launcher
323 return launcher
306
324
325 def engines_started_ok(self):
326 self.log.info("Engines appear to have started successfully")
327 self.early_shutdown = 0
328
307 def start_engines(self):
329 def start_engines(self):
308 self.log.info("Starting %i engines"%self.n)
330 self.log.info("Starting %i engines"%self.n)
309 self.engine_launcher.start(self.n)
331 self.engine_launcher.start(self.n)
332 self.engine_launcher.on_stop(self.engines_stopped_early)
333 if self.early_shutdown:
334 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
335
336 def engines_stopped_early(self, r):
337 if self.early_shutdown and not self._stopping:
338 self.log.error("""
339 Engines shutdown early, they probably failed to connect.
340
341 Check the engine log files for output.
342
343 If your controller and engines are not on the same machine, you probably
344 have to instruct the controller to listen on an interface other than localhost.
345
346 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
347
348 Be sure to read our security docs before instructing your controller to listen on
349 a public interface.
350 """)
351 self.stop_launchers()
352
353 return self.engines_stopped(r)
354
355 def engines_stopped(self, r):
356 return self.loop.stop()
310
357
311 def stop_engines(self):
358 def stop_engines(self):
312 self.log.info("Stopping Engines...")
313 if self.engine_launcher.running:
359 if self.engine_launcher.running:
360 self.log.info("Stopping Engines...")
314 d = self.engine_launcher.stop()
361 d = self.engine_launcher.stop()
315 return d
362 return d
316 else:
363 else:
@@ -322,7 +369,7 b' class IPClusterEngines(BaseParallelApplication):'
322 self.log.error("IPython cluster: stopping")
369 self.log.error("IPython cluster: stopping")
323 self.stop_engines()
370 self.stop_engines()
324 # Wait a few seconds to let things shut down.
371 # Wait a few seconds to let things shut down.
325 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
372 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
326 dc.start()
373 dc.start()
327
374
328 def sigint_handler(self, signum, frame):
375 def sigint_handler(self, signum, frame):
@@ -394,6 +441,13 b' class IPClusterStart(IPClusterEngines):'
394 delay = CFloat(1., config=True,
441 delay = CFloat(1., config=True,
395 help="delay (in s) between starting the controller and the engines")
442 help="delay (in s) between starting the controller and the engines")
396
443
444 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
445 def _controller_launcher_changed(self, name, old, new):
446 if isinstance(new, basestring):
447 # old 0.11-style config
448 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
449 " use controller_launcher_class" % self.__class__.__name__)
450 self.controller_launcher_class = new
397 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
451 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
398 config=True,
452 config=True,
399 help="""The class for launching a Controller. Change this value if you want
453 help="""The class for launching a Controller. Change this value if you want
@@ -408,8 +462,19 b' class IPClusterStart(IPClusterEngines):'
408 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
462 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
409 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
463 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
410 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
464 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
465 LSFControllerLauncher : use LSF (bsub) to submit engines to a batch queue
411 SSHControllerLauncher : use SSH to start the controller
466 SSHControllerLauncher : use SSH to start the controller
412 WindowsHPCControllerLauncher : use Windows HPC
467 WindowsHPCControllerLauncher : use Windows HPC
468
469 If you are using one of IPython's builtin launchers, you can specify just the
470 prefix, e.g:
471
472 c.IPClusterStart.controller_launcher_class = 'SSH'
473
474 or:
475
476 ipcluster start --controller 'MPIExec'
477
413 """
478 """
414 )
479 )
415 reset = Bool(False, config=True,
480 reset = Bool(False, config=True,
@@ -423,7 +488,11 b' class IPClusterStart(IPClusterEngines):'
423 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
488 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
424 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
489 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
425 self.controller_launcher.on_stop(self.stop_launchers)
490 self.controller_launcher.on_stop(self.stop_launchers)
426
491
492 def engines_stopped(self, r):
493 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
494 pass
495
427 def start_controller(self):
496 def start_controller(self):
428 self.controller_launcher.start()
497 self.controller_launcher.start()
429
498
@@ -55,7 +55,7 b' from IPython.parallel.controller.hub import HubFactory'
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
57
57
58 from IPython.parallel.util import signal_children, split_url, asbytes
58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59
59
60 # conditional import of MongoDB backend class
60 # conditional import of MongoDB backend class
61
61
@@ -290,13 +290,15 b' class IPControllerApp(BaseParallelApplication):'
290 mq = import_item(str(self.mq_class))
290 mq = import_item(str(self.mq_class))
291
291
292 hub = self.factory
292 hub = self.factory
293 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
293 # disambiguate url, in case of *
294 monitor_url = disambiguate_url(hub.monitor_url)
295 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
294 # IOPub relay (in a Process)
296 # IOPub relay (in a Process)
295 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
297 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
296 q.bind_in(hub.client_info['iopub'])
298 q.bind_in(hub.client_info['iopub'])
297 q.bind_out(hub.engine_info['iopub'])
299 q.bind_out(hub.engine_info['iopub'])
298 q.setsockopt_out(zmq.SUBSCRIBE, b'')
300 q.setsockopt_out(zmq.SUBSCRIBE, b'')
299 q.connect_mon(hub.monitor_url)
301 q.connect_mon(monitor_url)
300 q.daemon=True
302 q.daemon=True
301 children.append(q)
303 children.append(q)
302
304
@@ -305,7 +307,7 b' class IPControllerApp(BaseParallelApplication):'
305 q.bind_in(hub.client_info['mux'])
307 q.bind_in(hub.client_info['mux'])
306 q.setsockopt_in(zmq.IDENTITY, b'mux')
308 q.setsockopt_in(zmq.IDENTITY, b'mux')
307 q.bind_out(hub.engine_info['mux'])
309 q.bind_out(hub.engine_info['mux'])
308 q.connect_mon(hub.monitor_url)
310 q.connect_mon(monitor_url)
309 q.daemon=True
311 q.daemon=True
310 children.append(q)
312 children.append(q)
311
313
@@ -314,7 +316,7 b' class IPControllerApp(BaseParallelApplication):'
314 q.bind_in(hub.client_info['control'])
316 q.bind_in(hub.client_info['control'])
315 q.setsockopt_in(zmq.IDENTITY, b'control')
317 q.setsockopt_in(zmq.IDENTITY, b'control')
316 q.bind_out(hub.engine_info['control'])
318 q.bind_out(hub.engine_info['control'])
317 q.connect_mon(hub.monitor_url)
319 q.connect_mon(monitor_url)
318 q.daemon=True
320 q.daemon=True
319 children.append(q)
321 children.append(q)
320 try:
322 try:
@@ -329,7 +331,7 b' class IPControllerApp(BaseParallelApplication):'
329 q.bind_in(hub.client_info['task'][1])
331 q.bind_in(hub.client_info['task'][1])
330 q.setsockopt_in(zmq.IDENTITY, b'task')
332 q.setsockopt_in(zmq.IDENTITY, b'task')
331 q.bind_out(hub.engine_info['task'])
333 q.bind_out(hub.engine_info['task'])
332 q.connect_mon(hub.monitor_url)
334 q.connect_mon(monitor_url)
333 q.daemon=True
335 q.daemon=True
334 children.append(q)
336 children.append(q)
335 elif scheme == 'none':
337 elif scheme == 'none':
@@ -338,7 +340,7 b' class IPControllerApp(BaseParallelApplication):'
338 else:
340 else:
339 self.log.info("task::using Python %s Task scheduler"%scheme)
341 self.log.info("task::using Python %s Task scheduler"%scheme)
340 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
342 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
341 hub.monitor_url, hub.client_info['notification'])
343 monitor_url, disambiguate_url(hub.client_info['notification']))
342 kwargs = dict(logname='scheduler', loglevel=self.log_level,
344 kwargs = dict(logname='scheduler', loglevel=self.log_level,
343 log_url = self.log_url, config=dict(self.config))
345 log_url = self.log_url, config=dict(self.config))
344 if 'Process' in self.mq_class:
346 if 'Process' in self.mq_class:
@@ -633,7 +633,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
633 d = el.start(user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
634 if i==0:
634 if i==0:
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636 self.launchers[host+str(i)] = el
636 self.launchers[ "%s/%i" % (host,i) ] = el
637 dlist.append(d)
637 dlist.append(d)
638 self.notify_start(dlist)
638 self.notify_start(dlist)
639 return dlist
639 return dlist
@@ -62,6 +62,7 b' class AsyncResult(object):'
62 self._tracker = tracker
62 self._tracker = tracker
63 self._ready = False
63 self._ready = False
64 self._success = None
64 self._success = None
65 self._metadata = None
65 if len(msg_ids) == 1:
66 if len(msg_ids) == 1:
66 self._single_result = not isinstance(targets, (list, tuple))
67 self._single_result = not isinstance(targets, (list, tuple))
67 else:
68 else:
@@ -231,13 +232,13 b' class AsyncResult(object):'
231 else:
232 else:
232 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233
234
234 @check_ready
235 def __getattr__(self, key):
235 def __getattr__(self, key):
236 """getattr maps to getitem for convenient attr access to metadata."""
236 """getattr maps to getitem for convenient attr access to metadata."""
237 if key not in self._metadata[0].keys():
237 try:
238 return self.__getitem__(key)
239 except (error.TimeoutError, KeyError):
238 raise AttributeError("%r object has no attribute %r"%(
240 raise AttributeError("%r object has no attribute %r"%(
239 self.__class__.__name__, key))
241 self.__class__.__name__, key))
240 return self.__getitem__(key)
241
242
242 # asynchronous iterator:
243 # asynchronous iterator:
243 def __iter__(self):
244 def __iter__(self):
@@ -261,12 +262,19 b' class AsyncMapResult(AsyncResult):'
261 """Class for representing results of non-blocking gathers.
262 """Class for representing results of non-blocking gathers.
262
263
263 This will properly reconstruct the gather.
264 This will properly reconstruct the gather.
265
266 This class is iterable at any time, and will wait on results as they come.
267
268 If ordered=False, then the first results to arrive will come first, otherwise
269 results will be yielded in the order they were submitted.
270
264 """
271 """
265
272
266 def __init__(self, client, msg_ids, mapObject, fname=''):
273 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
267 AsyncResult.__init__(self, client, msg_ids, fname=fname)
274 AsyncResult.__init__(self, client, msg_ids, fname=fname)
268 self._mapObject = mapObject
275 self._mapObject = mapObject
269 self._single_result = False
276 self._single_result = False
277 self.ordered = ordered
270
278
271 def _reconstruct_result(self, res):
279 def _reconstruct_result(self, res):
272 """Perform the gather on the actual results."""
280 """Perform the gather on the actual results."""
@@ -274,6 +282,13 b' class AsyncMapResult(AsyncResult):'
274
282
275 # asynchronous iterator:
283 # asynchronous iterator:
276 def __iter__(self):
284 def __iter__(self):
285 it = self._ordered_iter if self.ordered else self._unordered_iter
286 for r in it():
287 yield r
288
289 # asynchronous ordered iterator:
290 def _ordered_iter(self):
291 """iterator for results *as they arrive*, preserving submission order."""
277 try:
292 try:
278 rlist = self.get(0)
293 rlist = self.get(0)
279 except error.TimeoutError:
294 except error.TimeoutError:
@@ -294,6 +309,42 b' class AsyncMapResult(AsyncResult):'
294 for r in rlist:
309 for r in rlist:
295 yield r
310 yield r
296
311
312 # asynchronous unordered iterator:
313 def _unordered_iter(self):
314 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
315 try:
316 rlist = self.get(0)
317 except error.TimeoutError:
318 pending = set(self.msg_ids)
319 while pending:
320 try:
321 self._client.wait(pending, 1e-3)
322 except error.TimeoutError:
323 # ignore timeout error, because that only means
324 # *some* jobs are outstanding
325 pass
326 # update ready set with those no longer outstanding:
327 ready = pending.difference(self._client.outstanding)
328 # update pending to exclude those that are finished
329 pending = pending.difference(ready)
330 while ready:
331 msg_id = ready.pop()
332 ar = AsyncResult(self._client, msg_id, self._fname)
333 rlist = ar.get()
334 try:
335 for r in rlist:
336 yield r
337 except TypeError:
338 # flattened, not a list
339 # this could get broken by flattened data that returns iterables
340 # but most calls to map do not expose the `flatten` argument
341 yield rlist
342 else:
343 # already done
344 for r in rlist:
345 yield r
346
347
297
348
298 class AsyncHubResult(AsyncResult):
349 class AsyncHubResult(AsyncResult):
299 """Class to wrap pending results that must be requested from the Hub.
350 """Class to wrap pending results that must be requested from the Hub.
@@ -46,7 +46,7 b' def remote(view, block=None, **flags):'
46 return remote_function
46 return remote_function
47
47
48 @skip_doctest
48 @skip_doctest
49 def parallel(view, dist='b', block=None, **flags):
49 def parallel(view, dist='b', block=None, ordered=True, **flags):
50 """Turn a function into a parallel remote function.
50 """Turn a function into a parallel remote function.
51
51
52 This method can be used for map:
52 This method can be used for map:
@@ -57,7 +57,7 b" def parallel(view, dist='b', block=None, **flags):"
57 """
57 """
58
58
59 def parallel_function(f):
59 def parallel_function(f):
60 return ParallelFunction(view, f, dist=dist, block=block, **flags)
60 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
61 return parallel_function
61 return parallel_function
62
62
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
@@ -122,15 +122,19 b' class ParallelFunction(RemoteFunction):'
122 to use the current `block` attribute of `view`
122 to use the current `block` attribute of `view`
123 chunksize : int or None
123 chunksize : int or None
124 The size of chunk to use when breaking up sequences in a load-balanced manner
124 The size of chunk to use when breaking up sequences in a load-balanced manner
125 ordered : bool [default: True]
126 Whether
125 **flags : remaining kwargs are passed to View.temp_flags
127 **flags : remaining kwargs are passed to View.temp_flags
126 """
128 """
127
129
128 chunksize=None
130 chunksize=None
131 ordered=None
129 mapObject=None
132 mapObject=None
130
133
131 def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
134 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
132 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
135 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
133 self.chunksize = chunksize
136 self.chunksize = chunksize
137 self.ordered = ordered
134
138
135 mapClass = Map.dists[dist]
139 mapClass = Map.dists[dist]
136 self.mapObject = mapClass()
140 self.mapObject = mapClass()
@@ -186,7 +190,10 b' class ParallelFunction(RemoteFunction):'
186
190
187 msg_ids.append(ar.msg_ids[0])
191 msg_ids.append(ar.msg_ids[0])
188
192
189 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
193 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
194 fname=self.func.__name__,
195 ordered=self.ordered
196 )
190
197
191 if self.block:
198 if self.block:
192 try:
199 try:
@@ -992,7 +992,7 b' class LoadBalancedView(View):'
992 @spin_after
992 @spin_after
993 @save_ids
993 @save_ids
994 def map(self, f, *sequences, **kwargs):
994 def map(self, f, *sequences, **kwargs):
995 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
995 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
996
996
997 Parallel version of builtin `map`, load-balanced by this View.
997 Parallel version of builtin `map`, load-balanced by this View.
998
998
@@ -1009,14 +1009,20 b' class LoadBalancedView(View):'
1009 function to be mapped
1009 function to be mapped
1010 *sequences: one or more sequences of matching length
1010 *sequences: one or more sequences of matching length
1011 the sequences to be distributed and passed to `f`
1011 the sequences to be distributed and passed to `f`
1012 block : bool
1012 block : bool [default self.block]
1013 whether to wait for the result or not [default self.block]
1013 whether to wait for the result or not
1014 track : bool
1014 track : bool
1015 whether to create a MessageTracker to allow the user to
1015 whether to create a MessageTracker to allow the user to
1016 safely edit after arrays and buffers during non-copying
1016 safely edit after arrays and buffers during non-copying
1017 sends.
1017 sends.
1018 chunksize : int
1018 chunksize : int [default 1]
1019 how many elements should be in each task [default 1]
1019 how many elements should be in each task.
1020 ordered : bool [default True]
1021 Whether the results should be gathered as they arrive, or enforce
1022 the order of submission.
1023
1024 Only applies when iterating through AsyncMapResult as results arrive.
1025 Has no effect when block=True.
1020
1026
1021 Returns
1027 Returns
1022 -------
1028 -------
@@ -1034,6 +1040,7 b' class LoadBalancedView(View):'
1034 # default
1040 # default
1035 block = kwargs.get('block', self.block)
1041 block = kwargs.get('block', self.block)
1036 chunksize = kwargs.get('chunksize', 1)
1042 chunksize = kwargs.get('chunksize', 1)
1043 ordered = kwargs.get('ordered', True)
1037
1044
1038 keyset = set(kwargs.keys())
1045 keyset = set(kwargs.keys())
1039 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1046 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
@@ -1042,7 +1049,7 b' class LoadBalancedView(View):'
1042
1049
1043 assert len(sequences) > 0, "must have some sequences to map onto!"
1050 assert len(sequences) > 0, "must have some sequences to map onto!"
1044
1051
1045 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
1052 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1046 return pf.map(*sequences)
1053 return pf.map(*sequences)
1047
1054
1048 __all__ = ['LoadBalancedView', 'DirectView']
1055 __all__ = ['LoadBalancedView', 'DirectView']
@@ -214,6 +214,14 b' class EngineFactory(RegistrationFactory):'
214
214
215 def abort(self):
215 def abort(self):
216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
217 if self.url.startswith('127.'):
218 self.log.fatal("""
219 If the controller and engines are not on the same machine,
220 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
221 c.HubFactory.ip='*' # for all interfaces, internal and external
222 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
223 or tunnel connections via ssh.
224 """)
217 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
225 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
218 time.sleep(1)
226 time.sleep(1)
219 sys.exit(255)
227 sys.exit(255)
@@ -70,4 +70,46 b' class AsyncResultTest(ClusterTestCase):'
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 for eid,r in d.iteritems():
71 for eid,r in d.iteritems():
72 self.assertEquals(r, 5)
72 self.assertEquals(r, 5)
73
74 def test_list_amr(self):
75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
76 rlist = list(ar)
77
78 def test_getattr(self):
79 ar = self.client[:].apply_async(wait, 0.5)
80 self.assertRaises(AttributeError, lambda : ar._foo)
81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
82 self.assertRaises(AttributeError, lambda : ar.foo)
83 self.assertRaises(AttributeError, lambda : ar.engine_id)
84 self.assertFalse(hasattr(ar, '__length_hint__'))
85 self.assertFalse(hasattr(ar, 'foo'))
86 self.assertFalse(hasattr(ar, 'engine_id'))
87 ar.get(5)
88 self.assertRaises(AttributeError, lambda : ar._foo)
89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
90 self.assertRaises(AttributeError, lambda : ar.foo)
91 self.assertTrue(isinstance(ar.engine_id, list))
92 self.assertEquals(ar.engine_id, ar['engine_id'])
93 self.assertFalse(hasattr(ar, '__length_hint__'))
94 self.assertFalse(hasattr(ar, 'foo'))
95 self.assertTrue(hasattr(ar, 'engine_id'))
96
97 def test_getitem(self):
98 ar = self.client[:].apply_async(wait, 0.5)
99 self.assertRaises(TimeoutError, lambda : ar['foo'])
100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
101 ar.get(5)
102 self.assertRaises(KeyError, lambda : ar['foo'])
103 self.assertTrue(isinstance(ar['engine_id'], list))
104 self.assertEquals(ar.engine_id, ar['engine_id'])
105
106 def test_single_result(self):
107 ar = self.client[-1].apply_async(wait, 0.5)
108 self.assertRaises(TimeoutError, lambda : ar['foo'])
109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
110 self.assertTrue(ar.get(5) == 0.5)
111 self.assertTrue(isinstance(ar['engine_id'], int))
112 self.assertTrue(isinstance(ar.engine_id, int))
113 self.assertEquals(ar.engine_id, ar['engine_id'])
114
73
115
@@ -58,6 +58,44 b' class TestLoadBalancedView(ClusterTestCase):'
58 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
59 self.assertEquals(r, map(f, data))
59 self.assertEquals(r, map(f, data))
60
60
61 def test_map_unordered(self):
62 def f(x):
63 return x**2
64 def slow_f(x):
65 import time
66 time.sleep(0.05*x)
67 return x**2
68 data = range(16,0,-1)
69 reference = map(f, data)
70
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come
74 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
76 # Ensure that at least one result came out of order:
77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
79
80 def test_map_ordered(self):
81 def f(x):
82 return x**2
83 def slow_f(x):
84 import time
85 time.sleep(0.05*x)
86 return x**2
87 data = range(16,0,-1)
88 reference = map(f, data)
89
90 amr = self.view.map_async(slow_f, data)
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
94 astheycame = list(amr)
95 # Ensure that results came in order
96 self.assertEquals(astheycame, reference)
97 self.assertEquals(amr.result, reference)
98
61 def test_abort(self):
99 def test_abort(self):
62 view = self.view
100 view = self.view
63 ar = self.client[:].apply_async(time.sleep, .5)
101 ar = self.client[:].apply_async(time.sleep, .5)
@@ -31,15 +31,14 b' class Heartbeat(Thread):'
31 def __init__(self, context, addr=(LOCALHOST, 0)):
31 def __init__(self, context, addr=(LOCALHOST, 0)):
32 Thread.__init__(self)
32 Thread.__init__(self)
33 self.context = context
33 self.context = context
34 self.addr = addr
34 self.ip, self.port = addr
35 self.ip = addr[0]
36 self.port = addr[1]
37 if self.port == 0:
35 if self.port == 0:
38 s = socket.socket()
36 s = socket.socket()
39 s.bind(self.addr)
37 # '*' means all interfaces to 0MQ, which is '' to socket.socket
38 s.bind(('' if self.ip == '*' else self.ip, 0))
40 self.port = s.getsockname()[1]
39 self.port = s.getsockname()[1]
41 s.close()
40 s.close()
42 self.addr = (self.ip, self.port)
41 self.addr = (self.ip, self.port)
43 self.daemon = True
42 self.daemon = True
44
43
45 def run(self):
44 def run(self):
@@ -710,6 +710,9 b' class KernelManager(HasTraits):'
710 # The addresses for the communication channels.
710 # The addresses for the communication channels.
711 connection_file = Unicode('')
711 connection_file = Unicode('')
712 ip = Unicode(LOCALHOST)
712 ip = Unicode(LOCALHOST)
713 def _ip_changed(self, name, old, new):
714 if new == '*':
715 self.ip = '0.0.0.0'
713 shell_port = Int(0)
716 shell_port = Int(0)
714 iopub_port = Int(0)
717 iopub_port = Int(0)
715 stdin_port = Int(0)
718 stdin_port = Int(0)
@@ -10,7 +10,7 b' for working with Graphs is NetworkX_. Here, we will walk through a demo mapping'
10 a nx DAG to task dependencies.
10 a nx DAG to task dependencies.
11
11
12 The full script that runs this demo can be found in
12 The full script that runs this demo can be found in
13 :file:`docs/examples/newparallel/dagdeps.py`.
13 :file:`docs/examples/parallel/dagdeps.py`.
14
14
15 Why are DAGs good for task dependencies?
15 Why are DAGs good for task dependencies?
16 ----------------------------------------
16 ----------------------------------------
@@ -30,7 +30,8 b' A Sample DAG'
30
30
31 Here, we have a very simple 5-node DAG:
31 Here, we have a very simple 5-node DAG:
32
32
33 .. figure:: simpledag.*
33 .. figure:: figs/simpledag.*
34 :width: 600px
34
35
35 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0
36 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
37 depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on
@@ -80,7 +81,7 b' The code to generate the simple DAG:'
80 For demonstration purposes, we have a function that generates a random DAG with a given
81 For demonstration purposes, we have a function that generates a random DAG with a given
81 number of nodes and edges.
82 number of nodes and edges.
82
83
83 .. literalinclude:: ../../examples/newparallel/dagdeps.py
84 .. literalinclude:: ../../examples/parallel/dagdeps.py
84 :language: python
85 :language: python
85 :lines: 20-36
86 :lines: 20-36
86
87
@@ -117,11 +118,13 b' on which it depends:'
117 In [6]: results = {}
118 In [6]: results = {}
118
119
119 In [7]: for node in G.topological_sort():
120 In [7]: for node in G.topological_sort():
120 ...: # get list of AsyncResult objects from nodes
121 ...: # get list of AsyncResult objects from nodes
121 ...: # leading into this one as dependencies
122 ...: # leading into this one as dependencies
122 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: deps = [ results[n] for n in G.predecessors(node) ]
123 ...: # submit and store AsyncResult object
124 ...: # submit and store AsyncResult object
124 ...: results[node] = view.apply_with_flags(jobs[node], after=deps, block=False)
125 ...: with view.temp_flags(after=deps, block=False):
126 ...: results[node] = view.apply_with_flags(jobs[node])
127
125
128
126 Now that we have submitted all the jobs, we can wait for the results:
129 Now that we have submitted all the jobs, we can wait for the results:
127
130
@@ -137,7 +140,7 b' These objects store a variety of metadata about each task, including various tim'
137 We can validate that the dependencies were respected by checking that each task was
140 We can validate that the dependencies were respected by checking that each task was
138 started after all of its predecessors were completed:
141 started after all of its predecessors were completed:
139
142
140 .. literalinclude:: ../../examples/newparallel/dagdeps.py
143 .. literalinclude:: ../../examples/parallel/dagdeps.py
141 :language: python
144 :language: python
142 :lines: 64-70
145 :lines: 64-70
143
146
@@ -155,16 +158,17 b' will be at the top, and quick, small tasks will be at the bottom.'
155 In [12]: pos = {}; colors = {}
158 In [12]: pos = {}; colors = {}
156
159
157 In [12]: for node in G:
160 In [12]: for node in G:
158 ...: md = results[node].metadata
161 ....: md = results[node].metadata
159 ...: start = date2num(md.started)
162 ....: start = date2num(md.started)
160 ...: runtime = date2num(md.completed) - start
163 ....: runtime = date2num(md.completed) - start
161 ...: pos[node] = (start, runtime)
164 ....: pos[node] = (start, runtime)
162 ...: colors[node] = md.engine_id
165 ....: colors[node] = md.engine_id
163
166
164 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
167 In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(),
165 ...: cmap=gist_rainbow)
168 ....: cmap=gist_rainbow)
166
169
167 .. figure:: dagdeps.*
170 .. figure:: figs/dagdeps.*
171 :width: 600px
168
172
169 Time started on x, runtime on y, and color-coded by engine-id (in this case there
173 Time started on x, runtime on y, and color-coded by engine-id (in this case there
170 were four engines). Edges denote dependencies.
174 were four engines). Edges denote dependencies.
1 NO CONTENT: file renamed from docs/source/parallel/asian_call.pdf to docs/source/parallel/figs/asian_call.pdf
NO CONTENT: file renamed from docs/source/parallel/asian_call.pdf to docs/source/parallel/figs/asian_call.pdf
1 NO CONTENT: file renamed from docs/source/parallel/asian_call.png to docs/source/parallel/figs/asian_call.png
NO CONTENT: file renamed from docs/source/parallel/asian_call.png to docs/source/parallel/figs/asian_call.png
1 NO CONTENT: file renamed from docs/source/parallel/asian_put.pdf to docs/source/parallel/figs/asian_put.pdf
NO CONTENT: file renamed from docs/source/parallel/asian_put.pdf to docs/source/parallel/figs/asian_put.pdf
1 NO CONTENT: file renamed from docs/source/parallel/asian_put.png to docs/source/parallel/figs/asian_put.png
NO CONTENT: file renamed from docs/source/parallel/asian_put.png to docs/source/parallel/figs/asian_put.png
1 NO CONTENT: file renamed from docs/source/parallel/dagdeps.pdf to docs/source/parallel/figs/dagdeps.pdf
NO CONTENT: file renamed from docs/source/parallel/dagdeps.pdf to docs/source/parallel/figs/dagdeps.pdf
1 NO CONTENT: file renamed from docs/source/parallel/dagdeps.png to docs/source/parallel/figs/dagdeps.png
NO CONTENT: file renamed from docs/source/parallel/dagdeps.png to docs/source/parallel/figs/dagdeps.png
1 NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.pdf to docs/source/parallel/figs/hpc_job_manager.pdf
NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.pdf to docs/source/parallel/figs/hpc_job_manager.pdf
1 NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.png to docs/source/parallel/figs/hpc_job_manager.png
NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.png to docs/source/parallel/figs/hpc_job_manager.png
1 NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.pdf to docs/source/parallel/figs/ipcluster_create.pdf
NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.pdf to docs/source/parallel/figs/ipcluster_create.pdf
1 NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.png to docs/source/parallel/figs/ipcluster_create.png
NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.png to docs/source/parallel/figs/ipcluster_create.png
1 NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.pdf to docs/source/parallel/figs/ipcluster_start.pdf
NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.pdf to docs/source/parallel/figs/ipcluster_start.pdf
1 NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.png to docs/source/parallel/figs/ipcluster_start.png
NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.png to docs/source/parallel/figs/ipcluster_start.png
1 NO CONTENT: file renamed from docs/source/parallel/ipython_shell.pdf to docs/source/parallel/figs/ipython_shell.pdf
NO CONTENT: file renamed from docs/source/parallel/ipython_shell.pdf to docs/source/parallel/figs/ipython_shell.pdf
1 NO CONTENT: file renamed from docs/source/parallel/ipython_shell.png to docs/source/parallel/figs/ipython_shell.png
NO CONTENT: file renamed from docs/source/parallel/ipython_shell.png to docs/source/parallel/figs/ipython_shell.png
1 NO CONTENT: file renamed from docs/source/parallel/mec_simple.pdf to docs/source/parallel/figs/mec_simple.pdf
NO CONTENT: file renamed from docs/source/parallel/mec_simple.pdf to docs/source/parallel/figs/mec_simple.pdf
1 NO CONTENT: file renamed from docs/source/parallel/mec_simple.png to docs/source/parallel/figs/mec_simple.png
NO CONTENT: file renamed from docs/source/parallel/mec_simple.png to docs/source/parallel/figs/mec_simple.png
1 NO CONTENT: file renamed from docs/source/parallel/parallel_pi.pdf to docs/source/parallel/figs/parallel_pi.pdf
NO CONTENT: file renamed from docs/source/parallel/parallel_pi.pdf to docs/source/parallel/figs/parallel_pi.pdf
1 NO CONTENT: file renamed from docs/source/parallel/parallel_pi.png to docs/source/parallel/figs/parallel_pi.png
NO CONTENT: file renamed from docs/source/parallel/parallel_pi.png to docs/source/parallel/figs/parallel_pi.png
1 NO CONTENT: file renamed from docs/source/parallel/simpledag.pdf to docs/source/parallel/figs/simpledag.pdf
NO CONTENT: file renamed from docs/source/parallel/simpledag.pdf to docs/source/parallel/figs/simpledag.pdf
1 NO CONTENT: file renamed from docs/source/parallel/simpledag.png to docs/source/parallel/figs/simpledag.png
NO CONTENT: file renamed from docs/source/parallel/simpledag.png to docs/source/parallel/figs/simpledag.png
1 NO CONTENT: file renamed from docs/source/parallel/single_digits.pdf to docs/source/parallel/figs/single_digits.pdf
NO CONTENT: file renamed from docs/source/parallel/single_digits.pdf to docs/source/parallel/figs/single_digits.pdf
1 NO CONTENT: file renamed from docs/source/parallel/single_digits.png to docs/source/parallel/figs/single_digits.png
NO CONTENT: file renamed from docs/source/parallel/single_digits.png to docs/source/parallel/figs/single_digits.png
1 NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.pdf to docs/source/parallel/figs/two_digit_counts.pdf
NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.pdf to docs/source/parallel/figs/two_digit_counts.pdf
1 NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.png to docs/source/parallel/figs/two_digit_counts.png
NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.png to docs/source/parallel/figs/two_digit_counts.png
@@ -4,7 +4,7 b' Parallel examples'
4
4
5 .. note::
5 .. note::
6
6
7 Performance numbers from ``IPython.kernel``, not newparallel.
7 Performance numbers from ``IPython.kernel``, not new ``IPython.parallel``.
8
8
9 In this section we describe two more involved examples of using an IPython
9 In this section we describe two more involved examples of using an IPython
10 cluster to perform a parallel computation. In these examples, we will be using
10 cluster to perform a parallel computation. In these examples, we will be using
@@ -27,7 +27,7 b' million digits.'
27
27
28 In both the serial and parallel calculation we will be using functions defined
28 In both the serial and parallel calculation we will be using functions defined
29 in the :file:`pidigits.py` file, which is available in the
29 in the :file:`pidigits.py` file, which is available in the
30 :file:`docs/examples/newparallel` directory of the IPython source distribution.
30 :file:`docs/examples/parallel` directory of the IPython source distribution.
31 These functions provide basic facilities for working with the digits of pi and
31 These functions provide basic facilities for working with the digits of pi and
32 can be loaded into IPython by putting :file:`pidigits.py` in your current
32 can be loaded into IPython by putting :file:`pidigits.py` in your current
33 working directory and then doing:
33 working directory and then doing:
@@ -75,7 +75,7 b' The resulting plot of the single digit counts shows that each digit occurs'
75 approximately 1,000 times, but that with only 10,000 digits the
75 approximately 1,000 times, but that with only 10,000 digits the
76 statistical fluctuations are still rather large:
76 statistical fluctuations are still rather large:
77
77
78 .. image:: single_digits.*
78 .. image:: figs/single_digits.*
79
79
80 It is clear that to reduce the relative fluctuations in the counts, we need
80 It is clear that to reduce the relative fluctuations in the counts, we need
81 to look at many more digits of pi. That brings us to the parallel calculation.
81 to look at many more digits of pi. That brings us to the parallel calculation.
@@ -101,13 +101,13 b' compute the two digit counts for the digits in a single file. Then in a final'
101 step the counts from each engine will be added up. To perform this
101 step the counts from each engine will be added up. To perform this
102 calculation, we will need two top-level functions from :file:`pidigits.py`:
102 calculation, we will need two top-level functions from :file:`pidigits.py`:
103
103
104 .. literalinclude:: ../../examples/newparallel/pidigits.py
104 .. literalinclude:: ../../examples/parallel/pi/pidigits.py
105 :language: python
105 :language: python
106 :lines: 47-62
106 :lines: 47-62
107
107
108 We will also use the :func:`plot_two_digit_freqs` function to plot the
108 We will also use the :func:`plot_two_digit_freqs` function to plot the
109 results. The code to run this calculation in parallel is contained in
109 results. The code to run this calculation in parallel is contained in
110 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
110 :file:`docs/examples/parallel/parallelpi.py`. This code can be run in parallel
111 using IPython by following these steps:
111 using IPython by following these steps:
112
112
113 1. Use :command:`ipcluster` to start 15 engines. We used an 8 core (2 quad
113 1. Use :command:`ipcluster` to start 15 engines. We used an 8 core (2 quad
@@ -188,7 +188,7 b' most likely and that "06" and "07" are least likely. Further analysis would'
188 show that the relative size of the statistical fluctuations have decreased
188 show that the relative size of the statistical fluctuations have decreased
189 compared to the 10,000 digit calculation.
189 compared to the 10,000 digit calculation.
190
190
191 .. image:: two_digit_counts.*
191 .. image:: figs/two_digit_counts.*
192
192
193
193
194 Parallel options pricing
194 Parallel options pricing
@@ -209,12 +209,12 b' simulation of the underlying asset price. In this example we use this approach'
209 to price both European and Asian (path dependent) options for various strike
209 to price both European and Asian (path dependent) options for various strike
210 prices and volatilities.
210 prices and volatilities.
211
211
212 The code for this example can be found in the :file:`docs/examples/newparallel`
212 The code for this example can be found in the :file:`docs/examples/parallel`
213 directory of the IPython source. The function :func:`price_options` in
213 directory of the IPython source. The function :func:`price_options` in
214 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
214 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
215 the NumPy package and is shown here:
215 the NumPy package and is shown here:
216
216
217 .. literalinclude:: ../../examples/newparallel/mcpricer.py
217 .. literalinclude:: ../../examples/parallel/options/mcpricer.py
218 :language: python
218 :language: python
219
219
220 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
220 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
@@ -222,21 +222,21 b' which distributes work to the engines using dynamic load balancing. This'
222 view is a wrapper of the :class:`Client` class shown in
222 view is a wrapper of the :class:`Client` class shown in
223 the previous example. The parallel calculation using :class:`LoadBalancedView` can
223 the previous example. The parallel calculation using :class:`LoadBalancedView` can
224 be found in the file :file:`mcpricer.py`. The code in this file creates a
224 be found in the file :file:`mcpricer.py`. The code in this file creates a
225 :class:`TaskClient` instance and then submits a set of tasks using
225 :class:`LoadBalancedView` instance and then submits a set of tasks using
226 :meth:`TaskClient.run` that calculate the option prices for different
226 :meth:`LoadBalancedView.apply` that calculate the option prices for different
227 volatilities and strike prices. The results are then plotted as a 2D contour
227 volatilities and strike prices. The results are then plotted as a 2D contour
228 plot using Matplotlib.
228 plot using Matplotlib.
229
229
230 .. literalinclude:: ../../examples/newparallel/mcdriver.py
230 .. literalinclude:: ../../examples/parallel/options/mckernel.py
231 :language: python
231 :language: python
232
232
233 To use this code, start an IPython cluster using :command:`ipcluster`, open
233 To use this code, start an IPython cluster using :command:`ipcluster`, open
234 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
234 IPython in the pylab mode with the file :file:`mckernel.py` in your current
235 working directory and then type:
235 working directory and then type:
236
236
237 .. sourcecode:: ipython
237 .. sourcecode:: ipython
238
238
239 In [7]: run mcdriver.py
239 In [7]: run mckernel.py
240 Submitted tasks: [0, 1, 2, ...]
240 Submitted tasks: [0, 1, 2, ...]
241
241
242 Once all the tasks have finished, the results can be plotted using the
242 Once all the tasks have finished, the results can be plotted using the
@@ -257,9 +257,9 b' entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)'
257 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
257 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
258 to the speedup observed in our previous example.
258 to the speedup observed in our previous example.
259
259
260 .. image:: asian_call.*
260 .. image:: figs/asian_call.*
261
261
262 .. image:: asian_put.*
262 .. image:: figs/asian_put.*
263
263
264 Conclusion
264 Conclusion
265 ==========
265 ==========
@@ -280,5 +280,5 b' parallel architecture that have been demonstrated:'
280
280
281 .. note::
281 .. note::
282
282
283 The newparallel code has never been run on Windows HPC Server, so the last
283 The new parallel code has never been run on Windows HPC Server, so the last
284 conclusion is untested.
284 conclusion is untested.
@@ -60,6 +60,10 b' the ``I`` in IPython. The following are some example usage cases for IPython:'
60 Architecture overview
60 Architecture overview
61 =====================
61 =====================
62
62
63 .. figure:: figs/wideView.png
64 :width: 300px
65
66
63 The IPython architecture consists of four components:
67 The IPython architecture consists of four components:
64
68
65 * The IPython engine.
69 * The IPython engine.
@@ -99,7 +103,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote'
99 The controller also provides a single point of contact for users who wish to
103 The controller also provides a single point of contact for users who wish to
100 utilize the engines connected to the controller. There are different ways of
104 utilize the engines connected to the controller. There are different ways of
101 working with a controller. In IPython, all of these models are implemented via
105 working with a controller. In IPython, all of these models are implemented via
102 the client's :meth:`.View.apply` method, with various arguments, or
106 the :meth:`.View.apply` method, after
103 constructing :class:`.View` objects to represent subsets of engines. The two
107 constructing :class:`.View` objects to represent subsets of engines. The two
104 primary models for interacting with engines are:
108 primary models for interacting with engines are:
105
109
@@ -181,6 +185,34 b' ipcontroller-client.json'
181 but since the controller may listen on different ports for clients and
185 but since the controller may listen on different ports for clients and
182 engines, it is stored separately.
186 engines, it is stored separately.
183
187
188 ipcontroller-client.json will look something like this, under default localhost
189 circumstances:
190
191 .. sourcecode:: python
192
193 {
194 "url":"tcp:\/\/127.0.0.1:54424",
195 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
196 "ssh":"",
197 "location":"10.19.1.135"
198 }
199
200 If, however, you are running the controller on a work node on a cluster, you will likely
201 need to use ssh tunnels to connect clients from your laptop to it. You will also
202 probably need to instruct the controller to listen for engines coming from other work nodes
203 on the cluster. An example of ipcontroller-client.json, as created by::
204
205 $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com
206
207
208 .. sourcecode:: python
209
210 {
211 "url":"tcp:\/\/*:54424",
212 "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130",
213 "ssh":"login.mycluster.com",
214 "location":"10.0.0.2"
215 }
184 More details of how these JSON files are used are given below.
216 More details of how these JSON files are used are given below.
185
217
186 A detailed description of the security model and its implementation in IPython
218 A detailed description of the security model and its implementation in IPython
@@ -248,7 +280,7 b' then you would connect to it with:'
248
280
249 .. sourcecode:: ipython
281 .. sourcecode:: ipython
250
282
251 In [2]: c = Client(sshserver='myhub.example.com')
283 In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com')
252
284
253 Where 'myhub.example.com' is the url or IP address of the machine on
285 Where 'myhub.example.com' is the url or IP address of the machine on
254 which the Hub process is running (or another machine that has direct access to the Hub's ports).
286 which the Hub process is running (or another machine that has direct access to the Hub's ports).
@@ -24,8 +24,8 b' the :command:`ipcluster` command::'
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26
26
27 Creating a ``Client`` instance
27 Creating a ``DirectView`` instance
28 ==============================
28 ==================================
29
29
30 The first step is to import the IPython :mod:`IPython.parallel`
30 The first step is to import the IPython :mod:`IPython.parallel`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
@@ -117,10 +117,10 b' two decorators:'
117 .. sourcecode:: ipython
117 .. sourcecode:: ipython
118
118
119 In [10]: @dview.remote(block=True)
119 In [10]: @dview.remote(block=True)
120 ...: def getpid():
120 ....: def getpid():
121 ...: import os
121 ....: import os
122 ...: return os.getpid()
122 ....: return os.getpid()
123 ...:
123 ....:
124
124
125 In [11]: getpid()
125 In [11]: getpid()
126 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
@@ -135,8 +135,8 b' operations and distribute them, reconstructing the result.'
135 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
136
136
137 In [14]: @dview.parallel(block=True)
137 In [14]: @dview.parallel(block=True)
138 ...: def pmul(A,B):
138 ....: def pmul(A,B):
139 ...: return A*B
139 ....: return A*B
140
140
141 In [15]: C_local = A*A
141 In [15]: C_local = A*A
142
142
@@ -183,6 +183,8 b' dv.track : bool'
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 edit in-place. You need to know when it becomes safe to edit the buffer
184 edit in-place. You need to know when it becomes safe to edit the buffer
185 without corrupting the message.
185 without corrupting the message.
186 dv.targets : int, list of ints
187 which targets this view is associated with.
186
188
187
189
188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
190 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
@@ -260,10 +262,10 b' local Python/IPython session:'
260
262
261 # define our function
263 # define our function
262 In [6]: def wait(t):
264 In [6]: def wait(t):
263 ...: import time
265 ....: import time
264 ...: tic = time.time()
266 ....: tic = time.time()
265 ...: time.sleep(t)
267 ....: time.sleep(t)
266 ...: return time.time()-tic
268 ....: return time.time()-tic
267
269
268 # In non-blocking mode
270 # In non-blocking mode
269 In [7]: ar = dview.apply_async(wait, 2)
271 In [7]: ar = dview.apply_async(wait, 2)
@@ -326,7 +328,7 b' and blocks until all of the associated results are ready:'
326 The ``block`` and ``targets`` keyword arguments and attributes
328 The ``block`` and ``targets`` keyword arguments and attributes
327 --------------------------------------------------------------
329 --------------------------------------------------------------
328
330
329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
331 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
330 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
332 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 blocking mode and which engines the command is applied to. The :class:`View` class also has
333 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
334 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
@@ -362,11 +364,6 b' The :attr:`block` and :attr:`targets` instance attributes of the'
362 Parallel magic commands
364 Parallel magic commands
363 -----------------------
365 -----------------------
364
366
365 .. warning::
366
367 The magics have not been changed to work with the zeromq system. The
368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369
370 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
367 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 that make it more pleasant to execute Python commands on the engines
368 that make it more pleasant to execute Python commands on the engines
372 interactively. These are simply shortcuts to :meth:`execute` and
369 interactively. These are simply shortcuts to :meth:`execute` and
@@ -376,9 +373,6 b' Python command on the engines specified by the :attr:`targets` attribute of the'
376
373
377 .. sourcecode:: ipython
374 .. sourcecode:: ipython
378
375
379 # load the parallel magic extension:
380 In [21]: %load_ext parallelmagic
381
382 # Create a DirectView for all targets
376 # Create a DirectView for all targets
383 In [22]: dv = rc[:]
377 In [22]: dv = rc[:]
384
378
@@ -387,10 +381,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
387
381
388 In [24]: dv.block=True
382 In [24]: dv.block=True
389
383
390 In [25]: import numpy
384 # import numpy here and everywhere
391
385 In [25]: with dv.sync_imports():
392 In [26]: %px import numpy
386 ....: import numpy
393 Parallel execution on engines: [0, 1, 2, 3]
387 importing numpy on engine(s)
394
388
395 In [27]: %px a = numpy.random.rand(2,2)
389 In [27]: %px a = numpy.random.rand(2,2)
396 Parallel execution on engines: [0, 1, 2, 3]
390 Parallel execution on engines: [0, 1, 2, 3]
@@ -400,10 +394,10 b' Python command on the engines specified by the :attr:`targets` attribute of the'
400
394
401 In [28]: dv['ev']
395 In [28]: dv['ev']
402 Out[28]: [ array([ 1.09522024, -0.09645227]),
396 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 array([ 1.21435496, -0.35546712]),
397 ....: array([ 1.21435496, -0.35546712]),
404 array([ 0.72180653, 0.07133042]),
398 ....: array([ 0.72180653, 0.07133042]),
405 array([ 1.46384341e+00, 1.04353244e-04])
399 ....: array([ 1.46384341, 1.04353244e-04])
406 ]
400 ....: ]
407
401
408 The ``%result`` magic gets the most recent result, or takes an argument
402 The ``%result`` magic gets the most recent result, or takes an argument
409 specifying the index of the result to be requested. It is simply a shortcut to the
403 specifying the index of the result to be requested. It is simply a shortcut to the
@@ -415,9 +409,9 b' specifying the index of the result to be requested. It is simply a shortcut to t'
415
409
416 In [30]: %result
410 In [30]: %result
417 Out[30]: [ [ 1.28167017 0.14197338],
411 Out[30]: [ [ 1.28167017 0.14197338],
418 [-0.14093616 1.27877273],
412 ....: [-0.14093616 1.27877273],
419 [-0.37023573 1.06779409],
413 ....: [-0.37023573 1.06779409],
420 [ 0.83664764 -0.25602658] ]
414 ....: [ 0.83664764 -0.25602658] ]
421
415
422 The ``%autopx`` magic switches to a mode where everything you type is executed
416 The ``%autopx`` magic switches to a mode where everything you type is executed
423 on the engines given by the :attr:`targets` attribute:
417 on the engines given by the :attr:`targets` attribute:
@@ -452,9 +446,9 b' on the engines given by the :attr:`targets` attribute:'
452
446
453 In [37]: dv['ans']
447 In [37]: dv['ans']
454 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
448 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 'Average max eigenvalue is: 10.2076902286',
449 ....: 'Average max eigenvalue is: 10.2076902286',
456 'Average max eigenvalue is: 10.1891484655',
450 ....: 'Average max eigenvalue is: 10.1891484655',
457 'Average max eigenvalue is: 10.1158837784',]
451 ....: 'Average max eigenvalue is: 10.1158837784',]
458
452
459
453
460 Moving Python objects around
454 Moving Python objects around
@@ -522,7 +516,7 b' follow that terminology. However, it is important to remember that in'
522 IPython's :class:`Client` class, :meth:`scatter` is from the
516 IPython's :class:`Client` class, :meth:`scatter` is from the
523 interactive IPython session to the engines and :meth:`gather` is from the
517 interactive IPython session to the engines and :meth:`gather` is from the
524 engines back to the interactive IPython session. For scatter/gather operations
518 engines back to the interactive IPython session. For scatter/gather operations
525 between engines, MPI should be used:
519 between engines, MPI, pyzmq, or some other direct interconnect should be used.
526
520
527 .. sourcecode:: ipython
521 .. sourcecode:: ipython
528
522
@@ -568,7 +562,7 b" created by a DirectView's :meth:`sync_imports` method:"
568 .. sourcecode:: ipython
562 .. sourcecode:: ipython
569
563
570 In [69]: with dview.sync_imports():
564 In [69]: with dview.sync_imports():
571 ...: import numpy
565 ....: import numpy
572 importing numpy on engine(s)
566 importing numpy on engine(s)
573
567
574 Any imports made inside the block will also be performed on the view's engines.
568 Any imports made inside the block will also be performed on the view's engines.
@@ -588,15 +582,15 b' execution, and will fail with an UnmetDependencyError.'
588 In [69]: from IPython.parallel import require
582 In [69]: from IPython.parallel import require
589
583
590 In [70]: @require('re'):
584 In [70]: @require('re'):
591 ...: def findall(pat, x):
585 ....: def findall(pat, x):
592 ...: # re is guaranteed to be available
586 ....: # re is guaranteed to be available
593 ...: return re.findall(pat, x)
587 ....: return re.findall(pat, x)
594
588
595 # you can also pass modules themselves, that you already have locally:
589 # you can also pass modules themselves, that you already have locally:
596 In [71]: @require(time):
590 In [71]: @require(time):
597 ...: def wait(t):
591 ....: def wait(t):
598 ...: time.sleep(t)
592 ....: time.sleep(t)
599 ...: return t
593 ....: return t
600
594
601 .. _parallel_exceptions:
595 .. _parallel_exceptions:
602
596
@@ -141,9 +141,39 b' Using various batch systems with :command:`ipcluster`'
141
141
142 :command:`ipcluster` has a notion of Launchers that can start controllers
142 :command:`ipcluster` has a notion of Launchers that can start controllers
143 and engines with various remote execution schemes. Currently supported
143 and engines with various remote execution schemes. Currently supported
144 models include :command:`ssh`, :command:`mpiexec`, PBS-style (Torque, SGE),
144 models include :command:`ssh`, :command:`mpiexec`, PBS-style (Torque, SGE, LSF),
145 and Windows HPC Server.
145 and Windows HPC Server.
146
146
147 In general, these are configured by the :attr:`IPClusterEngines.engine_set_launcher_class`,
148 and :attr:`IPClusterStart.controller_launcher_class` configurables, which can be the
149 fully specified object name (e.g. ``'IPython.parallel.apps.launcher.LocalControllerLauncher'``),
150 but if you are using IPython's builtin launchers, you can specify just the class name,
151 or even just the prefix e.g:
152
153 .. sourcecode:: python
154
155 c.IPClusterEngines.engine_launcher_class = 'SSH'
156 # equivalent to
157 c.IPClusterEngines.engine_launcher_class = 'SSHEngineSetLauncher'
158 # both of which expand to
159 c.IPClusterEngines.engine_launcher_class = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
160
161 The shortest form being of particular use on the command line, where all you need to do to
162 get an IPython cluster running with engines started with MPI is:
163
164 .. sourcecode:: bash
165
166 $> ipcluster start --engines=MPIExec
167
168 Assuming that the default MPI config is sufficient.
169
170 .. note::
171
172 shortcuts for builtin launcher names were added in 0.12, as was the ``_class`` suffix
173 on the configurable names. If you use the old 0.11 names (e.g. ``engine_set_launcher``),
174 they will still work, but you will get a deprecation warning that the name has changed.
175
176
147 .. note::
177 .. note::
148
178
149 The Launchers and configuration are designed in such a way that advanced
179 The Launchers and configuration are designed in such a way that advanced
@@ -170,7 +200,7 b' There, instruct ipcluster to use the MPIExec launchers by adding the lines:'
170
200
171 .. sourcecode:: python
201 .. sourcecode:: python
172
202
173 c.IPClusterEngines.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
203 c.IPClusterEngines.engine_launcher_class = 'MPIExecEngineSetLauncher'
174
204
175 If the default MPI configuration is correct, then you can now start your cluster, with::
205 If the default MPI configuration is correct, then you can now start your cluster, with::
176
206
@@ -185,7 +215,7 b' If you have a reason to also start the Controller with mpi, you can specify:'
185
215
186 .. sourcecode:: python
216 .. sourcecode:: python
187
217
188 c.IPClusterStart.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
218 c.IPClusterStart.controller_launcher_class = 'MPIExecControllerLauncher'
189
219
190 .. note::
220 .. note::
191
221
@@ -226,10 +256,8 b' and engines:'
226
256
227 .. sourcecode:: python
257 .. sourcecode:: python
228
258
229 c.IPClusterStart.controller_launcher = \
259 c.IPClusterStart.controller_launcher_class = 'PBSControllerLauncher'
230 'IPython.parallel.apps.launcher.PBSControllerLauncher'
260 c.IPClusterEngines.engine_launcher_class = 'PBSEngineSetLauncher'
231 c.IPClusterEngines.engine_launcher = \
232 'IPython.parallel.apps.launcher.PBSEngineSetLauncher'
233
261
234 .. note::
262 .. note::
235
263
@@ -355,12 +383,11 b' To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:'
355
383
356 .. sourcecode:: python
384 .. sourcecode:: python
357
385
358 c.IPClusterEngines.engine_launcher = \
386 c.IPClusterEngines.engine_launcher_class = 'SSHEngineSetLauncher'
359 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
360 # and if the Controller is also to be remote:
387 # and if the Controller is also to be remote:
361 c.IPClusterStart.controller_launcher = \
388 c.IPClusterStart.controller_launcher_class = 'SSHControllerLauncher'
362 'IPython.parallel.apps.launcher.SSHControllerLauncher'
389
363
390
364
391
365 The controller's remote location and configuration can be specified:
392 The controller's remote location and configuration can be specified:
366
393
@@ -29,8 +29,8 b' the :command:`ipcluster` command::'
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
30 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``LoadBalancedView`` instance
33 ==============================
33 ========================================
34
34
35 The first step is to import the IPython :mod:`IPython.parallel`
35 The first step is to import the IPython :mod:`IPython.parallel`
36 module and then create a :class:`.Client` instance, and we will also be using
36 module and then create a :class:`.Client` instance, and we will also be using
@@ -87,12 +87,12 b' To load-balance :meth:`map`,simply use a LoadBalancedView:'
87
87
88 In [62]: lview.block = True
88 In [62]: lview.block = True
89
89
90 In [63]: serial_result = map(lambda x:x**10, range(32))
90 In [63]: serial_result = map(lambda x:x**10, range(32))
91
91
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93
93
94 In [65]: serial_result==parallel_result
94 In [65]: serial_result==parallel_result
95 Out[65]: True
95 Out[65]: True
96
96
97 Parallel function decorator
97 Parallel function decorator
98 ---------------------------
98 ---------------------------
@@ -111,6 +111,27 b' that turns any Python function into a parallel function:'
111 In [11]: f.map(range(32)) # this is done in parallel
111 In [11]: f.map(range(32)) # this is done in parallel
112 Out[11]: [0.0,10.0,160.0,...]
112 Out[11]: [0.0,10.0,160.0,...]
113
113
114 .. _parallel_taskmap:
115
116 Map results are iterable!
117 -------------------------
118
119 When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult`
120 object), you can actually iterate through them, and act on the results as they arrive:
121
122 .. literalinclude:: ../../examples/parallel/itermapresult.py
123 :language: python
124 :lines: 9-34
125
126 .. seealso::
127
128 When AsyncResult or the AsyncMapResult don't provide what you need (for instance,
129 handling individual results as they arrive, but with metadata), you can always
130 just split the original result's ``msg_ids`` attribute, and handle them as you like.
131
132 For an example of this, see :file:`docs/examples/parallel/customresult.py`
133
134
114 .. _parallel_dependencies:
135 .. _parallel_dependencies:
115
136
116 Dependencies
137 Dependencies
@@ -157,8 +178,8 b' you specify are importable:'
157 .. sourcecode:: ipython
178 .. sourcecode:: ipython
158
179
159 In [10]: @require('numpy', 'zmq')
180 In [10]: @require('numpy', 'zmq')
160 ...: def myfunc():
181 ....: def myfunc():
161 ...: return dostuff()
182 ....: return dostuff()
162
183
163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
184 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
185 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
@@ -175,16 +196,16 b' will be assigned to another engine. If the dependency returns *anything other th'
175 .. sourcecode:: ipython
196 .. sourcecode:: ipython
176
197
177 In [10]: def platform_specific(plat):
198 In [10]: def platform_specific(plat):
178 ...: import sys
199 ....: import sys
179 ...: return sys.platform == plat
200 ....: return sys.platform == plat
180
201
181 In [11]: @depend(platform_specific, 'darwin')
202 In [11]: @depend(platform_specific, 'darwin')
182 ...: def mactask():
203 ....: def mactask():
183 ...: do_mac_stuff()
204 ....: do_mac_stuff()
184
205
185 In [12]: @depend(platform_specific, 'nt')
206 In [12]: @depend(platform_specific, 'nt')
186 ...: def wintask():
207 ....: def wintask():
187 ...: do_windows_stuff()
208 ....: do_windows_stuff()
188
209
189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
210 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
211 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
@@ -200,7 +221,7 b' the :class:`dependent` object that the decorators use:'
200 .. sourcecode::ipython
221 .. sourcecode::ipython
201
222
202 In [13]: def mytask(*args):
223 In [13]: def mytask(*args):
203 ...: dostuff()
224 ....: dostuff()
204
225
205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
226 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 # this is the same as decorating the declaration of mytask with @depend
227 # this is the same as decorating the declaration of mytask with @depend
@@ -213,8 +234,8 b' the :class:`dependent` object that the decorators use:'
213
234
214 # is equivalent to:
235 # is equivalent to:
215 In [17]: @depend(g, *dargs, **dkwargs)
236 In [17]: @depend(g, *dargs, **dkwargs)
216 ...: def t(a,b,c):
237 ....: def t(a,b,c):
217 ...: # contents of f
238 ....: # contents of f
218
239
219 Graph Dependencies
240 Graph Dependencies
220 ------------------
241 ------------------
@@ -278,10 +299,11 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
278
299
279 In [16]: ar2 = lview.apply(f2)
300 In [16]: ar2 = lview.apply(f2)
280
301
281 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
302 In [17]: with lview.temp_flags(after=[ar,ar2]):
282
303 ....: ar3 = lview.apply(f3)
283 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
284
304
305 In [18]: with lview.temp_flags(follow=[ar], timeout=2.5)
306 ....: ar4 = lview.apply(f3)
285
307
286 .. seealso::
308 .. seealso::
287
309
@@ -291,8 +313,6 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje'
291 onto task dependencies.
313 onto task dependencies.
292
314
293
315
294
295
296 Impossible Dependencies
316 Impossible Dependencies
297 ***********************
317 ***********************
298
318
@@ -433,7 +453,7 b' The following is an overview of how to use these classes together:'
433 2. Define some functions to be run as tasks
453 2. Define some functions to be run as tasks
434 3. Submit your tasks to using the :meth:`apply` method of your
454 3. Submit your tasks to using the :meth:`apply` method of your
435 :class:`LoadBalancedView` instance.
455 :class:`LoadBalancedView` instance.
436 4. Use :meth:`Client.get_result` to get the results of the
456 4. Use :meth:`.Client.get_result` to get the results of the
437 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
457 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
438 for and then receive the results.
458 for and then receive the results.
439
459
@@ -120,7 +120,7 b' opening a Windows Command Prompt and typing ``ipython``. This will'
120 start IPython's interactive shell and you should see something like the
120 start IPython's interactive shell and you should see something like the
121 following screenshot:
121 following screenshot:
122
122
123 .. image:: ipython_shell.*
123 .. image:: figs/ipython_shell.*
124
124
125 Starting an IPython cluster
125 Starting an IPython cluster
126 ===========================
126 ===========================
@@ -168,7 +168,7 b' You should see a number of messages printed to the screen, ending with'
168 "IPython cluster: started". The result should look something like the following
168 "IPython cluster: started". The result should look something like the following
169 screenshot:
169 screenshot:
170
170
171 .. image:: ipcluster_start.*
171 .. image:: figs/ipcluster_start.*
172
172
173 At this point, the controller and two engines are running on your local host.
173 At this point, the controller and two engines are running on your local host.
174 This configuration is useful for testing and for situations where you want to
174 This configuration is useful for testing and for situations where you want to
@@ -210,7 +210,7 b' The output of this command is shown in the screenshot below. Notice how'
210 :command:`ipcluster` prints out the location of the newly created cluster
210 :command:`ipcluster` prints out the location of the newly created cluster
211 directory.
211 directory.
212
212
213 .. image:: ipcluster_create.*
213 .. image:: figs/ipcluster_create.*
214
214
215 Configuring a cluster profile
215 Configuring a cluster profile
216 -----------------------------
216 -----------------------------
@@ -232,10 +232,8 b' will need to edit the following attributes in the file'
232
232
233 # Set these at the top of the file to tell ipcluster to use the
233 # Set these at the top of the file to tell ipcluster to use the
234 # Windows HPC job scheduler.
234 # Windows HPC job scheduler.
235 c.IPClusterStart.controller_launcher = \
235 c.IPClusterStart.controller_launcher_class = 'WindowsHPCControllerLauncher'
236 'IPython.parallel.apps.launcher.WindowsHPCControllerLauncher'
236 c.IPClusterEngines.engine_launcher_class = 'WindowsHPCEngineSetLauncher'
237 c.IPClusterEngines.engine_launcher = \
238 'IPython.parallel.apps.launcher.WindowsHPCEngineSetLauncher'
239
237
240 # Set these to the host name of the scheduler (head node) of your cluster.
238 # Set these to the host name of the scheduler (head node) of your cluster.
241 c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
239 c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
@@ -279,7 +277,7 b' must be run again to regenerate the XML job description files. The'
279 following screenshot shows what the HPC Job Manager interface looks like
277 following screenshot shows what the HPC Job Manager interface looks like
280 with a running IPython cluster.
278 with a running IPython cluster.
281
279
282 .. image:: hpc_job_manager.*
280 .. image:: figs/hpc_job_manager.*
283
281
284 Performing a simple interactive parallel computation
282 Performing a simple interactive parallel computation
285 ====================================================
283 ====================================================
@@ -330,5 +328,5 b" The :meth:`map` method has the same signature as Python's builtin :func:`map`"
330 function, but runs the calculation in parallel. More involved examples of using
328 function, but runs the calculation in parallel. More involved examples of using
331 :class:`MultiEngineClient` are provided in the examples that follow.
329 :class:`MultiEngineClient` are provided in the examples that follow.
332
330
333 .. image:: mec_simple.*
331 .. image:: figs/mec_simple.*
334
332
General Comments 0
You need to be logged in to leave comments. Login now