diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 79d54a4..4571023 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -49,6 +49,7 @@ from IPython.parallel import error from IPython.parallel import util from IPython.zmq.session import Session, Message +from IPython.zmq import serialize from .asyncresult import AsyncResult, AsyncHubResult from .view import DirectView, LoadBalancedView @@ -184,6 +185,7 @@ class Metadata(dict): 'stdout' : '', 'stderr' : '', 'outputs' : [], + 'data': {}, 'outputs_ready' : False, } self.update(md) @@ -768,7 +770,7 @@ class Client(HasTraits): # construct result: if content['status'] == 'ok': - self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] + self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] elif content['status'] == 'aborted': self.results[msg_id] = error.TaskAborted(msg_id) elif content['status'] == 'resubmitted': @@ -864,6 +866,9 @@ class Client(HasTraits): md['outputs'].append(content) elif msg_type == 'pyout': md['pyout'] = content + elif msg_type == 'data_message': + data, remainder = serialize.unserialize_object(msg['buffers']) + md['data'].update(data) elif msg_type == 'status': # idle message comes after all outputs if content['execution_state'] == 'idle': @@ -1209,7 +1214,7 @@ class Client(HasTraits): if not isinstance(metadata, dict): raise TypeError("metadata must be dict, not %s"%type(metadata)) - bufs = util.pack_apply_message(f, args, kwargs, + bufs = serialize.pack_apply_message(f, args, kwargs, buffer_threshold=self.session.buffer_threshold, item_threshold=self.session.item_threshold, ) @@ -1538,7 +1543,7 @@ class Client(HasTraits): if rcontent['status'] == 'ok': if header['msg_type'] == 'apply_reply': - res,buffers = util.unserialize_object(buffers) + res,buffers = serialize.unserialize_object(buffers) elif header['msg_type'] == 'execute_reply': res = ExecuteReply(msg_id, rcontent, md) else: