From 498a93f13d89558c689414c2020b5504a6a5f11a 2011-04-08 00:38:20
From: MinRK <benjaminrk@gmail.com>
Date: 2011-04-08 00:38:20
Subject: [PATCH] API update involving map and load-balancing

---

diff --git a/IPython/utils/newserialized.py b/IPython/utils/newserialized.py
index 2b5378a..a8e9cc4 100644
--- a/IPython/utils/newserialized.py
+++ b/IPython/utils/newserialized.py
@@ -147,9 +147,9 @@ class UnSerializeIt(UnSerialized):
         if globals().has_key('numpy') and typeDescriptor == 'ndarray':
                 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
                 result.shape = self.serialized.metadata['shape']
-                # This is a hack to make the array writable.  We are working with
+                # numpy arrays with frombuffer are read-only.  We are working with
                 # the numpy folks to address this issue.
-                result = result.copy()
+                # result = result.copy()
         elif typeDescriptor == 'pickle':
             result = pickle.loads(self.serialized.getData())
         elif typeDescriptor in ('bytes', 'buffer'):
diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py
index 0b5959f..4c0f598 100644
--- a/IPython/zmq/parallel/client.py
+++ b/IPython/zmq/parallel/client.py
@@ -21,7 +21,7 @@ from pprint import pprint
 pjoin = os.path.join
 
 import zmq
-from zmq.eventloop import ioloop, zmqstream
+# from zmq.eventloop import ioloop, zmqstream
 
 from IPython.utils.path import get_ipython_dir
 from IPython.external.decorator import decorator
@@ -203,6 +203,7 @@ class Client(object):
     
     Attributes
     ----------
+    
     ids : set of int engine IDs
         requesting the ids attribute always synchronizes
         the registration state. To request ids without synchronization,
@@ -225,17 +226,23 @@ class Client(object):
     
     Methods
     -------
-    spin : flushes incoming results and registration state changes
-            control methods spin, and requesting `ids` also ensures up to date
-            
-    barrier : wait on one or more msg_ids
     
-    execution methods: apply/apply_bound/apply_to/apply_bound
+    spin
+        flushes incoming results and registration state changes
+        control methods spin, and requesting `ids` also ensures up to date
+    
+    barrier
+        wait on one or more msg_ids
+    
+    execution methods
+        apply
         legacy: execute, run
     
-    query methods: queue_status, get_result, purge
+    query methods
+        queue_status, get_result, purge
     
-    control methods: abort, kill
+    control methods
+        abort, shutdown
     
     """
     
@@ -265,7 +272,6 @@ class Client(object):
         if context is None:
             context = zmq.Context()
         self.context = context
-        self.targets = 'all'
         
         self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
         if self._cd is not None:
@@ -634,42 +640,18 @@ class Client(object):
     #--------------------------------------------------------------------------
     
     def __getitem__(self, key):
-        """Dict access returns DirectView multiplexer objects or,
-        if key is None, a LoadBalancedView."""
-        if key is None:
-            return LoadBalancedView(self)
-        if isinstance(key, int):
-            if key not in self.ids:
-                raise IndexError("No such engine: %i"%key)
-            return DirectView(self, key)
-        
-        if isinstance(key, slice):
-            indices = range(len(self.ids))[key]
-            ids = sorted(self._ids)
-            key = [ ids[i] for i in indices ]
-            # newkeys = sorted(self._ids)[thekeys[k]]
+        """index access returns DirectView multiplexer objects
         
-        if isinstance(key, (tuple, list, xrange)):
-            _,targets = self._build_targets(list(key))
-            return DirectView(self, targets)
+        Must be int, slice, or list/tuple/xrange of ints"""
+        if not isinstance(key, (int, slice, tuple, list, xrange)):
+            raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
         else:
-            raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
+            return self.view(key, balanced=False)
     
     #--------------------------------------------------------------------------
     # Begin public methods
     #--------------------------------------------------------------------------
     
-    @property
-    def remote(self):
-        """property for convenient RemoteFunction generation.
-        
-        >>> @client.remote
-        ... def getpid():
-                import os
-                return os.getpid()
-        """
-        return remote(self, block=self.block)
-    
     def spin(self):
         """Flush any registration notifications and execution results
         waiting in the ZMQ queue.
@@ -690,6 +672,7 @@ class Client(object):
         
         Parameters
         ----------
+        
         msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
                 ints are indices to self.history
                 strs are msg_ids
@@ -700,6 +683,7 @@ class Client(object):
         
         Returns
         -------
+        
         True : when all msg_ids are done
         False : timeout reached, some msg_ids still outstanding
         """
@@ -815,6 +799,7 @@ class Client(object):
         
         Parameters
         ----------
+        
         code : str
                 the code string to be executed
         targets : int/str/list of ints/strs
@@ -824,7 +809,7 @@ class Client(object):
                 whether or not to wait until done to return
                 default: self.block
         """
-        result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
+        result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
         return result
     
     def run(self, filename, targets='all', block=None):
@@ -834,6 +819,7 @@ class Client(object):
         
         Parameters
         ----------
+        
         filename : str
                 The path to the file
         targets : int/str/list of ints/strs
@@ -868,7 +854,8 @@ class Client(object):
             return list(Dependency(dep))
         
     @defaultblock
-    def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
+    def apply(self, f, args=None, kwargs=None, bound=True, block=None,
+                        targets=None, balanced=None,
                         after=None, follow=None, timeout=None):
         """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
         
@@ -905,11 +892,34 @@ class Client(object):
             if int:
                 Run on single engine
         
-        after,follow,timeout only used in `apply_balanced`. See that docstring
-        for details.
+        balanced : bool, default None
+            whether to load-balance.  This will default to True
+            if targets is unspecified, or False if targets is specified.
+            
+            The following arguments are only used when balanced is True:
+        after : Dependency or collection of msg_ids
+            Only for load-balanced execution (targets=None)
+            Specify a list of msg_ids as a time-based dependency.
+            This job will only be run *after* the dependencies
+            have been met.
+            
+        follow : Dependency or collection of msg_ids
+            Only for load-balanced execution (targets=None)
+            Specify a list of msg_ids as a location-based dependency.
+            This job will only be run on an engine where this dependency
+            is met.
+        
+        timeout : float/int or None
+            Only for load-balanced execution (targets=None)
+            Specify an amount of time (in seconds) for the scheduler to
+            wait for dependencies to be met before failing with a
+            DependencyTimeout.
+        
+        after,follow,timeout only used if `balanced=True`.
         
         Returns
         -------
+        
         if block is False:
             return AsyncResult wrapping msg_ids
             output of AsyncResult.get() is identical to that of `apply(...block=True)`
@@ -921,10 +931,21 @@ class Client(object):
         """
         
         # defaults:
