##// END OF EJS Templates
Merge pull request #2255 from minrk/arwait...
Merge pull request #2255 from minrk/arwait better flush iopub with AsyncResults; requesting metadata (e.g. ar.data or ar.stdout) will result in flushing iopub if the outputs are incomplete, so separate wait(0) need not be called.

File last commit:

r8081:4bca5d9a
r8149:99eddce3 merge
Show More
pickleutil.py
281 lines | 7.6 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 # encoding: utf-8
"""Pickle related utilities. Perhaps this should be called 'can'."""
__docformat__ = "restructuredtext en"
#-------------------------------------------------------------------------------
Matthias BUSSONNIER
update copyright to 2011/20xx-2011...
r5390 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
Improvements to dependency handling...
r3607 import copy
MinRK
use logger for canning import error
r8034 import logging
MinRK
update API after sagedays29...
r3664 import sys
from types import FunctionType
MinRK
Improvements to dependency handling...
r3607
MinRK
better serialization for parallel code...
r7967 try:
import cPickle as pickle
except ImportError:
import pickle
try:
import numpy
except:
numpy = None
MinRK
codeutil into zmq, to prevent IPython.kernel import
r3557 import codeutil
MinRK
better serialization for parallel code...
r7967 import py3compat
from importstring import import_item
MinRK
use logger for canning import error
r8034 from IPython.config import Application
MinRK
better serialization for parallel code...
r7967 if py3compat.PY3:
buffer = memoryview
MinRK
prep newparallel for rebase...
r3539
MinRK
Improvements to dependency handling...
r3607 #-------------------------------------------------------------------------------
# Classes
#-------------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539 class CannedObject(object):
MinRK
added dependency decorator
r3546 def __init__(self, obj, keys=[]):
self.keys = keys
MinRK
Improvements to dependency handling...
r3607 self.obj = copy.copy(obj)
MinRK
added dependency decorator
r3546 for key in keys:
MinRK
Improvements to dependency handling...
r3607 setattr(self.obj, key, can(getattr(obj, key)))
MinRK
better serialization for parallel code...
r7967
self.buffers = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
better serialization for parallel code...
r7967 def get_object(self, g=None):
MinRK
added dependency decorator
r3546 if g is None:
MinRK
better serialization for parallel code...
r7967 g = {}
MinRK
added dependency decorator
r3546 for key in self.keys:
setattr(self.obj, key, uncan(getattr(self.obj, key), g))
return self.obj
MinRK
better serialization for parallel code...
r7967
MinRK
added dependency decorator
r3546
MinRK
add Reference object
r3643 class Reference(CannedObject):
"""object for wrapping a remote reference by name."""
def __init__(self, name):
if not isinstance(name, basestring):
raise TypeError("illegal name: %r"%name)
self.name = name
MinRK
better serialization for parallel code...
r7967 self.buffers = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Reference object
r3643 def __repr__(self):
return "<Reference: %r>"%self.name
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
better serialization for parallel code...
r7967 def get_object(self, g=None):
MinRK
add Reference object
r3643 if g is None:
MinRK
better serialization for parallel code...
r7967 g = {}
MinRK
use eval to uncan References...
r6159
return eval(self.name, g)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
added dependency decorator
r3546
MinRK
prep newparallel for rebase...
r3539 class CannedFunction(CannedObject):
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def __init__(self, f):
MinRK
better serialization for parallel code...
r7967 self._check_type(f)
MinRK
prep newparallel for rebase...
r3539 self.code = f.func_code
MinRK
can func_defaults...
r8041 if f.func_defaults:
self.defaults = [ can(fd) for fd in f.func_defaults ]
else:
self.defaults = None
MinRK
update API after sagedays29...
r3664 self.module = f.__module__ or '__main__'
MinRK
Improvements to dependency handling...
r3607 self.__name__ = f.__name__
MinRK
better serialization for parallel code...
r7967 self.buffers = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
better serialization for parallel code...
r7967 def _check_type(self, obj):
MinRK
prep newparallel for rebase...
r3539 assert isinstance(obj, FunctionType), "Not a function type"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
better serialization for parallel code...
r7967 def get_object(self, g=None):
MinRK
update API after sagedays29...
r3664 # try to load function back into its module:
if not self.module.startswith('__'):
try:
__import__(self.module)
except ImportError:
pass
else:
g = sys.modules[self.module].__dict__
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if g is None:
MinRK
better serialization for parallel code...
r7967 g = {}
MinRK
can func_defaults...
r8041 if self.defaults:
defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
else:
defaults = None
newFunc = FunctionType(self.code, g, self.__name__, defaults)
MinRK
prep newparallel for rebase...
r3539 return newFunc
MinRK
better serialization for parallel code...
r7967
class CannedArray(CannedObject):
def __init__(self, obj):
self.shape = obj.shape
MinRK
use brief dtype, rather than full object
r7971 self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
MinRK
better serialization for parallel code...
r7967 if sum(obj.shape) == 0:
# just pickle it
self.buffers = [pickle.dumps(obj, -1)]
else:
# ensure contiguous
obj = numpy.ascontiguousarray(obj, dtype=None)
self.buffers = [buffer(obj)]
def get_object(self, g=None):
data = self.buffers[0]
if sum(self.shape) == 0:
# no shape, we just pickled it
return pickle.loads(data)
else:
return numpy.frombuffer(data, dtype=self.dtype).reshape(self.shape)
class CannedBytes(CannedObject):
wrap = bytes
def __init__(self, obj):
self.buffers = [obj]
def get_object(self, g=None):
data = self.buffers[0]
return self.wrap(data)
def CannedBuffer(CannedBytes):
wrap = buffer
MinRK
Improvements to dependency handling...
r3607 #-------------------------------------------------------------------------------
# Functions
#-------------------------------------------------------------------------------
MinRK
adjust how canning deals with import strings...
r8081 def _logger():
"""get the logger for the current Application
the root logger will be used if no Application is running
"""
MinRK
use logger for canning import error
r8034 if Application.initialized():
logger = Application.instance().log
else:
logger = logging.getLogger()
if not logger.handlers:
logging.basicConfig()
MinRK
adjust how canning deals with import strings...
r8081
return logger
def _import_mapping(mapping, original=None):
"""import any string-keys in a type mapping
"""
log = _logger()
log.debug("Importing canning map")
for key,value in mapping.items():
if isinstance(key, basestring):
try:
cls = import_item(key)
except Exception:
if original and key not in original:
# only message on user-added classes
log.error("cannning class not importable: %r", key, exc_info=True)
mapping.pop(key)
else:
mapping[cls] = mapping.pop(key)
MinRK
prep newparallel for rebase...
r3539
MinRK
better serialization for parallel code...
r7967 def can(obj):
"""prepare an object for pickling"""
MinRK
adjust how canning deals with import strings...
r8081
import_needed = False
MinRK
better serialization for parallel code...
r7967 for cls,canner in can_map.iteritems():
if isinstance(cls, basestring):
MinRK
adjust how canning deals with import strings...
r8081 import_needed = True
break
elif isinstance(obj, cls):
MinRK
better serialization for parallel code...
r7967 return canner(obj)
MinRK
adjust how canning deals with import strings...
r8081
if import_needed:
# perform can_map imports, then try again
# this will usually only happen once
_import_mapping(can_map, _original_can_map)
return can(obj)
MinRK
better serialization for parallel code...
r7967 return obj
def can_dict(obj):
"""can the *values* of a dict"""
MinRK
prep newparallel for rebase...
r3539 if isinstance(obj, dict):
newobj = {}
for k, v in obj.iteritems():
newobj[k] = can(v)
return newobj
else:
return obj
MinRK
better serialization for parallel code...
r7967 def can_sequence(obj):
"""can the elements of a sequence"""
MinRK
prep newparallel for rebase...
r3539 if isinstance(obj, (list, tuple)):
t = type(obj)
return t([can(i) for i in obj])
else:
return obj
def uncan(obj, g=None):
MinRK
better serialization for parallel code...
r7967 """invert canning"""
MinRK
adjust how canning deals with import strings...
r8081
import_needed = False
MinRK
better serialization for parallel code...
r7967 for cls,uncanner in uncan_map.iteritems():
if isinstance(cls, basestring):
MinRK
adjust how canning deals with import strings...
r8081 import_needed = True
break
elif isinstance(obj, cls):
MinRK
better serialization for parallel code...
r7967 return uncanner(obj, g)
MinRK
adjust how canning deals with import strings...
r8081
if import_needed:
# perform uncan_map imports, then try again
# this will usually only happen once
_import_mapping(uncan_map, _original_uncan_map)
return uncan(obj, g)
MinRK
better serialization for parallel code...
r7967 return obj
def uncan_dict(obj, g=None):
MinRK
prep newparallel for rebase...
r3539 if isinstance(obj, dict):
newobj = {}
for k, v in obj.iteritems():
newobj[k] = uncan(v,g)
return newobj
else:
return obj
MinRK
better serialization for parallel code...
r7967 def uncan_sequence(obj, g=None):
MinRK
prep newparallel for rebase...
r3539 if isinstance(obj, (list, tuple)):
t = type(obj)
return t([uncan(i,g) for i in obj])
else:
return obj
MinRK
better serialization for parallel code...
r7967 #-------------------------------------------------------------------------------
MinRK
adjust how canning deals with import strings...
r8081 # API dictionaries
MinRK
better serialization for parallel code...
r7967 #-------------------------------------------------------------------------------
# These dicts can be extended for custom serialization of new objects
can_map = {
'IPython.parallel.dependent' : lambda obj: CannedObject(obj, keys=('f','df')),
'numpy.ndarray' : CannedArray,
FunctionType : CannedFunction,
bytes : CannedBytes,
buffer : CannedBuffer,
}
uncan_map = {
CannedObject : lambda obj, g: obj.get_object(g),
}
MinRK
adjust how canning deals with import strings...
r8081 # for use in _import_mapping:
_original_can_map = can_map.copy()
_original_uncan_map = uncan_map.copy()