##// END OF EJS Templates
Fixing last few things with the test suite for kernel managers.
Brian E. Granger -
Show More
@@ -1,357 +1,362 b''
1 1 """A kernel manager for multiple kernels.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 20 import uuid
21 21
22 22 import zmq
23 23 from zmq.eventloop.zmqstream import ZMQStream
24 24
25 25 from tornado import web
26 26
27 27 from IPython.config.configurable import LoggingConfigurable
28 28 from IPython.utils.importstring import import_item
29 29 from IPython.utils.traitlets import (
30 30 Instance, Dict, List, Unicode, Float, Integer, Any, DottedObjectName,
31 31 )
32 32 #-----------------------------------------------------------------------------
33 33 # Classes
34 34 #-----------------------------------------------------------------------------
35 35
36 36 class DuplicateKernelError(Exception):
37 37 pass
38 38
39 39
40 40 class MultiKernelManager(LoggingConfigurable):
41 41 """A class for managing multiple kernels."""
42 42
43 43 kernel_manager_class = DottedObjectName(
44 44 "IPython.zmq.blockingkernelmanager.BlockingKernelManager", config=True,
45 45 help="""The kernel manager class. This is configurable to allow
46 46 subclassing of the KernelManager for customized behavior.
47 47 """
48 48 )
49 49 def _kernel_manager_class_changed(self, name, old, new):
50 50 self.kernel_manager_factory = import_item(new)
51 51
52 52 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
53 53 def _kernel_manager_factory_default(self):
54 54 return import_item(self.kernel_manager_class)
55 55
56 56 context = Instance('zmq.Context')
57 57 def _context_default(self):
58 58 return zmq.Context.instance()
59 59
60 60 connection_dir = Unicode('')
61 61
62 62 _kernels = Dict()
63 63
64 64 def list_kernel_ids(self):
65 65 """Return a list of the kernel ids of the active kernels."""
66 66 # Create a copy so we can iterate over kernels in operations
67 67 # that delete keys.
68 68 return list(self._kernels.keys())
69 69
70 70 def __len__(self):
71 71 """Return the number of running kernels."""
72 72 return len(self.list_kernel_ids())
73 73
74 74 def __contains__(self, kernel_id):
75 75 return kernel_id in self._kernels
76 76
77 77 def start_kernel(self, **kwargs):
78 78 """Start a new kernel.
79 79
80 80 The caller can pick a kernel_id by passing one in as a keyword arg,
81 81 otherwise one will be picked using a uuid.
82
83 To silence the kernel's stdout/stderr, call this using::
84
85 km.start_kernel(stdout=PIPE, stderr=PIPE)
86
82 87 """
83 88 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
84 89 if kernel_id in self:
85 90 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
86 91 # kernel_manager_factory is the constructor for the KernelManager
87 92 # subclass we are using. It can be configured as any Configurable,
88 93 # including things like its transport and ip.
89 94 km = self.kernel_manager_factory(connection_file=os.path.join(
90 95 self.connection_dir, "kernel-%s.json" % kernel_id),
91 96 config=self.config,
92 97 )
93 98 km.start_kernel(**kwargs)
94 99 # start just the shell channel, needed for graceful restart
95 100 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
96 101 self._kernels[kernel_id] = km
97 102 return kernel_id
98 103
99 104 def shutdown_kernel(self, kernel_id, now=False):
100 105 """Shutdown a kernel by its kernel uuid.
101 106
102 107 Parameters
103 108 ==========
104 109 kernel_id : uuid
105 110 The id of the kernel to shutdown.
106 111 now : bool
107 112 Should the kernel be shutdown forcibly using a signal.
108 113 """
109 114 k = self.get_kernel(kernel_id)
110 115 k.shutdown_kernel(now=now)
111 116 k.shell_channel.stop()
112 117 del self._kernels[kernel_id]
113 118
114 119 def shutdown_all(self, now=False):
115 120 """Shutdown all kernels."""
116 121 for kid in self.list_kernel_ids():
117 122 self.shutdown_kernel(kid, now=now)
118 123
119 124 def interrupt_kernel(self, kernel_id):
120 125 """Interrupt (SIGINT) the kernel by its uuid.
121 126
122 127 Parameters
123 128 ==========
124 129 kernel_id : uuid
125 130 The id of the kernel to interrupt.
126 131 """
127 132 return self.get_kernel(kernel_id).interrupt_kernel()
128 133
129 134 def signal_kernel(self, kernel_id, signum):
130 135 """Sends a signal to the kernel by its uuid.
131 136
132 137 Note that since only SIGTERM is supported on Windows, this function
133 138 is only useful on Unix systems.
134 139
135 140 Parameters
136 141 ==========
137 142 kernel_id : uuid
138 143 The id of the kernel to signal.
139 144 """
140 145 return self.get_kernel(kernel_id).signal_kernel(signum)
141 146
142 147 def restart_kernel(self, kernel_id):
143 148 """Restart a kernel by its uuid, keeping the same ports.
144 149
145 150 Parameters
146 151 ==========
147 152 kernel_id : uuid
148 153 The id of the kernel to interrupt.
149 154 """
150 155 return self.get_kernel(kernel_id).restart_kernel()
151 156
152 157 def get_kernel(self, kernel_id):
153 158 """Get the single KernelManager object for a kernel by its uuid.
154 159
155 160 Parameters
156 161 ==========
157 162 kernel_id : uuid
158 163 The id of the kernel.
159 164 """
160 165 km = self._kernels.get(kernel_id)
161 166 if km is not None:
162 167 return km
163 168 else:
164 169 raise KeyError("Kernel with id not found: %s" % kernel_id)
165 170
166 171 def get_connection_info(self, kernel_id):
167 172 """Return a dictionary of connection data for a kernel.
168 173
169 174 Parameters
170 175 ==========
171 176 kernel_id : uuid
172 177 The id of the kernel.
173 178
174 179 Returns
175 180 =======
176 181 connection_dict : dict
177 182 A dict of the information needed to connect to a kernel.
178 183 This includes the ip address and the integer port
179 184 numbers of the different channels (stdin_port, iopub_port,
180 185 shell_port, hb_port).
181 186 """
182 187 km = self.get_kernel(kernel_id)
183 188 return dict(transport=km.transport,
184 189 ip=km.ip,
185 190 shell_port=km.shell_port,
186 191 iopub_port=km.iopub_port,
187 192 stdin_port=km.stdin_port,
188 193 hb_port=km.hb_port,
189 194 )
190 195
191 196 def _make_url(self, transport, ip, port):
192 197 """Make a ZeroMQ URL for a given transport, ip and port."""
193 198 if transport == 'tcp':
194 199 return "tcp://%s:%i" % (ip, port)
195 200 else:
196 201 return "%s://%s-%s" % (transport, ip, port)
197 202
198 203 def _create_connected_stream(self, kernel_id, socket_type):
199 204 """Create a connected ZMQStream for a kernel."""
200 205 cinfo = self.get_connection_info(kernel_id)
201 206 url = self._make_url(cinfo['transport'], cinfo['ip'], cinfo['port'])
202 207 sock = self.context.socket(socket_type)
203 208 self.log.info("Connecting to: %s" % url)
204 209 sock.connect(url)
205 210 return ZMQStream(sock)
206 211
207 212 def create_iopub_stream(self, kernel_id):
208 213 """Return a ZMQStream object connected to the iopub channel.
209 214
210 215 Parameters
211 216 ==========
212 217 kernel_id : uuid
213 218 The id of the kernel.
214 219
215 220 Returns
216 221 =======
217 222 stream : ZMQStream
218 223 """
219 224 iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB)
220 225 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
221 226 return iopub_stream
222 227
223 228 def create_shell_stream(self, kernel_id):
224 229 """Return a ZMQStream object connected to the shell channel.
225 230
226 231 Parameters
227 232 ==========
228 233 kernel_id : uuid
229 234 The id of the kernel.
230 235
231 236 Returns
232 237 =======
233 238 stream : ZMQStream
234 239 """
235 240 shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER)
236 241 return shell_stream
237 242
238 243 def create_hb_stream(self, kernel_id):
239 244 """Return a ZMQStream object connected to the hb channel.
240 245
241 246 Parameters
242 247 ==========
243 248 kernel_id : uuid
244 249 The id of the kernel.
245 250
246 251 Returns
247 252 =======
248 253 stream : ZMQStream
249 254 """
250 255 hb_stream = self._create_connected_stream(kernel_id, zmq.REQ)
251 256 return hb_stream
252 257
253 258
254 259 class MappingKernelManager(MultiKernelManager):
255 260 """A KernelManager that handles notebok mapping and HTTP error handling"""
256 261
257 262 kernel_argv = List(Unicode)
258 263
259 264 time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
260 265 first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
261 266
262 267 max_msg_size = Integer(65536, config=True, help="""
263 268 The max raw message size accepted from the browser
264 269 over a WebSocket connection.
265 270 """)
266 271
267 272 _notebook_mapping = Dict()
268 273
269 274 #-------------------------------------------------------------------------
270 275 # Methods for managing kernels and sessions
271 276 #-------------------------------------------------------------------------
272 277
273 278 def kernel_for_notebook(self, notebook_id):
274 279 """Return the kernel_id for a notebook_id or None."""
275 280 return self._notebook_mapping.get(notebook_id)
276 281
277 282 def set_kernel_for_notebook(self, notebook_id, kernel_id):
278 283 """Associate a notebook with a kernel."""
279 284 if notebook_id is not None:
280 285 self._notebook_mapping[notebook_id] = kernel_id
281 286
282 287 def notebook_for_kernel(self, kernel_id):
283 288 """Return the notebook_id for a kernel_id or None."""
284 289 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
285 290 if len(notebook_ids) == 1:
286 291 return notebook_ids[0]
287 292 else:
288 293 return None
289 294
290 295 def delete_mapping_for_kernel(self, kernel_id):
291 296 """Remove the kernel/notebook mapping for kernel_id."""
292 297 notebook_id = self.notebook_for_kernel(kernel_id)
293 298 if notebook_id is not None:
294 299 del self._notebook_mapping[notebook_id]
295 300
296 301 def start_kernel(self, notebook_id=None, **kwargs):
297 302 """Start a kernel for a notebok an return its kernel_id.
298 303
299 304 Parameters
300 305 ----------
301 306 notebook_id : uuid
302 307 The uuid of the notebook to associate the new kernel with. If this
303 308 is not None, this kernel will be persistent whenever the notebook
304 309 requests a kernel.
305 310 """
306 311 kernel_id = self.kernel_for_notebook(notebook_id)
307 312 if kernel_id is None:
308 313 kwargs['extra_arguments'] = self.kernel_argv
309 314 kernel_id = super(MappingKernelManager, self).start_kernel(**kwargs)
310 315 self.set_kernel_for_notebook(notebook_id, kernel_id)
311 316 self.log.info("Kernel started: %s" % kernel_id)
312 317 self.log.debug("Kernel args: %r" % kwargs)
313 318 else:
314 319 self.log.info("Using existing kernel: %s" % kernel_id)
315 320 return kernel_id
316 321
317 322 def shutdown_kernel(self, kernel_id, now=False):
318 323 """Shutdown a kernel and remove its notebook association."""
319 324 self._check_kernel_id(kernel_id)
320 325 super(MappingKernelManager, self).shutdown_kernel(
321 326 kernel_id, now=now
322 327 )
323 328 self.delete_mapping_for_kernel(kernel_id)
324 329 self.log.info("Kernel shutdown: %s" % kernel_id)
325 330
326 331 def interrupt_kernel(self, kernel_id):
327 332 """Interrupt a kernel."""
328 333 self._check_kernel_id(kernel_id)
329 334 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
330 335 self.log.info("Kernel interrupted: %s" % kernel_id)
331 336
332 337 def restart_kernel(self, kernel_id):
333 338 """Restart a kernel while keeping clients connected."""
334 339 self._check_kernel_id(kernel_id)
335 340 super(MappingKernelManager, self).restart_kernel(kernel_id)
336 341 self.log.info("Kernel restarted: %s" % kernel_id)
337 342
338 343 def create_iopub_stream(self, kernel_id):
339 344 """Create a new iopub stream."""
340 345 self._check_kernel_id(kernel_id)
341 346 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
342 347
343 348 def create_shell_stream(self, kernel_id):
344 349 """Create a new shell stream."""
345 350 self._check_kernel_id(kernel_id)
346 351 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
347 352
348 353 def create_hb_stream(self, kernel_id):
349 354 """Create a new hb stream."""
350 355 self._check_kernel_id(kernel_id)
351 356 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
352 357
353 358 def _check_kernel_id(self, kernel_id):
354 359 """Check a that a kernel_id exists and raise 404 if not."""
355 360 if kernel_id not in self:
356 361 raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
357 362
@@ -1,69 +1,70 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 from subprocess import PIPE
3 4 import time
4 5 from unittest import TestCase
5 6
6 7 from IPython.config.loader import Config
7 8 from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager
8 9 from IPython.zmq.kernelmanager import KernelManager
9 10
10 11 class TestKernelManager(TestCase):
11 12
12 13 def _get_tcp_km(self):
13 14 return MultiKernelManager()
14 15
15 16 def _get_ipc_km(self):
16 17 c = Config()
17 18 c.KernelManager.transport = 'ipc'
18 19 c.KernelManager.ip = 'test'
19 20 km = MultiKernelManager(config=c)
20 21 return km
21 22
22 23 def _run_lifecycle(self, km):
23 kid = km.start_kernel()
24 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
24 25 self.assertTrue(kid in km)
25 26 self.assertTrue(kid in km.list_kernel_ids())
26 27 self.assertEqual(len(km),1)
27 28 km.restart_kernel(kid)
28 29 self.assertTrue(kid in km.list_kernel_ids())
29 30 # We need a delay here to give the restarting kernel a chance to
30 31 # restart. Otherwise, the interrupt will kill it, causing the test
31 32 # suite to hang. The reason it *hangs* is that the shutdown
32 33 # message for the restart sometimes hasn't been sent to the kernel.
33 34 # Because linger is oo on the shell channel, the context can't
34 35 # close until the message is sent to the kernel, which is not dead.
35 36 time.sleep(1.0)
36 37 km.interrupt_kernel(kid)
37 38 k = km.get_kernel(kid)
38 39 self.assertTrue(isinstance(k, KernelManager))
39 40 km.shutdown_kernel(kid)
40 41 self.assertTrue(not kid in km)
41 42
42 43 def _run_cinfo(self, km, transport, ip):
43 kid = km.start_kernel()
44 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
44 45 k = km.get_kernel(kid)
45 46 cinfo = km.get_connection_info(kid)
46 47 self.assertEqual(transport, cinfo['transport'])
47 48 self.assertEqual(ip, cinfo['ip'])
48 49 self.assertTrue('stdin_port' in cinfo)
49 50 self.assertTrue('iopub_port' in cinfo)
50 51 self.assertTrue('shell_port' in cinfo)
51 52 self.assertTrue('hb_port' in cinfo)
52 53 km.shutdown_kernel(kid)
53 54
54 55 def test_tcp_lifecycle(self):
55 56 km = self._get_tcp_km()
56 57 self._run_lifecycle(km)
57 58
58 59 def test_tcp_cinfo(self):
59 60 km = self._get_tcp_km()
60 61 self._run_cinfo(km, 'tcp', '127.0.0.1')
61 62
62 63 def test_ipc_lifecycle(self):
63 64 km = self._get_ipc_km()
64 65 self._run_lifecycle(km)
65 66
66 67 def test_ipc_cinfo(self):
67 68 km = self._get_ipc_km()
68 69 self._run_cinfo(km, 'ipc', 'test')
69 70
@@ -1,44 +1,45 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 from subprocess import PIPE
3 4 import time
4 5 from unittest import TestCase
5 6
6 7 from IPython.config.loader import Config
7 8 from IPython.zmq.kernelmanager import KernelManager
8 9
9 10 class TestKernelManager(TestCase):
10 11
11 12 def _get_tcp_km(self):
12 13 return KernelManager()
13 14
14 15 def _get_ipc_km(self):
15 16 c = Config()
16 17 c.KernelManager.transport = 'ipc'
17 18 c.KernelManager.ip = 'test'
18 19 km = KernelManager(config=c)
19 20 return km
20 21
21 22 def _run_lifecycle(self, km):
22 km.start_kernel()
23 km.start_kernel(stdout=PIPE, stderr=PIPE)
23 24 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
24 25 km.restart_kernel()
25 26 # We need a delay here to give the restarting kernel a chance to
26 27 # restart. Otherwise, the interrupt will kill it, causing the test
27 28 # suite to hang. The reason it *hangs* is that the shutdown
28 29 # message for the restart sometimes hasn't been sent to the kernel.
29 30 # Because linger is oo on the shell channel, the context can't
30 31 # close until the message is sent to the kernel, which is not dead.
31 time.sleep()
32 time.sleep(1.0)
32 33 km.interrupt_kernel()
33 34 self.assertTrue(isinstance(km, KernelManager))
34 35 km.shutdown_kernel()
35 36 km.shell_channel.stop()
36 37
37 38 def test_tcp_lifecycle(self):
38 39 km = self._get_tcp_km()
39 40 self._run_lifecycle(km)
40 41
41 42 def testipc_lifecycle(self):
42 43 km = self._get_ipc_km()
43 44 self._run_lifecycle(km)
44 45
General Comments 0
You need to be logged in to leave comments. Login now