From 8bfca1015f0617e4f1b564fb16c0d11ae85e3daf 2012-04-27 19:40:39
From: Min RK <benjaminrk@gmail.com>
Date: 2012-04-27 19:40:39
Subject: [PATCH] Merge pull request #1656 from minrk/embed_kernel_tests

ensure kernels are cleaned up in embed_kernel tests

use contextmanager to ensure kernel manager sockets are closed at the end of tests, preventing delayed 'could not close sockets' errors in later tests.

closes #1653
---

diff --git a/IPython/zmq/tests/test_embed_kernel.py b/IPython/zmq/tests/test_embed_kernel.py
index e63e9ac..6357c78 100644
--- a/IPython/zmq/tests/test_embed_kernel.py
+++ b/IPython/zmq/tests/test_embed_kernel.py
@@ -17,6 +17,7 @@ import sys
 import tempfile
 import time
 
+from contextlib import contextmanager
 from subprocess import Popen, PIPE
 
 import nose.tools as nt
@@ -51,12 +52,13 @@ def teardown():
         pass
 
 
-def _launch_kernel(cmd):
+@contextmanager
+def setup_kernel(cmd):
     """start an embedded kernel in a subprocess, and wait for it to be ready
     
     Returns
     -------
-    kernel, kernel_manager: Popen instance and connected KernelManager
+    kernel_manager: connected KernelManager instance
     """
     kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
     connection_file = os.path.join(IPYTHONDIR,
@@ -81,7 +83,10 @@ def _launch_kernel(cmd):
     km.load_connection_file()
     km.start_channels()
     
-    return kernel, km
+    try:
+        yield km
+    finally:
+        km.stop_channels()
 
 def test_embed_kernel_basic():
     """IPython.embed_kernel() is basically functional"""
@@ -95,26 +100,26 @@ def test_embed_kernel_basic():
         '',
     ])
     
-    kernel, km = _launch_kernel(cmd)
-    shell = km.shell_channel
+    with setup_kernel(cmd) as km:
+        shell = km.shell_channel
     
-    # oinfo a (int)
-    msg_id = shell.object_info('a')
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_true(content['found'])
+        # oinfo a (int)
+        msg_id = shell.object_info('a')
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_true(content['found'])
     
-    msg_id = shell.execute("c=a*2")
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_equals(content['status'], u'ok')
-
-    # oinfo c (should be 10)
-    msg_id = shell.object_info('c')
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_true(content['found'])
-    nt.assert_equals(content['string_form'], u'10')
+        msg_id = shell.execute("c=a*2")
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_equals(content['status'], u'ok')
+
+        # oinfo c (should be 10)
+        msg_id = shell.object_info('c')
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_true(content['found'])
+        nt.assert_equals(content['string_form'], u'10')
 
 def test_embed_kernel_namespace():
     """IPython.embed_kernel() inherits calling namespace"""
@@ -128,26 +133,26 @@ def test_embed_kernel_namespace():
         '',
     ])
     
-    kernel, km = _launch_kernel(cmd)
-    shell = km.shell_channel
+    with setup_kernel(cmd) as km:
+        shell = km.shell_channel
     
-    # oinfo a (int)
-    msg_id = shell.object_info('a')
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_true(content['found'])
-    nt.assert_equals(content['string_form'], u'5')
-
-    # oinfo b (str)
-    msg_id = shell.object_info('b')
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_true(content['found'])
-    nt.assert_equals(content['string_form'], u'hi there')
-
-    # oinfo c (undefined)
-    msg_id = shell.object_info('c')
-    msg = shell.get_msg(block=True, timeout=2)
-    content = msg['content']
-    nt.assert_false(content['found'])
+        # oinfo a (int)
+        msg_id = shell.object_info('a')
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_true(content['found'])
+        nt.assert_equals(content['string_form'], u'5')
+
+        # oinfo b (str)
+        msg_id = shell.object_info('b')
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_true(content['found'])
+        nt.assert_equals(content['string_form'], u'hi there')
+
+        # oinfo c (undefined)
+        msg_id = shell.object_info('c')
+        msg = shell.get_msg(block=True, timeout=2)
+        content = msg['content']
+        nt.assert_false(content['found'])