-        block = block if block is not None else self.block
         args = args if args is not None else []
         kwargs = kwargs if kwargs is not None else {}
         
+        if balanced is None:
+            if targets is None:
+                # default to balanced if targets unspecified
+                balanced = True
+            else:
+                # otherwise default to multiplexing
+                balanced = False
+        
+        if targets is None and balanced is False:
+            # default to all if *not* balanced, and targets is unspecified
+            targets = 'all'
+        
         # enforce types of f,args,kwrags
         if not callable(f):
             raise TypeError("f must be callable, not %s"%type(f))
@@ -935,80 +956,27 @@ class Client(object):
         
         options  = dict(bound=bound, block=block, targets=targets)
             
-        if targets is None:
-            return self.apply_balanced(f, args, kwargs, timeout=timeout, 
+        if balanced:
+            return self._apply_balanced(f, args, kwargs, timeout=timeout, 
                                         after=after, follow=follow, **options)
-        else:
-            if follow or after or timeout:
-                msg = "follow, after, and timeout args are only used for load-balanced"
-                msg += "execution."
+        elif follow or after or timeout:
+                msg = "follow, after, and timeout args are only used for"
+                msg += " load-balanced execution."
                 raise ValueError(msg)
+        else:
             return self._apply_direct(f, args, kwargs, **options)
     
-    @defaultblock
-    def apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
+    def _apply_balanced(self, f, args, kwargs, bound=True, block=None, targets=None,
                             after=None, follow=None, timeout=None):
         """call f(*args, **kwargs) remotely in a load-balanced manner.
         
-        Parameters
-        ----------
-        
-        f : function
-            The fuction to be called remotely
-        args : tuple/list
-            The positional arguments passed to `f`
-        kwargs : dict
-            The keyword arguments passed to `f`
-        bound : bool (default: True)
-            Whether to execute in the Engine(s) namespace, or in a clean
-            namespace not affecting the engine.
-        block : bool (default: self.block)
-            Whether to wait for the result, or return immediately.
-            False:
-                returns AsyncResult
-            True:
-                returns actual result(s) of f(*args, **kwargs)
-                if multiple targets:
-                    list of results, matching `targets`
-        targets : int,list of ints, 'all', None
-            Specify the destination of the job.
-            if None:
-                Submit via Task queue for load-balancing.
-            if 'all':
-                Run on all active engines
-            if list:
-                Run on each specified engine
-            if int:
-                Run on single engine
-        
-        after : Dependency or collection of msg_ids
-            Only for load-balanced execution (targets=None)
-            Specify a list of msg_ids as a time-based dependency.
-            This job will only be run *after* the dependencies
-            have been met.
-            
-        follow : Dependency or collection of msg_ids
-            Only for load-balanced execution (targets=None)
-            Specify a list of msg_ids as a location-based dependency.
-            This job will only be run on an engine where this dependency
-            is met.
-        
-        timeout : float/int or None
-            Only for load-balanced execution (targets=None)
-            Specify an amount of time (in seconds) for the scheduler to
-            wait for dependencies to be met before failing with a
-            DependencyTimeout.
-        
-        Returns
-        -------
-        if block is False:
-            return AsyncResult wrapping msg_id
-            output of AsyncResult.get() is identical to that of `apply(...block=True)`
-        else:
-            wait for, and return actual result of `f(*args, **kwargs)`
-        
+        This is a private method, see `apply` for details.
+        Not to be called directly!
         """
         
+        for kwarg in (bound, block, targets):
+            assert kwarg is not None, "kwarg %r must be specified!"%kwarg
+        
         if self._task_socket is None:
             msg = "Task farming is disabled"
             if self._task_scheme == 'pure':
@@ -1025,7 +993,6 @@ class Client(object):
             if isinstance(f, dependent):
                 # soft warn on functional dependencies
                 warnings.warn(msg, RuntimeWarning)
-            
         
         # defaults:
         args = args if args is not None else []
@@ -1036,14 +1003,6 @@ class Client(object):
         else:
             idents = []
         
-        # enforce types of f,args,kwrags
-        if not callable(f):
-            raise TypeError("f must be callable, not %s"%type(f))
-        if not isinstance(args, (tuple, list)):
-            raise TypeError("args must be tuple or list, not %s"%type(args))
-        if not isinstance(kwargs, dict):
-            raise TypeError("kwargs must be dict, not %s"%type(kwargs))
-        
         after = self._build_dependency(after)
         follow = self._build_dependency(follow)
         subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
@@ -1064,13 +1023,17 @@ class Client(object):
         else:
             return ar
     
-    def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
+    def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
         """Then underlying method for applying functions to specific engines
         via the MUX queue.
         
+        This is a private method, see `apply` for details.
         Not to be called directly!
         """
         
+        for kwarg in (bound, block, targets):
+            assert kwarg is not None, "kwarg %r must be specified!"%kwarg
+        
         idents,targets = self._build_targets(targets)
         
         subheader = {}
@@ -1095,103 +1058,46 @@ class Client(object):
             return ar
     
     #--------------------------------------------------------------------------
-    # Map and decorators
+    # decorators
     #--------------------------------------------------------------------------
     
-    def map(self, f, *sequences, **kwargs):
-        """Parallel version of builtin `map`, using all our engines.
-        
-        `block` and `targets` can be passed as keyword arguments only.
-        
-        There will be one task per target, so work will be chunked
-        if the sequences are longer than `targets`.  
-        
-        Results can be iterated as they are ready, but will become available in chunks.
-        
-        Parameters
-        ----------
-        
-        f : callable
-            function to be mapped
-        *sequences: one or more sequences of matching length
-            the sequences to be distributed and passed to `f`
-        block : bool
-            whether to wait for the result or not [default self.block]
-        targets : valid targets
-            targets to be used [default self.targets]
-        
-        Returns
-        -------
-        
-        if block=False:
-            AsyncMapResult
-                An object like AsyncResult, but which reassembles the sequence of results
-                into a single list. AsyncMapResults can be iterated through before all
-                results are complete.
-            else:
-                the result of map(f,*sequences)
-        
-        """
-        block = kwargs.get('block', self.block)
-        targets = kwargs.get('targets', self.targets)
-        assert len(sequences) > 0, "must have some sequences to map onto!"
-        pf = ParallelFunction(self, f, block=block,
-                        bound=True, targets=targets)
-        return pf.map(*sequences)
-    
-    def imap(self, f, *sequences, **kwargs):
-        """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
-        
-        Each element will be a separate task, and will be load-balanced.  This
-        lets individual elements be ready for iteration as soon as they come.
-        
-        Parameters
-        ----------
-        
-        f : callable
-            function to be mapped
-        *sequences: one or more sequences of matching length
-            the sequences to be distributed and passed to `f`
-        block : bool
-            whether to wait for the result or not [default self.block]
-        
-        Returns
-        -------
-        
-        if block=False:
-            AsyncMapResult
-                An object like AsyncResult, but which reassembles the sequence of results
-                into a single list. AsyncMapResults can be iterated through before all
-                results are complete.
-            else:
-                the result of map(f,*sequences)
-        
-        """
-        
-        block = kwargs.get('block', self.block)
-        
-        assert len(sequences) > 0, "must have some sequences to map onto!"
-        
-        pf = ParallelFunction(self, f, block=self.block,
-                        bound=True, targets=None)
-        return pf.map(*sequences)
-    
-    def parallel(self, bound=True, targets='all', block=True):
+    @defaultblock
+    def parallel(self, bound=True, targets='all', block=None):
         """Decorator for making a ParallelFunction."""
         return parallel(self, bound=bound, targets=targets, block=block)
     
-    def remote(self, bound=True, targets='all', block=True):
+    @defaultblock
+    def remote(self, bound=True, targets='all', block=None):
         """Decorator for making a RemoteFunction."""
         return remote(self, bound=bound, targets=targets, block=block)
     
     def view(self, targets=None, balanced=False):
         """Method for constructing View objects"""
-        if not balanced:
-            if not targets:
+        if targets is None:
+            if balanced:
+                return LoadBalancedView(client=self)
+            else:
                 targets = slice(None)
-            return self[targets]
+        
+        if balanced:
+            view_class = LoadBalancedView
+        else:
+            view_class = DirectView
+        if isinstance(targets, int):
+            if targets not in self.ids:
+                raise IndexError("No such engine: %i"%targets)
+            return view_class(client=self, targets=targets)
+        
+        if isinstance(targets, slice):
+            indices = range(len(self.ids))[targets]
+            ids = sorted(self._ids)
+            targets = [ ids[i] for i in indices ]
+        
+        if isinstance(targets, (tuple, list, xrange)):
+            _,targets = self._build_targets(list(targets))
+            return view_class(client=self, targets=targets)
         else:
-            return LoadBalancedView(self, targets)
+            raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
     
     #--------------------------------------------------------------------------
     # Data movement
@@ -1202,7 +1108,7 @@ class Client(object):
         """Push the contents of `ns` into the namespace on `target`"""
         if not isinstance(ns, dict):
             raise TypeError("Must be a dict, not %s"%type(ns))
-        result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
+        result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
         return result
     
     @defaultblock
@@ -1214,14 +1120,14 @@ class Client(object):
             for key in keys:
                 if not isinstance(key, str):
                     raise TypeError
-        result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
+        result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
         return result
     
+    @defaultblock
     def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
         """
         Partition a Python sequence and send the partitions to a set of engines.
         """
-        block = block if block is not None else self.block
         targets = self._build_targets(targets)[-1]
         mapObject = Map.dists[dist]()
         nparts = len(targets)
@@ -1239,11 +1145,11 @@ class Client(object):
         else:
             return r
     
+    @defaultblock
     def gather(self, key, dist='b', targets='all', block=None):
         """
         Gather a partitioned sequence on a set of engines as a single local seq.
         """
-        block = block if block is not None else self.block
         
         targets = self._build_targets(targets)[-1]
         mapObject = Map.dists[dist]()
@@ -1267,6 +1173,7 @@ class Client(object):
         
         Parameters
         ----------
+        
         msg_ids : list of ints or msg_ids
             if int:
                 Passed as index to self.history for convenience.
@@ -1351,13 +1258,14 @@ class Client(object):
         return content
 
     @spinfirst
-    def queue_status(self, targets=None, verbose=False):
+    def queue_status(self, targets='all', verbose=False):
         """Fetch the status of engine queues.
         
         Parameters
         ----------
+        
         targets : int/str/list of ints/strs
-                the engines on which to execute
+                the engines whose states are to be queried.
                 default : all
         verbose : bool
                 Whether to return lengths only, or lists of ids for each element
@@ -1383,6 +1291,7 @@ class Client(object):
         
         Parameters
         ----------
+        
         msg_ids : str or list of strs
                 the msg_ids whose results should be forgotten.
         targets : int/str/list of ints/strs
@@ -1404,59 +1313,6 @@ class Client(object):
         if content['status'] != 'ok':
             raise ss.unwrap_exception(content)
 
-    #----------------------------------------
-    # activate for %px,%autopx magics
-    #----------------------------------------
-    def activate(self):
-        """Make this `View` active for parallel magic commands.
-        
-        IPython has a magic command syntax to work with `MultiEngineClient` objects.
-        In a given IPython session there is a single active one.  While
-        there can be many `Views` created and used by the user, 
-        there is only one active one.  The active `View` is used whenever 
-        the magic commands %px and %autopx are used.
-        
-        The activate() method is called on a given `View` to make it 
-        active.  Once this has been done, the magic commands can be used.
-        """
-        
-        try:
-            # This is injected into __builtins__.
-            ip = get_ipython()
-        except NameError:
-            print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
-        else:
-            pmagic = ip.plugin_manager.get_plugin('parallelmagic')
-            if pmagic is not None:
-                pmagic.active_multiengine_client = self
-            else:
-                print "You must first load the parallelmagic extension " \
-                      "by doing '%load_ext parallelmagic'"
-
-class AsynClient(Client):
-    """An Asynchronous client, using the Tornado Event Loop.
-    !!!unfinished!!!"""
-    io_loop = None
-    _queue_stream = None
-    _notifier_stream = None
-    _task_stream = None
-    _control_stream = None
-    
-    def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
-        Client.__init__(self, addr, context, username, debug)
-        if io_loop is None:
-            io_loop = ioloop.IOLoop.instance()
-        self.io_loop = io_loop
-        
-        self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
-        self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
-        self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
-        self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
-    
-    def spin(self):
-        for stream in (self.queue_stream, self.notifier_stream, 
-                        self.task_stream, self.control_stream):
-            stream.flush()
 
 __all__ = [ 'Client', 
             'depend', 
diff --git a/IPython/zmq/parallel/remotefunction.py b/IPython/zmq/parallel/remotefunction.py
index 4035f1a..388a7e6 100644
--- a/IPython/zmq/parallel/remotefunction.py
+++ b/IPython/zmq/parallel/remotefunction.py
@@ -17,7 +17,7 @@ from asyncresult import AsyncMapResult
 # Decorators
 #-----------------------------------------------------------------------------
 
-def remote(client, bound=False, block=None, targets=None):
+def remote(client, bound=False, block=None, targets=None, balanced=None):
     """Turn a function into a remote function.
     
     This method can be used for map:
@@ -26,10 +26,10 @@ def remote(client, bound=False, block=None, targets=None):
         def func(a)
     """
     def remote_function(f):
-        return RemoteFunction(client, f, bound, block, targets)
+        return RemoteFunction(client, f, bound, block, targets, balanced)
     return remote_function
 
-def parallel(client, dist='b', bound=False, block=None, targets='all'):
+def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None):
     """Turn a function into a parallel remote function.
     
     This method can be used for map:
@@ -38,7 +38,7 @@ def parallel(client, dist='b', bound=False, block=None, targets='all'):
         def func(a)
     """
     def parallel_function(f):
-        return ParallelFunction(client, f, dist, bound, block, targets)
+        return ParallelFunction(client, f, dist, bound, block, targets, balanced)
     return parallel_function
 
 #--------------------------------------------------------------------------
@@ -62,6 +62,8 @@ class RemoteFunction(object):
         to use the current `block` attribute of `client`
     targets : valid target list [default: all]
         The targets on which to execute.
+    balanced : bool
+        Whether to load-balance with the Task scheduler or not
     """
     
     client = None # the remote connection
@@ -69,23 +71,30 @@ class RemoteFunction(object):
     block = None # whether to block
     bound = None # whether to affect the namespace
     targets = None # where to execute
+    balanced = None # whether to load-balance
     
-    def __init__(self, client, f, bound=False, block=None, targets=None):
+    def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
         self.client = client
         self.func = f
         self.block=block
         self.bound=bound
         self.targets=targets
+        if balanced is None:
+            if targets is None:
+                balanced = True
+            else:
+                balanced = False
+        self.balanced = balanced
     
     def __call__(self, *args, **kwargs):
         return self.client.apply(self.func, args=args, kwargs=kwargs,
-                block=self.block, targets=self.targets, bound=self.bound)
+                block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
     
 
 class ParallelFunction(RemoteFunction):
     """Class for mapping a function to sequences."""
-    def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
-        super(ParallelFunction, self).__init__(client,f,bound,block,targets)
+    def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None):
+        super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
         mapClass = Map.dists[dist]
         self.mapObject = mapClass()
     
@@ -93,21 +102,19 @@ class ParallelFunction(RemoteFunction):
         len_0 = len(sequences[0])
         for s in sequences:
             if len(s)!=len_0:
-                raise ValueError('all sequences must have equal length')
+                msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
+                raise ValueError(msg)
         
-        if self.targets is None:
-            # load-balanced:
-            engines = [None]*len_0
-        elif isinstance(self.targets, int):
-            engines = [None]*self.targets
+        if self.balanced:
+            targets = [self.targets]*len_0
         else:
             # multiplexed:
-            engines = self.client._build_targets(self.targets)[-1]
+            targets = self.client._build_targets(self.targets)[-1]
         
-        nparts = len(engines)
+        nparts = len(targets)
         msg_ids = []
         # my_f = lambda *a: map(self.func, *a)
-        for index, engineid in enumerate(engines):
+        for index, t in enumerate(targets):
             args = []
             for seq in sequences:
                 part = self.mapObject.getPartition(seq, index, nparts)
@@ -124,22 +131,26 @@ class ParallelFunction(RemoteFunction):
                 args = [self.func]+args
             else:
                 f=self.func
-            mid = self.client.apply(f, args=args, block=False, 
-                        bound=self.bound,
-                        targets=engineid).msg_ids[0]
-            msg_ids.append(mid)
+            ar = self.client.apply(f, args=args, block=False, bound=self.bound, 
+                        targets=targets, balanced=self.balanced)
+            
+            msg_ids.append(ar.msg_ids[0])
         
         r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
         if self.block:
-            r.wait()
-            return r.result
+            try:
+                return r.get()
+            except KeyboardInterrupt:
+                return r
         else:
             return r
     
     def map(self, *sequences):
         """call a function on each element of a sequence remotely."""
         self._map = True
-        ret = self.__call__(*sequences)
-        del self._map
+        try:
+            ret = self.__call__(*sequences)
+        finally:
+            del self._map
         return ret
 
diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py
index bf9df96..bb6222b 100644
--- a/IPython/zmq/parallel/util.py
+++ b/IPython/zmq/parallel/util.py
@@ -59,7 +59,7 @@ def validate_url(url):
         except ValueError:
             raise AssertionError("Invalid port %r in url: %r"%(port, url))
         
-        assert pat.match(addr) is not None, 'Invalid url: %r'%url
+        assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
         
     else:
         # only validate tcp urls currently
diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py
index ead83d4..fb039ab 100644
--- a/IPython/zmq/parallel/view.py
+++ b/IPython/zmq/parallel/view.py
@@ -10,7 +10,11 @@
 # Imports
 #-----------------------------------------------------------------------------
 
+from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
+
 from IPython.external.decorator import decorator
+from IPython.zmq.parallel.asyncresult import AsyncResult
+from IPython.zmq.parallel.dependency import Dependency
 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
 
 #-----------------------------------------------------------------------------
