diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index e23ac0a..5db9882 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -10,8 +10,6 @@ # Imports #----------------------------------------------------------------------------- -from __future__ import print_function - import os import time from getpass import getpass @@ -57,7 +55,7 @@ def _clear(): """helper method for implementing `client.clear` via `client.apply`""" globals().clear() -def execute(code): +def _execute(code): """helper method for implementing `client.execute` via `client.apply`""" exec code in globals() @@ -79,8 +77,10 @@ def defaultblock(f, self, *args, **kwargs): block = self.block if block is None else block saveblock = self.block self.block = block - ret = f(self, *args, **kwargs) - self.block = saveblock + try: + ret = f(self, *args, **kwargs) + finally: + self.block = saveblock return ret @@ -198,6 +198,7 @@ class Client(object): results = None history = None debug = False + targets = None def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, @@ -205,6 +206,7 @@ class Client(object): if context is None: context = zmq.Context() self.context = context + self.targets = 'all' self._addr = addr self._ssh = bool(sshserver or sshkey or password) if self._ssh and sshserver is None: @@ -478,7 +480,7 @@ class Client(object): Parameters ---------- - msg_ids : int, str, or list of ints and/or strs + msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects ints are indices to self.history strs are msg_ids default: wait on all outstanding messages @@ -495,13 +497,18 @@ class Client(object): if msg_ids is None: theids = self.outstanding else: - if isinstance(msg_ids, (int, str)): + if isinstance(msg_ids, (int, str, AsyncResult)): msg_ids = [msg_ids] theids = set() for msg_id in msg_ids: if isinstance(msg_id, int): msg_id = self.history[msg_id] + elif isinstance(msg_id, AsyncResult): + map(theids.add, msg_id._msg_ids) + continue theids.add(msg_id) + if not theids.intersection(self.outstanding): + return True self.spin() while theids.intersection(self.outstanding): if timeout >= 0 and ( time.time()-tic ) > timeout: @@ -607,7 +614,7 @@ class Client(object): whether or not to wait until done to return default: self.block """ - result = self.apply(execute, (code,), targets=targets, block=block, bound=True) + result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) return result def run(self, code, block=None): @@ -796,11 +803,11 @@ class Client(object): return pf.map(*sequences) def parallel(self, bound=True, targets='all', block=True): - """Decorator for making a ParallelFunction""" + """Decorator for making a ParallelFunction.""" return parallel(self, bound=bound, targets=targets, block=block) def remote(self, bound=True, targets='all', block=True): - """Decorator for making a RemoteFunction""" + """Decorator for making a RemoteFunction.""" return remote(self, bound=bound, targets=targets, block=block) #-------------------------------------------------------------------------- @@ -816,7 +823,7 @@ class Client(object): return result @defaultblock - def pull(self, keys, targets='all', block=True): + def pull(self, keys, targets='all', block=None): """Pull objects from `target`'s namespace by `keys`""" if isinstance(keys, str): pass @@ -827,11 +834,11 @@ class Client(object): result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) return result - @defaultblock def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): """ Partition a Python sequence and send the partitions to a set of engines. """ + block = block if block is not None else self.block targets = self._build_targets(targets)[-1] mapObject = Map.dists[dist]() nparts = len(targets) @@ -839,33 +846,31 @@ class Client(object): for index, engineid in enumerate(targets): partition = mapObject.getPartition(seq, index, nparts) if flatten and len(partition) == 1: - mid = self.push({key: partition[0]}, targets=engineid, block=False) + r = self.push({key: partition[0]}, targets=engineid, block=False) else: - mid = self.push({key: partition}, targets=engineid, block=False) - msg_ids.append(mid) + r = self.push({key: partition}, targets=engineid, block=False) + msg_ids.extend(r._msg_ids) r = AsyncResult(self, msg_ids) if block: - r.wait() - return + return r.get() else: return r - @defaultblock - def gather(self, key, dist='b', targets='all', block=True): + def gather(self, key, dist='b', targets='all', block=None): """ Gather a partitioned sequence on a set of engines as a single local seq. """ + block = block if block is not None else self.block targets = self._build_targets(targets)[-1] mapObject = Map.dists[dist]() msg_ids = [] for index, engineid in enumerate(targets): - msg_ids.append(self.pull(key, targets=engineid,block=False)) + msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids) r = AsyncMapResult(self, msg_ids, mapObject) if block: - r.wait() - return r.result + return r.get() else: return r @@ -980,6 +985,35 @@ class Client(object): if content['status'] != 'ok': raise ss.unwrap_exception(content) + #---------------------------------------- + # activate for %px,%autopx magics + #---------------------------------------- + def activate(self): + """Make this `View` active for parallel magic commands. + + IPython has a magic command syntax to work with `MultiEngineClient` objects. + In a given IPython session there is a single active one. While + there can be many `Views` created and used by the user, + there is only one active one. The active `View` is used whenever + the magic commands %px and %autopx are used. + + The activate() method is called on a given `View` to make it + active. Once this has been done, the magic commands can be used. + """ + + try: + # This is injected into __builtins__. + ip = get_ipython() + except NameError: + print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." + else: + pmagic = ip.plugin_manager.get_plugin('parallelmagic') + if pmagic is not None: + pmagic.active_multiengine_client = self + else: + print "You must first load the parallelmagic extension " \ + "by doing '%load_ext parallelmagic'" + class AsynClient(Client): """An Asynchronous client, using the Tornado Event Loop. !!!unfinished!!!""" diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index c2c1f72..b597eec 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -11,7 +11,7 @@ #----------------------------------------------------------------------------- from IPython.external.decorator import decorator -from IPython.zmq.parallel.remotefunction import ParallelFunction +from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel #----------------------------------------------------------------------------- # Decorators @@ -22,8 +22,10 @@ def myblock(f, self, *args, **kwargs): """override client.block with self.block during a call""" block = self.client.block self.client.block = self.block - ret = f(self, *args, **kwargs) - self.client.block = block + try: + ret = f(self, *args, **kwargs) + finally: + self.client.block = block return ret @decorator @@ -65,7 +67,6 @@ class View(object): Don't use this class, use subclasses. """ _targets = None - _ntargets = None block=None bound=None history=None @@ -75,7 +76,7 @@ class View(object): self._targets = targets self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) self.block = client.block - self.bound=True + self.bound=False self.history = [] self.outstanding = set() self.results = {} @@ -92,7 +93,8 @@ class View(object): @targets.setter def targets(self, value): - raise AttributeError("Cannot set my targets argument after construction!") + self._targets = value + # raise AttributeError("Cannot set my targets argument after construction!") @sync_results def spin(self): @@ -185,6 +187,10 @@ class View(object): bound=True, targets=targets) return pf.map(*sequences) + def parallel(self, bound=True, block=True): + """Decorator for making a ParallelFunction""" + return parallel(self.client, bound=bound, targets=self.targets, block=block) + def abort(self, msg_ids=None, block=None): """Abort jobs on my engines. @@ -202,11 +208,12 @@ class View(object): """Fetch the Queue status of my engines""" return self.client.queue_status(targets=self.targets, verbose=verbose) - def purge_results(self, msg_ids=[],targets=[]): + def purge_results(self, msg_ids=[], targets=[]): """Instruct the controller to forget specific results.""" if targets is None or targets == 'all': targets = self.targets return self.client.purge_results(msg_ids=msg_ids, targets=targets) + class DirectView(View): @@ -219,8 +226,16 @@ class DirectView(View): >>> dv_even = client[::2] >>> dv_some = client[1:3] + This object provides dictionary access + """ + @sync_results + @save_ids + def execute(self, code, block=True): + """execute some code on my targets.""" + return self.client.execute(code, block=self.block, targets=self.targets) + def update(self, ns): """update remote namespace with dict `ns`""" return self.client.push(ns, targets=self.targets, block=self.block) @@ -234,6 +249,8 @@ class DirectView(View): # block = block if block is not None else self.block return self.client.pull(key_s, block=True, targets=self.targets) + @sync_results + @save_ids def pull(self, key_s, block=True): """get object(s) by `key_s` from remote namespace will return one object if it is a key. @@ -252,6 +269,8 @@ class DirectView(View): return self.client.scatter(key, seq, dist=dist, flatten=flatten, targets=targets, block=block) + @sync_results + @save_ids def gather(self, key, dist='b', targets=None, block=True): """ Gather a partitioned sequence on a set of engines as a single local seq. @@ -278,6 +297,36 @@ class DirectView(View): block = block if block is not None else self.block return self.client.kill(targets=self.targets, block=block) + #---------------------------------------- + # activate for %px,%autopx magics + #---------------------------------------- + def activate(self): + """Make this `View` active for parallel magic commands. + + IPython has a magic command syntax to work with `MultiEngineClient` objects. + In a given IPython session there is a single active one. While + there can be many `Views` created and used by the user, + there is only one active one. The active `View` is used whenever + the magic commands %px and %autopx are used. + + The activate() method is called on a given `View` to make it + active. Once this has been done, the magic commands can be used. + """ + + try: + # This is injected into __builtins__. + ip = get_ipython() + except NameError: + print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." + else: + pmagic = ip.plugin_manager.get_plugin('parallelmagic') + if pmagic is not None: + pmagic.active_multiengine_client = self + else: + print "You must first load the parallelmagic extension " \ + "by doing '%load_ext parallelmagic'" + + class LoadBalancedView(View): """An engine-agnostic View that only executes via the Task queue.