##// END OF EJS Templates
handle data_pub in parallel Client
MinRK -
Show More
@@ -49,6 +49,7 b' from IPython.parallel import error'
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52
53
53 from .asyncresult import AsyncResult, AsyncHubResult
54 from .asyncresult import AsyncResult, AsyncHubResult
54 from .view import DirectView, LoadBalancedView
55 from .view import DirectView, LoadBalancedView
@@ -184,6 +185,7 b' class Metadata(dict):'
184 'stdout' : '',
185 'stdout' : '',
185 'stderr' : '',
186 'stderr' : '',
186 'outputs' : [],
187 'outputs' : [],
188 'data': {},
187 'outputs_ready' : False,
189 'outputs_ready' : False,
188 }
190 }
189 self.update(md)
191 self.update(md)
@@ -768,7 +770,7 b' class Client(HasTraits):'
768
770
769 # construct result:
771 # construct result:
770 if content['status'] == 'ok':
772 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
773 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
772 elif content['status'] == 'aborted':
774 elif content['status'] == 'aborted':
773 self.results[msg_id] = error.TaskAborted(msg_id)
775 self.results[msg_id] = error.TaskAborted(msg_id)
774 elif content['status'] == 'resubmitted':
776 elif content['status'] == 'resubmitted':
@@ -864,6 +866,9 b' class Client(HasTraits):'
864 md['outputs'].append(content)
866 md['outputs'].append(content)
865 elif msg_type == 'pyout':
867 elif msg_type == 'pyout':
866 md['pyout'] = content
868 md['pyout'] = content
869 elif msg_type == 'data_message':
870 data, remainder = serialize.unserialize_object(msg['buffers'])
871 md['data'].update(data)
867 elif msg_type == 'status':
872 elif msg_type == 'status':
868 # idle message comes after all outputs
873 # idle message comes after all outputs
869 if content['execution_state'] == 'idle':
874 if content['execution_state'] == 'idle':
@@ -1209,7 +1214,7 b' class Client(HasTraits):'
1209 if not isinstance(metadata, dict):
1214 if not isinstance(metadata, dict):
1210 raise TypeError("metadata must be dict, not %s"%type(metadata))
1215 raise TypeError("metadata must be dict, not %s"%type(metadata))
1211
1216
1212 bufs = util.pack_apply_message(f, args, kwargs,
1217 bufs = serialize.pack_apply_message(f, args, kwargs,
1213 buffer_threshold=self.session.buffer_threshold,
1218 buffer_threshold=self.session.buffer_threshold,
1214 item_threshold=self.session.item_threshold,
1219 item_threshold=self.session.item_threshold,
1215 )
1220 )
@@ -1538,7 +1543,7 b' class Client(HasTraits):'
1538
1543
1539 if rcontent['status'] == 'ok':
1544 if rcontent['status'] == 'ok':
1540 if header['msg_type'] == 'apply_reply':
1545 if header['msg_type'] == 'apply_reply':
1541 res,buffers = util.unserialize_object(buffers)
1546 res,buffers = serialize.unserialize_object(buffers)
1542 elif header['msg_type'] == 'execute_reply':
1547 elif header['msg_type'] == 'execute_reply':
1543 res = ExecuteReply(msg_id, rcontent, md)
1548 res = ExecuteReply(msg_id, rcontent, md)
1544 else:
1549 else:
General Comments 0
You need to be logged in to leave comments. Login now