##// END OF EJS Templates
cleanup in kernel.tests.utils
MinRK -
Show More
@@ -1,168 +1,167 b''
1 1 """utilities for testing IPython kernels"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import atexit
15 15
16 16 from contextlib import contextmanager
17 17 from subprocess import PIPE
18 18 from Queue import Empty
19 19
20 20 import nose.tools as nt
21 21
22 22 from IPython.kernel import KernelManager
23 23
24 24 #-------------------------------------------------------------------------------
25 25 # Globals
26 26 #-------------------------------------------------------------------------------
27 27
28 28 STARTUP_TIMEOUT = 60
29 29 TIMEOUT = 15
30 30
31 31 KM = None
32 32 KC = None
33 33
34 34 #-------------------------------------------------------------------------------
35 35 # code
36 36 #-------------------------------------------------------------------------------
37 37
38 38
39 39 def start_new_kernel():
40 40 """start a new kernel, and return its Manager and Client"""
41 41 km = KernelManager()
42 42 km.start_kernel(stdout=PIPE, stderr=PIPE)
43 43 kc = km.client()
44 44 kc.start_channels()
45 45
46 46 msg_id = kc.kernel_info()
47 47 kc.get_shell_msg(block=True, timeout=STARTUP_TIMEOUT)
48 48 flush_channels(kc)
49 49 return km, kc
50 50
51 51 def flush_channels(kc=None):
52 52 """flush any messages waiting on the queue"""
53 53 from .test_message_spec import validate_message
54 54
55 55 if kc is None:
56 56 kc = KC
57 57 for channel in (kc.shell_channel, kc.iopub_channel):
58 58 while True:
59 59 try:
60 60 msg = channel.get_msg(block=True, timeout=0.1)
61 61 except Empty:
62 62 break
63 63 else:
64 64 validate_message(msg)
65 65
66 66
67 67 def execute(code='', kc=None, **kwargs):
68 68 """wrapper for doing common steps for validating an execution request"""
69 69 from .test_message_spec import validate_message
70 70 if kc is None:
71 71 kc = KC
72 72 msg_id = kc.execute(code=code, **kwargs)
73 73 reply = kc.get_shell_msg(timeout=TIMEOUT)
74 74 validate_message(reply, 'execute_reply', msg_id)
75 75 busy = kc.get_iopub_msg(timeout=TIMEOUT)
76 76 validate_message(busy, 'status', msg_id)
77 77 nt.assert_equal(busy['content']['execution_state'], 'busy')
78 78
79 79 if not kwargs.get('silent'):
80 80 pyin = kc.get_iopub_msg(timeout=TIMEOUT)
81 81 validate_message(pyin, 'pyin', msg_id)
82 82 nt.assert_equal(pyin['content']['code'], code)
83 83
84 84 return msg_id, reply['content']
85 85
86 86 def start_global_kernel():
87 87 """start the global kernel (if it isn't running) and return its client"""
88 88 global KM, KC
89 print KM, KC
90 89 if KM is None:
91 90 KM, KC = start_new_kernel()
92 91 atexit.register(stop_global_kernel)
93 92 return KC
94 93
95 94 @contextmanager
96 95 def kernel():
97 96 """Context manager for the global kernel instance
98 97
99 98 Should be used for most kernel tests
100 99
101 100 Returns
102 101 -------
103 102 kernel_client: connected KernelClient instance
104 103 """
105 104 yield start_global_kernel()
106 105
107 106 def uses_kernel(test_f):
108 107 """Decorator for tests that use the global kernel"""
109 108 def wrapped_test():
110 109 with kernel() as kc:
111 110 test_f(kc)
112 111 wrapped_test.__doc__ = test_f.__doc__
113 112 wrapped_test.__name__ = test_f.__name__
114 113 return wrapped_test
115 114
116 115 def stop_global_kernel():
117 116 """Stop the global shared kernel instance, if it exists"""
118 117 global KM, KC
119 118 KC.stop_channels()
120 119 KC = None
121 120 if KM is None:
122 121 return
123 122 KM.shutdown_kernel(now=True)
124 123 KM = None
125 124
126 125 @contextmanager
127 126 def new_kernel():
128 127 """Context manager for a new kernel in a subprocess
129 128
130 129 Should only be used for tests where the kernel must not be re-used.
131 130
132 131 Returns
133 132 -------
134 133 kernel_client: connected KernelClient instance
135 134 """
136 135 km, kc = start_new_kernel()
137 136 try:
138 137 yield kc
139 138 finally:
140 139 kc.stop_channels()
141 km.shutdown_kernel()
140 km.shutdown_kernel(now=True)
142 141
143 142
144 143 def assemble_output(iopub):
145 144 """assemble stdout/err from an execution"""
146 145 stdout = ''
147 146 stderr = ''
148 147 while True:
149 148 msg = iopub.get_msg(block=True, timeout=1)
150 149 msg_type = msg['msg_type']
151 150 content = msg['content']
152 151 if msg_type == 'status' and content['execution_state'] == 'idle':
153 152 # idle message signals end of output
154 153 break
155 154 elif msg['msg_type'] == 'stream':
156 155 if content['name'] == 'stdout':
157 stdout = stdout + content['data']
156 stdout += content['data']
158 157 elif content['name'] == 'stderr':
159 stderr = stderr + content['data']
158 stderr += content['data']
160 159 else:
161 160 raise KeyError("bad stream: %r" % content['name'])
162 161 else:
163 162 # other output, ignored
164 163 pass
165 164 return stdout, stderr
166 165
167 166
168 167
General Comments 0
You need to be logged in to leave comments. Login now