From 367cb43916cbeb51dfc7d1191c1b4be91e80b44f 2012-06-12 16:08:14
From: MinRK <benjaminrk@gmail.com>
Date: 2012-06-12 16:08:14
Subject: [PATCH] fix & test HubResults from execute requests
---

diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py
index 16313f9..09903e0 100644
--- a/IPython/parallel/client/client.py
+++ b/IPython/parallel/client/client.py
@@ -1543,11 +1543,15 @@ class Client(HasTraits):
                 if rec.get('received'):
                     md['received'] = rec['received']
                 md.update(iodict)
-
+                
                 if rcontent['status'] == 'ok':
-                    res,buffers = util.unserialize_object(buffers)
+                    if header['msg_type'] == 'apply_reply':
+                        res,buffers = util.unserialize_object(buffers)
+                    elif header['msg_type'] == 'execute_reply':
+                        res = ExecuteReply(msg_id, rcontent, md)
+                    else:
+                        raise KeyError("unhandled msg type: %r" % header[msg_type])
                 else:
-                    print rcontent
                     res = self._unwrap_exception(rcontent)
                     failures.append(res)
 
@@ -1555,7 +1559,7 @@ class Client(HasTraits):
                 content[msg_id] = res
 
         if len(theids) == 1 and failures:
-                raise failures[0]
+            raise failures[0]
 
         error.collect_exceptions(failures, "result_status")
         return content
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index 9c8f5a3..fa3c52e 100644
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -824,8 +824,15 @@ class Hub(SessionFactory):
             d['pyerr'] = content
         elif msg_type == 'pyin':
             d['pyin'] = content['code']
+        elif msg_type in ('display_data', 'pyout'):
+            d[msg_type] = content
+        elif msg_type == 'status':
+            pass
         else:
-            d[msg_type] = content.get('data', '')
+            self.log.warn("unhandled iopub msg_type: %r", msg_type)
+
+        if not d:
+            return
 
         try:
             self.db.update_record(msg_id, d)
diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py
index 6f108e5..8733503 100644
--- a/IPython/parallel/tests/test_client.py
+++ b/IPython/parallel/tests/test_client.py
@@ -159,6 +159,25 @@ class TestClient(ClusterTestCase):
         self.assertFalse(isinstance(ar2, AsyncHubResult))
         c.close()
     
+    def test_get_execute_result(self):
+        """test getting execute results from the Hub."""
+        c = clientmod.Client(profile='iptest')
+        t = c.ids[-1]
+        cell = '\n'.join([
+            'import time',
+            'time.sleep(0.25)',
+            '5'
+        ])
+        ar = c[t].execute("import time; time.sleep(1)", silent=False)
+        # give the monitor time to notice the message
+        time.sleep(.25)
+        ahr = self.client.get_result(ar.msg_ids)
+        self.assertTrue(isinstance(ahr, AsyncHubResult))
+        self.assertEquals(ahr.get().pyout, ar.get().pyout)
+        ar2 = self.client.get_result(ar.msg_ids)
+        self.assertFalse(isinstance(ar2, AsyncHubResult))
+        c.close()
+    
     def test_ids_list(self):
         """test client.ids"""
         ids = self.client.ids