diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 6dce41c..39e40e6 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -29,10 +29,16 @@ from view import DirectView, LoadBalancedView from dependency import Dependency, depend, require import error +#-------------------------------------------------------------------------- +# helpers for implementing old MEC API via client.apply +#-------------------------------------------------------------------------- + def _push(ns): + """helper method for implementing `client.push` via `client.apply`""" globals().update(ns) def _pull(keys): + """helper method for implementing `client.pull` via `client.apply`""" g = globals() if isinstance(keys, (list,tuple, set)): for key in keys: @@ -45,10 +51,13 @@ def _pull(keys): return g.get(keys) def _clear(): + """helper method for implementing `client.clear` via `client.apply`""" globals().clear() def execute(code): + """helper method for implementing `client.execute` via `client.apply`""" exec code in globals() + #-------------------------------------------------------------------------- # Decorators for Client methods @@ -613,7 +622,7 @@ class Client(object): error = ss.unwrap_exception(msg['content']) if error: - return error + raise error #-------------------------------------------------------------------------- # Execution methods @@ -623,6 +632,8 @@ class Client(object): def execute(self, code, targets='all', block=None): """Executes `code` on `targets` in blocking or nonblocking manner. + ``execute`` is always `bound` (affects engine namespace) + Parameters ---------- code : str @@ -634,11 +645,7 @@ class Client(object): whether or not to wait until done to return default: self.block """ - # block = self.block if block is None else block - # saveblock = self.block - # self.block = block result = self.apply(execute, (code,), targets=targets, block=block, bound=True) - # self.block = saveblock return result def run(self, code, block=None): @@ -646,6 +653,8 @@ class Client(object): Calls to this are load-balanced. + ``run`` is never `bound` (no effect on engine namespace) + Parameters ---------- code : str @@ -908,7 +917,7 @@ class Client(object): the engines on which to execute default : all verbose : bool - whether to return lengths only, or lists of ids for each element + Whether to return lengths only, or lists of ids for each element """ targets = self._build_targets(targets)[1] content = dict(targets=targets, verbose=verbose) @@ -927,12 +936,16 @@ class Client(object): """Tell the controller to forget results. Individual results can be purged by msg_id, or the entire - history of specific targets can + history of specific targets can be purged. Parameters ---------- + msg_ids : str or list of strs + the msg_ids whose results should be forgotten. targets : int/str/list of ints/strs - the targets + The targets, by uuid or int_id, whose entire history is to be purged. + Use `targets='all'` to scrub everything from the controller's memory. + default : None """ if not targets and not msg_ids: diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index 6f2229a..af3e0c1 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -201,7 +201,7 @@ def serialize_object(obj, threshold=64e-6): def unserialize_object(bufs): - """reconstruct an object serialized by serialize_object from data buffers""" + """reconstruct an object serialized by serialize_object from data buffers.""" bufs = list(bufs) sobj = pickle.loads(bufs.pop(0)) if isinstance(sobj, (list, tuple)): @@ -402,7 +402,7 @@ class StreamSession(object): return omsg def send_raw(self, stream, msg, flags=0, copy=True, ident=None): - """Send a raw message via idents. + """Send a raw message via ident path. Parameters ---------- @@ -444,9 +444,23 @@ class StreamSession(object): raise e def feed_identities(self, msg, copy=True): - """This is a completely horrible thing, but it strips the zmq - ident prefixes off of a message. It will break if any identities - are unpackable by self.unpack.""" + """feed until DELIM is reached, then return the prefix as idents and remainder as + msg. This is easily broken by setting an IDENT to DELIM, but that would be silly. + + Parameters + ---------- + msg : a list of Message or bytes objects + the message to be split + copy : bool + flag determining whether the arguments are bytes or Messages + + Returns + ------- + (idents,msg) : two lists + idents will always be a list of bytes - the indentity prefix + msg will be a list of bytes or Messages, unchanged from input + msg should be unpackable via self.unpack_message at this point. + """ msg = list(msg) idents = [] while len(msg) > 3: diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 222f2c5..081c14e 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -1,11 +1,24 @@ -#!/usr/bin/env python -"""Views""" +"""Views of remote engines""" +#----------------------------------------------------------------------------- +# Copyright (C) 2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- from IPython.external.decorator import decorator +#----------------------------------------------------------------------------- +# Decorators +#----------------------------------------------------------------------------- @decorator def myblock(f, self, *args, **kwargs): + """override client.block with self.block during a call""" block = self.client.block self.client.block = self.block ret = f(self, *args, **kwargs) @@ -14,6 +27,7 @@ def myblock(f, self, *args, **kwargs): @decorator def save_ids(f, self, *args, **kwargs): + """Keep our history and outstanding attributes up to date after a method call.""" ret = f(self, *args, **kwargs) msg_ids = self.client.history[-self._ntargets:] self.history.extend(msg_ids) @@ -22,6 +36,7 @@ def save_ids(f, self, *args, **kwargs): @decorator def sync_results(f, self, *args, **kwargs): + """sync relevant results from self.client to our results attribute.""" ret = f(self, *args, **kwargs) delta = self.outstanding.difference(self.client.outstanding) completed = self.outstanding.intersection(delta) @@ -32,13 +47,20 @@ def sync_results(f, self, *args, **kwargs): @decorator def spin_after(f, self, *args, **kwargs): + """call spin after the method.""" ret = f(self, *args, **kwargs) self.spin() return ret +#----------------------------------------------------------------------------- +# Classes +#----------------------------------------------------------------------------- class View(object): - """Base View class""" + """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. + + Don't use this class, use subclasses. + """ _targets = None _ntargets = None block=None @@ -67,7 +89,7 @@ class View(object): @targets.setter def targets(self, value): - raise TypeError("Cannot set my targets argument after construction!") + raise AttributeError("Cannot set my targets argument after construction!") @sync_results def spin(self): @@ -175,7 +197,16 @@ class View(object): class DirectView(View): - """Direct Multiplexer View""" + """Direct Multiplexer View of one or more engines. + + These are created via indexed access to a client: + + >>> dv_1 = client[1] + >>> dv_all = client[:] + >>> dv_even = client[::2] + >>> dv_some = client[1:3] + + """ def update(self, ns): """update remote namespace with dict `ns`""" @@ -214,6 +245,19 @@ class DirectView(View): return self.client.kill(targets=self.targets, block=block) class LoadBalancedView(View): + """An engine-agnostic View that only executes via the Task queue. + + Typically created via: + + >>> lbv = client[None] + + + but can also be created with: + + >>> lbc = LoadBalancedView(client) + + TODO: allow subset of engines across which to balance. + """ def __repr__(self): return "<%s %s>"%(self.__class__.__name__, self.client._addr)