##// END OF EJS Templates
handle data_pub in parallel Client
MinRK -
Show More
@@ -49,6 +49,7 b' from IPython.parallel import error'
49 49 from IPython.parallel import util
50 50
51 51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52 53
53 54 from .asyncresult import AsyncResult, AsyncHubResult
54 55 from .view import DirectView, LoadBalancedView
@@ -184,6 +185,7 b' class Metadata(dict):'
184 185 'stdout' : '',
185 186 'stderr' : '',
186 187 'outputs' : [],
188 'data': {},
187 189 'outputs_ready' : False,
188 190 }
189 191 self.update(md)
@@ -768,7 +770,7 b' class Client(HasTraits):'
768 770
769 771 # construct result:
770 772 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
773 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
772 774 elif content['status'] == 'aborted':
773 775 self.results[msg_id] = error.TaskAborted(msg_id)
774 776 elif content['status'] == 'resubmitted':
@@ -864,6 +866,9 b' class Client(HasTraits):'
864 866 md['outputs'].append(content)
865 867 elif msg_type == 'pyout':
866 868 md['pyout'] = content
869 elif msg_type == 'data_message':
870 data, remainder = serialize.unserialize_object(msg['buffers'])
871 md['data'].update(data)
867 872 elif msg_type == 'status':
868 873 # idle message comes after all outputs
869 874 if content['execution_state'] == 'idle':
@@ -1209,7 +1214,7 b' class Client(HasTraits):'
1209 1214 if not isinstance(metadata, dict):
1210 1215 raise TypeError("metadata must be dict, not %s"%type(metadata))
1211 1216
1212 bufs = util.pack_apply_message(f, args, kwargs,
1217 bufs = serialize.pack_apply_message(f, args, kwargs,
1213 1218 buffer_threshold=self.session.buffer_threshold,
1214 1219 item_threshold=self.session.item_threshold,
1215 1220 )
@@ -1538,7 +1543,7 b' class Client(HasTraits):'
1538 1543
1539 1544 if rcontent['status'] == 'ok':
1540 1545 if header['msg_type'] == 'apply_reply':
1541 res,buffers = util.unserialize_object(buffers)
1546 res,buffers = serialize.unserialize_object(buffers)
1542 1547 elif header['msg_type'] == 'execute_reply':
1543 1548 res = ExecuteReply(msg_id, rcontent, md)
1544 1549 else:
General Comments 0
You need to be logged in to leave comments. Login now