##// END OF EJS Templates
load header with engine id when engine dies in TaskScheduler...
load header with engine id when engine dies in TaskScheduler This ensures that the metadata dict on the *Client* has the engine_uuid of the engine on which the task failed. It is identical to code elsewhere (Hub, Client) for identifying when engines die.

File last commit:

r5390:c82649ea
r6068:f8f19148
Show More
newserialized.py
177 lines | 5.0 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 # encoding: utf-8
# -*- test-case-name: IPython.kernel.test.test_newserialized -*-
"""Refactored serialization classes and interfaces."""
__docformat__ = "restructuredtext en"
# Tell nose to skip this module
__test__ = {}
#-------------------------------------------------------------------------------
Matthias BUSSONNIER
update copyright to 2011/20xx-2011...
r5390 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
prep newparallel for rebase...
r3539 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 import sys
MinRK
prep newparallel for rebase...
r3539 import cPickle as pickle
try:
import numpy
except ImportError:
MinRK
update parallel code for py3k...
r4155 numpy = None
MinRK
prep newparallel for rebase...
r3539
MinRK
codeutil into zmq, to prevent IPython.kernel import
r3557 class SerializationError(Exception):
pass
MinRK
prep newparallel for rebase...
r3539
MinRK
update parallel code for py3k...
r4155 if sys.version_info[0] >= 3:
buffer = memoryview
py3k = True
else:
py3k = False
MinRK
don't special case for py3k+numpy...
r4571 if sys.version_info[:2] <= (2,6):
memoryview = buffer
MinRK
update parallel code for py3k...
r4155
MinRK
prep newparallel for rebase...
r3539 #-----------------------------------------------------------------------------
# Classes and functions
#-----------------------------------------------------------------------------
class ISerialized:
def getData():
""""""
def getDataSize(units=10.0**6):
""""""
def getTypeDescriptor():
""""""
def getMetadata():
""""""
class IUnSerialized:
def getObject():
""""""
class Serialized(object):
# implements(ISerialized)
def __init__(self, data, typeDescriptor, metadata={}):
self.data = data
self.typeDescriptor = typeDescriptor
self.metadata = metadata
def getData(self):
return self.data
def getDataSize(self, units=10.0**6):
return len(self.data)/units
def getTypeDescriptor(self):
return self.typeDescriptor
def getMetadata(self):
return self.metadata
class UnSerialized(object):
# implements(IUnSerialized)
def __init__(self, obj):
self.obj = obj
def getObject(self):
return self.obj
class SerializeIt(object):
# implements(ISerialized)
def __init__(self, unSerialized):
self.data = None
self.obj = unSerialized.getObject()
MinRK
update parallel code for py3k...
r4155 if numpy is not None and isinstance(self.obj, numpy.ndarray):
MinRK
don't special case for py3k+numpy...
r4571 if len(self.obj.shape) == 0: # length 0 arrays are just pickled
MinRK
pickle length-0 arrays.
r3648 self.typeDescriptor = 'pickle'
self.metadata = {}
else:
MinRK
prep newparallel for rebase...
r3539 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
self.typeDescriptor = 'ndarray'
self.metadata = {'shape':self.obj.shape,
'dtype':self.obj.dtype.str}
MinRK
update parallel code for py3k...
r4155 elif isinstance(self.obj, bytes):
MinRK
prep newparallel for rebase...
r3539 self.typeDescriptor = 'bytes'
self.metadata = {}
elif isinstance(self.obj, buffer):
self.typeDescriptor = 'buffer'
self.metadata = {}
else:
self.typeDescriptor = 'pickle'
self.metadata = {}
MinRK
eliminate relative imports
r3642 self._generateData()
MinRK
prep newparallel for rebase...
r3539
def _generateData(self):
if self.typeDescriptor == 'ndarray':
MinRK
don't special case for py3k+numpy...
r4571 self.data = buffer(self.obj)
MinRK
prep newparallel for rebase...
r3539 elif self.typeDescriptor in ('bytes', 'buffer'):
self.data = self.obj
elif self.typeDescriptor == 'pickle':
MinRK
fixed buffer serialization for buffers below threshold
r3545 self.data = pickle.dumps(self.obj, -1)
MinRK
prep newparallel for rebase...
r3539 else:
raise SerializationError("Really wierd serialization error.")
del self.obj
def getData(self):
return self.data
def getDataSize(self, units=10.0**6):
return 1.0*len(self.data)/units
def getTypeDescriptor(self):
return self.typeDescriptor
def getMetadata(self):
return self.metadata
class UnSerializeIt(UnSerialized):
# implements(IUnSerialized)
def __init__(self, serialized):
self.serialized = serialized
def getObject(self):
typeDescriptor = self.serialized.getTypeDescriptor()
MinRK
update parallel code for py3k...
r4155 if numpy is not None and typeDescriptor == 'ndarray':
MinRK
eliminate relative imports
r3642 buf = self.serialized.getData()
MinRK
don't special case for py3k+numpy...
r4571 if isinstance(buf, (bytes, buffer, memoryview)):
MinRK
eliminate relative imports
r3642 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
else:
MinRK
don't special case for py3k+numpy...
r4571 raise TypeError("Expected bytes or buffer/memoryview, but got %r"%type(buf))
MinRK
prep newparallel for rebase...
r3539 result.shape = self.serialized.metadata['shape']
elif typeDescriptor == 'pickle':
result = pickle.loads(self.serialized.getData())
elif typeDescriptor in ('bytes', 'buffer'):
result = self.serialized.getData()
else:
raise SerializationError("Really wierd serialization error.")
return result
def serialize(obj):
return SerializeIt(UnSerialized(obj))
def unserialize(serialized):
return UnSerializeIt(serialized).getObject()