##// END OF EJS Templates
Merge pull request #4149 from minrk/fewer-kernels...
Paul Ivanov -
r12449:cd4bd811 merge
parent child Browse files
Show More
@@ -0,0 +1,167 b''
1 """utilities for testing IPython kernels"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import atexit
15
16 from contextlib import contextmanager
17 from subprocess import PIPE
18 from Queue import Empty
19
20 import nose.tools as nt
21
22 from IPython.kernel import KernelManager
23
24 #-------------------------------------------------------------------------------
25 # Globals
26 #-------------------------------------------------------------------------------
27
28 STARTUP_TIMEOUT = 60
29 TIMEOUT = 15
30
31 KM = None
32 KC = None
33
34 #-------------------------------------------------------------------------------
35 # code
36 #-------------------------------------------------------------------------------
37
38
39 def start_new_kernel():
40 """start a new kernel, and return its Manager and Client"""
41 km = KernelManager()
42 km.start_kernel(stdout=PIPE, stderr=PIPE)
43 kc = km.client()
44 kc.start_channels()
45
46 msg_id = kc.kernel_info()
47 kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT)
48 flush_channels(kc)
49 return km, kc
50
51 def flush_channels(kc=None):
52 """flush any messages waiting on the queue"""
53 from .test_message_spec import validate_message
54
55 if kc is None:
56 kc = KC
57 for channel in (kc.shell_channel, kc.iopub_channel):
58 while True:
59 try:
60 msg = channel.get_msg(block=True, timeout=0.1)
61 except Empty:
62 break
63 else:
64 validate_message(msg)
65
66
67 def execute(code='', kc=None, **kwargs):
68 """wrapper for doing common steps for validating an execution request"""
69 from .test_message_spec import validate_message
70 if kc is None:
71 kc = KC
72 msg_id = kc.execute(code=code, **kwargs)
73 reply = kc.get_shell_msg(timeout=TIMEOUT)
74 validate_message(reply, 'execute_reply', msg_id)
75 busy = kc.get_iopub_msg(timeout=TIMEOUT)
76 validate_message(busy, 'status', msg_id)
77 nt.assert_equal(busy['content']['execution_state'], 'busy')
78
79 if not kwargs.get('silent'):
80 pyin = kc.get_iopub_msg(timeout=TIMEOUT)
81 validate_message(pyin, 'pyin', msg_id)
82 nt.assert_equal(pyin['content']['code'], code)
83
84 return msg_id, reply['content']
85
86 def start_global_kernel():
87 """start the global kernel (if it isn't running) and return its client"""
88 global KM, KC
89 if KM is None:
90 KM, KC = start_new_kernel()
91 atexit.register(stop_global_kernel)
92 return KC
93
94 @contextmanager
95 def kernel():
96 """Context manager for the global kernel instance
97
98 Should be used for most kernel tests
99
100 Returns
101 -------
102 kernel_client: connected KernelClient instance
103 """
104 yield start_global_kernel()
105
106 def uses_kernel(test_f):
107 """Decorator for tests that use the global kernel"""
108 def wrapped_test():
109 with kernel() as kc:
110 test_f(kc)
111 wrapped_test.__doc__ = test_f.__doc__
112 wrapped_test.__name__ = test_f.__name__
113 return wrapped_test
114
115 def stop_global_kernel():
116 """Stop the global shared kernel instance, if it exists"""
117 global KM, KC
118 KC.stop_channels()
119 KC = None
120 if KM is None:
121 return
122 KM.shutdown_kernel(now=True)
123 KM = None
124
125 @contextmanager
126 def new_kernel():
127 """Context manager for a new kernel in a subprocess
128
129 Should only be used for tests where the kernel must not be re-used.
130
131 Returns
132 -------
133 kernel_client: connected KernelClient instance
134 """
135 km, kc = start_new_kernel()
136 try:
137 yield kc
138 finally:
139 kc.stop_channels()
140 km.shutdown_kernel(now=True)
141
142
143 def assemble_output(iopub):
144 """assemble stdout/err from an execution"""
145 stdout = ''
146 stderr = ''
147 while True:
148 msg = iopub.get_msg(block=True, timeout=1)
149 msg_type = msg['msg_type']
150 content = msg['content']
151 if msg_type == 'status' and content['execution_state'] == 'idle':
152 # idle message signals end of output
153 break
154 elif msg['msg_type'] == 'stream':
155 if content['name'] == 'stdout':
156 stdout += content['data']
157 elif content['name'] == 'stderr':
158 stderr += content['data']
159 else:
160 raise KeyError("bad stream: %r" % content['name'])
161 else:
162 # other output, ignored
163 pass
164 return stdout, stderr
165
166
167
@@ -171,7 +171,7 b' class MultiKernelManager(LoggingConfigurable):'
171 self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))
171 self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))
172
172
173 @kernel_method
173 @kernel_method
174 def restart_kernel(self, kernel_id):
174 def restart_kernel(self, kernel_id, now=False):
175 """Restart a kernel by its uuid, keeping the same ports.
175 """Restart a kernel by its uuid, keeping the same ports.
176
176
177 Parameters
177 Parameters
@@ -11,104 +11,18 b''
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
15 import shutil
16 import sys
14 import sys
17 import tempfile
18
19 from contextlib import contextmanager
20 from subprocess import PIPE
21
15
22 import nose.tools as nt
16 import nose.tools as nt
23
17
24 from IPython.kernel import KernelManager
25 from IPython.kernel.tests.test_message_spec import execute, flush_channels
26 from IPython.testing import decorators as dec, tools as tt
18 from IPython.testing import decorators as dec, tools as tt
27 from IPython.utils import path, py3compat
19 from IPython.utils import py3compat
20
21 from .utils import new_kernel, kernel, TIMEOUT, assemble_output, execute, flush_channels
28
22
29 #-------------------------------------------------------------------------------
23 #-------------------------------------------------------------------------------
30 # Tests
24 # Tests
31 #-------------------------------------------------------------------------------
25 #-------------------------------------------------------------------------------
32 IPYTHONDIR = None
33 save_env = None
34 save_get_ipython_dir = None
35
36 STARTUP_TIMEOUT = 60
37 TIMEOUT = 15
38
39 def setup():
40 """setup temporary IPYTHONDIR for tests"""
41 global IPYTHONDIR
42 global save_env
43 global save_get_ipython_dir
44
45 IPYTHONDIR = tempfile.mkdtemp()
46
47 save_env = os.environ.copy()
48 os.environ["IPYTHONDIR"] = IPYTHONDIR
49
50 save_get_ipython_dir = path.get_ipython_dir
51 path.get_ipython_dir = lambda : IPYTHONDIR
52
53
54 def teardown():
55 path.get_ipython_dir = save_get_ipython_dir
56 os.environ = save_env
57
58 try:
59 shutil.rmtree(IPYTHONDIR)
60 except (OSError, IOError):
61 # no such file
62 pass
63
64
65 @contextmanager
66 def new_kernel():
67 """start a kernel in a subprocess, and wait for it to be ready
68
69 Returns
70 -------
71 kernel_manager: connected KernelManager instance
72 """
73 KM = KernelManager()
74
75 KM.start_kernel(stdout=PIPE, stderr=PIPE)
76 KC = KM.client()
77 KC.start_channels()
78
79 # wait for kernel to be ready
80 KC.shell_channel.execute("import sys")
81 KC.shell_channel.get_msg(block=True, timeout=STARTUP_TIMEOUT)
82 flush_channels(KC)
83 try:
84 yield KC
85 finally:
86 KC.stop_channels()
87 KM.shutdown_kernel()
88
89
90 def assemble_output(iopub):
91 """assemble stdout/err from an execution"""
92 stdout = ''
93 stderr = ''
94 while True:
95 msg = iopub.get_msg(block=True, timeout=1)
96 msg_type = msg['msg_type']
97 content = msg['content']
98 if msg_type == 'status' and content['execution_state'] == 'idle':
99 # idle message signals end of output
100 break
101 elif msg['msg_type'] == 'stream':
102 if content['name'] == 'stdout':
103 stdout = stdout + content['data']
104 elif content['name'] == 'stderr':
105 stderr = stderr + content['data']
106 else:
107 raise KeyError("bad stream: %r" % content['name'])
108 else:
109 # other output, ignored
110 pass
111 return stdout, stderr
112
26
113
27
114 def _check_mp_mode(kc, expected=False, stream="stdout"):
28 def _check_mp_mode(kc, expected=False, stream="stdout"):
@@ -123,7 +37,7 b' def _check_mp_mode(kc, expected=False, stream="stdout"):'
123
37
124 def test_simple_print():
38 def test_simple_print():
125 """simple print statement in kernel"""
39 """simple print statement in kernel"""
126 with new_kernel() as kc:
40 with kernel() as kc:
127 iopub = kc.iopub_channel
41 iopub = kc.iopub_channel
128 msg_id, content = execute(kc=kc, code="print ('hi')")
42 msg_id, content = execute(kc=kc, code="print ('hi')")
129 stdout, stderr = assemble_output(iopub)
43 stdout, stderr = assemble_output(iopub)
@@ -165,7 +79,7 b' def test_subprocess_print():'
165
79
166 def test_subprocess_noprint():
80 def test_subprocess_noprint():
167 """mp.Process without print doesn't trigger iostream mp_mode"""
81 """mp.Process without print doesn't trigger iostream mp_mode"""
168 with new_kernel() as kc:
82 with kernel() as kc:
169 iopub = kc.iopub_channel
83 iopub = kc.iopub_channel
170
84
171 np = 5
85 np = 5
@@ -210,7 +124,7 b' def test_subprocess_error():'
210
124
211 def test_raw_input():
125 def test_raw_input():
212 """test [raw_]input"""
126 """test [raw_]input"""
213 with new_kernel() as kc:
127 with kernel() as kc:
214 iopub = kc.iopub_channel
128 iopub = kc.iopub_channel
215
129
216 input_f = "input" if py3compat.PY3 else "raw_input"
130 input_f = "input" if py3compat.PY3 else "raw_input"
@@ -232,7 +146,7 b' def test_raw_input():'
232 @dec.skipif(py3compat.PY3)
146 @dec.skipif(py3compat.PY3)
233 def test_eval_input():
147 def test_eval_input():
234 """test input() on Python 2"""
148 """test input() on Python 2"""
235 with new_kernel() as kc:
149 with kernel() as kc:
236 iopub = kc.iopub_channel
150 iopub = kc.iopub_channel
237
151
238 input_f = "input" if py3compat.PY3 else "raw_input"
152 input_f = "input" if py3compat.PY3 else "raw_input"
@@ -26,18 +26,11 b' class TestKernelManager(TestCase):'
26 def _run_lifecycle(self, km):
26 def _run_lifecycle(self, km):
27 km.start_kernel(stdout=PIPE, stderr=PIPE)
27 km.start_kernel(stdout=PIPE, stderr=PIPE)
28 self.assertTrue(km.is_alive())
28 self.assertTrue(km.is_alive())
29 km.restart_kernel()
29 km.restart_kernel(now=True)
30 self.assertTrue(km.is_alive())
30 self.assertTrue(km.is_alive())
31 # We need a delay here to give the restarting kernel a chance to
32 # restart. Otherwise, the interrupt will kill it, causing the test
33 # suite to hang. The reason it *hangs* is that the shutdown
34 # message for the restart sometimes hasn't been sent to the kernel.
35 # Because linger is oo on the shell channel, the context can't
36 # close until the message is sent to the kernel, which is not dead.
37 time.sleep(1.0)
38 km.interrupt_kernel()
31 km.interrupt_kernel()
39 self.assertTrue(isinstance(km, KernelManager))
32 self.assertTrue(isinstance(km, KernelManager))
40 km.shutdown_kernel()
33 km.shutdown_kernel(now=True)
41
34
42 def test_tcp_lifecycle(self):
35 def test_tcp_lifecycle(self):
43 km = self._get_tcp_km()
36 km = self._get_tcp_km()
@@ -1,7 +1,7 b''
1 """Test suite for our zeromq-based messaging specification.
1 """Test suite for our zeromq-based message specification.
2 """
2 """
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
4 # Copyright (C) 2010 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING.txt, distributed as part of this software.
7 # the file COPYING.txt, distributed as part of this software.
@@ -19,72 +19,21 b' from IPython.utils.traitlets import ('
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
20 )
20 )
21
21
22 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
23
22 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
23 # Global setup and utilities
25 # Globals
24 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
25
27 KC = None
26 STARTUP_TIMEOUT = 60
27 TIMEOUT = 15
28
28
29 def setup():
29 def setup():
30 global KM, KC
30 global KC
31 KM = KernelManager()
31 KC = start_global_kernel()
32 KM.start_kernel(stdout=PIPE, stderr=PIPE)
33 KC = KM.client()
34 KC.start_channels()
35
36 # wait for kernel to be ready
37 try:
38 msg = KC.iopub_channel.get_msg(block=True, timeout=STARTUP_TIMEOUT)
39 except Empty:
40 pass
41 msg_id = KC.kernel_info()
42 KC.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT)
43 flush_channels()
44
45
46 def teardown():
47 KC.stop_channels()
48 KM.shutdown_kernel()
49
50
51 def flush_channels(kc=None):
52 """flush any messages waiting on the queue"""
53 if kc is None:
54 kc = KC
55 for channel in (kc.shell_channel, kc.iopub_channel):
56 while True:
57 try:
58 msg = channel.get_msg(block=True, timeout=0.1)
59 except Empty:
60 break
61 else:
62 validate_message(msg)
63
64
65 def execute(code='', kc=None, **kwargs):
66 """wrapper for doing common steps for validating an execution request"""
67 if kc is None:
68 kc = KC
69 msg_id = kc.execute(code=code, **kwargs)
70 reply = kc.get_shell_msg(timeout=TIMEOUT)
71 validate_message(reply, 'execute_reply', msg_id)
72 busy = kc.get_iopub_msg(timeout=TIMEOUT)
73 validate_message(busy, 'status', msg_id)
74 nt.assert_equal(busy['content']['execution_state'], 'busy')
75
76 if not kwargs.get('silent'):
77 pyin = kc.get_iopub_msg(timeout=TIMEOUT)
78 validate_message(pyin, 'pyin', msg_id)
79 nt.assert_equal(pyin['content']['code'], code)
80
81 return msg_id, reply['content']
82
32
83 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
84 # MSG Spec References
34 # Message Spec References
85 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
86
36
87
88 class Reference(HasTraits):
37 class Reference(HasTraits):
89
38
90 """
39 """
@@ -31,20 +31,13 b' class TestKernelManager(TestCase):'
31 self.assertTrue(kid in km)
31 self.assertTrue(kid in km)
32 self.assertTrue(kid in km.list_kernel_ids())
32 self.assertTrue(kid in km.list_kernel_ids())
33 self.assertEqual(len(km),1)
33 self.assertEqual(len(km),1)
34 km.restart_kernel(kid)
34 km.restart_kernel(kid, now=True)
35 self.assertTrue(km.is_alive(kid))
35 self.assertTrue(km.is_alive(kid))
36 self.assertTrue(kid in km.list_kernel_ids())
36 self.assertTrue(kid in km.list_kernel_ids())
37 # We need a delay here to give the restarting kernel a chance to
38 # restart. Otherwise, the interrupt will kill it, causing the test
39 # suite to hang. The reason it *hangs* is that the shutdown
40 # message for the restart sometimes hasn't been sent to the kernel.
41 # Because linger is oo on the shell channel, the context can't
42 # close until the message is sent to the kernel, which is not dead.
43 time.sleep(1.0)
44 km.interrupt_kernel(kid)
37 km.interrupt_kernel(kid)
45 k = km.get_kernel(kid)
38 k = km.get_kernel(kid)
46 self.assertTrue(isinstance(k, KernelManager))
39 self.assertTrue(isinstance(k, KernelManager))
47 km.shutdown_kernel(kid)
40 km.shutdown_kernel(kid, now=True)
48 self.assertTrue(not kid in km)
41 self.assertTrue(not kid in km)
49
42
50 def _run_cinfo(self, km, transport, ip):
43 def _run_cinfo(self, km, transport, ip):
@@ -63,7 +56,7 b' class TestKernelManager(TestCase):'
63 self.assertTrue('hb_port' in cinfo)
56 self.assertTrue('hb_port' in cinfo)
64 stream = km.connect_hb(kid)
57 stream = km.connect_hb(kid)
65 stream.close()
58 stream.close()
66 km.shutdown_kernel(kid)
59 km.shutdown_kernel(kid, now=True)
67
60
68 def test_tcp_lifecycle(self):
61 def test_tcp_lifecycle(self):
69 km = self._get_tcp_km()
62 km = self._get_tcp_km()
General Comments 0
You need to be logged in to leave comments. Login now