##// END OF EJS Templates
Example of adding input transformers....
Example of adding input transformers. Closes gh-4709

File last commit:

r13638:1fdd178f
r13887:19207476
Show More
Using Dill.ipynb
462 lines | 28.2 KiB | text/plain | TextLexer

Using dill to pickle anything

IPython.parallel doesn't do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn't do anything other than the bare minimum to make basic interactively defined functions and classes sendable.

There are a few projects that extend pickle to make just about anything sendable, and one of these is dill.

To install dill:

pip install dill

First, as always, we create a task function, this time with a closure

In [1]:
def make_closure(a):
    """make a function with a closure, and return it"""
    def has_closure(b):
        return a * b
    return has_closure
In [2]:
closed = make_closure(5)
In [3]:
closed(2)
Out[3]:
10
In [4]:
import pickle

Without help, pickle can't deal with closures

In [5]:
pickle.dumps(closed)
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<ipython-input-5-0f1f376cfea0> in <module>()
----> 1 pickle.dumps(closed)

/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in dumps(obj, protocol)
   1372 def dumps(obj, protocol=None):
   1373     file = StringIO()
-> 1374     Pickler(file, protocol).dump(obj)
   1375     return file.getvalue()
   1376 

/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in dump(self, obj)
    222         if self.proto >= 2:
    223             self.write(PROTO + chr(self.proto))
--> 224         self.save(obj)
    225         self.write(STOP)
    226 

/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in save_global(self, obj, name, pack)
    746             raise PicklingError(
    747                 "Can't pickle %r: it's not found as %s.%s" %
--> 748                 (obj, module, name))
    749         else:
    750             if klass is not obj:

PicklingError: Can't pickle <function has_closure at 0x10d2552a8>: it's not found as __main__.has_closure

But after we import dill, magic happens

In [6]:
import dill
In [7]:
pickle.dumps(closed)[:64] + '...'
Out[7]:
"cdill.dill\n_load_type\np0\n(S'FunctionType'\np1\ntp2\nRp3\n(cdill.dill..."

So from now on, pretty much everything is pickleable.

Now use this in IPython.parallel

As usual, we start by creating our Client and View

In [8]:
from IPython import parallel
rc = parallel.Client()
view = rc.load_balanced_view()

Now let's try sending our function with a closure:

In [9]:
view.apply_sync(closed, 3)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-23a646829fdc> in <module>()
----> 1 view.apply_sync(closed, 3)

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in apply_sync(self, f, *args, **kwargs)

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in spin_after(f, self, *args, **kwargs)
     73 def spin_after(f, self, *args, **kwargs):
     74     """call spin after the method."""
---> 75     ret = f(self, *args, **kwargs)
     76     self.spin()
     77     return ret

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in apply_sync(self, f, *args, **kwargs)
    248         returns: actual result of f(*args, **kwargs)
    249         """
--> 250         return self._really_apply(f, args, kwargs, block=True)
    251 
    252     #----------------------------------------------------------------

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
     64     self._in_sync_results = True
     65     try:
---> 66         ret = f(self, *args, **kwargs)
     67     finally:
     68         self._in_sync_results = False

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
     49     n_previous = len(self.client.history)
     50     try:
---> 51         ret = f(self, *args, **kwargs)
     52     finally:
     53         nmsgs = len(self.client.history) - n_previous

/Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries)
   1053 
   1054         msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
-> 1055                                 metadata=metadata)
   1056         tracker = None if track is False else msg['tracker']
   1057 

/Users/minrk/dev/ip/mine/IPython/parallel/client/client.pyc in send_apply_request(self, socket, f, args, kwargs, metadata, track, ident)
   1252         bufs = serialize.pack_apply_message(f, args, kwargs,
   1253             buffer_threshold=self.session.buffer_threshold,
-> 1254             item_threshold=self.session.item_threshold,
   1255         )
   1256 

/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold)
    163     info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
    164 
--> 165     msg = [pickle.dumps(can(f),-1)]
    166     msg.append(pickle.dumps(info, -1))
    167     msg.extend(arg_bufs)

/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc in reduce_code(co)
     36 def reduce_code(co):
     37     if co.co_freevars or co.co_cellvars:
---> 38         raise ValueError("Sorry, cannot pickle code objects with closures")
     39     args =  [co.co_argcount, co.co_nlocals, co.co_stacksize,
     40             co.co_flags, co.co_code, co.co_consts, co.co_names,

ValueError: Sorry, cannot pickle code objects with closures

Oops, no dice. For IPython to work with dill, there are one or two more steps. IPython will do these for you if you call pickleutil.use_dill:

In [10]:
from IPython.utils import pickleutil
pickleutil.use_dill()

Now let's try again

In [11]:
view.apply_sync(closed, 3)
Out[11]:
15

Yay! Now we can use dill to allow IPython.parallel to send anything.

And that's it! We can send closures and other previously non-pickleables to our engines.

But wait, there's more!

In [12]:
view.apply_sync(make_closure, 2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)/Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold)
    100         buffers.extend(_extract_buffers(cobj, buffer_threshold))
    101 
--> 102     buffers.insert(0, pickle.dumps(cobj,-1))
    103     return buffers
    104 
/Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc in reduce_code(co)
     36 def reduce_code(co):
     37     if co.co_freevars or co.co_cellvars:
---> 38         raise ValueError("Sorry, cannot pickle code objects with closures")
     39     args =  [co.co_argcount, co.co_nlocals, co.co_stacksize,
     40             co.co_flags, co.co_code, co.co_consts, co.co_names,
ValueError: Sorry, cannot pickle code objects with closures

If we want dill support for objects coming from the engines, then we need to call use_dill() there as well.

DirectView objects have a method to call use_dill locally and on every engine:

In [13]:
rc[:].use_dill()
Out[13]:
<AsyncResult: use_dill>

This is equivalent to

from IPython.utils.pickleutil import use_dill
use_dill()
rc[:].apply(use_dill)

Let's give it a try now:

In [14]:
remote_closure = view.apply_sync(make_closure, 4)
remote_closure(5)
Out[14]:
20

At this point, we can send/recv all kinds of stuff

In [15]:
def outer(a):
    def inner(b):
        def inner_again(c):
            return c * b * a
        return inner_again
    return inner

So outer returns a function with a closure, which returns a function with a closure.

Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.

In [16]:
view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))
Out[16]:
6