@@ -61,30 +65,29 @@ def spin_after(f, self, *args, **kwargs):
 # Classes
 #-----------------------------------------------------------------------------
 
-class View(object):
+class View(HasTraits):
     """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
     
     Don't use this class, use subclasses.
     """
-    block=None
-    bound=None
-    history=None
-    outstanding = set()
-    results = {}
-    
+    block=Bool(False)
+    bound=Bool(False)
+    history=List()
+    outstanding = Set()
+    results = Dict()
+    client = Instance('IPython.zmq.parallel.client.Client')
+    
+    _ntargets = Int(1)
+    _balanced = Bool(False)
+    _default_names = List(['block', 'bound'])
     _targets = None
-    _apply_name = 'apply'
-    _default_names = ['targets', 'block']
     
-    def __init__(self, client, targets=None):
-        self.client = client
+    def __init__(self, client=None, targets=None):
+        super(View, self).__init__(client=client)
         self._targets = targets
         self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
         self.block = client.block
-        self.bound=False
-        self.history = []
-        self.outstanding = set()
-        self.results = {}
+        
         for name in self._default_names:
             setattr(self, name, getattr(self, name, None))
         
@@ -101,26 +104,46 @@ class View(object):
 
     @targets.setter
     def targets(self, value):
-        self._targets = value
-        # raise AttributeError("Cannot set my targets argument after construction!")
+        raise AttributeError("Cannot set View `targets` after construction!")
     
     def _defaults(self, *excludes):
         """return dict of our default attributes, excluding names given."""
-        d = {}
+        d = dict(balanced=self._balanced, targets=self.targets)
         for name in self._default_names:
             if name not in excludes:
                 d[name] = getattr(self, name)
         return d
     
+    def set_flags(self, **kwargs):
+        """set my attribute flags by keyword.
+        
+        A View is a wrapper for the Client's apply method, but
+        with attributes that specify keyword arguments, those attributes
+        can be set by keyword argument with this method.
+        
+        Parameters
+        ----------
+        
+        block : bool
+            whether to wait for results
+        bound : bool
+            whether to use the client's namespace
+        """
+        for key in kwargs:
+            if key not in self._default_names:
+                raise KeyError("Invalid name: %r"%key)
+        for name in ('block', 'bound'):
+            if name in kwargs:
+                setattr(self, name, kwargs)
+    
+    #----------------------------------------------------------------
+    # wrappers for client methods:
+    #----------------------------------------------------------------
     @sync_results
     def spin(self):
         """spin the client, and sync"""
         self.client.spin()
     
-    @property
-    def _apply(self):
-        return getattr(self.client, self._apply_name)
-
     @sync_results
     @save_ids
     def apply(self, f, *args, **kwargs):
@@ -133,7 +156,7 @@ class View(object):
         else:
             returns actual result of f(*args, **kwargs)
         """
-        return self._apply(f, args, kwargs, **self._defaults())
+        return self.client.apply(f, args, kwargs, **self._defaults())
 
     @save_ids
     def apply_async(self, f, *args, **kwargs):
@@ -144,7 +167,7 @@ class View(object):
         returns msg_id
         """
         d = self._defaults('block', 'bound')
-        return self._apply(f,args,kwargs, block=False, bound=False, **d)
+        return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
 
     @spin_after
     @save_ids
@@ -157,7 +180,7 @@ class View(object):
         returns: actual result of f(*args, **kwargs)
         """
         d = self._defaults('block', 'bound')
-        return self._apply(f,args,kwargs, block=True, bound=False, **d)
+        return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
 
     @sync_results
     @save_ids
@@ -173,7 +196,7 @@ class View(object):
         
         """
         d = self._defaults('bound')
-        return self._apply(f, args, kwargs, bound=True, **d)
+        return self.client.apply(f, args, kwargs, bound=True, **d)
 
     @sync_results
     @save_ids
@@ -187,7 +210,7 @@ class View(object):
         
         """
         d = self._defaults('block', 'bound')
-        return self._apply(f, args, kwargs, block=False, bound=True, **d)
+        return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
 
     @spin_after
     @save_ids
@@ -200,23 +223,7 @@ class View(object):
         
         """
         d = self._defaults('block', 'bound')
-        return self._apply(f, args, kwargs, block=True, bound=True, **d)
-    
-    @spin_after
-    @save_ids
-    def map(self, f, *sequences):
-        """Parallel version of builtin `map`, using this view's engines."""
-        if isinstance(self.targets, int):
-            targets = [self.targets]
-        else:
-            targets = self.targets
-        pf = ParallelFunction(self.client, f, block=self.block,
-                        bound=True, targets=targets)
-        return pf.map(*sequences)
-    
-    def parallel(self, bound=True, block=True):
-        """Decorator for making a ParallelFunction"""
-        return parallel(self.client, bound=bound, targets=self.targets, block=block)
+        return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
     
     def abort(self, msg_ids=None, block=None):
         """Abort jobs on my engines.
@@ -240,6 +247,17 @@ class View(object):
         if targets is None or targets == 'all':
             targets = self.targets
         return self.client.purge_results(msg_ids=msg_ids, targets=targets)
+        
+    #-------------------------------------------------------------------
+    # Decorators
+    #-------------------------------------------------------------------
+    def parallel(self, bound=True, block=True):
+        """Decorator for making a ParallelFunction"""
+        return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
+    
+    def remote(self, bound=True, block=True):
+        """Decorator for making a RemoteFunction"""
+        return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
     
 
 
@@ -262,6 +280,62 @@ class DirectView(View):
     
     """
     
