From 498a93f13d89558c689414c2020b5504a6a5f11a 2011-04-08 00:38:20 From: MinRK Date: 2011-04-08 00:38:20 Subject: [PATCH] API update involving map and load-balancing --- diff --git a/IPython/utils/newserialized.py b/IPython/utils/newserialized.py index 2b5378a..a8e9cc4 100644 --- a/IPython/utils/newserialized.py +++ b/IPython/utils/newserialized.py @@ -147,9 +147,9 @@ class UnSerializeIt(UnSerialized): if globals().has_key('numpy') and typeDescriptor == 'ndarray': result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) result.shape = self.serialized.metadata['shape'] - # This is a hack to make the array writable. We are working with + # numpy arrays with frombuffer are read-only. We are working with # the numpy folks to address this issue. - result = result.copy() + # result = result.copy() elif typeDescriptor == 'pickle': result = pickle.loads(self.serialized.getData()) elif typeDescriptor in ('bytes', 'buffer'): diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 0b5959f..4c0f598 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -21,7 +21,7 @@ from pprint import pprint pjoin = os.path.join import zmq -from zmq.eventloop import ioloop, zmqstream +# from zmq.eventloop import ioloop, zmqstream from IPython.utils.path import get_ipython_dir from IPython.external.decorator import decorator @@ -203,6 +203,7 @@ class Client(object): Attributes ---------- + ids : set of int engine IDs requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, @@ -225,17 +226,23 @@ class Client(object): Methods ------- - spin : flushes incoming results and registration state changes - control methods spin, and requesting `ids` also ensures up to date - - barrier : wait on one or more msg_ids - execution methods: apply/apply_bound/apply_to/apply_bound + spin + flushes incoming results and registration state changes + control methods spin, and requesting `ids` also ensures up to date + + barrier + wait on one or more msg_ids + + execution methods + apply legacy: execute, run - query methods: queue_status, get_result, purge + query methods + queue_status, get_result, purge - control methods: abort, kill + control methods + abort, shutdown """ @@ -265,7 +272,6 @@ class Client(object): if context is None: context = zmq.Context() self.context = context - self.targets = 'all' self._setup_cluster_dir(profile, cluster_dir, ipython_dir) if self._cd is not None: @@ -634,42 +640,18 @@ class Client(object): #-------------------------------------------------------------------------- def __getitem__(self, key): - """Dict access returns DirectView multiplexer objects or, - if key is None, a LoadBalancedView.""" - if key is None: - return LoadBalancedView(self) - if isinstance(key, int): - if key not in self.ids: - raise IndexError("No such engine: %i"%key) - return DirectView(self, key) - - if isinstance(key, slice): - indices = range(len(self.ids))[key] - ids = sorted(self._ids) - key = [ ids[i] for i in indices ] - # newkeys = sorted(self._ids)[thekeys[k]] + """index access returns DirectView multiplexer objects - if isinstance(key, (tuple, list, xrange)): - _,targets = self._build_targets(list(key)) - return DirectView(self, targets) + Must be int, slice, or list/tuple/xrange of ints""" + if not isinstance(key, (int, slice, tuple, list, xrange)): + raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) else: - raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) + return self.view(key, balanced=False) #-------------------------------------------------------------------------- # Begin public methods #-------------------------------------------------------------------------- - @property - def remote(self): - """property for convenient RemoteFunction generation. - - >>> @client.remote - ... def getpid(): - import os - return os.getpid() - """ - return remote(self, block=self.block) - def spin(self): """Flush any registration notifications and execution results waiting in the ZMQ queue. @@ -690,6 +672,7 @@ class Client(object): Parameters ---------- + msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects ints are indices to self.history strs are msg_ids @@ -700,6 +683,7 @@ class Client(object): Returns ------- + True : when all msg_ids are done False : timeout reached, some msg_ids still outstanding """ @@ -815,6 +799,7 @@ class Client(object): Parameters ---------- + code : str the code string to be executed targets : int/str/list of ints/strs @@ -824,7 +809,7 @@ class Client(object): whether or not to wait until done to return default: self.block """ - result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) + result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False) return result def run(self, filename, targets='all', block=None): @@ -834,6 +819,7 @@ class Client(object): Parameters ---------- + filename : str The path to the file targets : int/str/list of ints/strs @@ -868,7 +854,8 @@ class Client(object): return list(Dependency(dep)) @defaultblock - def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, + def apply(self, f, args=None, kwargs=None, bound=True, block=None, + targets=None, balanced=None, after=None, follow=None, timeout=None): """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. @@ -905,11 +892,34 @@ class Client(object): if int: Run on single engine - after,follow,timeout only used in `apply_balanced`. See that docstring - for details. + balanced : bool, default None + whether to load-balance. This will default to True + if targets is unspecified, or False if targets is specified. + + The following arguments are only used when balanced is True: + after : Dependency or collection of msg_ids + Only for load-balanced execution (targets=None) + Specify a list of msg_ids as a time-based dependency. + This job will only be run *after* the dependencies + have been met. + + follow : Dependency or collection of msg_ids + Only for load-balanced execution (targets=None) + Specify a list of msg_ids as a location-based dependency. + This job will only be run on an engine where this dependency + is met. + + timeout : float/int or None + Only for load-balanced execution (targets=None) + Specify an amount of time (in seconds) for the scheduler to + wait for dependencies to be met before failing with a + DependencyTimeout. + + after,follow,timeout only used if `balanced=True`. Returns ------- + if block is False: return AsyncResult wrapping msg_ids output of AsyncResult.get() is identical to that of `apply(...block=True)` @@ -921,10 +931,21 @@ class Client(object): """ # defaults: - block = block if block is not None else self.block args = args if args is not None else [] kwargs = kwargs if kwargs is not None else {} + if balanced is None: + if targets is None: + # default to balanced if targets unspecified + balanced = True + else: + # otherwise default to multiplexing + balanced = False + + if targets is None and balanced is False: + # default to all if *not* balanced, and targets is unspecified + targets = 'all' + # enforce types of f,args,kwrags if not callable(f): raise TypeError("f must be callable, not %s"%type(f)) @@ -935,80 +956,27 @@ class Client(object): options = dict(bound=bound, block=block, targets=targets) - if targets is None: - return self.apply_balanced(f, args, kwargs, timeout=timeout, + if balanced: + return self._apply_balanced(f, args, kwargs, timeout=timeout, after=after, follow=follow, **options) - else: - if follow or after or timeout: - msg = "follow, after, and timeout args are only used for load-balanced" - msg += "execution." + elif follow or after or timeout: + msg = "follow, after, and timeout args are only used for" + msg += " load-balanced execution." raise ValueError(msg) + else: return self._apply_direct(f, args, kwargs, **options) - @defaultblock - def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, + def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None, after=None, follow=None, timeout=None): """call f(*args, **kwargs) remotely in a load-balanced manner. - Parameters - ---------- - - f : function - The fuction to be called remotely - args : tuple/list - The positional arguments passed to `f` - kwargs : dict - The keyword arguments passed to `f` - bound : bool (default: True) - Whether to execute in the Engine(s) namespace, or in a clean - namespace not affecting the engine. - block : bool (default: self.block) - Whether to wait for the result, or return immediately. - False: - returns AsyncResult - True: - returns actual result(s) of f(*args, **kwargs) - if multiple targets: - list of results, matching `targets` - targets : int,list of ints, 'all', None - Specify the destination of the job. - if None: - Submit via Task queue for load-balancing. - if 'all': - Run on all active engines - if list: - Run on each specified engine - if int: - Run on single engine - - after : Dependency or collection of msg_ids - Only for load-balanced execution (targets=None) - Specify a list of msg_ids as a time-based dependency. - This job will only be run *after* the dependencies - have been met. - - follow : Dependency or collection of msg_ids - Only for load-balanced execution (targets=None) - Specify a list of msg_ids as a location-based dependency. - This job will only be run on an engine where this dependency - is met. - - timeout : float/int or None - Only for load-balanced execution (targets=None) - Specify an amount of time (in seconds) for the scheduler to - wait for dependencies to be met before failing with a - DependencyTimeout. - - Returns - ------- - if block is False: - return AsyncResult wrapping msg_id - output of AsyncResult.get() is identical to that of `apply(...block=True)` - else: - wait for, and return actual result of `f(*args, **kwargs)` - + This is a private method, see `apply` for details. + Not to be called directly! """ + for kwarg in (bound, block, targets): + assert kwarg is not None, "kwarg %r must be specified!"%kwarg + if self._task_socket is None: msg = "Task farming is disabled" if self._task_scheme == 'pure': @@ -1025,7 +993,6 @@ class Client(object): if isinstance(f, dependent): # soft warn on functional dependencies warnings.warn(msg, RuntimeWarning) - # defaults: args = args if args is not None else [] @@ -1036,14 +1003,6 @@ class Client(object): else: idents = [] - # enforce types of f,args,kwrags - if not callable(f): - raise TypeError("f must be callable, not %s"%type(f)) - if not isinstance(args, (tuple, list)): - raise TypeError("args must be tuple or list, not %s"%type(args)) - if not isinstance(kwargs, dict): - raise TypeError("kwargs must be dict, not %s"%type(kwargs)) - after = self._build_dependency(after) follow = self._build_dependency(follow) subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) @@ -1064,13 +1023,17 @@ class Client(object): else: return ar - def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): + def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None): """Then underlying method for applying functions to specific engines via the MUX queue. + This is a private method, see `apply` for details. Not to be called directly! """ + for kwarg in (bound, block, targets): + assert kwarg is not None, "kwarg %r must be specified!"%kwarg + idents,targets = self._build_targets(targets) subheader = {} @@ -1095,103 +1058,46 @@ class Client(object): return ar #-------------------------------------------------------------------------- - # Map and decorators + # decorators #-------------------------------------------------------------------------- - def map(self, f, *sequences, **kwargs): - """Parallel version of builtin `map`, using all our engines. - - `block` and `targets` can be passed as keyword arguments only. - - There will be one task per target, so work will be chunked - if the sequences are longer than `targets`. - - Results can be iterated as they are ready, but will become available in chunks. - - Parameters - ---------- - - f : callable - function to be mapped - *sequences: one or more sequences of matching length - the sequences to be distributed and passed to `f` - block : bool - whether to wait for the result or not [default self.block] - targets : valid targets - targets to be used [default self.targets] - - Returns - ------- - - if block=False: - AsyncMapResult - An object like AsyncResult, but which reassembles the sequence of results - into a single list. AsyncMapResults can be iterated through before all - results are complete. - else: - the result of map(f,*sequences) - - """ - block = kwargs.get('block', self.block) - targets = kwargs.get('targets', self.targets) - assert len(sequences) > 0, "must have some sequences to map onto!" - pf = ParallelFunction(self, f, block=block, - bound=True, targets=targets) - return pf.map(*sequences) - - def imap(self, f, *sequences, **kwargs): - """Parallel version of builtin `itertools.imap`, load-balanced across all engines. - - Each element will be a separate task, and will be load-balanced. This - lets individual elements be ready for iteration as soon as they come. - - Parameters - ---------- - - f : callable - function to be mapped - *sequences: one or more sequences of matching length - the sequences to be distributed and passed to `f` - block : bool - whether to wait for the result or not [default self.block] - - Returns - ------- - - if block=False: - AsyncMapResult - An object like AsyncResult, but which reassembles the sequence of results - into a single list. AsyncMapResults can be iterated through before all - results are complete. - else: - the result of map(f,*sequences) - - """ - - block = kwargs.get('block', self.block) - - assert len(sequences) > 0, "must have some sequences to map onto!" - - pf = ParallelFunction(self, f, block=self.block, - bound=True, targets=None) - return pf.map(*sequences) - - def parallel(self, bound=True, targets='all', block=True): + @defaultblock + def parallel(self, bound=True, targets='all', block=None): """Decorator for making a ParallelFunction.""" return parallel(self, bound=bound, targets=targets, block=block) - def remote(self, bound=True, targets='all', block=True): + @defaultblock + def remote(self, bound=True, targets='all', block=None): """Decorator for making a RemoteFunction.""" return remote(self, bound=bound, targets=targets, block=block) def view(self, targets=None, balanced=False): """Method for constructing View objects""" - if not balanced: - if not targets: + if targets is None: + if balanced: + return LoadBalancedView(client=self) + else: targets = slice(None) - return self[targets] + + if balanced: + view_class = LoadBalancedView + else: + view_class = DirectView + if isinstance(targets, int): + if targets not in self.ids: + raise IndexError("No such engine: %i"%targets) + return view_class(client=self, targets=targets) + + if isinstance(targets, slice): + indices = range(len(self.ids))[targets] + ids = sorted(self._ids) + targets = [ ids[i] for i in indices ] + + if isinstance(targets, (tuple, list, xrange)): + _,targets = self._build_targets(list(targets)) + return view_class(client=self, targets=targets) else: - return LoadBalancedView(self, targets) + raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) #-------------------------------------------------------------------------- # Data movement @@ -1202,7 +1108,7 @@ class Client(object): """Push the contents of `ns` into the namespace on `target`""" if not isinstance(ns, dict): raise TypeError("Must be a dict, not %s"%type(ns)) - result = self.apply(_push, (ns,), targets=targets, block=block, bound=True) + result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False) return result @defaultblock @@ -1214,14 +1120,14 @@ class Client(object): for key in keys: if not isinstance(key, str): raise TypeError - result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) + result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) return result + @defaultblock def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): """ Partition a Python sequence and send the partitions to a set of engines. """ - block = block if block is not None else self.block targets = self._build_targets(targets)[-1] mapObject = Map.dists[dist]() nparts = len(targets) @@ -1239,11 +1145,11 @@ class Client(object): else: return r + @defaultblock def gather(self, key, dist='b', targets='all', block=None): """ Gather a partitioned sequence on a set of engines as a single local seq. """ - block = block if block is not None else self.block targets = self._build_targets(targets)[-1] mapObject = Map.dists[dist]() @@ -1267,6 +1173,7 @@ class Client(object): Parameters ---------- + msg_ids : list of ints or msg_ids if int: Passed as index to self.history for convenience. @@ -1351,13 +1258,14 @@ class Client(object): return content @spinfirst - def queue_status(self, targets=None, verbose=False): + def queue_status(self, targets='all', verbose=False): """Fetch the status of engine queues. Parameters ---------- + targets : int/str/list of ints/strs - the engines on which to execute + the engines whose states are to be queried. default : all verbose : bool Whether to return lengths only, or lists of ids for each element @@ -1383,6 +1291,7 @@ class Client(object): Parameters ---------- + msg_ids : str or list of strs the msg_ids whose results should be forgotten. targets : int/str/list of ints/strs @@ -1404,59 +1313,6 @@ class Client(object): if content['status'] != 'ok': raise ss.unwrap_exception(content) - #---------------------------------------- - # activate for %px,%autopx magics - #---------------------------------------- - def activate(self): - """Make this `View` active for parallel magic commands. - - IPython has a magic command syntax to work with `MultiEngineClient` objects. - In a given IPython session there is a single active one. While - there can be many `Views` created and used by the user, - there is only one active one. The active `View` is used whenever - the magic commands %px and %autopx are used. - - The activate() method is called on a given `View` to make it - active. Once this has been done, the magic commands can be used. - """ - - try: - # This is injected into __builtins__. - ip = get_ipython() - except NameError: - print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." - else: - pmagic = ip.plugin_manager.get_plugin('parallelmagic') - if pmagic is not None: - pmagic.active_multiengine_client = self - else: - print "You must first load the parallelmagic extension " \ - "by doing '%load_ext parallelmagic'" - -class AsynClient(Client): - """An Asynchronous client, using the Tornado Event Loop. - !!!unfinished!!!""" - io_loop = None - _queue_stream = None - _notifier_stream = None - _task_stream = None - _control_stream = None - - def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): - Client.__init__(self, addr, context, username, debug) - if io_loop is None: - io_loop = ioloop.IOLoop.instance() - self.io_loop = io_loop - - self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop) - self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop) - self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop) - self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop) - - def spin(self): - for stream in (self.queue_stream, self.notifier_stream, - self.task_stream, self.control_stream): - stream.flush() __all__ = [ 'Client', 'depend', diff --git a/IPython/zmq/parallel/remotefunction.py b/IPython/zmq/parallel/remotefunction.py index 4035f1a..388a7e6 100644 --- a/IPython/zmq/parallel/remotefunction.py +++ b/IPython/zmq/parallel/remotefunction.py @@ -17,7 +17,7 @@ from asyncresult import AsyncMapResult # Decorators #----------------------------------------------------------------------------- -def remote(client, bound=False, block=None, targets=None): +def remote(client, bound=False, block=None, targets=None, balanced=None): """Turn a function into a remote function. This method can be used for map: @@ -26,10 +26,10 @@ def remote(client, bound=False, block=None, targets=None): def func(a) """ def remote_function(f): - return RemoteFunction(client, f, bound, block, targets) + return RemoteFunction(client, f, bound, block, targets, balanced) return remote_function -def parallel(client, dist='b', bound=False, block=None, targets='all'): +def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): """Turn a function into a parallel remote function. This method can be used for map: @@ -38,7 +38,7 @@ def parallel(client, dist='b', bound=False, block=None, targets='all'): def func(a) """ def parallel_function(f): - return ParallelFunction(client, f, dist, bound, block, targets) + return ParallelFunction(client, f, dist, bound, block, targets, balanced) return parallel_function #-------------------------------------------------------------------------- @@ -62,6 +62,8 @@ class RemoteFunction(object): to use the current `block` attribute of `client` targets : valid target list [default: all] The targets on which to execute. + balanced : bool + Whether to load-balance with the Task scheduler or not """ client = None # the remote connection @@ -69,23 +71,30 @@ class RemoteFunction(object): block = None # whether to block bound = None # whether to affect the namespace targets = None # where to execute + balanced = None # whether to load-balance - def __init__(self, client, f, bound=False, block=None, targets=None): + def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): self.client = client self.func = f self.block=block self.bound=bound self.targets=targets + if balanced is None: + if targets is None: + balanced = True + else: + balanced = False + self.balanced = balanced def __call__(self, *args, **kwargs): return self.client.apply(self.func, args=args, kwargs=kwargs, - block=self.block, targets=self.targets, bound=self.bound) + block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) class ParallelFunction(RemoteFunction): """Class for mapping a function to sequences.""" - def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): - super(ParallelFunction, self).__init__(client,f,bound,block,targets) + def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None): + super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) mapClass = Map.dists[dist] self.mapObject = mapClass() @@ -93,21 +102,19 @@ class ParallelFunction(RemoteFunction): len_0 = len(sequences[0]) for s in sequences: if len(s)!=len_0: - raise ValueError('all sequences must have equal length') + msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) + raise ValueError(msg) - if self.targets is None: - # load-balanced: - engines = [None]*len_0 - elif isinstance(self.targets, int): - engines = [None]*self.targets + if self.balanced: + targets = [self.targets]*len_0 else: # multiplexed: - engines = self.client._build_targets(self.targets)[-1] + targets = self.client._build_targets(self.targets)[-1] - nparts = len(engines) + nparts = len(targets) msg_ids = [] # my_f = lambda *a: map(self.func, *a) - for index, engineid in enumerate(engines): + for index, t in enumerate(targets): args = [] for seq in sequences: part = self.mapObject.getPartition(seq, index, nparts) @@ -124,22 +131,26 @@ class ParallelFunction(RemoteFunction): args = [self.func]+args else: f=self.func - mid = self.client.apply(f, args=args, block=False, - bound=self.bound, - targets=engineid).msg_ids[0] - msg_ids.append(mid) + ar = self.client.apply(f, args=args, block=False, bound=self.bound, + targets=targets, balanced=self.balanced) + + msg_ids.append(ar.msg_ids[0]) r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) if self.block: - r.wait() - return r.result + try: + return r.get() + except KeyboardInterrupt: + return r else: return r def map(self, *sequences): """call a function on each element of a sequence remotely.""" self._map = True - ret = self.__call__(*sequences) - del self._map + try: + ret = self.__call__(*sequences) + finally: + del self._map return ret diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py index bf9df96..bb6222b 100644 --- a/IPython/zmq/parallel/util.py +++ b/IPython/zmq/parallel/util.py @@ -59,7 +59,7 @@ def validate_url(url): except ValueError: raise AssertionError("Invalid port %r in url: %r"%(port, url)) - assert pat.match(addr) is not None, 'Invalid url: %r'%url + assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url else: # only validate tcp urls currently diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index ead83d4..fb039ab 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -10,7 +10,11 @@ # Imports #----------------------------------------------------------------------------- +from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance + from IPython.external.decorator import decorator +from IPython.zmq.parallel.asyncresult import AsyncResult +from IPython.zmq.parallel.dependency import Dependency from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel #----------------------------------------------------------------------------- @@ -61,30 +65,29 @@ def spin_after(f, self, *args, **kwargs): # Classes #----------------------------------------------------------------------------- -class View(object): +class View(HasTraits): """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. Don't use this class, use subclasses. """ - block=None - bound=None - history=None - outstanding = set() - results = {} - + block=Bool(False) + bound=Bool(False) + history=List() + outstanding = Set() + results = Dict() + client = Instance('IPython.zmq.parallel.client.Client') + + _ntargets = Int(1) + _balanced = Bool(False) + _default_names = List(['block', 'bound']) _targets = None - _apply_name = 'apply' - _default_names = ['targets', 'block'] - def __init__(self, client, targets=None): - self.client = client + def __init__(self, client=None, targets=None): + super(View, self).__init__(client=client) self._targets = targets self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) self.block = client.block - self.bound=False - self.history = [] - self.outstanding = set() - self.results = {} + for name in self._default_names: setattr(self, name, getattr(self, name, None)) @@ -101,26 +104,46 @@ class View(object): @targets.setter def targets(self, value): - self._targets = value - # raise AttributeError("Cannot set my targets argument after construction!") + raise AttributeError("Cannot set View `targets` after construction!") def _defaults(self, *excludes): """return dict of our default attributes, excluding names given.""" - d = {} + d = dict(balanced=self._balanced, targets=self.targets) for name in self._default_names: if name not in excludes: d[name] = getattr(self, name) return d + def set_flags(self, **kwargs): + """set my attribute flags by keyword. + + A View is a wrapper for the Client's apply method, but + with attributes that specify keyword arguments, those attributes + can be set by keyword argument with this method. + + Parameters + ---------- + + block : bool + whether to wait for results + bound : bool + whether to use the client's namespace + """ + for key in kwargs: + if key not in self._default_names: + raise KeyError("Invalid name: %r"%key) + for name in ('block', 'bound'): + if name in kwargs: + setattr(self, name, kwargs) + + #---------------------------------------------------------------- + # wrappers for client methods: + #---------------------------------------------------------------- @sync_results def spin(self): """spin the client, and sync""" self.client.spin() - @property - def _apply(self): - return getattr(self.client, self._apply_name) - @sync_results @save_ids def apply(self, f, *args, **kwargs): @@ -133,7 +156,7 @@ class View(object): else: returns actual result of f(*args, **kwargs) """ - return self._apply(f, args, kwargs, **self._defaults()) + return self.client.apply(f, args, kwargs, **self._defaults()) @save_ids def apply_async(self, f, *args, **kwargs): @@ -144,7 +167,7 @@ class View(object): returns msg_id """ d = self._defaults('block', 'bound') - return self._apply(f,args,kwargs, block=False, bound=False, **d) + return self.client.apply(f,args,kwargs, block=False, bound=False, **d) @spin_after @save_ids @@ -157,7 +180,7 @@ class View(object): returns: actual result of f(*args, **kwargs) """ d = self._defaults('block', 'bound') - return self._apply(f,args,kwargs, block=True, bound=False, **d) + return self.client.apply(f,args,kwargs, block=True, bound=False, **d) @sync_results @save_ids @@ -173,7 +196,7 @@ class View(object): """ d = self._defaults('bound') - return self._apply(f, args, kwargs, bound=True, **d) + return self.client.apply(f, args, kwargs, bound=True, **d) @sync_results @save_ids @@ -187,7 +210,7 @@ class View(object): """ d = self._defaults('block', 'bound') - return self._apply(f, args, kwargs, block=False, bound=True, **d) + return self.client.apply(f, args, kwargs, block=False, bound=True, **d) @spin_after @save_ids @@ -200,23 +223,7 @@ class View(object): """ d = self._defaults('block', 'bound') - return self._apply(f, args, kwargs, block=True, bound=True, **d) - - @spin_after - @save_ids - def map(self, f, *sequences): - """Parallel version of builtin `map`, using this view's engines.""" - if isinstance(self.targets, int): - targets = [self.targets] - else: - targets = self.targets - pf = ParallelFunction(self.client, f, block=self.block, - bound=True, targets=targets) - return pf.map(*sequences) - - def parallel(self, bound=True, block=True): - """Decorator for making a ParallelFunction""" - return parallel(self.client, bound=bound, targets=self.targets, block=block) + return self.client.apply(f, args, kwargs, block=True, bound=True, **d) def abort(self, msg_ids=None, block=None): """Abort jobs on my engines. @@ -240,6 +247,17 @@ class View(object): if targets is None or targets == 'all': targets = self.targets return self.client.purge_results(msg_ids=msg_ids, targets=targets) + + #------------------------------------------------------------------- + # Decorators + #------------------------------------------------------------------- + def parallel(self, bound=True, block=True): + """Decorator for making a ParallelFunction""" + return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) + + def remote(self, bound=True, block=True): + """Decorator for making a RemoteFunction""" + return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) @@ -262,6 +280,62 @@ class DirectView(View): """ + def __init__(self, client=None, targets=None): + super(DirectView, self).__init__(client=client, targets=targets) + self._balanced = False + + @spin_after + @save_ids + def map(self, f, *sequences, **kwargs): + """Parallel version of builtin `map`, using this View's `targets`. + + There will be one task per target, so work will be chunked + if the sequences are longer than `targets`. + + Results can be iterated as they are ready, but will become available in chunks. + + Parameters + ---------- + + f : callable + function to be mapped + *sequences: one or more sequences of matching length + the sequences to be distributed and passed to `f` + block : bool + whether to wait for the result or not [default self.block] + bound : bool + whether to wait for the result or not [default self.bound] + + Returns + ------- + + if block=False: + AsyncMapResult + An object like AsyncResult, but which reassembles the sequence of results + into a single list. AsyncMapResults can be iterated through before all + results are complete. + else: + the result of map(f,*sequences) + """ + + block = kwargs.get('block', self.block) + bound = kwargs.get('bound', self.bound) + for k in kwargs.keys(): + if k not in ['block', 'bound']: + raise TypeError("invalid keyword arg, %r"%k) + + assert len(sequences) > 0, "must have some sequences to map onto!" + pf = ParallelFunction(self.client, f, block=block, + bound=bound, targets=self.targets, balanced=False) + return pf.map(*sequences) + + def map_async(self, f, *sequences, **kwargs): + """Parallel version of builtin `map`, using this view's engines.""" + if 'block' in kwargs: + raise TypeError("map_async doesn't take a `block` keyword argument.") + kwargs['block'] = True + return self.map(f,*sequences,**kwargs) + @sync_results @save_ids def execute(self, code, block=True): @@ -358,14 +432,13 @@ class DirectView(View): class LoadBalancedView(View): - """An engine-agnostic View that only executes via the Task queue. + """An load-balancing View that only executes via the Task scheduler. - Typically created via: + Load-balanced views can be created with the client's `view` method: - >>> v = client[None] - + >>> v = client.view(balanced=True) - but can also be created with: + or targets can be specified, to restrict the potential destinations: >>> v = client.view([1,3],balanced=True) @@ -374,10 +447,126 @@ class LoadBalancedView(View): """ _apply_name = 'apply_balanced' - _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout'] + _default_names = ['block', 'bound', 'follow', 'after', 'timeout'] - def __init__(self, client, targets=None): - super(LoadBalancedView, self).__init__(client, targets) + def __init__(self, client=None, targets=None): + super(LoadBalancedView, self).__init__(client=client, targets=targets) self._ntargets = 1 - self._apply_name = 'apply_balanced' + + def _validate_dependency(self, dep): + """validate a dependency. + + For use in `set_flags`. + """ + if dep is None or isinstance(dep, (str, AsyncResult, Dependency)): + return True + elif isinstance(dep, (list,set, tuple)): + for d in dep: + if not isinstance(d, str, AsyncResult): + return False + elif isinstance(dep, dict): + if set(dep.keys()) != set(Dependency().as_dict().keys()): + return False + if not isinstance(dep['msg_ids'], list): + return False + for d in dep['msg_ids']: + if not isinstance(d, str): + return False + else: + return False + + def set_flags(self, **kwargs): + """set my attribute flags by keyword. + + A View is a wrapper for the Client's apply method, but + with attributes that specify keyword arguments, those attributes + can be set by keyword argument with this method. + + Parameters + ---------- + + block : bool + whether to wait for results + bound : bool + whether to use the engine's namespace + follow : Dependency, list, msg_id, AsyncResult + the location dependencies of tasks + after : Dependency, list, msg_id, AsyncResult + the time dependencies of tasks + timeout : int,None + the timeout to be used for tasks + """ + + super(LoadBalancedView, self).set_flags(**kwargs) + for name in ('follow', 'after'): + if name in kwargs: + value = kwargs[name] + if self._validate_dependency(value): + setattr(self, name, value) + else: + raise ValueError("Invalid dependency: %r"%value) + if 'timeout' in kwargs: + t = kwargs['timeout'] + if not isinstance(t, (int, long, float, None)): + raise TypeError("Invalid type for timeout: %r"%type(t)) + if t is not None: + if t < 0: + raise ValueError("Invalid timeout: %s"%t) + self.timeout = t + + @spin_after + @save_ids + def map(self, f, *sequences, **kwargs): + """Parallel version of builtin `map`, load-balanced by this View. + + Each element will be a separate task, and will be load-balanced. This + lets individual elements be available for iteration as soon as they arrive. + + Parameters + ---------- + + f : callable + function to be mapped + *sequences: one or more sequences of matching length + the sequences to be distributed and passed to `f` + block : bool + whether to wait for the result or not [default self.block] + bound : bool + whether to use the engine's namespace + + Returns + ------- + + if block=False: + AsyncMapResult + An object like AsyncResult, but which reassembles the sequence of results + into a single list. AsyncMapResults can be iterated through before all + results are complete. + else: + the result of map(f,*sequences) + + """ + + block = kwargs.get('block', self.block) + bound = kwargs.get('bound', self.bound) + + assert len(sequences) > 0, "must have some sequences to map onto!" + + pf = ParallelFunction(self.client, f, block=block, bound=bound, + targets=self.targets, balanced=True) + return pf.map(*sequences) + + def map_async(self, f, *sequences, **kwargs): + """Parallel version of builtin `map`, using this view's engines. + + This is equivalent to map(...block=False) + + See `map` for details. + """ + + if 'block' in kwargs: + raise TypeError("map_async doesn't take a `block` keyword argument.") + kwargs['block'] = True + return self.map(f,*sequences,**kwargs) + diff --git a/docs/source/parallelz/parallel_multiengine.txt b/docs/source/parallelz/parallel_multiengine.txt index 48c5039..02b979b 100644 --- a/docs/source/parallelz/parallel_multiengine.txt +++ b/docs/source/parallelz/parallel_multiengine.txt @@ -37,7 +37,7 @@ module and then create a :class:`.Client` instance: In [2]: rc = client.Client() This form assumes that the default connection information (stored in -:file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is +:file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is accurate. If the controller was started on a remote machine, you must copy that connection file to the client machine, or enter its contents as arguments to the Client constructor: @@ -45,17 +45,17 @@ file to the client machine, or enter its contents as arguments to the Client con # If you have copied the json connector file from the controller: In [2]: rc = client.Client('/path/to/ipcontroller-client.json') - # for a remote controller at 10.0.1.5, visible from my.server.com: + # or for a remote controller at 10.0.1.5, visible from my.server.com: In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com') -To make sure there are engines connected to the controller, use can get a list +To make sure there are engines connected to the controller, users can get a list of engine ids: .. sourcecode:: ipython In [3]: rc.ids - Out[3]: set([0, 1, 2, 3]) + Out[3]: [0, 1, 2, 3] Here we see that there are four engines ready to do work for us. @@ -73,24 +73,25 @@ Parallel map Python's builtin :func:`map` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, since IPython's interface is all about functions anyway, -you can just use the builtin :func:`map`, or a client's :meth:`map` method: +you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a +DirectView's :meth:`map` method: .. sourcecode:: ipython In [62]: serial_result = map(lambda x:x**10, range(32)) - In [66]: parallel_result = rc.map(lambda x: x**10, range(32)) + In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32)) - In [67]: serial_result==parallel_result + In [67]: serial_result==parallel_result.get() Out[67]: True .. note:: - The client's own version of :meth:`map` or that of :class:`.DirectView` do + The :class:`DirectView`'s version of :meth:`map` does not do any load balancing. For a load balanced version, use a :class:`LoadBalancedView`, or a :class:`ParallelFunction` with - `targets=None`. + `balanced=True`. .. seealso:: @@ -105,16 +106,18 @@ some decorators: .. sourcecode:: ipython - In [10]: @rc.remote(block=True) + In [10]: @rc.remote(block=True, targets=0) ....: def f(x): ....: return 10.0*x**4 ....: - In [11]: map(f, range(32)) # this is done in parallel + In [11]: map(f, range(32)) # this is done on engine 0 Out[11]: [0.0,10.0,160.0,...] -See the docstring for the :func:`parallel` and :func:`remote` decorators for -options. +.. seealso:: + + See the docstring for the :func:`parallel` and :func:`remote` decorators for + options. Calling Python functions ======================== @@ -141,7 +144,7 @@ Instead, they provide the signature:: In order to provide the nicer interface, we have :class:`View` classes, which wrap :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine the extra arguments. For instance, performing index-access on a client creates a -:class:`.LoadBalancedView`. +:class:`.DirectView`. .. sourcecode:: ipython @@ -218,7 +221,7 @@ unbound, unless called by the :meth:`apply_bound` method: .. sourcecode:: ipython - In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere + In [9]: dview['b'] = 5 # assign b to 5 everywhere In [10]: v0 = rc[0] @@ -277,14 +280,14 @@ local Python/IPython session: ...: return time.time()-tic # In non-blocking mode - In [7]: pr = rc[:].apply_async(wait, 2) + In [7]: pr = dview.apply_async(wait, 2) # Now block for the result In [8]: pr.get() Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] # Again in non-blocking mode - In [9]: pr = rc[:].apply_async(wait, 10) + In [9]: pr = dview.apply_async(wait, 10) # Poll to see if the result is ready In [10]: pr.ready() @@ -321,7 +324,7 @@ associated results are ready: In [72]: rc.block=False # A trivial list of AsyncResults objects - In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)] + In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] # Wait until all of them are done In [74]: rc.barrier(pr_list) @@ -332,63 +335,38 @@ associated results are ready: -The ``block`` and ``targets`` keyword arguments and attributes --------------------------------------------------------------- - -.. warning:: - - This is different now, I haven't updated this section. - -MinRK +The ``block`` keyword argument and attributes +--------------------------------------------- Most methods(like :meth:`apply`) accept -``block`` and ``targets`` as keyword arguments. As we have seen above, these -keyword arguments control the blocking mode and which engines the command is -applied to. The :class:`Client` class also has :attr:`block` and -:attr:`targets` attributes that control the default behavior when the keyword -arguments are not provided. Thus the following logic is used for :attr:`block` -and :attr:`targets`: +``block`` as a keyword argument. As we have seen above, these +keyword arguments control the blocking mode . The :class:`Client` class also has +a :attr:`block` attribute that controls the default behavior when the keyword +argument is not provided. Thus the following logic is used for :attr:`block`: * If no keyword argument is provided, the instance attributes are used. -* Keyword argument, if provided override the instance attributes. +* Keyword argument, if provided override the instance attributes for + the duration of a single call. The following examples demonstrate how to use the instance attributes: .. sourcecode:: ipython - In [16]: rc.targets = [0,2] - In [17]: rc.block = False - In [18]: pr = rc.execute('a=5') - - In [19]: pr.r - Out[19]: - - [0] In [6]: a=5 - [2] In [6]: a=5 + In [18]: ar = rc.apply(lambda : 10, targets=[0,2]) - # Note targets='all' means all engines - In [20]: rc.targets = 'all' + In [19]: ar.get() + Out[19]: [10,10] In [21]: rc.block = True - In [22]: rc.execute('b=10; print b') - Out[22]: - - [0] In [7]: b=10; print b - [0] Out[7]: 10 - - [1] In [6]: b=10; print b - [1] Out[6]: 10 - - [2] In [7]: b=10; print b - [2] Out[7]: 10 - - [3] In [6]: b=10; print b - [3] Out[6]: 10 + # Note targets='all' means all engines + In [22]: rc.apply(lambda : 42, targets='all') + Out[22]: [42, 42, 42, 42] -The :attr:`block` and :attr:`targets` instance attributes also determine the -behavior of the parallel magic commands. +The :attr:`block` and :attr:`targets` instance attributes of the +:class:`.DirectView` also determine the behavior of the parallel magic commands. Parallel magic commands @@ -567,9 +545,9 @@ appear as a local dictionary. Underneath, this uses :meth:`push` and In [50]: rc.block=True - In [51]: rc[:]['a']=['foo','bar'] + In [51]: dview['a']=['foo','bar'] - In [52]: rc[:]['a'] + In [52]: dview['a'] Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] Scatter and gather @@ -585,13 +563,13 @@ between engines, MPI should be used: .. sourcecode:: ipython - In [58]: rc.scatter('a',range(16)) + In [58]: dview.scatter('a',range(16)) Out[58]: [None,None,None,None] - In [59]: rc[:]['a'] + In [59]: dview['a'] Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] - In [60]: rc.gather('a') + In [60]: dview.gather('a') Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] Other things to look at @@ -606,14 +584,14 @@ basic effect using :meth:`scatter` and :meth:`gather`: .. sourcecode:: ipython - In [66]: rc.scatter('x',range(64)) + In [66]: dview.scatter('x',range(64)) Out[66]: [None,None,None,None] In [67]: px y = [i**10 for i in x] Parallel execution on engines: [0, 1, 2, 3] Out[67]: - In [68]: y = rc.gather('y') + In [68]: y = dview.gather('y') In [69]: print y [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] diff --git a/docs/source/parallelz/parallel_task.txt b/docs/source/parallelz/parallel_task.txt index 99213ff..ec49a0f 100644 --- a/docs/source/parallelz/parallel_task.txt +++ b/docs/source/parallelz/parallel_task.txt @@ -41,8 +41,8 @@ module and then create a :class:`.Client` instance: In [2]: rc = client.Client() - In [3]: lview = rc[None] - Out[3]: + In [3]: lview = rc.view(balanced=True) + Out[3]: This form assumes that the controller was started on localhost with default @@ -66,7 +66,7 @@ objects, but *in parallel*. Like the multiengine interface, these can be implemented via the task interface. The exact same tools can perform these actions in load-balanced ways as well as multiplexed ways: a parallel version of :func:`map` and :func:`@parallel` function decorator. If one specifies the -argument `targets=None`, then they are dynamically load balanced. Thus, if the +argument `balanced=True`, then they are dynamically load balanced. Thus, if the execution time per item varies significantly, you should use the versions in the task interface. @@ -80,7 +80,7 @@ for the ``None`` element: In [63]: serial_result = map(lambda x:x**10, range(32)) - In [64]: parallel_result = tc[None].map(lambda x:x**10, range(32)) + In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True) In [65]: serial_result==parallel_result Out[65]: True @@ -258,13 +258,13 @@ you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje In [14]: client.block=False - In [15]: ar = client.apply(f, args, kwargs, targets=None) + In [15]: ar = client.apply(f, args, kwargs, balanced=True) - In [16]: ar2 = client.apply(f2, targets=None) + In [16]: ar2 = client.apply(f2, balanced=True) - In [17]: ar3 = client.apply(f3, after=[ar,ar2]) + In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True) - In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5) + In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True) .. seealso:: @@ -383,7 +383,7 @@ The following is an overview of how to use these classes together: 1. Create a :class:`Client`. 2. Define some functions to be run as tasks 3. Submit your tasks to using the :meth:`apply` method of your - :class:`Client` instance, specifying `targets=None`. This signals + :class:`Client` instance, specifying `balanced=True`. This signals the :class:`Client` to entrust the Scheduler with assigning tasks to engines. 4. Use :meth:`Client.get_results` to get the results of the tasks, or use the :meth:`AsyncResult.get` method of the results to wait