Show More
@@ -0,0 +1,61 b'' | |||||
|
1 | """An example for handling results in a way that AsyncMapResult doesn't provide | |||
|
2 | ||||
|
3 | Specifically, out-of-order results with some special handing of metadata. | |||
|
4 | ||||
|
5 | This just submits a bunch of jobs, waits on the results, and prints the stdout | |||
|
6 | and results of each as they finish. | |||
|
7 | ||||
|
8 | Authors | |||
|
9 | ------- | |||
|
10 | * MinRK | |||
|
11 | """ | |||
|
12 | import time | |||
|
13 | import random | |||
|
14 | ||||
|
15 | from IPython import parallel | |||
|
16 | ||||
|
17 | # create client & views | |||
|
18 | rc = parallel.Client() | |||
|
19 | dv = rc[:] | |||
|
20 | v = rc.load_balanced_view() | |||
|
21 | ||||
|
22 | ||||
|
23 | # scatter 'id', so id=0,1,2 on engines 0,1,2 | |||
|
24 | dv.scatter('id', rc.ids, flatten=True) | |||
|
25 | print dv['id'] | |||
|
26 | ||||
|
27 | ||||
|
28 | def sleep_here(count, t): | |||
|
29 | """simple function that takes args, prints a short message, sleeps for a time, and returns the same args""" | |||
|
30 | import time,sys | |||
|
31 | print "hi from engine %i" % id | |||
|
32 | sys.stdout.flush() | |||
|
33 | time.sleep(t) | |||
|
34 | return count,t | |||
|
35 | ||||
|
36 | amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2) | |||
|
37 | ||||
|
38 | pending = set(amr.msg_ids) | |||
|
39 | while pending: | |||
|
40 | try: | |||
|
41 | rc.wait(pending, 1e-3) | |||
|
42 | except parallel.TimeoutError: | |||
|
43 | # ignore timeouterrors, since they only mean that at least one isn't done | |||
|
44 | pass | |||
|
45 | # finished is the set of msg_ids that are complete | |||
|
46 | finished = pending.difference(rc.outstanding) | |||
|
47 | # update pending to exclude those that just finished | |||
|
48 | pending = pending.difference(finished) | |||
|
49 | for msg_id in finished: | |||
|
50 | # we know these are done, so don't worry about blocking | |||
|
51 | ar = rc.get_result(msg_id) | |||
|
52 | print "job id %s finished on engine %i" % (msg_id, ar.engine_id) | |||
|
53 | print "with stdout:" | |||
|
54 | print ' ' + ar.stdout.replace('\n', '\n ').rstrip() | |||
|
55 | print "and results:" | |||
|
56 | ||||
|
57 | # note that each job in a map always returns a list of length chunksize | |||
|
58 | # even if chunksize == 1 | |||
|
59 | for (count,t) in ar.result: | |||
|
60 | print " item %i: slept for %.2fs" % (count, t) | |||
|
61 |
@@ -0,0 +1,83 b'' | |||||
|
1 | """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines. | |||
|
2 | ||||
|
3 | This connects to the default cluster, or you can pass the path to your ipcontroller-client.json | |||
|
4 | ||||
|
5 | Try running this script, and then running a few jobs that print (and call sys.stdout.flush), | |||
|
6 | and you will see the print statements as they arrive, notably not waiting for the results | |||
|
7 | to finish. | |||
|
8 | ||||
|
9 | You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines, | |||
|
10 | and easily filter by message type. | |||
|
11 | ||||
|
12 | Authors | |||
|
13 | ------- | |||
|
14 | * MinRK | |||
|
15 | """ | |||
|
16 | ||||
|
17 | import os | |||
|
18 | import sys | |||
|
19 | import json | |||
|
20 | import zmq | |||
|
21 | ||||
|
22 | from IPython.zmq.session import Session | |||
|
23 | from IPython.parallel.util import disambiguate_url | |||
|
24 | from IPython.utils.py3compat import str_to_bytes | |||
|
25 | from IPython.utils.path import get_security_file | |||
|
26 | ||||
|
27 | def main(connection_file): | |||
|
28 | """watch iopub channel, and print messages""" | |||
|
29 | ||||
|
30 | ctx = zmq.Context.instance() | |||
|
31 | ||||
|
32 | with open(connection_file) as f: | |||
|
33 | cfg = json.loads(f.read()) | |||
|
34 | ||||
|
35 | location = cfg['location'] | |||
|
36 | reg_url = cfg['url'] | |||
|
37 | session = Session(key=str_to_bytes(cfg['exec_key'])) | |||
|
38 | ||||
|
39 | query = ctx.socket(zmq.DEALER) | |||
|
40 | query.connect(disambiguate_url(cfg['url'], location)) | |||
|
41 | session.send(query, "connection_request") | |||
|
42 | idents,msg = session.recv(query, mode=0) | |||
|
43 | c = msg['content'] | |||
|
44 | iopub_url = disambiguate_url(c['iopub'], location) | |||
|
45 | sub = ctx.socket(zmq.SUB) | |||
|
46 | # This will subscribe to all messages: | |||
|
47 | sub.setsockopt(zmq.SUBSCRIBE, b'') | |||
|
48 | # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout | |||
|
49 | # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes | |||
|
50 | # to everything from engine 1, but there is no way to subscribe to | |||
|
51 | # just stdout from everyone. | |||
|
52 | # multiple calls to subscribe will add subscriptions, e.g. to subscribe to | |||
|
53 | # engine 1's stderr and engine 2's stdout: | |||
|
54 | # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr') | |||
|
55 | # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout') | |||
|
56 | sub.connect(iopub_url) | |||
|
57 | while True: | |||
|
58 | try: | |||
|
59 | idents,msg = session.recv(sub, mode=0) | |||
|
60 | except KeyboardInterrupt: | |||
|
61 | return | |||
|
62 | # ident always length 1 here | |||
|
63 | topic = idents[0] | |||
|
64 | if msg['msg_type'] == 'stream': | |||
|
65 | # stdout/stderr | |||
|
66 | # stream names are in msg['content']['name'], if you want to handle | |||
|
67 | # them differently | |||
|
68 | print "%s: %s" % (topic, msg['content']['data']) | |||
|
69 | elif msg['msg_type'] == 'pyerr': | |||
|
70 | # Python traceback | |||
|
71 | c = msg['content'] | |||
|
72 | print topic + ':' | |||
|
73 | for line in c['traceback']: | |||
|
74 | # indent lines | |||
|
75 | print ' ' + line | |||
|
76 | ||||
|
77 | if __name__ == '__main__': | |||
|
78 | if len(sys.argv) > 1: | |||
|
79 | cf = sys.argv[1] | |||
|
80 | else: | |||
|
81 | # This gets the security file for the default profile: | |||
|
82 | cf = get_security_file('ipcontroller-client.json') | |||
|
83 | main(cf) No newline at end of file |
@@ -0,0 +1,65 b'' | |||||
|
1 | """Example of iteration through AsyncMapResults, without waiting for all results | |||
|
2 | ||||
|
3 | When you call view.map(func, sequence), you will receive a special AsyncMapResult | |||
|
4 | object. These objects are used to reconstruct the results of the split call. | |||
|
5 | One feature AsyncResults provide is that they are iterable *immediately*, so | |||
|
6 | you can iterate through the actual results as they complete. | |||
|
7 | ||||
|
8 | This is useful if you submit a large number of tasks that may take some time, | |||
|
9 | but want to perform logic on elements in the result, or even abort subsequent | |||
|
10 | tasks in cases where you are searching for the first affirmative result. | |||
|
11 | ||||
|
12 | By default, the results will match the ordering of the submitted sequence, but | |||
|
13 | if you call `map(...ordered=False)`, then results will be provided to the iterator | |||
|
14 | on a first come first serve basis. | |||
|
15 | ||||
|
16 | Authors | |||
|
17 | ------- | |||
|
18 | * MinRK | |||
|
19 | """ | |||
|
20 | import time | |||
|
21 | ||||
|
22 | from IPython import parallel | |||
|
23 | ||||
|
24 | # create client & view | |||
|
25 | rc = parallel.Client() | |||
|
26 | dv = rc[:] | |||
|
27 | v = rc.load_balanced_view() | |||
|
28 | ||||
|
29 | # scatter 'id', so id=0,1,2 on engines 0,1,2 | |||
|
30 | dv.scatter('id', rc.ids, flatten=True) | |||
|
31 | print "Engine IDs: ", dv['id'] | |||
|
32 | ||||
|
33 | # create a Reference to `id`. This will be a different value on each engine | |||
|
34 | ref = parallel.Reference('id') | |||
|
35 | print "sleeping for `id` seconds on each engine" | |||
|
36 | tic = time.time() | |||
|
37 | ar = dv.apply(time.sleep, ref) | |||
|
38 | for i,r in enumerate(ar): | |||
|
39 | print "%i: %.3f"%(i, time.time()-tic) | |||
|
40 | ||||
|
41 | def sleep_here(t): | |||
|
42 | import time | |||
|
43 | time.sleep(t) | |||
|
44 | return id,t | |||
|
45 | ||||
|
46 | # one call per task | |||
|
47 | print "running with one call per task" | |||
|
48 | amr = v.map(sleep_here, [.01*t for t in range(100)]) | |||
|
49 | tic = time.time() | |||
|
50 | for i,r in enumerate(amr): | |||
|
51 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |||
|
52 | ||||
|
53 | print "running with four calls per task" | |||
|
54 | # with chunksize, we can have four calls per task | |||
|
55 | amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4) | |||
|
56 | tic = time.time() | |||
|
57 | for i,r in enumerate(amr): | |||
|
58 | print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic) | |||
|
59 | ||||
|
60 | print "running with two calls per task, with unordered results" | |||
|
61 | # We can even iterate through faster results first, with ordered=False | |||
|
62 | amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2) | |||
|
63 | tic = time.time() | |||
|
64 | for i,r in enumerate(amr): | |||
|
65 | print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic) |
1 | NO CONTENT: new file 100644, binary diff hidden |
|
NO CONTENT: new file 100644, binary diff hidden |
@@ -38,7 +38,7 b' from IPython.core.profiledir import ProfileDir' | |||||
38 | from IPython.utils.daemonize import daemonize |
|
38 | from IPython.utils.daemonize import daemonize | |
39 | from IPython.utils.importstring import import_item |
|
39 | from IPython.utils.importstring import import_item | |
40 | from IPython.utils.sysinfo import num_cpus |
|
40 | from IPython.utils.sysinfo import num_cpus | |
41 | from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, |
|
41 | from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, Any, | |
42 | DottedObjectName) |
|
42 | DottedObjectName) | |
43 |
|
43 | |||
44 | from IPython.parallel.apps.baseapp import ( |
|
44 | from IPython.parallel.apps.baseapp import ( | |
@@ -233,6 +233,12 b' class IPClusterEngines(BaseParallelApplication):' | |||||
233 | help="""The number of engines to start. The default is to use one for each |
|
233 | help="""The number of engines to start. The default is to use one for each | |
234 | CPU on your machine""") |
|
234 | CPU on your machine""") | |
235 |
|
235 | |||
|
236 | engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class") | |||
|
237 | def _engine_launcher_changed(self, name, old, new): | |||
|
238 | if isinstance(new, basestring): | |||
|
239 | self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12," | |||
|
240 | " use engine_launcher_class" % self.__class__.__name__) | |||
|
241 | self.engine_launcher_class = new | |||
236 | engine_launcher_class = DottedObjectName('LocalEngineSetLauncher', |
|
242 | engine_launcher_class = DottedObjectName('LocalEngineSetLauncher', | |
237 | config=True, |
|
243 | config=True, | |
238 | help="""The class for launching a set of Engines. Change this value |
|
244 | help="""The class for launching a set of Engines. Change this value | |
@@ -249,11 +255,22 b' class IPClusterEngines(BaseParallelApplication):' | |||||
249 | MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment |
|
255 | MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment | |
250 | PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue |
|
256 | PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue | |
251 | SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue |
|
257 | SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue | |
|
258 | LSFEngineSetLauncher : use LSF (bsub) to submit engines to a batch queue | |||
252 | SSHEngineSetLauncher : use SSH to start the controller |
|
259 | SSHEngineSetLauncher : use SSH to start the controller | |
253 | Note that SSH does *not* move the connection files |
|
260 | Note that SSH does *not* move the connection files | |
254 | around, so you will likely have to do this manually |
|
261 | around, so you will likely have to do this manually | |
255 | unless the machines are on a shared file system. |
|
262 | unless the machines are on a shared file system. | |
256 | WindowsHPCEngineSetLauncher : use Windows HPC |
|
263 | WindowsHPCEngineSetLauncher : use Windows HPC | |
|
264 | ||||
|
265 | If you are using one of IPython's builtin launchers, you can specify just the | |||
|
266 | prefix, e.g: | |||
|
267 | ||||
|
268 | c.IPClusterEngines.engine_launcher_class = 'SSH' | |||
|
269 | ||||
|
270 | or: | |||
|
271 | ||||
|
272 | ipcluster start --engines 'MPIExec' | |||
|
273 | ||||
257 | """ |
|
274 | """ | |
258 | ) |
|
275 | ) | |
259 | daemonize = Bool(False, config=True, |
|
276 | daemonize = Bool(False, config=True, | |
@@ -265,9 +282,11 b' class IPClusterEngines(BaseParallelApplication):' | |||||
265 | if new: |
|
282 | if new: | |
266 | self.log_to_file = True |
|
283 | self.log_to_file = True | |
267 |
|
284 | |||
|
285 | early_shutdown = Int(30, config=True, help="The timeout (in seconds)") | |||
|
286 | _stopping = False | |||
|
287 | ||||
268 | aliases = Dict(engine_aliases) |
|
288 | aliases = Dict(engine_aliases) | |
269 | flags = Dict(engine_flags) |
|
289 | flags = Dict(engine_flags) | |
270 | _stopping = False |
|
|||
271 |
|
290 | |||
272 | @catch_config_error |
|
291 | @catch_config_error | |
273 | def initialize(self, argv=None): |
|
292 | def initialize(self, argv=None): | |
@@ -277,7 +296,6 b' class IPClusterEngines(BaseParallelApplication):' | |||||
277 |
|
296 | |||
278 | def init_launchers(self): |
|
297 | def init_launchers(self): | |
279 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') |
|
298 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | |
280 | self.engine_launcher.on_stop(lambda r: self.loop.stop()) |
|
|||
281 |
|
299 | |||
282 | def init_signal(self): |
|
300 | def init_signal(self): | |
283 | # Setup signals |
|
301 | # Setup signals | |
@@ -304,13 +322,42 b' class IPClusterEngines(BaseParallelApplication):' | |||||
304 | ) |
|
322 | ) | |
305 | return launcher |
|
323 | return launcher | |
306 |
|
324 | |||
|
325 | def engines_started_ok(self): | |||
|
326 | self.log.info("Engines appear to have started successfully") | |||
|
327 | self.early_shutdown = 0 | |||
|
328 | ||||
307 | def start_engines(self): |
|
329 | def start_engines(self): | |
308 | self.log.info("Starting %i engines"%self.n) |
|
330 | self.log.info("Starting %i engines"%self.n) | |
309 | self.engine_launcher.start(self.n) |
|
331 | self.engine_launcher.start(self.n) | |
|
332 | self.engine_launcher.on_stop(self.engines_stopped_early) | |||
|
333 | if self.early_shutdown: | |||
|
334 | ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start() | |||
|
335 | ||||
|
336 | def engines_stopped_early(self, r): | |||
|
337 | if self.early_shutdown and not self._stopping: | |||
|
338 | self.log.error(""" | |||
|
339 | Engines shutdown early, they probably failed to connect. | |||
|
340 | ||||
|
341 | Check the engine log files for output. | |||
|
342 | ||||
|
343 | If your controller and engines are not on the same machine, you probably | |||
|
344 | have to instruct the controller to listen on an interface other than localhost. | |||
|
345 | ||||
|
346 | You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args. | |||
|
347 | ||||
|
348 | Be sure to read our security docs before instructing your controller to listen on | |||
|
349 | a public interface. | |||
|
350 | """) | |||
|
351 | self.stop_launchers() | |||
|
352 | ||||
|
353 | return self.engines_stopped(r) | |||
|
354 | ||||
|
355 | def engines_stopped(self, r): | |||
|
356 | return self.loop.stop() | |||
310 |
|
357 | |||
311 | def stop_engines(self): |
|
358 | def stop_engines(self): | |
312 | self.log.info("Stopping Engines...") |
|
|||
313 | if self.engine_launcher.running: |
|
359 | if self.engine_launcher.running: | |
|
360 | self.log.info("Stopping Engines...") | |||
314 | d = self.engine_launcher.stop() |
|
361 | d = self.engine_launcher.stop() | |
315 | return d |
|
362 | return d | |
316 | else: |
|
363 | else: | |
@@ -322,7 +369,7 b' class IPClusterEngines(BaseParallelApplication):' | |||||
322 | self.log.error("IPython cluster: stopping") |
|
369 | self.log.error("IPython cluster: stopping") | |
323 | self.stop_engines() |
|
370 | self.stop_engines() | |
324 | # Wait a few seconds to let things shut down. |
|
371 | # Wait a few seconds to let things shut down. | |
325 |
dc = ioloop.DelayedCallback(self.loop.stop, |
|
372 | dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) | |
326 | dc.start() |
|
373 | dc.start() | |
327 |
|
374 | |||
328 | def sigint_handler(self, signum, frame): |
|
375 | def sigint_handler(self, signum, frame): | |
@@ -394,6 +441,13 b' class IPClusterStart(IPClusterEngines):' | |||||
394 | delay = CFloat(1., config=True, |
|
441 | delay = CFloat(1., config=True, | |
395 | help="delay (in s) between starting the controller and the engines") |
|
442 | help="delay (in s) between starting the controller and the engines") | |
396 |
|
443 | |||
|
444 | controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class") | |||
|
445 | def _controller_launcher_changed(self, name, old, new): | |||
|
446 | if isinstance(new, basestring): | |||
|
447 | # old 0.11-style config | |||
|
448 | self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12," | |||
|
449 | " use controller_launcher_class" % self.__class__.__name__) | |||
|
450 | self.controller_launcher_class = new | |||
397 | controller_launcher_class = DottedObjectName('LocalControllerLauncher', |
|
451 | controller_launcher_class = DottedObjectName('LocalControllerLauncher', | |
398 | config=True, |
|
452 | config=True, | |
399 | help="""The class for launching a Controller. Change this value if you want |
|
453 | help="""The class for launching a Controller. Change this value if you want | |
@@ -408,8 +462,19 b' class IPClusterStart(IPClusterEngines):' | |||||
408 | MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe |
|
462 | MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe | |
409 | PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue |
|
463 | PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue | |
410 | SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue |
|
464 | SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue | |
|
465 | LSFControllerLauncher : use LSF (bsub) to submit engines to a batch queue | |||
411 | SSHControllerLauncher : use SSH to start the controller |
|
466 | SSHControllerLauncher : use SSH to start the controller | |
412 | WindowsHPCControllerLauncher : use Windows HPC |
|
467 | WindowsHPCControllerLauncher : use Windows HPC | |
|
468 | ||||
|
469 | If you are using one of IPython's builtin launchers, you can specify just the | |||
|
470 | prefix, e.g: | |||
|
471 | ||||
|
472 | c.IPClusterStart.controller_launcher_class = 'SSH' | |||
|
473 | ||||
|
474 | or: | |||
|
475 | ||||
|
476 | ipcluster start --controller 'MPIExec' | |||
|
477 | ||||
413 | """ |
|
478 | """ | |
414 | ) |
|
479 | ) | |
415 | reset = Bool(False, config=True, |
|
480 | reset = Bool(False, config=True, | |
@@ -423,7 +488,11 b' class IPClusterStart(IPClusterEngines):' | |||||
423 | self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller') |
|
488 | self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller') | |
424 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') |
|
489 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | |
425 | self.controller_launcher.on_stop(self.stop_launchers) |
|
490 | self.controller_launcher.on_stop(self.stop_launchers) | |
426 |
|
491 | |||
|
492 | def engines_stopped(self, r): | |||
|
493 | """prevent parent.engines_stopped from stopping everything on engine shutdown""" | |||
|
494 | pass | |||
|
495 | ||||
427 | def start_controller(self): |
|
496 | def start_controller(self): | |
428 | self.controller_launcher.start() |
|
497 | self.controller_launcher.start() | |
429 |
|
498 |
@@ -55,7 +55,7 b' from IPython.parallel.controller.hub import HubFactory' | |||||
55 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
55 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | |
56 | from IPython.parallel.controller.sqlitedb import SQLiteDB |
|
56 | from IPython.parallel.controller.sqlitedb import SQLiteDB | |
57 |
|
57 | |||
58 | from IPython.parallel.util import signal_children, split_url, asbytes |
|
58 | from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url | |
59 |
|
59 | |||
60 | # conditional import of MongoDB backend class |
|
60 | # conditional import of MongoDB backend class | |
61 |
|
61 | |||
@@ -290,13 +290,15 b' class IPControllerApp(BaseParallelApplication):' | |||||
290 | mq = import_item(str(self.mq_class)) |
|
290 | mq = import_item(str(self.mq_class)) | |
291 |
|
291 | |||
292 | hub = self.factory |
|
292 | hub = self.factory | |
293 | # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url |
|
293 | # disambiguate url, in case of * | |
|
294 | monitor_url = disambiguate_url(hub.monitor_url) | |||
|
295 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url | |||
294 | # IOPub relay (in a Process) |
|
296 | # IOPub relay (in a Process) | |
295 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
297 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | |
296 | q.bind_in(hub.client_info['iopub']) |
|
298 | q.bind_in(hub.client_info['iopub']) | |
297 | q.bind_out(hub.engine_info['iopub']) |
|
299 | q.bind_out(hub.engine_info['iopub']) | |
298 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
300 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | |
299 |
q.connect_mon( |
|
301 | q.connect_mon(monitor_url) | |
300 | q.daemon=True |
|
302 | q.daemon=True | |
301 | children.append(q) |
|
303 | children.append(q) | |
302 |
|
304 | |||
@@ -305,7 +307,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
305 | q.bind_in(hub.client_info['mux']) |
|
307 | q.bind_in(hub.client_info['mux']) | |
306 | q.setsockopt_in(zmq.IDENTITY, b'mux') |
|
308 | q.setsockopt_in(zmq.IDENTITY, b'mux') | |
307 | q.bind_out(hub.engine_info['mux']) |
|
309 | q.bind_out(hub.engine_info['mux']) | |
308 |
q.connect_mon( |
|
310 | q.connect_mon(monitor_url) | |
309 | q.daemon=True |
|
311 | q.daemon=True | |
310 | children.append(q) |
|
312 | children.append(q) | |
311 |
|
313 | |||
@@ -314,7 +316,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
314 | q.bind_in(hub.client_info['control']) |
|
316 | q.bind_in(hub.client_info['control']) | |
315 | q.setsockopt_in(zmq.IDENTITY, b'control') |
|
317 | q.setsockopt_in(zmq.IDENTITY, b'control') | |
316 | q.bind_out(hub.engine_info['control']) |
|
318 | q.bind_out(hub.engine_info['control']) | |
317 |
q.connect_mon( |
|
319 | q.connect_mon(monitor_url) | |
318 | q.daemon=True |
|
320 | q.daemon=True | |
319 | children.append(q) |
|
321 | children.append(q) | |
320 | try: |
|
322 | try: | |
@@ -329,7 +331,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
329 | q.bind_in(hub.client_info['task'][1]) |
|
331 | q.bind_in(hub.client_info['task'][1]) | |
330 | q.setsockopt_in(zmq.IDENTITY, b'task') |
|
332 | q.setsockopt_in(zmq.IDENTITY, b'task') | |
331 | q.bind_out(hub.engine_info['task']) |
|
333 | q.bind_out(hub.engine_info['task']) | |
332 |
q.connect_mon( |
|
334 | q.connect_mon(monitor_url) | |
333 | q.daemon=True |
|
335 | q.daemon=True | |
334 | children.append(q) |
|
336 | children.append(q) | |
335 | elif scheme == 'none': |
|
337 | elif scheme == 'none': | |
@@ -338,7 +340,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
338 | else: |
|
340 | else: | |
339 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
341 | self.log.info("task::using Python %s Task scheduler"%scheme) | |
340 | sargs = (hub.client_info['task'][1], hub.engine_info['task'], |
|
342 | sargs = (hub.client_info['task'][1], hub.engine_info['task'], | |
341 |
|
|
343 | monitor_url, disambiguate_url(hub.client_info['notification'])) | |
342 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
344 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
343 | log_url = self.log_url, config=dict(self.config)) |
|
345 | log_url = self.log_url, config=dict(self.config)) | |
344 | if 'Process' in self.mq_class: |
|
346 | if 'Process' in self.mq_class: |
@@ -633,7 +633,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):' | |||||
633 | d = el.start(user=user, hostname=host) |
|
633 | d = el.start(user=user, hostname=host) | |
634 | if i==0: |
|
634 | if i==0: | |
635 | self.log.info("Starting SSHEngineSetLauncher: %r" % el.args) |
|
635 | self.log.info("Starting SSHEngineSetLauncher: %r" % el.args) | |
636 |
self.launchers[host |
|
636 | self.launchers[ "%s/%i" % (host,i) ] = el | |
637 | dlist.append(d) |
|
637 | dlist.append(d) | |
638 | self.notify_start(dlist) |
|
638 | self.notify_start(dlist) | |
639 | return dlist |
|
639 | return dlist |
@@ -62,6 +62,7 b' class AsyncResult(object):' | |||||
62 | self._tracker = tracker |
|
62 | self._tracker = tracker | |
63 | self._ready = False |
|
63 | self._ready = False | |
64 | self._success = None |
|
64 | self._success = None | |
|
65 | self._metadata = None | |||
65 | if len(msg_ids) == 1: |
|
66 | if len(msg_ids) == 1: | |
66 | self._single_result = not isinstance(targets, (list, tuple)) |
|
67 | self._single_result = not isinstance(targets, (list, tuple)) | |
67 | else: |
|
68 | else: | |
@@ -231,13 +232,13 b' class AsyncResult(object):' | |||||
231 | else: |
|
232 | else: | |
232 | raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) |
|
233 | raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |
233 |
|
234 | |||
234 | @check_ready |
|
|||
235 | def __getattr__(self, key): |
|
235 | def __getattr__(self, key): | |
236 | """getattr maps to getitem for convenient attr access to metadata.""" |
|
236 | """getattr maps to getitem for convenient attr access to metadata.""" | |
237 | if key not in self._metadata[0].keys(): |
|
237 | try: | |
|
238 | return self.__getitem__(key) | |||
|
239 | except (error.TimeoutError, KeyError): | |||
238 | raise AttributeError("%r object has no attribute %r"%( |
|
240 | raise AttributeError("%r object has no attribute %r"%( | |
239 | self.__class__.__name__, key)) |
|
241 | self.__class__.__name__, key)) | |
240 | return self.__getitem__(key) |
|
|||
241 |
|
242 | |||
242 | # asynchronous iterator: |
|
243 | # asynchronous iterator: | |
243 | def __iter__(self): |
|
244 | def __iter__(self): | |
@@ -261,12 +262,19 b' class AsyncMapResult(AsyncResult):' | |||||
261 | """Class for representing results of non-blocking gathers. |
|
262 | """Class for representing results of non-blocking gathers. | |
262 |
|
263 | |||
263 | This will properly reconstruct the gather. |
|
264 | This will properly reconstruct the gather. | |
|
265 | ||||
|
266 | This class is iterable at any time, and will wait on results as they come. | |||
|
267 | ||||
|
268 | If ordered=False, then the first results to arrive will come first, otherwise | |||
|
269 | results will be yielded in the order they were submitted. | |||
|
270 | ||||
264 | """ |
|
271 | """ | |
265 |
|
272 | |||
266 | def __init__(self, client, msg_ids, mapObject, fname=''): |
|
273 | def __init__(self, client, msg_ids, mapObject, fname='', ordered=True): | |
267 | AsyncResult.__init__(self, client, msg_ids, fname=fname) |
|
274 | AsyncResult.__init__(self, client, msg_ids, fname=fname) | |
268 | self._mapObject = mapObject |
|
275 | self._mapObject = mapObject | |
269 | self._single_result = False |
|
276 | self._single_result = False | |
|
277 | self.ordered = ordered | |||
270 |
|
278 | |||
271 | def _reconstruct_result(self, res): |
|
279 | def _reconstruct_result(self, res): | |
272 | """Perform the gather on the actual results.""" |
|
280 | """Perform the gather on the actual results.""" | |
@@ -274,6 +282,13 b' class AsyncMapResult(AsyncResult):' | |||||
274 |
|
282 | |||
275 | # asynchronous iterator: |
|
283 | # asynchronous iterator: | |
276 | def __iter__(self): |
|
284 | def __iter__(self): | |
|
285 | it = self._ordered_iter if self.ordered else self._unordered_iter | |||
|
286 | for r in it(): | |||
|
287 | yield r | |||
|
288 | ||||
|
289 | # asynchronous ordered iterator: | |||
|
290 | def _ordered_iter(self): | |||
|
291 | """iterator for results *as they arrive*, preserving submission order.""" | |||
277 | try: |
|
292 | try: | |
278 | rlist = self.get(0) |
|
293 | rlist = self.get(0) | |
279 | except error.TimeoutError: |
|
294 | except error.TimeoutError: | |
@@ -294,6 +309,42 b' class AsyncMapResult(AsyncResult):' | |||||
294 | for r in rlist: |
|
309 | for r in rlist: | |
295 | yield r |
|
310 | yield r | |
296 |
|
311 | |||
|
312 | # asynchronous unordered iterator: | |||
|
313 | def _unordered_iter(self): | |||
|
314 | """iterator for results *as they arrive*, on FCFS basis, ignoring submission order.""" | |||
|
315 | try: | |||
|
316 | rlist = self.get(0) | |||
|
317 | except error.TimeoutError: | |||
|
318 | pending = set(self.msg_ids) | |||
|
319 | while pending: | |||
|
320 | try: | |||
|
321 | self._client.wait(pending, 1e-3) | |||
|
322 | except error.TimeoutError: | |||
|
323 | # ignore timeout error, because that only means | |||
|
324 | # *some* jobs are outstanding | |||
|
325 | pass | |||
|
326 | # update ready set with those no longer outstanding: | |||
|
327 | ready = pending.difference(self._client.outstanding) | |||
|
328 | # update pending to exclude those that are finished | |||
|
329 | pending = pending.difference(ready) | |||
|
330 | while ready: | |||
|
331 | msg_id = ready.pop() | |||
|
332 | ar = AsyncResult(self._client, msg_id, self._fname) | |||
|
333 | rlist = ar.get() | |||
|
334 | try: | |||
|
335 | for r in rlist: | |||
|
336 | yield r | |||
|
337 | except TypeError: | |||
|
338 | # flattened, not a list | |||
|
339 | # this could get broken by flattened data that returns iterables | |||
|
340 | # but most calls to map do not expose the `flatten` argument | |||
|
341 | yield rlist | |||
|
342 | else: | |||
|
343 | # already done | |||
|
344 | for r in rlist: | |||
|
345 | yield r | |||
|
346 | ||||
|
347 | ||||
297 |
|
348 | |||
298 | class AsyncHubResult(AsyncResult): |
|
349 | class AsyncHubResult(AsyncResult): | |
299 | """Class to wrap pending results that must be requested from the Hub. |
|
350 | """Class to wrap pending results that must be requested from the Hub. |
@@ -46,7 +46,7 b' def remote(view, block=None, **flags):' | |||||
46 | return remote_function |
|
46 | return remote_function | |
47 |
|
47 | |||
48 | @skip_doctest |
|
48 | @skip_doctest | |
49 | def parallel(view, dist='b', block=None, **flags): |
|
49 | def parallel(view, dist='b', block=None, ordered=True, **flags): | |
50 | """Turn a function into a parallel remote function. |
|
50 | """Turn a function into a parallel remote function. | |
51 |
|
51 | |||
52 | This method can be used for map: |
|
52 | This method can be used for map: | |
@@ -57,7 +57,7 b" def parallel(view, dist='b', block=None, **flags):" | |||||
57 | """ |
|
57 | """ | |
58 |
|
58 | |||
59 | def parallel_function(f): |
|
59 | def parallel_function(f): | |
60 | return ParallelFunction(view, f, dist=dist, block=block, **flags) |
|
60 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) | |
61 | return parallel_function |
|
61 | return parallel_function | |
62 |
|
62 | |||
63 | #-------------------------------------------------------------------------- |
|
63 | #-------------------------------------------------------------------------- | |
@@ -122,15 +122,19 b' class ParallelFunction(RemoteFunction):' | |||||
122 | to use the current `block` attribute of `view` |
|
122 | to use the current `block` attribute of `view` | |
123 | chunksize : int or None |
|
123 | chunksize : int or None | |
124 | The size of chunk to use when breaking up sequences in a load-balanced manner |
|
124 | The size of chunk to use when breaking up sequences in a load-balanced manner | |
|
125 | ordered : bool [default: True] | |||
|
126 | Whether | |||
125 | **flags : remaining kwargs are passed to View.temp_flags |
|
127 | **flags : remaining kwargs are passed to View.temp_flags | |
126 | """ |
|
128 | """ | |
127 |
|
129 | |||
128 | chunksize=None |
|
130 | chunksize=None | |
|
131 | ordered=None | |||
129 | mapObject=None |
|
132 | mapObject=None | |
130 |
|
133 | |||
131 | def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): |
|
134 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | |
132 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) |
|
135 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |
133 | self.chunksize = chunksize |
|
136 | self.chunksize = chunksize | |
|
137 | self.ordered = ordered | |||
134 |
|
138 | |||
135 | mapClass = Map.dists[dist] |
|
139 | mapClass = Map.dists[dist] | |
136 | self.mapObject = mapClass() |
|
140 | self.mapObject = mapClass() | |
@@ -186,7 +190,10 b' class ParallelFunction(RemoteFunction):' | |||||
186 |
|
190 | |||
187 | msg_ids.append(ar.msg_ids[0]) |
|
191 | msg_ids.append(ar.msg_ids[0]) | |
188 |
|
192 | |||
189 |
r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, |
|
193 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | |
|
194 | fname=self.func.__name__, | |||
|
195 | ordered=self.ordered | |||
|
196 | ) | |||
190 |
|
197 | |||
191 | if self.block: |
|
198 | if self.block: | |
192 | try: |
|
199 | try: |
@@ -992,7 +992,7 b' class LoadBalancedView(View):' | |||||
992 | @spin_after |
|
992 | @spin_after | |
993 | @save_ids |
|
993 | @save_ids | |
994 | def map(self, f, *sequences, **kwargs): |
|
994 | def map(self, f, *sequences, **kwargs): | |
995 | """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult |
|
995 | """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult | |
996 |
|
996 | |||
997 | Parallel version of builtin `map`, load-balanced by this View. |
|
997 | Parallel version of builtin `map`, load-balanced by this View. | |
998 |
|
998 | |||
@@ -1009,14 +1009,20 b' class LoadBalancedView(View):' | |||||
1009 | function to be mapped |
|
1009 | function to be mapped | |
1010 | *sequences: one or more sequences of matching length |
|
1010 | *sequences: one or more sequences of matching length | |
1011 | the sequences to be distributed and passed to `f` |
|
1011 | the sequences to be distributed and passed to `f` | |
1012 | block : bool |
|
1012 | block : bool [default self.block] | |
1013 |
whether to wait for the result or not |
|
1013 | whether to wait for the result or not | |
1014 | track : bool |
|
1014 | track : bool | |
1015 | whether to create a MessageTracker to allow the user to |
|
1015 | whether to create a MessageTracker to allow the user to | |
1016 | safely edit after arrays and buffers during non-copying |
|
1016 | safely edit after arrays and buffers during non-copying | |
1017 | sends. |
|
1017 | sends. | |
1018 | chunksize : int |
|
1018 | chunksize : int [default 1] | |
1019 |
how many elements should be in each task |
|
1019 | how many elements should be in each task. | |
|
1020 | ordered : bool [default True] | |||
|
1021 | Whether the results should be gathered as they arrive, or enforce | |||
|
1022 | the order of submission. | |||
|
1023 | ||||
|
1024 | Only applies when iterating through AsyncMapResult as results arrive. | |||
|
1025 | Has no effect when block=True. | |||
1020 |
|
1026 | |||
1021 | Returns |
|
1027 | Returns | |
1022 | ------- |
|
1028 | ------- | |
@@ -1034,6 +1040,7 b' class LoadBalancedView(View):' | |||||
1034 | # default |
|
1040 | # default | |
1035 | block = kwargs.get('block', self.block) |
|
1041 | block = kwargs.get('block', self.block) | |
1036 | chunksize = kwargs.get('chunksize', 1) |
|
1042 | chunksize = kwargs.get('chunksize', 1) | |
|
1043 | ordered = kwargs.get('ordered', True) | |||
1037 |
|
1044 | |||
1038 | keyset = set(kwargs.keys()) |
|
1045 | keyset = set(kwargs.keys()) | |
1039 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) |
|
1046 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) | |
@@ -1042,7 +1049,7 b' class LoadBalancedView(View):' | |||||
1042 |
|
1049 | |||
1043 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
1050 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
1044 |
|
1051 | |||
1045 |
pf = ParallelFunction(self, f, block=block, |
|
1052 | pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered) | |
1046 | return pf.map(*sequences) |
|
1053 | return pf.map(*sequences) | |
1047 |
|
1054 | |||
1048 | __all__ = ['LoadBalancedView', 'DirectView'] |
|
1055 | __all__ = ['LoadBalancedView', 'DirectView'] |
@@ -214,6 +214,14 b' class EngineFactory(RegistrationFactory):' | |||||
214 |
|
214 | |||
215 | def abort(self): |
|
215 | def abort(self): | |
216 | self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) |
|
216 | self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) | |
|
217 | if self.url.startswith('127.'): | |||
|
218 | self.log.fatal(""" | |||
|
219 | If the controller and engines are not on the same machine, | |||
|
220 | you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py): | |||
|
221 | c.HubFactory.ip='*' # for all interfaces, internal and external | |||
|
222 | c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see | |||
|
223 | or tunnel connections via ssh. | |||
|
224 | """) | |||
217 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) |
|
225 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | |
218 | time.sleep(1) |
|
226 | time.sleep(1) | |
219 | sys.exit(255) |
|
227 | sys.exit(255) |
@@ -70,4 +70,46 b' class AsyncResultTest(ClusterTestCase):' | |||||
70 | self.assertEquals(sorted(d.keys()), sorted(self.client.ids)) |
|
70 | self.assertEquals(sorted(d.keys()), sorted(self.client.ids)) | |
71 | for eid,r in d.iteritems(): |
|
71 | for eid,r in d.iteritems(): | |
72 | self.assertEquals(r, 5) |
|
72 | self.assertEquals(r, 5) | |
|
73 | ||||
|
74 | def test_list_amr(self): | |||
|
75 | ar = self.client.load_balanced_view().map_async(wait, [0.1]*5) | |||
|
76 | rlist = list(ar) | |||
|
77 | ||||
|
78 | def test_getattr(self): | |||
|
79 | ar = self.client[:].apply_async(wait, 0.5) | |||
|
80 | self.assertRaises(AttributeError, lambda : ar._foo) | |||
|
81 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | |||
|
82 | self.assertRaises(AttributeError, lambda : ar.foo) | |||
|
83 | self.assertRaises(AttributeError, lambda : ar.engine_id) | |||
|
84 | self.assertFalse(hasattr(ar, '__length_hint__')) | |||
|
85 | self.assertFalse(hasattr(ar, 'foo')) | |||
|
86 | self.assertFalse(hasattr(ar, 'engine_id')) | |||
|
87 | ar.get(5) | |||
|
88 | self.assertRaises(AttributeError, lambda : ar._foo) | |||
|
89 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | |||
|
90 | self.assertRaises(AttributeError, lambda : ar.foo) | |||
|
91 | self.assertTrue(isinstance(ar.engine_id, list)) | |||
|
92 | self.assertEquals(ar.engine_id, ar['engine_id']) | |||
|
93 | self.assertFalse(hasattr(ar, '__length_hint__')) | |||
|
94 | self.assertFalse(hasattr(ar, 'foo')) | |||
|
95 | self.assertTrue(hasattr(ar, 'engine_id')) | |||
|
96 | ||||
|
97 | def test_getitem(self): | |||
|
98 | ar = self.client[:].apply_async(wait, 0.5) | |||
|
99 | self.assertRaises(TimeoutError, lambda : ar['foo']) | |||
|
100 | self.assertRaises(TimeoutError, lambda : ar['engine_id']) | |||
|
101 | ar.get(5) | |||
|
102 | self.assertRaises(KeyError, lambda : ar['foo']) | |||
|
103 | self.assertTrue(isinstance(ar['engine_id'], list)) | |||
|
104 | self.assertEquals(ar.engine_id, ar['engine_id']) | |||
|
105 | ||||
|
106 | def test_single_result(self): | |||
|
107 | ar = self.client[-1].apply_async(wait, 0.5) | |||
|
108 | self.assertRaises(TimeoutError, lambda : ar['foo']) | |||
|
109 | self.assertRaises(TimeoutError, lambda : ar['engine_id']) | |||
|
110 | self.assertTrue(ar.get(5) == 0.5) | |||
|
111 | self.assertTrue(isinstance(ar['engine_id'], int)) | |||
|
112 | self.assertTrue(isinstance(ar.engine_id, int)) | |||
|
113 | self.assertEquals(ar.engine_id, ar['engine_id']) | |||
|
114 | ||||
73 |
|
115 |
@@ -58,6 +58,44 b' class TestLoadBalancedView(ClusterTestCase):' | |||||
58 | r = self.view.map_sync(f, data) |
|
58 | r = self.view.map_sync(f, data) | |
59 | self.assertEquals(r, map(f, data)) |
|
59 | self.assertEquals(r, map(f, data)) | |
60 |
|
60 | |||
|
61 | def test_map_unordered(self): | |||
|
62 | def f(x): | |||
|
63 | return x**2 | |||
|
64 | def slow_f(x): | |||
|
65 | import time | |||
|
66 | time.sleep(0.05*x) | |||
|
67 | return x**2 | |||
|
68 | data = range(16,0,-1) | |||
|
69 | reference = map(f, data) | |||
|
70 | ||||
|
71 | amr = self.view.map_async(slow_f, data, ordered=False) | |||
|
72 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | |||
|
73 | # check individual elements, retrieved as they come | |||
|
74 | # list comprehension uses __iter__ | |||
|
75 | astheycame = [ r for r in amr ] | |||
|
76 | # Ensure that at least one result came out of order: | |||
|
77 | self.assertNotEquals(astheycame, reference, "should not have preserved order") | |||
|
78 | self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted") | |||
|
79 | ||||
|
80 | def test_map_ordered(self): | |||
|
81 | def f(x): | |||
|
82 | return x**2 | |||
|
83 | def slow_f(x): | |||
|
84 | import time | |||
|
85 | time.sleep(0.05*x) | |||
|
86 | return x**2 | |||
|
87 | data = range(16,0,-1) | |||
|
88 | reference = map(f, data) | |||
|
89 | ||||
|
90 | amr = self.view.map_async(slow_f, data) | |||
|
91 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | |||
|
92 | # check individual elements, retrieved as they come | |||
|
93 | # list(amr) uses __iter__ | |||
|
94 | astheycame = list(amr) | |||
|
95 | # Ensure that results came in order | |||
|
96 | self.assertEquals(astheycame, reference) | |||
|
97 | self.assertEquals(amr.result, reference) | |||
|
98 | ||||
61 | def test_abort(self): |
|
99 | def test_abort(self): | |
62 | view = self.view |
|
100 | view = self.view | |
63 | ar = self.client[:].apply_async(time.sleep, .5) |
|
101 | ar = self.client[:].apply_async(time.sleep, .5) |
@@ -31,15 +31,14 b' class Heartbeat(Thread):' | |||||
31 | def __init__(self, context, addr=(LOCALHOST, 0)): |
|
31 | def __init__(self, context, addr=(LOCALHOST, 0)): | |
32 | Thread.__init__(self) |
|
32 | Thread.__init__(self) | |
33 | self.context = context |
|
33 | self.context = context | |
34 |
self. |
|
34 | self.ip, self.port = addr | |
35 | self.ip = addr[0] |
|
|||
36 | self.port = addr[1] |
|
|||
37 | if self.port == 0: |
|
35 | if self.port == 0: | |
38 | s = socket.socket() |
|
36 | s = socket.socket() | |
39 | s.bind(self.addr) |
|
37 | # '*' means all interfaces to 0MQ, which is '' to socket.socket | |
|
38 | s.bind(('' if self.ip == '*' else self.ip, 0)) | |||
40 | self.port = s.getsockname()[1] |
|
39 | self.port = s.getsockname()[1] | |
41 | s.close() |
|
40 | s.close() | |
42 |
|
|
41 | self.addr = (self.ip, self.port) | |
43 | self.daemon = True |
|
42 | self.daemon = True | |
44 |
|
43 | |||
45 | def run(self): |
|
44 | def run(self): |
@@ -710,6 +710,9 b' class KernelManager(HasTraits):' | |||||
710 | # The addresses for the communication channels. |
|
710 | # The addresses for the communication channels. | |
711 | connection_file = Unicode('') |
|
711 | connection_file = Unicode('') | |
712 | ip = Unicode(LOCALHOST) |
|
712 | ip = Unicode(LOCALHOST) | |
|
713 | def _ip_changed(self, name, old, new): | |||
|
714 | if new == '*': | |||
|
715 | self.ip = '0.0.0.0' | |||
713 | shell_port = Int(0) |
|
716 | shell_port = Int(0) | |
714 | iopub_port = Int(0) |
|
717 | iopub_port = Int(0) | |
715 | stdin_port = Int(0) |
|
718 | stdin_port = Int(0) |
@@ -10,7 +10,7 b' for working with Graphs is NetworkX_. Here, we will walk through a demo mapping' | |||||
10 | a nx DAG to task dependencies. |
|
10 | a nx DAG to task dependencies. | |
11 |
|
11 | |||
12 | The full script that runs this demo can be found in |
|
12 | The full script that runs this demo can be found in | |
13 |
:file:`docs/examples/ |
|
13 | :file:`docs/examples/parallel/dagdeps.py`. | |
14 |
|
14 | |||
15 | Why are DAGs good for task dependencies? |
|
15 | Why are DAGs good for task dependencies? | |
16 | ---------------------------------------- |
|
16 | ---------------------------------------- | |
@@ -30,7 +30,8 b' A Sample DAG' | |||||
30 |
|
30 | |||
31 | Here, we have a very simple 5-node DAG: |
|
31 | Here, we have a very simple 5-node DAG: | |
32 |
|
32 | |||
33 | .. figure:: simpledag.* |
|
33 | .. figure:: figs/simpledag.* | |
|
34 | :width: 600px | |||
34 |
|
35 | |||
35 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 |
|
36 | With NetworkX, an arrow is just a fattened bit on the edge. Here, we can see that task 0 | |
36 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on |
|
37 | depends on nothing, and can run immediately. 1 and 2 depend on 0; 3 depends on | |
@@ -80,7 +81,7 b' The code to generate the simple DAG:' | |||||
80 | For demonstration purposes, we have a function that generates a random DAG with a given |
|
81 | For demonstration purposes, we have a function that generates a random DAG with a given | |
81 | number of nodes and edges. |
|
82 | number of nodes and edges. | |
82 |
|
83 | |||
83 |
.. literalinclude:: ../../examples/ |
|
84 | .. literalinclude:: ../../examples/parallel/dagdeps.py | |
84 | :language: python |
|
85 | :language: python | |
85 | :lines: 20-36 |
|
86 | :lines: 20-36 | |
86 |
|
87 | |||
@@ -117,11 +118,13 b' on which it depends:' | |||||
117 | In [6]: results = {} |
|
118 | In [6]: results = {} | |
118 |
|
119 | |||
119 | In [7]: for node in G.topological_sort(): |
|
120 | In [7]: for node in G.topological_sort(): | |
120 |
...: |
|
121 | ...: # get list of AsyncResult objects from nodes | |
121 |
...: |
|
122 | ...: # leading into this one as dependencies | |
122 |
...: |
|
123 | ...: deps = [ results[n] for n in G.predecessors(node) ] | |
123 |
...: |
|
124 | ...: # submit and store AsyncResult object | |
124 |
...: |
|
125 | ...: with view.temp_flags(after=deps, block=False): | |
|
126 | ...: results[node] = view.apply_with_flags(jobs[node]) | |||
|
127 | ||||
125 |
|
128 | |||
126 | Now that we have submitted all the jobs, we can wait for the results: |
|
129 | Now that we have submitted all the jobs, we can wait for the results: | |
127 |
|
130 | |||
@@ -137,7 +140,7 b' These objects store a variety of metadata about each task, including various tim' | |||||
137 | We can validate that the dependencies were respected by checking that each task was |
|
140 | We can validate that the dependencies were respected by checking that each task was | |
138 | started after all of its predecessors were completed: |
|
141 | started after all of its predecessors were completed: | |
139 |
|
142 | |||
140 |
.. literalinclude:: ../../examples/ |
|
143 | .. literalinclude:: ../../examples/parallel/dagdeps.py | |
141 | :language: python |
|
144 | :language: python | |
142 | :lines: 64-70 |
|
145 | :lines: 64-70 | |
143 |
|
146 | |||
@@ -155,16 +158,17 b' will be at the top, and quick, small tasks will be at the bottom.' | |||||
155 | In [12]: pos = {}; colors = {} |
|
158 | In [12]: pos = {}; colors = {} | |
156 |
|
159 | |||
157 | In [12]: for node in G: |
|
160 | In [12]: for node in G: | |
158 |
|
|
161 | ....: md = results[node].metadata | |
159 |
|
|
162 | ....: start = date2num(md.started) | |
160 |
|
|
163 | ....: runtime = date2num(md.completed) - start | |
161 |
|
|
164 | ....: pos[node] = (start, runtime) | |
162 |
|
|
165 | ....: colors[node] = md.engine_id | |
163 |
|
166 | |||
164 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), |
|
167 | In [13]: nx.draw(G, pos, node_list=colors.keys(), node_color=colors.values(), | |
165 |
|
|
168 | ....: cmap=gist_rainbow) | |
166 |
|
169 | |||
167 | .. figure:: dagdeps.* |
|
170 | .. figure:: figs/dagdeps.* | |
|
171 | :width: 600px | |||
168 |
|
172 | |||
169 | Time started on x, runtime on y, and color-coded by engine-id (in this case there |
|
173 | Time started on x, runtime on y, and color-coded by engine-id (in this case there | |
170 | were four engines). Edges denote dependencies. |
|
174 | were four engines). Edges denote dependencies. |
1 | NO CONTENT: file renamed from docs/source/parallel/asian_call.pdf to docs/source/parallel/figs/asian_call.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/asian_call.pdf to docs/source/parallel/figs/asian_call.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/asian_call.png to docs/source/parallel/figs/asian_call.png |
|
NO CONTENT: file renamed from docs/source/parallel/asian_call.png to docs/source/parallel/figs/asian_call.png |
1 | NO CONTENT: file renamed from docs/source/parallel/asian_put.pdf to docs/source/parallel/figs/asian_put.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/asian_put.pdf to docs/source/parallel/figs/asian_put.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/asian_put.png to docs/source/parallel/figs/asian_put.png |
|
NO CONTENT: file renamed from docs/source/parallel/asian_put.png to docs/source/parallel/figs/asian_put.png |
1 | NO CONTENT: file renamed from docs/source/parallel/dagdeps.pdf to docs/source/parallel/figs/dagdeps.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/dagdeps.pdf to docs/source/parallel/figs/dagdeps.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/dagdeps.png to docs/source/parallel/figs/dagdeps.png |
|
NO CONTENT: file renamed from docs/source/parallel/dagdeps.png to docs/source/parallel/figs/dagdeps.png |
1 | NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.pdf to docs/source/parallel/figs/hpc_job_manager.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.pdf to docs/source/parallel/figs/hpc_job_manager.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.png to docs/source/parallel/figs/hpc_job_manager.png |
|
NO CONTENT: file renamed from docs/source/parallel/hpc_job_manager.png to docs/source/parallel/figs/hpc_job_manager.png |
1 | NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.pdf to docs/source/parallel/figs/ipcluster_create.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.pdf to docs/source/parallel/figs/ipcluster_create.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.png to docs/source/parallel/figs/ipcluster_create.png |
|
NO CONTENT: file renamed from docs/source/parallel/ipcluster_create.png to docs/source/parallel/figs/ipcluster_create.png |
1 | NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.pdf to docs/source/parallel/figs/ipcluster_start.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.pdf to docs/source/parallel/figs/ipcluster_start.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.png to docs/source/parallel/figs/ipcluster_start.png |
|
NO CONTENT: file renamed from docs/source/parallel/ipcluster_start.png to docs/source/parallel/figs/ipcluster_start.png |
1 | NO CONTENT: file renamed from docs/source/parallel/ipython_shell.pdf to docs/source/parallel/figs/ipython_shell.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/ipython_shell.pdf to docs/source/parallel/figs/ipython_shell.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/ipython_shell.png to docs/source/parallel/figs/ipython_shell.png |
|
NO CONTENT: file renamed from docs/source/parallel/ipython_shell.png to docs/source/parallel/figs/ipython_shell.png |
1 | NO CONTENT: file renamed from docs/source/parallel/mec_simple.pdf to docs/source/parallel/figs/mec_simple.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/mec_simple.pdf to docs/source/parallel/figs/mec_simple.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/mec_simple.png to docs/source/parallel/figs/mec_simple.png |
|
NO CONTENT: file renamed from docs/source/parallel/mec_simple.png to docs/source/parallel/figs/mec_simple.png |
1 | NO CONTENT: file renamed from docs/source/parallel/parallel_pi.pdf to docs/source/parallel/figs/parallel_pi.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/parallel_pi.pdf to docs/source/parallel/figs/parallel_pi.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/parallel_pi.png to docs/source/parallel/figs/parallel_pi.png |
|
NO CONTENT: file renamed from docs/source/parallel/parallel_pi.png to docs/source/parallel/figs/parallel_pi.png |
1 | NO CONTENT: file renamed from docs/source/parallel/simpledag.pdf to docs/source/parallel/figs/simpledag.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/simpledag.pdf to docs/source/parallel/figs/simpledag.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/simpledag.png to docs/source/parallel/figs/simpledag.png |
|
NO CONTENT: file renamed from docs/source/parallel/simpledag.png to docs/source/parallel/figs/simpledag.png |
1 | NO CONTENT: file renamed from docs/source/parallel/single_digits.pdf to docs/source/parallel/figs/single_digits.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/single_digits.pdf to docs/source/parallel/figs/single_digits.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/single_digits.png to docs/source/parallel/figs/single_digits.png |
|
NO CONTENT: file renamed from docs/source/parallel/single_digits.png to docs/source/parallel/figs/single_digits.png |
1 | NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.pdf to docs/source/parallel/figs/two_digit_counts.pdf |
|
NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.pdf to docs/source/parallel/figs/two_digit_counts.pdf |
1 | NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.png to docs/source/parallel/figs/two_digit_counts.png |
|
NO CONTENT: file renamed from docs/source/parallel/two_digit_counts.png to docs/source/parallel/figs/two_digit_counts.png |
@@ -4,7 +4,7 b' Parallel examples' | |||||
4 |
|
4 | |||
5 | .. note:: |
|
5 | .. note:: | |
6 |
|
6 | |||
7 | Performance numbers from ``IPython.kernel``, not newparallel. |
|
7 | Performance numbers from ``IPython.kernel``, not new ``IPython.parallel``. | |
8 |
|
8 | |||
9 | In this section we describe two more involved examples of using an IPython |
|
9 | In this section we describe two more involved examples of using an IPython | |
10 | cluster to perform a parallel computation. In these examples, we will be using |
|
10 | cluster to perform a parallel computation. In these examples, we will be using | |
@@ -27,7 +27,7 b' million digits.' | |||||
27 |
|
27 | |||
28 | In both the serial and parallel calculation we will be using functions defined |
|
28 | In both the serial and parallel calculation we will be using functions defined | |
29 | in the :file:`pidigits.py` file, which is available in the |
|
29 | in the :file:`pidigits.py` file, which is available in the | |
30 |
:file:`docs/examples/ |
|
30 | :file:`docs/examples/parallel` directory of the IPython source distribution. | |
31 | These functions provide basic facilities for working with the digits of pi and |
|
31 | These functions provide basic facilities for working with the digits of pi and | |
32 | can be loaded into IPython by putting :file:`pidigits.py` in your current |
|
32 | can be loaded into IPython by putting :file:`pidigits.py` in your current | |
33 | working directory and then doing: |
|
33 | working directory and then doing: | |
@@ -75,7 +75,7 b' The resulting plot of the single digit counts shows that each digit occurs' | |||||
75 | approximately 1,000 times, but that with only 10,000 digits the |
|
75 | approximately 1,000 times, but that with only 10,000 digits the | |
76 | statistical fluctuations are still rather large: |
|
76 | statistical fluctuations are still rather large: | |
77 |
|
77 | |||
78 | .. image:: single_digits.* |
|
78 | .. image:: figs/single_digits.* | |
79 |
|
79 | |||
80 | It is clear that to reduce the relative fluctuations in the counts, we need |
|
80 | It is clear that to reduce the relative fluctuations in the counts, we need | |
81 | to look at many more digits of pi. That brings us to the parallel calculation. |
|
81 | to look at many more digits of pi. That brings us to the parallel calculation. | |
@@ -101,13 +101,13 b' compute the two digit counts for the digits in a single file. Then in a final' | |||||
101 | step the counts from each engine will be added up. To perform this |
|
101 | step the counts from each engine will be added up. To perform this | |
102 | calculation, we will need two top-level functions from :file:`pidigits.py`: |
|
102 | calculation, we will need two top-level functions from :file:`pidigits.py`: | |
103 |
|
103 | |||
104 |
.. literalinclude:: ../../examples/ |
|
104 | .. literalinclude:: ../../examples/parallel/pi/pidigits.py | |
105 | :language: python |
|
105 | :language: python | |
106 | :lines: 47-62 |
|
106 | :lines: 47-62 | |
107 |
|
107 | |||
108 | We will also use the :func:`plot_two_digit_freqs` function to plot the |
|
108 | We will also use the :func:`plot_two_digit_freqs` function to plot the | |
109 | results. The code to run this calculation in parallel is contained in |
|
109 | results. The code to run this calculation in parallel is contained in | |
110 |
:file:`docs/examples/ |
|
110 | :file:`docs/examples/parallel/parallelpi.py`. This code can be run in parallel | |
111 | using IPython by following these steps: |
|
111 | using IPython by following these steps: | |
112 |
|
112 | |||
113 | 1. Use :command:`ipcluster` to start 15 engines. We used an 8 core (2 quad |
|
113 | 1. Use :command:`ipcluster` to start 15 engines. We used an 8 core (2 quad | |
@@ -188,7 +188,7 b' most likely and that "06" and "07" are least likely. Further analysis would' | |||||
188 | show that the relative size of the statistical fluctuations have decreased |
|
188 | show that the relative size of the statistical fluctuations have decreased | |
189 | compared to the 10,000 digit calculation. |
|
189 | compared to the 10,000 digit calculation. | |
190 |
|
190 | |||
191 | .. image:: two_digit_counts.* |
|
191 | .. image:: figs/two_digit_counts.* | |
192 |
|
192 | |||
193 |
|
193 | |||
194 | Parallel options pricing |
|
194 | Parallel options pricing | |
@@ -209,12 +209,12 b' simulation of the underlying asset price. In this example we use this approach' | |||||
209 | to price both European and Asian (path dependent) options for various strike |
|
209 | to price both European and Asian (path dependent) options for various strike | |
210 | prices and volatilities. |
|
210 | prices and volatilities. | |
211 |
|
211 | |||
212 |
The code for this example can be found in the :file:`docs/examples/ |
|
212 | The code for this example can be found in the :file:`docs/examples/parallel` | |
213 | directory of the IPython source. The function :func:`price_options` in |
|
213 | directory of the IPython source. The function :func:`price_options` in | |
214 | :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using |
|
214 | :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using | |
215 | the NumPy package and is shown here: |
|
215 | the NumPy package and is shown here: | |
216 |
|
216 | |||
217 |
.. literalinclude:: ../../examples/ |
|
217 | .. literalinclude:: ../../examples/parallel/options/mcpricer.py | |
218 | :language: python |
|
218 | :language: python | |
219 |
|
219 | |||
220 | To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, |
|
220 | To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class, | |
@@ -222,21 +222,21 b' which distributes work to the engines using dynamic load balancing. This' | |||||
222 | view is a wrapper of the :class:`Client` class shown in |
|
222 | view is a wrapper of the :class:`Client` class shown in | |
223 | the previous example. The parallel calculation using :class:`LoadBalancedView` can |
|
223 | the previous example. The parallel calculation using :class:`LoadBalancedView` can | |
224 | be found in the file :file:`mcpricer.py`. The code in this file creates a |
|
224 | be found in the file :file:`mcpricer.py`. The code in this file creates a | |
225 |
:class:` |
|
225 | :class:`LoadBalancedView` instance and then submits a set of tasks using | |
226 |
:meth:` |
|
226 | :meth:`LoadBalancedView.apply` that calculate the option prices for different | |
227 | volatilities and strike prices. The results are then plotted as a 2D contour |
|
227 | volatilities and strike prices. The results are then plotted as a 2D contour | |
228 | plot using Matplotlib. |
|
228 | plot using Matplotlib. | |
229 |
|
229 | |||
230 |
.. literalinclude:: ../../examples/ |
|
230 | .. literalinclude:: ../../examples/parallel/options/mckernel.py | |
231 | :language: python |
|
231 | :language: python | |
232 |
|
232 | |||
233 | To use this code, start an IPython cluster using :command:`ipcluster`, open |
|
233 | To use this code, start an IPython cluster using :command:`ipcluster`, open | |
234 |
IPython in the pylab mode with the file :file:`mc |
|
234 | IPython in the pylab mode with the file :file:`mckernel.py` in your current | |
235 | working directory and then type: |
|
235 | working directory and then type: | |
236 |
|
236 | |||
237 | .. sourcecode:: ipython |
|
237 | .. sourcecode:: ipython | |
238 |
|
238 | |||
239 |
In [7]: run mc |
|
239 | In [7]: run mckernel.py | |
240 | Submitted tasks: [0, 1, 2, ...] |
|
240 | Submitted tasks: [0, 1, 2, ...] | |
241 |
|
241 | |||
242 | Once all the tasks have finished, the results can be plotted using the |
|
242 | Once all the tasks have finished, the results can be plotted using the | |
@@ -257,9 +257,9 b' entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)' | |||||
257 | took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable |
|
257 | took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable | |
258 | to the speedup observed in our previous example. |
|
258 | to the speedup observed in our previous example. | |
259 |
|
259 | |||
260 | .. image:: asian_call.* |
|
260 | .. image:: figs/asian_call.* | |
261 |
|
261 | |||
262 | .. image:: asian_put.* |
|
262 | .. image:: figs/asian_put.* | |
263 |
|
263 | |||
264 | Conclusion |
|
264 | Conclusion | |
265 | ========== |
|
265 | ========== | |
@@ -280,5 +280,5 b' parallel architecture that have been demonstrated:' | |||||
280 |
|
280 | |||
281 | .. note:: |
|
281 | .. note:: | |
282 |
|
282 | |||
283 | The newparallel code has never been run on Windows HPC Server, so the last |
|
283 | The new parallel code has never been run on Windows HPC Server, so the last | |
284 | conclusion is untested. |
|
284 | conclusion is untested. |
@@ -60,6 +60,10 b' the ``I`` in IPython. The following are some example usage cases for IPython:' | |||||
60 | Architecture overview |
|
60 | Architecture overview | |
61 | ===================== |
|
61 | ===================== | |
62 |
|
62 | |||
|
63 | .. figure:: figs/wideView.png | |||
|
64 | :width: 300px | |||
|
65 | ||||
|
66 | ||||
63 | The IPython architecture consists of four components: |
|
67 | The IPython architecture consists of four components: | |
64 |
|
68 | |||
65 | * The IPython engine. |
|
69 | * The IPython engine. | |
@@ -99,7 +103,7 b' same machine as the Hub, but can be run anywhere from local threads or on remote' | |||||
99 | The controller also provides a single point of contact for users who wish to |
|
103 | The controller also provides a single point of contact for users who wish to | |
100 | utilize the engines connected to the controller. There are different ways of |
|
104 | utilize the engines connected to the controller. There are different ways of | |
101 | working with a controller. In IPython, all of these models are implemented via |
|
105 | working with a controller. In IPython, all of these models are implemented via | |
102 |
the |
|
106 | the :meth:`.View.apply` method, after | |
103 | constructing :class:`.View` objects to represent subsets of engines. The two |
|
107 | constructing :class:`.View` objects to represent subsets of engines. The two | |
104 | primary models for interacting with engines are: |
|
108 | primary models for interacting with engines are: | |
105 |
|
109 | |||
@@ -181,6 +185,34 b' ipcontroller-client.json' | |||||
181 | but since the controller may listen on different ports for clients and |
|
185 | but since the controller may listen on different ports for clients and | |
182 | engines, it is stored separately. |
|
186 | engines, it is stored separately. | |
183 |
|
187 | |||
|
188 | ipcontroller-client.json will look something like this, under default localhost | |||
|
189 | circumstances: | |||
|
190 | ||||
|
191 | .. sourcecode:: python | |||
|
192 | ||||
|
193 | { | |||
|
194 | "url":"tcp:\/\/127.0.0.1:54424", | |||
|
195 | "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130", | |||
|
196 | "ssh":"", | |||
|
197 | "location":"10.19.1.135" | |||
|
198 | } | |||
|
199 | ||||
|
200 | If, however, you are running the controller on a work node on a cluster, you will likely | |||
|
201 | need to use ssh tunnels to connect clients from your laptop to it. You will also | |||
|
202 | probably need to instruct the controller to listen for engines coming from other work nodes | |||
|
203 | on the cluster. An example of ipcontroller-client.json, as created by:: | |||
|
204 | ||||
|
205 | $> ipcontroller --ip=0.0.0.0 --ssh=login.mycluster.com | |||
|
206 | ||||
|
207 | ||||
|
208 | .. sourcecode:: python | |||
|
209 | ||||
|
210 | { | |||
|
211 | "url":"tcp:\/\/*:54424", | |||
|
212 | "exec_key":"a361fe89-92fc-4762-9767-e2f0a05e3130", | |||
|
213 | "ssh":"login.mycluster.com", | |||
|
214 | "location":"10.0.0.2" | |||
|
215 | } | |||
184 | More details of how these JSON files are used are given below. |
|
216 | More details of how these JSON files are used are given below. | |
185 |
|
217 | |||
186 | A detailed description of the security model and its implementation in IPython |
|
218 | A detailed description of the security model and its implementation in IPython | |
@@ -248,7 +280,7 b' then you would connect to it with:' | |||||
248 |
|
280 | |||
249 | .. sourcecode:: ipython |
|
281 | .. sourcecode:: ipython | |
250 |
|
282 | |||
251 | In [2]: c = Client(sshserver='myhub.example.com') |
|
283 | In [2]: c = Client('/path/to/my/ipcontroller-client.json', sshserver='me@myhub.example.com') | |
252 |
|
284 | |||
253 | Where 'myhub.example.com' is the url or IP address of the machine on |
|
285 | Where 'myhub.example.com' is the url or IP address of the machine on | |
254 | which the Hub process is running (or another machine that has direct access to the Hub's ports). |
|
286 | which the Hub process is running (or another machine that has direct access to the Hub's ports). |
@@ -24,8 +24,8 b' the :command:`ipcluster` command::' | |||||
24 | For more detailed information about starting the controller and engines, see |
|
24 | For more detailed information about starting the controller and engines, see | |
25 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. |
|
25 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. | |
26 |
|
26 | |||
27 |
Creating a `` |
|
27 | Creating a ``DirectView`` instance | |
28 | ============================== |
|
28 | ================================== | |
29 |
|
29 | |||
30 | The first step is to import the IPython :mod:`IPython.parallel` |
|
30 | The first step is to import the IPython :mod:`IPython.parallel` | |
31 | module and then create a :class:`.Client` instance: |
|
31 | module and then create a :class:`.Client` instance: | |
@@ -117,10 +117,10 b' two decorators:' | |||||
117 | .. sourcecode:: ipython |
|
117 | .. sourcecode:: ipython | |
118 |
|
118 | |||
119 | In [10]: @dview.remote(block=True) |
|
119 | In [10]: @dview.remote(block=True) | |
120 |
|
|
120 | ....: def getpid(): | |
121 |
|
|
121 | ....: import os | |
122 |
|
|
122 | ....: return os.getpid() | |
123 |
|
|
123 | ....: | |
124 |
|
124 | |||
125 | In [11]: getpid() |
|
125 | In [11]: getpid() | |
126 | Out[11]: [12345, 12346, 12347, 12348] |
|
126 | Out[11]: [12345, 12346, 12347, 12348] | |
@@ -135,8 +135,8 b' operations and distribute them, reconstructing the result.' | |||||
135 | In [13]: A = np.random.random((64,48)) |
|
135 | In [13]: A = np.random.random((64,48)) | |
136 |
|
136 | |||
137 | In [14]: @dview.parallel(block=True) |
|
137 | In [14]: @dview.parallel(block=True) | |
138 |
|
|
138 | ....: def pmul(A,B): | |
139 |
|
|
139 | ....: return A*B | |
140 |
|
140 | |||
141 | In [15]: C_local = A*A |
|
141 | In [15]: C_local = A*A | |
142 |
|
142 | |||
@@ -183,6 +183,8 b' dv.track : bool' | |||||
183 | This is primarily useful for non-copying sends of numpy arrays that you plan to |
|
183 | This is primarily useful for non-copying sends of numpy arrays that you plan to | |
184 | edit in-place. You need to know when it becomes safe to edit the buffer |
|
184 | edit in-place. You need to know when it becomes safe to edit the buffer | |
185 | without corrupting the message. |
|
185 | without corrupting the message. | |
|
186 | dv.targets : int, list of ints | |||
|
187 | which targets this view is associated with. | |||
186 |
|
188 | |||
187 |
|
189 | |||
188 | Creating a view is simple: index-access on a client creates a :class:`.DirectView`. |
|
190 | Creating a view is simple: index-access on a client creates a :class:`.DirectView`. | |
@@ -260,10 +262,10 b' local Python/IPython session:' | |||||
260 |
|
262 | |||
261 | # define our function |
|
263 | # define our function | |
262 | In [6]: def wait(t): |
|
264 | In [6]: def wait(t): | |
263 |
|
|
265 | ....: import time | |
264 |
|
|
266 | ....: tic = time.time() | |
265 |
|
|
267 | ....: time.sleep(t) | |
266 |
|
|
268 | ....: return time.time()-tic | |
267 |
|
269 | |||
268 | # In non-blocking mode |
|
270 | # In non-blocking mode | |
269 | In [7]: ar = dview.apply_async(wait, 2) |
|
271 | In [7]: ar = dview.apply_async(wait, 2) | |
@@ -326,7 +328,7 b' and blocks until all of the associated results are ready:' | |||||
326 | The ``block`` and ``targets`` keyword arguments and attributes |
|
328 | The ``block`` and ``targets`` keyword arguments and attributes | |
327 | -------------------------------------------------------------- |
|
329 | -------------------------------------------------------------- | |
328 |
|
330 | |||
329 |
Most DirectView methods (excluding :meth:`apply` |
|
331 | Most DirectView methods (excluding :meth:`apply`) accept ``block`` and | |
330 | ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the |
|
332 | ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the | |
331 | blocking mode and which engines the command is applied to. The :class:`View` class also has |
|
333 | blocking mode and which engines the command is applied to. The :class:`View` class also has | |
332 | :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword |
|
334 | :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword | |
@@ -362,11 +364,6 b' The :attr:`block` and :attr:`targets` instance attributes of the' | |||||
362 | Parallel magic commands |
|
364 | Parallel magic commands | |
363 | ----------------------- |
|
365 | ----------------------- | |
364 |
|
366 | |||
365 | .. warning:: |
|
|||
366 |
|
||||
367 | The magics have not been changed to work with the zeromq system. The |
|
|||
368 | magics do work, but *do not* print stdin/out like they used to in IPython.kernel. |
|
|||
369 |
|
||||
370 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) |
|
367 | We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) | |
371 | that make it more pleasant to execute Python commands on the engines |
|
368 | that make it more pleasant to execute Python commands on the engines | |
372 | interactively. These are simply shortcuts to :meth:`execute` and |
|
369 | interactively. These are simply shortcuts to :meth:`execute` and | |
@@ -376,9 +373,6 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||||
376 |
|
373 | |||
377 | .. sourcecode:: ipython |
|
374 | .. sourcecode:: ipython | |
378 |
|
375 | |||
379 | # load the parallel magic extension: |
|
|||
380 | In [21]: %load_ext parallelmagic |
|
|||
381 |
|
||||
382 | # Create a DirectView for all targets |
|
376 | # Create a DirectView for all targets | |
383 | In [22]: dv = rc[:] |
|
377 | In [22]: dv = rc[:] | |
384 |
|
378 | |||
@@ -387,10 +381,10 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||||
387 |
|
381 | |||
388 | In [24]: dv.block=True |
|
382 | In [24]: dv.block=True | |
389 |
|
383 | |||
390 | In [25]: import numpy |
|
384 | # import numpy here and everywhere | |
391 |
|
385 | In [25]: with dv.sync_imports(): | ||
392 |
|
|
386 | ....: import numpy | |
393 | Parallel execution on engines: [0, 1, 2, 3] |
|
387 | importing numpy on engine(s) | |
394 |
|
388 | |||
395 | In [27]: %px a = numpy.random.rand(2,2) |
|
389 | In [27]: %px a = numpy.random.rand(2,2) | |
396 | Parallel execution on engines: [0, 1, 2, 3] |
|
390 | Parallel execution on engines: [0, 1, 2, 3] | |
@@ -400,10 +394,10 b' Python command on the engines specified by the :attr:`targets` attribute of the' | |||||
400 |
|
394 | |||
401 | In [28]: dv['ev'] |
|
395 | In [28]: dv['ev'] | |
402 | Out[28]: [ array([ 1.09522024, -0.09645227]), |
|
396 | Out[28]: [ array([ 1.09522024, -0.09645227]), | |
403 |
|
|
397 | ....: array([ 1.21435496, -0.35546712]), | |
404 |
|
|
398 | ....: array([ 0.72180653, 0.07133042]), | |
405 |
|
|
399 | ....: array([ 1.46384341, 1.04353244e-04]) | |
406 |
|
|
400 | ....: ] | |
407 |
|
401 | |||
408 | The ``%result`` magic gets the most recent result, or takes an argument |
|
402 | The ``%result`` magic gets the most recent result, or takes an argument | |
409 | specifying the index of the result to be requested. It is simply a shortcut to the |
|
403 | specifying the index of the result to be requested. It is simply a shortcut to the | |
@@ -415,9 +409,9 b' specifying the index of the result to be requested. It is simply a shortcut to t' | |||||
415 |
|
409 | |||
416 | In [30]: %result |
|
410 | In [30]: %result | |
417 | Out[30]: [ [ 1.28167017 0.14197338], |
|
411 | Out[30]: [ [ 1.28167017 0.14197338], | |
418 |
|
|
412 | ....: [-0.14093616 1.27877273], | |
419 |
|
|
413 | ....: [-0.37023573 1.06779409], | |
420 |
|
|
414 | ....: [ 0.83664764 -0.25602658] ] | |
421 |
|
415 | |||
422 | The ``%autopx`` magic switches to a mode where everything you type is executed |
|
416 | The ``%autopx`` magic switches to a mode where everything you type is executed | |
423 | on the engines given by the :attr:`targets` attribute: |
|
417 | on the engines given by the :attr:`targets` attribute: | |
@@ -452,9 +446,9 b' on the engines given by the :attr:`targets` attribute:' | |||||
452 |
|
446 | |||
453 | In [37]: dv['ans'] |
|
447 | In [37]: dv['ans'] | |
454 | Out[37]: [ 'Average max eigenvalue is: 10.1387247332', |
|
448 | Out[37]: [ 'Average max eigenvalue is: 10.1387247332', | |
455 |
|
|
449 | ....: 'Average max eigenvalue is: 10.2076902286', | |
456 |
|
|
450 | ....: 'Average max eigenvalue is: 10.1891484655', | |
457 |
|
|
451 | ....: 'Average max eigenvalue is: 10.1158837784',] | |
458 |
|
452 | |||
459 |
|
453 | |||
460 | Moving Python objects around |
|
454 | Moving Python objects around | |
@@ -522,7 +516,7 b' follow that terminology. However, it is important to remember that in' | |||||
522 | IPython's :class:`Client` class, :meth:`scatter` is from the |
|
516 | IPython's :class:`Client` class, :meth:`scatter` is from the | |
523 | interactive IPython session to the engines and :meth:`gather` is from the |
|
517 | interactive IPython session to the engines and :meth:`gather` is from the | |
524 | engines back to the interactive IPython session. For scatter/gather operations |
|
518 | engines back to the interactive IPython session. For scatter/gather operations | |
525 |
between engines, MPI should be used |
|
519 | between engines, MPI, pyzmq, or some other direct interconnect should be used. | |
526 |
|
520 | |||
527 | .. sourcecode:: ipython |
|
521 | .. sourcecode:: ipython | |
528 |
|
522 | |||
@@ -568,7 +562,7 b" created by a DirectView's :meth:`sync_imports` method:" | |||||
568 | .. sourcecode:: ipython |
|
562 | .. sourcecode:: ipython | |
569 |
|
563 | |||
570 | In [69]: with dview.sync_imports(): |
|
564 | In [69]: with dview.sync_imports(): | |
571 |
|
|
565 | ....: import numpy | |
572 | importing numpy on engine(s) |
|
566 | importing numpy on engine(s) | |
573 |
|
567 | |||
574 | Any imports made inside the block will also be performed on the view's engines. |
|
568 | Any imports made inside the block will also be performed on the view's engines. | |
@@ -588,15 +582,15 b' execution, and will fail with an UnmetDependencyError.' | |||||
588 | In [69]: from IPython.parallel import require |
|
582 | In [69]: from IPython.parallel import require | |
589 |
|
583 | |||
590 | In [70]: @require('re'): |
|
584 | In [70]: @require('re'): | |
591 |
|
|
585 | ....: def findall(pat, x): | |
592 |
|
|
586 | ....: # re is guaranteed to be available | |
593 |
|
|
587 | ....: return re.findall(pat, x) | |
594 |
|
588 | |||
595 | # you can also pass modules themselves, that you already have locally: |
|
589 | # you can also pass modules themselves, that you already have locally: | |
596 | In [71]: @require(time): |
|
590 | In [71]: @require(time): | |
597 |
|
|
591 | ....: def wait(t): | |
598 |
|
|
592 | ....: time.sleep(t) | |
599 |
|
|
593 | ....: return t | |
600 |
|
594 | |||
601 | .. _parallel_exceptions: |
|
595 | .. _parallel_exceptions: | |
602 |
|
596 |
@@ -141,9 +141,39 b' Using various batch systems with :command:`ipcluster`' | |||||
141 |
|
141 | |||
142 | :command:`ipcluster` has a notion of Launchers that can start controllers |
|
142 | :command:`ipcluster` has a notion of Launchers that can start controllers | |
143 | and engines with various remote execution schemes. Currently supported |
|
143 | and engines with various remote execution schemes. Currently supported | |
144 | models include :command:`ssh`, :command:`mpiexec`, PBS-style (Torque, SGE), |
|
144 | models include :command:`ssh`, :command:`mpiexec`, PBS-style (Torque, SGE, LSF), | |
145 | and Windows HPC Server. |
|
145 | and Windows HPC Server. | |
146 |
|
146 | |||
|
147 | In general, these are configured by the :attr:`IPClusterEngines.engine_set_launcher_class`, | |||
|
148 | and :attr:`IPClusterStart.controller_launcher_class` configurables, which can be the | |||
|
149 | fully specified object name (e.g. ``'IPython.parallel.apps.launcher.LocalControllerLauncher'``), | |||
|
150 | but if you are using IPython's builtin launchers, you can specify just the class name, | |||
|
151 | or even just the prefix e.g: | |||
|
152 | ||||
|
153 | .. sourcecode:: python | |||
|
154 | ||||
|
155 | c.IPClusterEngines.engine_launcher_class = 'SSH' | |||
|
156 | # equivalent to | |||
|
157 | c.IPClusterEngines.engine_launcher_class = 'SSHEngineSetLauncher' | |||
|
158 | # both of which expand to | |||
|
159 | c.IPClusterEngines.engine_launcher_class = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher' | |||
|
160 | ||||
|
161 | The shortest form being of particular use on the command line, where all you need to do to | |||
|
162 | get an IPython cluster running with engines started with MPI is: | |||
|
163 | ||||
|
164 | .. sourcecode:: bash | |||
|
165 | ||||
|
166 | $> ipcluster start --engines=MPIExec | |||
|
167 | ||||
|
168 | Assuming that the default MPI config is sufficient. | |||
|
169 | ||||
|
170 | .. note:: | |||
|
171 | ||||
|
172 | shortcuts for builtin launcher names were added in 0.12, as was the ``_class`` suffix | |||
|
173 | on the configurable names. If you use the old 0.11 names (e.g. ``engine_set_launcher``), | |||
|
174 | they will still work, but you will get a deprecation warning that the name has changed. | |||
|
175 | ||||
|
176 | ||||
147 | .. note:: |
|
177 | .. note:: | |
148 |
|
178 | |||
149 | The Launchers and configuration are designed in such a way that advanced |
|
179 | The Launchers and configuration are designed in such a way that advanced | |
@@ -170,7 +200,7 b' There, instruct ipcluster to use the MPIExec launchers by adding the lines:' | |||||
170 |
|
200 | |||
171 | .. sourcecode:: python |
|
201 | .. sourcecode:: python | |
172 |
|
202 | |||
173 |
c.IPClusterEngines.engine_launcher = ' |
|
203 | c.IPClusterEngines.engine_launcher_class = 'MPIExecEngineSetLauncher' | |
174 |
|
204 | |||
175 | If the default MPI configuration is correct, then you can now start your cluster, with:: |
|
205 | If the default MPI configuration is correct, then you can now start your cluster, with:: | |
176 |
|
206 | |||
@@ -185,7 +215,7 b' If you have a reason to also start the Controller with mpi, you can specify:' | |||||
185 |
|
215 | |||
186 | .. sourcecode:: python |
|
216 | .. sourcecode:: python | |
187 |
|
217 | |||
188 |
c.IPClusterStart.controller_launcher = ' |
|
218 | c.IPClusterStart.controller_launcher_class = 'MPIExecControllerLauncher' | |
189 |
|
219 | |||
190 | .. note:: |
|
220 | .. note:: | |
191 |
|
221 | |||
@@ -226,10 +256,8 b' and engines:' | |||||
226 |
|
256 | |||
227 | .. sourcecode:: python |
|
257 | .. sourcecode:: python | |
228 |
|
258 | |||
229 |
c.IPClusterStart.controller_launcher |
|
259 | c.IPClusterStart.controller_launcher_class = 'PBSControllerLauncher' | |
230 | 'IPython.parallel.apps.launcher.PBSControllerLauncher' |
|
260 | c.IPClusterEngines.engine_launcher_class = 'PBSEngineSetLauncher' | |
231 | c.IPClusterEngines.engine_launcher = \ |
|
|||
232 | 'IPython.parallel.apps.launcher.PBSEngineSetLauncher' |
|
|||
233 |
|
261 | |||
234 | .. note:: |
|
262 | .. note:: | |
235 |
|
263 | |||
@@ -355,12 +383,11 b' To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:' | |||||
355 |
|
383 | |||
356 | .. sourcecode:: python |
|
384 | .. sourcecode:: python | |
357 |
|
385 | |||
358 |
c.IPClusterEngines.engine_launcher |
|
386 | c.IPClusterEngines.engine_launcher_class = 'SSHEngineSetLauncher' | |
359 | 'IPython.parallel.apps.launcher.SSHEngineSetLauncher' |
|
|||
360 | # and if the Controller is also to be remote: |
|
387 | # and if the Controller is also to be remote: | |
361 |
c.IPClusterStart.controller_launcher |
|
388 | c.IPClusterStart.controller_launcher_class = 'SSHControllerLauncher' | |
362 | 'IPython.parallel.apps.launcher.SSHControllerLauncher' |
|
389 | ||
363 |
|
390 | |||
364 |
|
391 | |||
365 | The controller's remote location and configuration can be specified: |
|
392 | The controller's remote location and configuration can be specified: | |
366 |
|
393 |
@@ -29,8 +29,8 b' the :command:`ipcluster` command::' | |||||
29 | For more detailed information about starting the controller and engines, see |
|
29 | For more detailed information about starting the controller and engines, see | |
30 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. |
|
30 | our :ref:`introduction <parallel_overview>` to using IPython for parallel computing. | |
31 |
|
31 | |||
32 |
Creating a `` |
|
32 | Creating a ``LoadBalancedView`` instance | |
33 | ============================== |
|
33 | ======================================== | |
34 |
|
34 | |||
35 | The first step is to import the IPython :mod:`IPython.parallel` |
|
35 | The first step is to import the IPython :mod:`IPython.parallel` | |
36 | module and then create a :class:`.Client` instance, and we will also be using |
|
36 | module and then create a :class:`.Client` instance, and we will also be using | |
@@ -87,12 +87,12 b' To load-balance :meth:`map`,simply use a LoadBalancedView:' | |||||
87 |
|
87 | |||
88 | In [62]: lview.block = True |
|
88 | In [62]: lview.block = True | |
89 |
|
89 | |||
90 |
|
|
90 | In [63]: serial_result = map(lambda x:x**10, range(32)) | |
91 |
|
91 | |||
92 |
|
|
92 | In [64]: parallel_result = lview.map(lambda x:x**10, range(32)) | |
93 |
|
93 | |||
94 |
|
|
94 | In [65]: serial_result==parallel_result | |
95 |
|
|
95 | Out[65]: True | |
96 |
|
96 | |||
97 | Parallel function decorator |
|
97 | Parallel function decorator | |
98 | --------------------------- |
|
98 | --------------------------- | |
@@ -111,6 +111,27 b' that turns any Python function into a parallel function:' | |||||
111 | In [11]: f.map(range(32)) # this is done in parallel |
|
111 | In [11]: f.map(range(32)) # this is done in parallel | |
112 | Out[11]: [0.0,10.0,160.0,...] |
|
112 | Out[11]: [0.0,10.0,160.0,...] | |
113 |
|
113 | |||
|
114 | .. _parallel_taskmap: | |||
|
115 | ||||
|
116 | Map results are iterable! | |||
|
117 | ------------------------- | |||
|
118 | ||||
|
119 | When an AsyncResult object actually maps multiple results (e.g. the :class:`~AsyncMapResult` | |||
|
120 | object), you can actually iterate through them, and act on the results as they arrive: | |||
|
121 | ||||
|
122 | .. literalinclude:: ../../examples/parallel/itermapresult.py | |||
|
123 | :language: python | |||
|
124 | :lines: 9-34 | |||
|
125 | ||||
|
126 | .. seealso:: | |||
|
127 | ||||
|
128 | When AsyncResult or the AsyncMapResult don't provide what you need (for instance, | |||
|
129 | handling individual results as they arrive, but with metadata), you can always | |||
|
130 | just split the original result's ``msg_ids`` attribute, and handle them as you like. | |||
|
131 | ||||
|
132 | For an example of this, see :file:`docs/examples/parallel/customresult.py` | |||
|
133 | ||||
|
134 | ||||
114 | .. _parallel_dependencies: |
|
135 | .. _parallel_dependencies: | |
115 |
|
136 | |||
116 | Dependencies |
|
137 | Dependencies | |
@@ -157,8 +178,8 b' you specify are importable:' | |||||
157 | .. sourcecode:: ipython |
|
178 | .. sourcecode:: ipython | |
158 |
|
179 | |||
159 | In [10]: @require('numpy', 'zmq') |
|
180 | In [10]: @require('numpy', 'zmq') | |
160 |
|
|
181 | ....: def myfunc(): | |
161 |
|
|
182 | ....: return dostuff() | |
162 |
|
183 | |||
163 | Now, any time you apply :func:`myfunc`, the task will only run on a machine that has |
|
184 | Now, any time you apply :func:`myfunc`, the task will only run on a machine that has | |
164 | numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported. |
|
185 | numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported. | |
@@ -175,16 +196,16 b' will be assigned to another engine. If the dependency returns *anything other th' | |||||
175 | .. sourcecode:: ipython |
|
196 | .. sourcecode:: ipython | |
176 |
|
197 | |||
177 | In [10]: def platform_specific(plat): |
|
198 | In [10]: def platform_specific(plat): | |
178 |
|
|
199 | ....: import sys | |
179 |
|
|
200 | ....: return sys.platform == plat | |
180 |
|
201 | |||
181 | In [11]: @depend(platform_specific, 'darwin') |
|
202 | In [11]: @depend(platform_specific, 'darwin') | |
182 |
|
|
203 | ....: def mactask(): | |
183 |
|
|
204 | ....: do_mac_stuff() | |
184 |
|
205 | |||
185 | In [12]: @depend(platform_specific, 'nt') |
|
206 | In [12]: @depend(platform_specific, 'nt') | |
186 |
|
|
207 | ....: def wintask(): | |
187 |
|
|
208 | ....: do_windows_stuff() | |
188 |
|
209 | |||
189 | In this case, any time you apply ``mytask``, it will only run on an OSX machine. |
|
210 | In this case, any time you apply ``mytask``, it will only run on an OSX machine. | |
190 | ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)`` |
|
211 | ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)`` | |
@@ -200,7 +221,7 b' the :class:`dependent` object that the decorators use:' | |||||
200 | .. sourcecode::ipython |
|
221 | .. sourcecode::ipython | |
201 |
|
222 | |||
202 | In [13]: def mytask(*args): |
|
223 | In [13]: def mytask(*args): | |
203 |
|
|
224 | ....: dostuff() | |
204 |
|
225 | |||
205 | In [14]: mactask = dependent(mytask, platform_specific, 'darwin') |
|
226 | In [14]: mactask = dependent(mytask, platform_specific, 'darwin') | |
206 | # this is the same as decorating the declaration of mytask with @depend |
|
227 | # this is the same as decorating the declaration of mytask with @depend | |
@@ -213,8 +234,8 b' the :class:`dependent` object that the decorators use:' | |||||
213 |
|
234 | |||
214 | # is equivalent to: |
|
235 | # is equivalent to: | |
215 | In [17]: @depend(g, *dargs, **dkwargs) |
|
236 | In [17]: @depend(g, *dargs, **dkwargs) | |
216 |
|
|
237 | ....: def t(a,b,c): | |
217 |
|
|
238 | ....: # contents of f | |
218 |
|
239 | |||
219 | Graph Dependencies |
|
240 | Graph Dependencies | |
220 | ------------------ |
|
241 | ------------------ | |
@@ -278,10 +299,11 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||||
278 |
|
299 | |||
279 | In [16]: ar2 = lview.apply(f2) |
|
300 | In [16]: ar2 = lview.apply(f2) | |
280 |
|
301 | |||
281 |
In [17]: |
|
302 | In [17]: with lview.temp_flags(after=[ar,ar2]): | |
282 |
|
303 | ....: ar3 = lview.apply(f3) | ||
283 | In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5) |
|
|||
284 |
|
304 | |||
|
305 | In [18]: with lview.temp_flags(follow=[ar], timeout=2.5) | |||
|
306 | ....: ar4 = lview.apply(f3) | |||
285 |
|
307 | |||
286 | .. seealso:: |
|
308 | .. seealso:: | |
287 |
|
309 | |||
@@ -291,8 +313,6 b' you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje' | |||||
291 | onto task dependencies. |
|
313 | onto task dependencies. | |
292 |
|
314 | |||
293 |
|
315 | |||
294 |
|
||||
295 |
|
||||
296 | Impossible Dependencies |
|
316 | Impossible Dependencies | |
297 | *********************** |
|
317 | *********************** | |
298 |
|
318 | |||
@@ -433,7 +453,7 b' The following is an overview of how to use these classes together:' | |||||
433 | 2. Define some functions to be run as tasks |
|
453 | 2. Define some functions to be run as tasks | |
434 | 3. Submit your tasks to using the :meth:`apply` method of your |
|
454 | 3. Submit your tasks to using the :meth:`apply` method of your | |
435 | :class:`LoadBalancedView` instance. |
|
455 | :class:`LoadBalancedView` instance. | |
436 | 4. Use :meth:`Client.get_result` to get the results of the |
|
456 | 4. Use :meth:`.Client.get_result` to get the results of the | |
437 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait |
|
457 | tasks, or use the :meth:`AsyncResult.get` method of the results to wait | |
438 | for and then receive the results. |
|
458 | for and then receive the results. | |
439 |
|
459 |
@@ -120,7 +120,7 b' opening a Windows Command Prompt and typing ``ipython``. This will' | |||||
120 | start IPython's interactive shell and you should see something like the |
|
120 | start IPython's interactive shell and you should see something like the | |
121 | following screenshot: |
|
121 | following screenshot: | |
122 |
|
122 | |||
123 | .. image:: ipython_shell.* |
|
123 | .. image:: figs/ipython_shell.* | |
124 |
|
124 | |||
125 | Starting an IPython cluster |
|
125 | Starting an IPython cluster | |
126 | =========================== |
|
126 | =========================== | |
@@ -168,7 +168,7 b' You should see a number of messages printed to the screen, ending with' | |||||
168 | "IPython cluster: started". The result should look something like the following |
|
168 | "IPython cluster: started". The result should look something like the following | |
169 | screenshot: |
|
169 | screenshot: | |
170 |
|
170 | |||
171 | .. image:: ipcluster_start.* |
|
171 | .. image:: figs/ipcluster_start.* | |
172 |
|
172 | |||
173 | At this point, the controller and two engines are running on your local host. |
|
173 | At this point, the controller and two engines are running on your local host. | |
174 | This configuration is useful for testing and for situations where you want to |
|
174 | This configuration is useful for testing and for situations where you want to | |
@@ -210,7 +210,7 b' The output of this command is shown in the screenshot below. Notice how' | |||||
210 | :command:`ipcluster` prints out the location of the newly created cluster |
|
210 | :command:`ipcluster` prints out the location of the newly created cluster | |
211 | directory. |
|
211 | directory. | |
212 |
|
212 | |||
213 | .. image:: ipcluster_create.* |
|
213 | .. image:: figs/ipcluster_create.* | |
214 |
|
214 | |||
215 | Configuring a cluster profile |
|
215 | Configuring a cluster profile | |
216 | ----------------------------- |
|
216 | ----------------------------- | |
@@ -232,10 +232,8 b' will need to edit the following attributes in the file' | |||||
232 |
|
232 | |||
233 | # Set these at the top of the file to tell ipcluster to use the |
|
233 | # Set these at the top of the file to tell ipcluster to use the | |
234 | # Windows HPC job scheduler. |
|
234 | # Windows HPC job scheduler. | |
235 |
c.IPClusterStart.controller_launcher |
|
235 | c.IPClusterStart.controller_launcher_class = 'WindowsHPCControllerLauncher' | |
236 | 'IPython.parallel.apps.launcher.WindowsHPCControllerLauncher' |
|
236 | c.IPClusterEngines.engine_launcher_class = 'WindowsHPCEngineSetLauncher' | |
237 | c.IPClusterEngines.engine_launcher = \ |
|
|||
238 | 'IPython.parallel.apps.launcher.WindowsHPCEngineSetLauncher' |
|
|||
239 |
|
237 | |||
240 | # Set these to the host name of the scheduler (head node) of your cluster. |
|
238 | # Set these to the host name of the scheduler (head node) of your cluster. | |
241 | c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' |
|
239 | c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE' | |
@@ -279,7 +277,7 b' must be run again to regenerate the XML job description files. The' | |||||
279 | following screenshot shows what the HPC Job Manager interface looks like |
|
277 | following screenshot shows what the HPC Job Manager interface looks like | |
280 | with a running IPython cluster. |
|
278 | with a running IPython cluster. | |
281 |
|
279 | |||
282 | .. image:: hpc_job_manager.* |
|
280 | .. image:: figs/hpc_job_manager.* | |
283 |
|
281 | |||
284 | Performing a simple interactive parallel computation |
|
282 | Performing a simple interactive parallel computation | |
285 | ==================================================== |
|
283 | ==================================================== | |
@@ -330,5 +328,5 b" The :meth:`map` method has the same signature as Python's builtin :func:`map`" | |||||
330 | function, but runs the calculation in parallel. More involved examples of using |
|
328 | function, but runs the calculation in parallel. More involved examples of using | |
331 | :class:`MultiEngineClient` are provided in the examples that follow. |
|
329 | :class:`MultiEngineClient` are provided in the examples that follow. | |
332 |
|
330 | |||
333 | .. image:: mec_simple.* |
|
331 | .. image:: figs/mec_simple.* | |
334 |
|
332 |
General Comments 0
You need to be logged in to leave comments.
Login now