+    def __init__(self, client=None, targets=None):
+        super(DirectView, self).__init__(client=client, targets=targets)
+        self._balanced = False
+    
+    @spin_after
+    @save_ids
+    def map(self, f, *sequences, **kwargs):
+        """Parallel version of builtin `map`, using this View's `targets`.
+        
+        There will be one task per target, so work will be chunked
+        if the sequences are longer than `targets`.  
+        
+        Results can be iterated as they are ready, but will become available in chunks.
+        
+        Parameters
+        ----------
+        
+        f : callable
+            function to be mapped
+        *sequences: one or more sequences of matching length
+            the sequences to be distributed and passed to `f`
+        block : bool
+            whether to wait for the result or not [default self.block]
+        bound : bool
+            whether to wait for the result or not [default self.bound]
+        
+        Returns
+        -------
+        
+        if block=False:
+            AsyncMapResult
+                An object like AsyncResult, but which reassembles the sequence of results
+                into a single list. AsyncMapResults can be iterated through before all
+                results are complete.
+            else:
+                the result of map(f,*sequences)
+        """
+        
+        block = kwargs.get('block', self.block)
+        bound = kwargs.get('bound', self.bound)
+        for k in kwargs.keys():
+            if k not in ['block', 'bound']:
+                raise TypeError("invalid keyword arg, %r"%k)
+        
+        assert len(sequences) > 0, "must have some sequences to map onto!"
+        pf = ParallelFunction(self.client, f, block=block,
+                        bound=bound, targets=self.targets, balanced=False)
+        return pf.map(*sequences)
+    
+    def map_async(self, f, *sequences, **kwargs):
+        """Parallel version of builtin `map`, using this view's engines."""
+        if 'block' in kwargs:
+            raise TypeError("map_async doesn't take a `block` keyword argument.")
+        kwargs['block'] = True
+        return self.map(f,*sequences,**kwargs)
+    
     @sync_results
     @save_ids
     def execute(self, code, block=True):
@@ -358,14 +432,13 @@ class DirectView(View):
 
     
 class LoadBalancedView(View):
-    """An engine-agnostic View that only executes via the Task queue.
+    """An load-balancing View that only executes via the Task scheduler.
     
-    Typically created via:
+    Load-balanced views can be created with the client's `view` method:
     
-    >>> v = client[None]
-    <LoadBalancedView None>
+    >>> v = client.view(balanced=True)
     
-    but can also be created with:
+    or targets can be specified, to restrict the potential destinations:
     
     >>> v = client.view([1,3],balanced=True)
     
