From 367cb43916cbeb51dfc7d1191c1b4be91e80b44f 2012-06-12 16:08:14 From: MinRK Date: 2012-06-12 16:08:14 Subject: [PATCH] fix & test HubResults from execute requests --- diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 16313f9..09903e0 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -1543,11 +1543,15 @@ class Client(HasTraits): if rec.get('received'): md['received'] = rec['received'] md.update(iodict) - + if rcontent['status'] == 'ok': - res,buffers = util.unserialize_object(buffers) + if header['msg_type'] == 'apply_reply': + res,buffers = util.unserialize_object(buffers) + elif header['msg_type'] == 'execute_reply': + res = ExecuteReply(msg_id, rcontent, md) + else: + raise KeyError("unhandled msg type: %r" % header[msg_type]) else: - print rcontent res = self._unwrap_exception(rcontent) failures.append(res) @@ -1555,7 +1559,7 @@ class Client(HasTraits): content[msg_id] = res if len(theids) == 1 and failures: - raise failures[0] + raise failures[0] error.collect_exceptions(failures, "result_status") return content diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 9c8f5a3..fa3c52e 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -824,8 +824,15 @@ class Hub(SessionFactory): d['pyerr'] = content elif msg_type == 'pyin': d['pyin'] = content['code'] + elif msg_type in ('display_data', 'pyout'): + d[msg_type] = content + elif msg_type == 'status': + pass else: - d[msg_type] = content.get('data', '') + self.log.warn("unhandled iopub msg_type: %r", msg_type) + + if not d: + return try: self.db.update_record(msg_id, d) diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 6f108e5..8733503 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -159,6 +159,25 @@ class TestClient(ClusterTestCase): self.assertFalse(isinstance(ar2, AsyncHubResult)) c.close() + def test_get_execute_result(self): + """test getting execute results from the Hub.""" + c = clientmod.Client(profile='iptest') + t = c.ids[-1] + cell = '\n'.join([ + 'import time', + 'time.sleep(0.25)', + '5' + ]) + ar = c[t].execute("import time; time.sleep(1)", silent=False) + # give the monitor time to notice the message + time.sleep(.25) + ahr = self.client.get_result(ar.msg_ids) + self.assertTrue(isinstance(ahr, AsyncHubResult)) + self.assertEquals(ahr.get().pyout, ar.get().pyout) + ar2 = self.client.get_result(ar.msg_ids) + self.assertFalse(isinstance(ar2, AsyncHubResult)) + c.close() + def test_ids_list(self): """test client.ids""" ids = self.client.ids