"""Views of remote engines. Authors: * Min RK """ from __future__ import print_function #----------------------------------------------------------------------------- # Copyright (C) 2010-2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import imp import sys import warnings from contextlib import contextmanager from types import ModuleType import zmq from IPython.testing.skipdoctest import skip_doctest from IPython.utils.traitlets import ( HasTraits, Any, Bool, List, Dict, Set, Instance, CFloat, Integer ) from IPython.external.decorator import decorator from IPython.parallel import util from IPython.parallel.controller.dependency import Dependency, dependent from IPython.utils.py3compat import string_types from . import map as Map from .asyncresult import AsyncResult, AsyncMapResult from .remotefunction import ParallelFunction, parallel, remote, getname #----------------------------------------------------------------------------- # Decorators #----------------------------------------------------------------------------- @decorator def save_ids(f, self, *args, **kwargs): """Keep our history and outstanding attributes up to date after a method call.""" n_previous = len(self.client.history) try: ret = f(self, *args, **kwargs) finally: nmsgs = len(self.client.history) - n_previous msg_ids = self.client.history[-nmsgs:] self.history.extend(msg_ids) map(self.outstanding.add, msg_ids) return ret @decorator def sync_results(f, self, *args, **kwargs): """sync relevant results from self.client to our results attribute.""" if self._in_sync_results: return f(self, *args, **kwargs) self._in_sync_results = True try: ret = f(self, *args, **kwargs) finally: self._in_sync_results = False self._sync_results() return ret @decorator def spin_after(f, self, *args, **kwargs): """call spin after the method.""" ret = f(self, *args, **kwargs) self.spin() return ret #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- @skip_doctest class View(HasTraits): """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes. Don't use this class, use subclasses. Methods ------- spin flushes incoming results and registration state changes control methods spin, and requesting `ids` also ensures up to date wait wait on one or more msg_ids execution methods apply legacy: execute, run data movement push, pull, scatter, gather query methods get_result, queue_status, purge_results, result_status control methods abort, shutdown """ # flags block=Bool(False) track=Bool(True) targets = Any() history=List() outstanding = Set() results = Dict() client = Instance('IPython.parallel.Client') _socket = Instance('zmq.Socket') _flag_names = List(['targets', 'block', 'track']) _in_sync_results = Bool(False) _targets = Any() _idents = Any() def __init__(self, client=None, socket=None, **flags): super(View, self).__init__(client=client, _socket=socket) self.results = client.results self.block = client.block self.set_flags(**flags) assert not self.__class__ is View, "Don't use base View objects, use subclasses" def __repr__(self): strtargets = str(self.targets) if len(strtargets) > 16: strtargets = strtargets[:12]+'...]' return "<%s %s>"%(self.__class__.__name__, strtargets) def __len__(self): if isinstance(self.targets, list): return len(self.targets) elif isinstance(self.targets, int): return 1 else: return len(self.client) def set_flags(self, **kwargs): """set my attribute flags by keyword. Views determine behavior with a few attributes (`block`, `track`, etc.). These attributes can be set all at once by name with this method. Parameters ---------- block : bool whether to wait for results track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. """ for name, value in kwargs.iteritems(): if name not in self._flag_names: raise KeyError("Invalid name: %r"%name) else: setattr(self, name, value) @contextmanager def temp_flags(self, **kwargs): """temporarily set flags, for use in `with` statements. See set_flags for permanent setting of flags Examples -------- >>> view.track=False ... >>> with view.temp_flags(track=True): ... ar = view.apply(dostuff, my_big_array) ... ar.tracker.wait() # wait for send to finish >>> view.track False """ # preflight: save flags, and set temporaries saved_flags = {} for f in self._flag_names: saved_flags[f] = getattr(self, f) self.set_flags(**kwargs) # yield to the with-statement block try: yield finally: # postflight: restore saved flags self.set_flags(**saved_flags) #---------------------------------------------------------------- # apply #---------------------------------------------------------------- def _sync_results(self): """to be called by @sync_results decorator after submitting any tasks. """ delta = self.outstanding.difference(self.client.outstanding) completed = self.outstanding.intersection(delta) self.outstanding = self.outstanding.difference(completed) @sync_results @save_ids def _really_apply(self, f, args, kwargs, block=None, **options): """wrapper for client.send_apply_request""" raise NotImplementedError("Implement in subclasses") def apply(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines, returning the result. This method sets all apply flags via this View's attributes. if self.block is False: returns AsyncResult else: returns actual result of f(*args, **kwargs) """ return self._really_apply(f, args, kwargs) def apply_async(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines in a nonblocking manner. returns AsyncResult """ return self._really_apply(f, args, kwargs, block=False) @spin_after def apply_sync(self, f, *args, **kwargs): """calls f(*args, **kwargs) on remote engines in a blocking manner, returning the result. returns: actual result of f(*args, **kwargs) """ return self._really_apply(f, args, kwargs, block=True) #---------------------------------------------------------------- # wrappers for client and control methods #---------------------------------------------------------------- @sync_results def spin(self): """spin the client, and sync""" self.client.spin() @sync_results def wait(self, jobs=None, timeout=-1): """waits on one or more `jobs`, for up to `timeout` seconds. Parameters ---------- jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects ints are indices to self.history strs are msg_ids default: wait on all outstanding messages timeout : float a time in seconds, after which to give up. default is -1, which means no timeout Returns ------- True : when all msg_ids are done False : timeout reached, some msg_ids still outstanding """ if jobs is None: jobs = self.history return self.client.wait(jobs, timeout) def abort(self, jobs=None, targets=None, block=None): """Abort jobs on my engines. Parameters ---------- jobs : None, str, list of strs, optional if None: abort all jobs. else: abort specific msg_id(s). """ block = block if block is not None else self.block targets = targets if targets is not None else self.targets jobs = jobs if jobs is not None else list(self.outstanding) return self.client.abort(jobs=jobs, targets=targets, block=block) def queue_status(self, targets=None, verbose=False): """Fetch the Queue status of my engines""" targets = targets if targets is not None else self.targets return self.client.queue_status(targets=targets, verbose=verbose) def purge_results(self, jobs=[], targets=[]): """Instruct the controller to forget specific results.""" if targets is None or targets == 'all': targets = self.targets return self.client.purge_results(jobs=jobs, targets=targets) def shutdown(self, targets=None, restart=False, hub=False, block=None): """Terminates one or more engine processes, optionally including the hub. """ block = self.block if block is None else block if targets is None or targets == 'all': targets = self.targets return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block) @spin_after def get_result(self, indices_or_msg_ids=None): """return one or more results, specified by history index or msg_id. See client.get_result for details. """ if indices_or_msg_ids is None: indices_or_msg_ids = -1 if isinstance(indices_or_msg_ids, int): indices_or_msg_ids = self.history[indices_or_msg_ids] elif isinstance(indices_or_msg_ids, (list,tuple,set)): indices_or_msg_ids = list(indices_or_msg_ids) for i,index in enumerate(indices_or_msg_ids): if isinstance(index, int): indices_or_msg_ids[i] = self.history[index] return self.client.get_result(indices_or_msg_ids) #------------------------------------------------------------------- # Map #------------------------------------------------------------------- @sync_results def map(self, f, *sequences, **kwargs): """override in subclasses""" raise NotImplementedError def map_async(self, f, *sequences, **kwargs): """Parallel version of builtin `map`, using this view's engines. This is equivalent to map(...block=False) See `self.map` for details. """ if 'block' in kwargs: raise TypeError("map_async doesn't take a `block` keyword argument.") kwargs['block'] = False return self.map(f,*sequences,**kwargs) def map_sync(self, f, *sequences, **kwargs): """Parallel version of builtin `map`, using this view's engines. This is equivalent to map(...block=True) See `self.map` for details. """ if 'block' in kwargs: raise TypeError("map_sync doesn't take a `block` keyword argument.") kwargs['block'] = True return self.map(f,*sequences,**kwargs) def imap(self, f, *sequences, **kwargs): """Parallel version of `itertools.imap`. See `self.map` for details. """ return iter(self.map_async(f,*sequences, **kwargs)) #------------------------------------------------------------------- # Decorators #------------------------------------------------------------------- def remote(self, block=None, **flags): """Decorator for making a RemoteFunction""" block = self.block if block is None else block return remote(self, block=block, **flags) def parallel(self, dist='b', block=None, **flags): """Decorator for making a ParallelFunction""" block = self.block if block is None else block return parallel(self, dist=dist, block=block, **flags) @skip_doctest class DirectView(View): """Direct Multiplexer View of one or more engines. These are created via indexed access to a client: >>> dv_1 = client[1] >>> dv_all = client[:] >>> dv_even = client[::2] >>> dv_some = client[1:3] This object provides dictionary access to engine namespaces: # push a=5: >>> dv['a'] = 5 # pull 'foo': >>> db['foo'] """ def __init__(self, client=None, socket=None, targets=None): super(DirectView, self).__init__(client=client, socket=socket, targets=targets) @property def importer(self): """sync_imports(local=True) as a property. See sync_imports for details. """ return self.sync_imports(True) @contextmanager def sync_imports(self, local=True, quiet=False): """Context Manager for performing simultaneous local and remote imports. 'import x as y' will *not* work. The 'as y' part will simply be ignored. If `local=True`, then the package will also be imported locally. If `quiet=True`, no output will be produced when attempting remote imports. Note that remote-only (`local=False`) imports have not been implemented. >>> with view.sync_imports(): ... from numpy import recarray importing recarray from numpy on engine(s) """ from IPython.utils.py3compat import builtin_mod local_import = builtin_mod.__import__ modules = set() results = [] @util.interactive def remote_import(name, fromlist, level): """the function to be passed to apply, that actually performs the import on the engine, and loads up the user namespace. """ import sys user_ns = globals() mod = __import__(name, fromlist=fromlist, level=level) if fromlist: for key in fromlist: user_ns[key] = getattr(mod, key) else: user_ns[name] = sys.modules[name] def view_import(name, globals={}, locals={}, fromlist=[], level=0): """the drop-in replacement for __import__, that optionally imports locally as well. """ # don't override nested imports save_import = builtin_mod.__import__ builtin_mod.__import__ = local_import if imp.lock_held(): # this is a side-effect import, don't do it remotely, or even # ignore the local effects return local_import(name, globals, locals, fromlist, level) imp.acquire_lock() if local: mod = local_import(name, globals, locals, fromlist, level) else: raise NotImplementedError("remote-only imports not yet implemented") imp.release_lock() key = name+':'+','.join(fromlist or []) if level <= 0 and key not in modules: modules.add(key) if not quiet: if fromlist: print("importing %s from %s on engine(s)"%(','.join(fromlist), name)) else: print("importing %s on engine(s)"%name) results.append(self.apply_async(remote_import, name, fromlist, level)) # restore override builtin_mod.__import__ = save_import return mod # override __import__ builtin_mod.__import__ = view_import try: # enter the block yield except ImportError: if local: raise else: # ignore import errors if not doing local imports pass finally: # always restore __import__ builtin_mod.__import__ = local_import for r in results: # raise possible remote ImportErrors here r.get() @sync_results @save_ids def _really_apply(self, f, args=None, kwargs=None, targets=None, block=None, track=None): """calls f(*args, **kwargs) on remote engines, returning the result. This method sets all of `apply`'s flags via this View's attributes. Parameters ---------- f : callable args : list [default: empty] kwargs : dict [default: empty] targets : target list [default: self.targets] where to run block : bool [default: self.block] whether to block track : bool [default: self.track] whether to ask zmq to track the message, for safe non-copying sends Returns ------- if self.block is False: returns AsyncResult else: returns actual result of f(*args, **kwargs) on the engine(s) This will be a list of self.targets is also a list (even length 1), or the single result if self.targets is an integer engine id """ args = [] if args is None else args kwargs = {} if kwargs is None else kwargs block = self.block if block is None else block track = self.track if track is None else track targets = self.targets if targets is None else targets _idents, _targets = self.client._build_targets(targets) msg_ids = [] trackers = [] for ident in _idents: msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, ident=ident) if track: trackers.append(msg['tracker']) msg_ids.append(msg['header']['msg_id']) if isinstance(targets, int): msg_ids = msg_ids[0] tracker = None if track is False else zmq.MessageTracker(*trackers) ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker) if block: try: return ar.get() except KeyboardInterrupt: pass return ar @sync_results def map(self, f, *sequences, **kwargs): """view.map(f, *sequences, block=self.block) => list|AsyncMapResult Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked if the sequences are longer than `targets`. Results can be iterated as they are ready, but will become available in chunks. Parameters ---------- f : callable function to be mapped *sequences: one or more sequences of matching length the sequences to be distributed and passed to `f` block : bool whether to wait for the result or not [default self.block] Returns ------- if block=False: AsyncMapResult An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete. else: list the result of map(f,*sequences) """ block = kwargs.pop('block', self.block) for k in kwargs.keys(): if k not in ['block', 'track']: raise TypeError("invalid keyword arg, %r"%k) assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction(self, f, block=block, **kwargs) return pf.map(*sequences) @sync_results @save_ids def execute(self, code, silent=True, targets=None, block=None): """Executes `code` on `targets` in blocking or nonblocking manner. ``execute`` is always `bound` (affects engine namespace) Parameters ---------- code : str the code string to be executed block : bool whether or not to wait until done to return default: self.block """ block = self.block if block is None else block targets = self.targets if targets is None else targets _idents, _targets = self.client._build_targets(targets) msg_ids = [] trackers = [] for ident in _idents: msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) msg_ids.append(msg['header']['msg_id']) if isinstance(targets, int): msg_ids = msg_ids[0] ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets) if block: try: ar.get() except KeyboardInterrupt: pass return ar def run(self, filename, targets=None, block=None): """Execute contents of `filename` on my engine(s). This simply reads the contents of the file and calls `execute`. Parameters ---------- filename : str The path to the file targets : int/str/list of ints/strs the engines on which to execute default : all block : bool whether or not to wait until done default: self.block """ with open(filename, 'r') as f: # add newline in case of trailing indented whitespace # which will cause SyntaxError code = f.read()+'\n' return self.execute(code, block=block, targets=targets) def update(self, ns): """update remote namespace with dict `ns` See `push` for details. """ return self.push(ns, block=self.block, track=self.track) def push(self, ns, targets=None, block=None, track=None): """update remote namespace with dict `ns` Parameters ---------- ns : dict dict of keys with which to update engine namespace(s) block : bool [default : self.block] whether to wait to be notified of engine receipt """ block = block if block is not None else self.block track = track if track is not None else self.track targets = targets if targets is not None else self.targets # applier = self.apply_sync if block else self.apply_async if not isinstance(ns, dict): raise TypeError("Must be a dict, not %s"%type(ns)) return self._really_apply(util._push, kwargs=ns, block=block, track=track, targets=targets) def get(self, key_s): """get object(s) by `key_s` from remote namespace see `pull` for details. """ # block = block if block is not None else self.block return self.pull(key_s, block=True) def pull(self, names, targets=None, block=None): """get object(s) by `name` from remote namespace will return one object if it is a key. can also take a list of keys, in which case it will return a list of objects. """ block = block if block is not None else self.block targets = targets if targets is not None else self.targets applier = self.apply_sync if block else self.apply_async if isinstance(names, string_types): pass elif isinstance(names, (list,tuple,set)): for key in names: if not isinstance(key, string_types): raise TypeError("keys must be str, not type %r"%type(key)) else: raise TypeError("names must be strs, not %r"%names) return self._really_apply(util._pull, (names,), block=block, targets=targets) def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None, track=None): """ Partition a Python sequence and send the partitions to a set of engines. """ block = block if block is not None else self.block track = track if track is not None else self.track targets = targets if targets is not None else self.targets # construct integer ID list: targets = self.client._build_targets(targets)[1] mapObject = Map.dists[dist]() nparts = len(targets) msg_ids = [] trackers = [] for index, engineid in enumerate(targets): partition = mapObject.getPartition(seq, index, nparts) if flatten and len(partition) == 1: ns = {key: partition[0]} else: ns = {key: partition} r = self.push(ns, block=False, track=track, targets=engineid) msg_ids.extend(r.msg_ids) if track: trackers.append(r._tracker) if track: tracker = zmq.MessageTracker(*trackers) else: tracker = None r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, tracker=tracker) if block: r.wait() else: return r @sync_results @save_ids def gather(self, key, dist='b', targets=None, block=None): """ Gather a partitioned sequence on a set of engines as a single local seq. """ block = block if block is not None else self.block targets = targets if targets is not None else self.targets mapObject = Map.dists[dist]() msg_ids = [] # construct integer ID list: targets = self.client._build_targets(targets)[1] for index, engineid in enumerate(targets): msg_ids.extend(self.pull(key, block=False, targets=engineid).msg_ids) r = AsyncMapResult(self.client, msg_ids, mapObject, fname='gather') if block: try: return r.get() except KeyboardInterrupt: pass return r def __getitem__(self, key): return self.get(key) def __setitem__(self,key, value): self.update({key:value}) def clear(self, targets=None, block=None): """Clear the remote namespaces on my engines.""" block = block if block is not None else self.block targets = targets if targets is not None else self.targets return self.client.clear(targets=targets, block=block) #---------------------------------------- # activate for %px, %autopx, etc. magics #---------------------------------------- def activate(self, suffix=''): """Activate IPython magics associated with this View Defines the magics `%px, %autopx, %pxresult, %%px, %pxconfig` Parameters ---------- suffix: str [default: ''] The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time. e.g. ``rc[::2].activate(suffix='_even')`` will give you the magics ``%px_even``, ``%pxresult_even``, etc. for running magics on the even engines. """ from IPython.parallel.client.magics import ParallelMagics try: # This is injected into __builtins__. ip = get_ipython() except NameError: print("The IPython parallel magics (%px, etc.) only work within IPython.") return M = ParallelMagics(ip, self, suffix) ip.magics_manager.register(M) @skip_doctest class LoadBalancedView(View): """An load-balancing View that only executes via the Task scheduler. Load-balanced views can be created with the client's `view` method: >>> v = client.load_balanced_view() or targets can be specified, to restrict the potential destinations: >>> v = client.client.load_balanced_view([1,3]) which would restrict loadbalancing to between engines 1 and 3. """ follow=Any() after=Any() timeout=CFloat() retries = Integer(0) _task_scheme = Any() _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']) def __init__(self, client=None, socket=None, **flags): super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) self._task_scheme=client._task_scheme def _validate_dependency(self, dep): """validate a dependency. For use in `set_flags`. """ if dep is None or isinstance(dep, string_types + (AsyncResult, Dependency)): return True elif isinstance(dep, (list,set, tuple)): for d in dep: if not isinstance(d, string_types + (AsyncResult,)): return False elif isinstance(dep, dict): if set(dep.keys()) != set(Dependency().as_dict().keys()): return False if not isinstance(dep['msg_ids'], list): return False for d in dep['msg_ids']: if not isinstance(d, string_types): return False else: return False return True def _render_dependency(self, dep): """helper for building jsonable dependencies from various input forms.""" if isinstance(dep, Dependency): return dep.as_dict() elif isinstance(dep, AsyncResult): return dep.msg_ids elif dep is None: return [] else: # pass to Dependency constructor return list(Dependency(dep)) def set_flags(self, **kwargs): """set my attribute flags by keyword. A View is a wrapper for the Client's apply method, but with attributes that specify keyword arguments, those attributes can be set by keyword argument with this method. Parameters ---------- block : bool whether to wait for results track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. after : Dependency or collection of msg_ids Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. This job will only be run *after* the dependencies have been met. follow : Dependency or collection of msg_ids Only for load-balanced execution (targets=None) Specify a list of msg_ids as a location-based dependency. This job will only be run on an engine where this dependency is met. timeout : float/int or None Only for load-balanced execution (targets=None) Specify an amount of time (in seconds) for the scheduler to wait for dependencies to be met before failing with a DependencyTimeout. retries : int Number of times a task will be retried on failure. """ super(LoadBalancedView, self).set_flags(**kwargs) for name in ('follow', 'after'): if name in kwargs: value = kwargs[name] if self._validate_dependency(value): setattr(self, name, value) else: raise ValueError("Invalid dependency: %r"%value) if 'timeout' in kwargs: t = kwargs['timeout'] if not isinstance(t, (int, long, float, type(None))): raise TypeError("Invalid type for timeout: %r"%type(t)) if t is not None: if t < 0: raise ValueError("Invalid timeout: %s"%t) self.timeout = t @sync_results @save_ids def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, after=None, follow=None, timeout=None, targets=None, retries=None): """calls f(*args, **kwargs) on a remote engine, returning the result. This method temporarily sets all of `apply`'s flags for a single call. Parameters ---------- f : callable args : list [default: empty] kwargs : dict [default: empty] block : bool [default: self.block] whether to block track : bool [default: self.track] whether to ask zmq to track the message, for safe non-copying sends !!!!!! TODO: THE REST HERE !!!! Returns ------- if self.block is False: returns AsyncResult else: returns actual result of f(*args, **kwargs) on the engine(s) This will be a list of self.targets is also a list (even length 1), or the single result if self.targets is an integer engine id """ # validate whether we can run if self._socket.closed: msg = "Task farming is disabled" if self._task_scheme == 'pure': msg += " because the pure ZMQ scheduler cannot handle" msg += " disappearing engines." raise RuntimeError(msg) if self._task_scheme == 'pure': # pure zmq scheme doesn't support extra features msg = "Pure ZMQ scheduler doesn't support the following flags:" "follow, after, retries, targets, timeout" if (follow or after or retries or targets or timeout): # hard fail on Scheduler flags raise RuntimeError(msg) if isinstance(f, dependent): # soft warn on functional dependencies warnings.warn(msg, RuntimeWarning) # build args args = [] if args is None else args kwargs = {} if kwargs is None else kwargs block = self.block if block is None else block track = self.track if track is None else track after = self.after if after is None else after retries = self.retries if retries is None else retries follow = self.follow if follow is None else follow timeout = self.timeout if timeout is None else timeout targets = self.targets if targets is None else targets if not isinstance(retries, int): raise TypeError('retries must be int, not %r'%type(retries)) if targets is None: idents = [] else: idents = self.client._build_targets(targets)[0] # ensure *not* bytes idents = [ ident.decode() for ident in idents ] after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, metadata=metadata) tracker = None if track is False else msg['tracker'] ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), targets=None, tracker=tracker) if block: try: return ar.get() except KeyboardInterrupt: pass return ar @sync_results @save_ids def map(self, f, *sequences, **kwargs): """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult Parallel version of builtin `map`, load-balanced by this View. `block`, and `chunksize` can be specified by keyword only. Each `chunksize` elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive. Parameters ---------- f : callable function to be mapped *sequences: one or more sequences of matching length the sequences to be distributed and passed to `f` block : bool [default self.block] whether to wait for the result or not track : bool whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends. chunksize : int [default 1] how many elements should be in each task. ordered : bool [default True] Whether the results should be gathered as they arrive, or enforce the order of submission. Only applies when iterating through AsyncMapResult as results arrive. Has no effect when block=True. Returns ------- if block=False: AsyncMapResult An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete. else: the result of map(f,*sequences) """ # default block = kwargs.get('block', self.block) chunksize = kwargs.get('chunksize', 1) ordered = kwargs.get('ordered', True) keyset = set(kwargs.keys()) extra_keys = keyset.difference_update(set(['block', 'chunksize'])) if extra_keys: raise TypeError("Invalid kwargs: %s"%list(extra_keys)) assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered) return pf.map(*sequences) __all__ = ['LoadBalancedView', 'DirectView']