@@ -374,10 +447,126 @@ class LoadBalancedView(View):
     """
     
     _apply_name = 'apply_balanced'
-    _default_names = ['targets', 'block', 'bound', 'follow', 'after', 'timeout']
+    _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
     
-    def __init__(self, client, targets=None):
-        super(LoadBalancedView, self).__init__(client, targets)
+    def __init__(self, client=None, targets=None):
+        super(LoadBalancedView, self).__init__(client=client, targets=targets)
         self._ntargets = 1
-        self._apply_name = 'apply_balanced'
+    
+    def _validate_dependency(self, dep):
+        """validate a dependency.
+        
+        For use in `set_flags`.
+        """
+        if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
+            return True
+        elif isinstance(dep, (list,set, tuple)):
+            for d in dep:
+                if not isinstance(d, str, AsyncResult):
+                    return False
+        elif isinstance(dep, dict):
+            if set(dep.keys()) != set(Dependency().as_dict().keys()):
+                return False
+            if not isinstance(dep['msg_ids'], list):
+                return False
+            for d in dep['msg_ids']:
+                if not isinstance(d, str):
+                    return False
+        else:
+            return False
+                
+    def set_flags(self, **kwargs):
+        """set my attribute flags by keyword.
+        
+        A View is a wrapper for the Client's apply method, but
+        with attributes that specify keyword arguments, those attributes
+        can be set by keyword argument with this method.
+        
+        Parameters
+        ----------
+        
+        block : bool
+            whether to wait for results
+        bound : bool
+            whether to use the engine's namespace
+        follow : Dependency, list, msg_id, AsyncResult
+            the location dependencies of tasks
+        after : Dependency, list, msg_id, AsyncResult
+            the time dependencies of tasks
+        timeout : int,None
+            the timeout to be used for tasks
+        """
+        
+        super(LoadBalancedView, self).set_flags(**kwargs)
+        for name in ('follow', 'after'):
+            if name in kwargs:
+                value = kwargs[name]
+                if self._validate_dependency(value):
+                    setattr(self, name, value)
+                else:
+                    raise ValueError("Invalid dependency: %r"%value)
+        if 'timeout' in kwargs:
+            t = kwargs['timeout']
+            if not isinstance(t, (int, long, float, None)):
+                raise TypeError("Invalid type for timeout: %r"%type(t))
+            if t is not None:
+                if t < 0:
+                    raise ValueError("Invalid timeout: %s"%t)
+            self.timeout = t
+                
+    @spin_after
+    @save_ids
+    def map(self, f, *sequences, **kwargs):
+        """Parallel version of builtin `map`, load-balanced by this View.
+        
+        Each element will be a separate task, and will be load-balanced.  This
+        lets individual elements be available for iteration as soon as they arrive.
+        
+        Parameters
+        ----------
+        
+        f : callable
+            function to be mapped
+        *sequences: one or more sequences of matching length
+            the sequences to be distributed and passed to `f`
+        block : bool
+            whether to wait for the result or not [default self.block]
+        bound : bool
+            whether to use the engine's namespace
+        
+        Returns
+        -------
+        
+        if block=False:
+            AsyncMapResult
+                An object like AsyncResult, but which reassembles the sequence of results
+                into a single list. AsyncMapResults can be iterated through before all
+                results are complete.
+            else:
+                the result of map(f,*sequences)
+        
+        """
+        
+        block = kwargs.get('block', self.block)
+        bound = kwargs.get('bound', self.bound)
+        
+        assert len(sequences) > 0, "must have some sequences to map onto!"
+        
+        pf = ParallelFunction(self.client, f, block=block, bound=bound, 
+                                targets=self.targets, balanced=True)
+        return pf.map(*sequences)
+    
+    def map_async(self, f, *sequences, **kwargs):
+        """Parallel version of builtin `map`, using this view's engines.
+        
+        This is equivalent to map(...block=False)
+        
+        See `map` for details.
+        """
+        
+        if 'block' in kwargs:
+            raise TypeError("map_async doesn't take a `block` keyword argument.")
+        kwargs['block'] = True
+        return self.map(f,*sequences,**kwargs)
+    
     
diff --git a/docs/source/parallelz/parallel_multiengine.txt b/docs/source/parallelz/parallel_multiengine.txt
index 48c5039..02b979b 100644
--- a/docs/source/parallelz/parallel_multiengine.txt
+++ b/docs/source/parallelz/parallel_multiengine.txt
@@ -37,7 +37,7 @@ module and then create a :class:`.Client` instance:
     In [2]: rc = client.Client()
 
 This form assumes that the default connection information (stored in
-:file:`ipcontroller-client.json` found in `~/.ipython/clusterz_default/security`) is
+:file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
 accurate. If the controller was started on a remote machine, you must copy that connection
 file to the client machine, or enter its contents as arguments to the Client constructor:
 
@@ -45,17 +45,17 @@ file to the client machine, or enter its contents as arguments to the Client con
 
     # If you have copied the json connector file from the controller:
     In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
-    # for a remote controller at 10.0.1.5, visible from my.server.com:
+    # or for a remote controller at 10.0.1.5, visible from my.server.com:
     In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
     
 
-To make sure there are engines connected to the controller, use can get a list
+To make sure there are engines connected to the controller, users can get a list
 of engine ids:
 
 .. sourcecode:: ipython
 
     In [3]: rc.ids
-    Out[3]: set([0, 1, 2, 3])
+    Out[3]: [0, 1, 2, 3]
 
 Here we see that there are four engines ready to do work for us.
 
@@ -73,24 +73,25 @@ Parallel map
 Python's builtin :func:`map` functions allows a function to be applied to a
 sequence element-by-element. This type of code is typically trivial to
 parallelize. In fact, since IPython's interface is all about functions anyway,
-you can just use the builtin :func:`map`, or a client's :meth:`map` method:
+you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a 
+DirectView's :meth:`map` method:
 
 .. sourcecode:: ipython
 
     In [62]: serial_result = map(lambda x:x**10, range(32))
 
-    In [66]: parallel_result = rc.map(lambda x: x**10, range(32))
+    In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
 
-    In [67]: serial_result==parallel_result
+    In [67]: serial_result==parallel_result.get()
     Out[67]: True
 
 
 .. note::
 
-    The client's own version of :meth:`map` or that of :class:`.DirectView` do
+    The :class:`DirectView`'s version of :meth:`map` does
     not do any load balancing. For a load balanced version, use a
     :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
-    `targets=None`.
+    `balanced=True`.
 
 .. seealso::
     
@@ -105,16 +106,18 @@ some decorators:
 
 .. sourcecode:: ipython
 
-    In [10]: @rc.remote(block=True)
+    In [10]: @rc.remote(block=True, targets=0)
        ....: def f(x):
        ....:     return 10.0*x**4
        ....: 
 
-    In [11]: map(f, range(32))    # this is done in parallel
+    In [11]: map(f, range(32))    # this is done on engine 0
     Out[11]: [0.0,10.0,160.0,...]
 
-See the docstring for the :func:`parallel` and :func:`remote` decorators for
-options.
+.. seealso::
+
+    See the docstring for the :func:`parallel` and :func:`remote` decorators for
+    options.
 
 Calling Python functions
 ========================
@@ -141,7 +144,7 @@ Instead, they provide the signature::
 In order to provide the nicer interface, we have :class:`View` classes, which wrap
 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
 the extra arguments. For instance, performing index-access on a client creates a
-:class:`.LoadBalancedView`.
+:class:`.DirectView`.
 
 .. sourcecode:: ipython
 
@@ -218,7 +221,7 @@ unbound, unless called by the :meth:`apply_bound` method:
 
 .. sourcecode:: ipython
 
-    In [9]: rc[:]['b'] = 5 # assign b to 5 everywhere
+    In [9]: dview['b'] = 5 # assign b to 5 everywhere
     
     In [10]: v0 = rc[0]
     
@@ -277,14 +280,14 @@ local Python/IPython session:
        ...:     return time.time()-tic
     
     # In non-blocking mode
-    In [7]: pr = rc[:].apply_async(wait, 2)
+    In [7]: pr = dview.apply_async(wait, 2)
 
     # Now block for the result
     In [8]: pr.get()
     Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
 
     # Again in non-blocking mode
-    In [9]: pr = rc[:].apply_async(wait, 10)
+    In [9]: pr = dview.apply_async(wait, 10)
 
     # Poll to see if the result is ready
     In [10]: pr.ready()
@@ -321,7 +324,7 @@ associated results are ready:
     In [72]: rc.block=False
 
     # A trivial list of AsyncResults objects
-    In [73]: pr_list = [rc[:].apply_async(wait, 3) for i in range(10)]
+    In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
 
     # Wait until all of them are done
     In [74]: rc.barrier(pr_list)
@@ -332,63 +335,38 @@ associated results are ready:
     
 
 
-The ``block`` and ``targets`` keyword arguments and attributes
---------------------------------------------------------------
-
-.. warning::
-
-    This is different now, I haven't updated this section.
-    -MinRK
+The ``block`` keyword argument and attributes
+---------------------------------------------
 
 Most methods(like :meth:`apply`) accept
-``block`` and ``targets`` as keyword arguments. As we have seen above, these
-keyword arguments control the blocking mode and which engines the command is
-applied to. The :class:`Client` class also has :attr:`block` and
-:attr:`targets` attributes that control the default behavior when the keyword
-arguments are not provided. Thus the following logic is used for :attr:`block`
-and :attr:`targets`:
+``block`` as a keyword argument. As we have seen above, these
+keyword arguments control the blocking mode . The :class:`Client` class also has 
+a :attr:`block` attribute that controls the default behavior when the keyword
+argument is not provided. Thus the following logic is used for :attr:`block`:
 
 * If no keyword argument is provided, the instance attributes are used.
-* Keyword argument, if provided override the instance attributes.
+* Keyword argument, if provided override the instance attributes for
+  the duration of a single call.
 
 The following examples demonstrate how to use the instance attributes:
 
 .. sourcecode:: ipython
 
-    In [16]: rc.targets = [0,2]
-
     In [17]: rc.block = False
 
-    In [18]: pr = rc.execute('a=5')
-
-    In [19]: pr.r
-    Out[19]: 
-    <Results List>
-    [0] In [6]: a=5
-    [2] In [6]: a=5
+    In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
 
-    # Note targets='all' means all engines
-    In [20]: rc.targets = 'all'
+    In [19]: ar.get()
+    Out[19]: [10,10]
 
     In [21]: rc.block = True
 
-    In [22]: rc.execute('b=10; print b')
-    Out[22]: 
-    <Results List>
-    [0] In [7]: b=10; print b
-    [0] Out[7]: 10
-
-    [1] In [6]: b=10; print b
-    [1] Out[6]: 10
-
-    [2] In [7]: b=10; print b
-    [2] Out[7]: 10
-
-    [3] In [6]: b=10; print b
-    [3] Out[6]: 10
+    # Note targets='all' means all engines
+    In [22]: rc.apply(lambda : 42, targets='all')
+    Out[22]: [42, 42, 42, 42]
 
-The :attr:`block` and :attr:`targets` instance attributes also determine the
-behavior of the parallel magic commands.
+The :attr:`block` and :attr:`targets` instance attributes of the
+:class:`.DirectView` also determine the behavior of the parallel magic commands.
 
 
 Parallel magic commands
@@ -567,9 +545,9 @@ appear as a local dictionary. Underneath, this uses :meth:`push` and
 
     In [50]: rc.block=True
 
-    In [51]: rc[:]['a']=['foo','bar']
+    In [51]: dview['a']=['foo','bar']
 
-    In [52]: rc[:]['a']
+    In [52]: dview['a']
     Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
 
 Scatter and gather
@@ -585,13 +563,13 @@ between engines, MPI should be used:
 
 .. sourcecode:: ipython
 
-    In [58]: rc.scatter('a',range(16))
+    In [58]: dview.scatter('a',range(16))
     Out[58]: [None,None,None,None]
 
-    In [59]: rc[:]['a']
+    In [59]: dview['a']
     Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
 
-    In [60]: rc.gather('a')
+    In [60]: dview.gather('a')
     Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
 
 Other things to look at
@@ -606,14 +584,14 @@ basic effect using :meth:`scatter` and :meth:`gather`:
 
 .. sourcecode:: ipython
 
-    In [66]: rc.scatter('x',range(64))
+    In [66]: dview.scatter('x',range(64))
     Out[66]: [None,None,None,None]
 
     In [67]: px y = [i**10 for i in x]
     Parallel execution on engines: [0, 1, 2, 3]
     Out[67]: 
 
-    In [68]: y = rc.gather('y')
+    In [68]: y = dview.gather('y')
 
     In [69]: print y
     [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
diff --git a/docs/source/parallelz/parallel_task.txt b/docs/source/parallelz/parallel_task.txt
index 99213ff..ec49a0f 100644
--- a/docs/source/parallelz/parallel_task.txt
+++ b/docs/source/parallelz/parallel_task.txt
@@ -41,8 +41,8 @@ module and then create a :class:`.Client` instance:
 	
 	In [2]: rc = client.Client()
 	
-	In [3]: lview = rc[None]
-	Out[3]: <LoadBalancedView tcp://127.0.0.1:10101>
+	In [3]: lview = rc.view(balanced=True)
+	Out[3]: <LoadBalancedView None>
     
 
 This form assumes that the controller was started on localhost with default
@@ -66,7 +66,7 @@ objects, but *in parallel*. Like the multiengine interface, these can be
 implemented via the task interface. The exact same tools can perform these
 actions in load-balanced ways as well as multiplexed ways: a parallel version
 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
-argument `targets=None`, then they are dynamically load balanced. Thus, if the
+argument `balanced=True`, then they are dynamically load balanced. Thus, if the
 execution time per item varies significantly, you should use the versions in
 the task interface.
 
@@ -80,7 +80,7 @@ for the ``None`` element:
 
 	In [63]: serial_result = map(lambda x:x**10, range(32))
 
-	In [64]: parallel_result = tc[None].map(lambda x:x**10, range(32))
+	In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
 
 	In [65]: serial_result==parallel_result
 	Out[65]: True
@@ -258,13 +258,13 @@ you can skip using Dependency objects, and just pass msg_ids or AsyncResult obje
 
     In [14]: client.block=False
 
-    In [15]: ar = client.apply(f, args, kwargs, targets=None)
+    In [15]: ar = client.apply(f, args, kwargs, balanced=True)
 
-    In [16]: ar2 = client.apply(f2, targets=None)
+    In [16]: ar2 = client.apply(f2, balanced=True)
 
-    In [17]: ar3 = client.apply(f3, after=[ar,ar2])
+    In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
 
-    In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5)
+    In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
 
 
 .. seealso::
@@ -383,7 +383,7 @@ The following is an overview of how to use these classes together:
 1. Create a :class:`Client`.
 2. Define some functions to be run as tasks
 3. Submit your tasks to using the :meth:`apply` method of your
-   :class:`Client` instance, specifying `targets=None`. This signals
+   :class:`Client` instance, specifying `balanced=True`. This signals
    the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
 4. Use :meth:`Client.get_results` to get the results of the
    tasks, or use the :meth:`AsyncResult.get` method of the results to wait