##// END OF EJS Templates
Fixing bug in kernelmanager tests that was causing tests to hang....
Brian E. Granger -
Show More
@@ -1,60 +1,69 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 import time
3 4 from unittest import TestCase
4 5
5 6 from IPython.config.loader import Config
6 7 from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager
7 8 from IPython.zmq.kernelmanager import KernelManager
8 9
9 10 class TestKernelManager(TestCase):
10 11
11 12 def _get_tcp_km(self):
12 13 return MultiKernelManager()
13 14
14 15 def _get_ipc_km(self):
15 16 c = Config()
16 17 c.KernelManager.transport = 'ipc'
17 18 c.KernelManager.ip = 'test'
18 19 km = MultiKernelManager(config=c)
19 20 return km
20 21
21 22 def _run_lifecycle(self, km):
22 23 kid = km.start_kernel()
23 24 self.assertTrue(kid in km)
24 25 self.assertTrue(kid in km.list_kernel_ids())
25 26 self.assertEqual(len(km),1)
26 27 km.restart_kernel(kid)
27 28 self.assertTrue(kid in km.list_kernel_ids())
29 # We need a delay here to give the restarting kernel a chance to
30 # restart. Otherwise, the interrupt will kill it, causing the test
31 # suite to hang. The reason it *hangs* is that the shutdown
32 # message for the restart sometimes hasn't been sent to the kernel.
33 # Because linger is oo on the shell channel, the context can't
34 # close until the message is sent to the kernel, which is not dead.
35 time.sleep(1.0)
28 36 km.interrupt_kernel(kid)
29 37 k = km.get_kernel(kid)
30 38 self.assertTrue(isinstance(k, KernelManager))
31 39 km.shutdown_kernel(kid)
32 40 self.assertTrue(not kid in km)
33 41
34 42 def _run_cinfo(self, km, transport, ip):
35 43 kid = km.start_kernel()
36 44 k = km.get_kernel(kid)
37 45 cinfo = km.get_connection_info(kid)
38 46 self.assertEqual(transport, cinfo['transport'])
39 47 self.assertEqual(ip, cinfo['ip'])
40 48 self.assertTrue('stdin_port' in cinfo)
41 49 self.assertTrue('iopub_port' in cinfo)
42 50 self.assertTrue('shell_port' in cinfo)
43 51 self.assertTrue('hb_port' in cinfo)
44 52 km.shutdown_kernel(kid)
45 53
46 def test_km_tcp(self):
54 def test_tcp_lifecycle(self):
47 55 km = self._get_tcp_km()
48 56 self._run_lifecycle(km)
49 57
50 58 def test_tcp_cinfo(self):
51 59 km = self._get_tcp_km()
52 60 self._run_cinfo(km, 'tcp', '127.0.0.1')
53 61
54 def test_km_ipc(self):
62 def test_ipc_lifecycle(self):
55 63 km = self._get_ipc_km()
56 64 self._run_lifecycle(km)
57 65
58 66 def test_ipc_cinfo(self):
59 67 km = self._get_ipc_km()
60 68 self._run_cinfo(km, 'ipc', 'test')
69
@@ -1,1083 +1,1082 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45 from IPython.zmq.kernelmanagerabc import (
46 46 ShellChannelABC, IOPubChannelABC,
47 47 HBChannelABC, StdInChannelABC,
48 48 KernelManagerABC
49 49 )
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Constants and exceptions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class InvalidPortNumber(Exception):
57 57 pass
58 58
59 59 #-----------------------------------------------------------------------------
60 60 # Utility functions
61 61 #-----------------------------------------------------------------------------
62 62
63 63 # some utilities to validate message structure, these might get moved elsewhere
64 64 # if they prove to have more generic utility
65 65
66 66 def validate_string_list(lst):
67 67 """Validate that the input is a list of strings.
68 68
69 69 Raises ValueError if not."""
70 70 if not isinstance(lst, list):
71 71 raise ValueError('input %r must be a list' % lst)
72 72 for x in lst:
73 73 if not isinstance(x, basestring):
74 74 raise ValueError('element %r in list must be a string' % x)
75 75
76 76
77 77 def validate_string_dict(dct):
78 78 """Validate that the input is a dict with string keys and values.
79 79
80 80 Raises ValueError if not."""
81 81 for k,v in dct.iteritems():
82 82 if not isinstance(k, basestring):
83 83 raise ValueError('key %r in dict must be a string' % k)
84 84 if not isinstance(v, basestring):
85 85 raise ValueError('value %r in dict must be a string' % v)
86 86
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # ZMQ Socket Channel classes
90 90 #-----------------------------------------------------------------------------
91 91
92 92 class ZMQSocketChannel(Thread):
93 93 """The base class for the channels that use ZMQ sockets."""
94 94 context = None
95 95 session = None
96 96 socket = None
97 97 ioloop = None
98 98 stream = None
99 99 _address = None
100 100 _exiting = False
101 101
102 102 def __init__(self, context, session, address):
103 103 """Create a channel.
104 104
105 105 Parameters
106 106 ----------
107 107 context : :class:`zmq.Context`
108 108 The ZMQ context to use.
109 109 session : :class:`session.Session`
110 110 The session to use.
111 111 address : zmq url
112 112 Standard (ip, port) tuple that the kernel is listening on.
113 113 """
114 114 super(ZMQSocketChannel, self).__init__()
115 115 self.daemon = True
116 116
117 117 self.context = context
118 118 self.session = session
119 119 if isinstance(address, tuple):
120 120 if address[1] == 0:
121 121 message = 'The port number for a channel cannot be 0.'
122 122 raise InvalidPortNumber(message)
123 123 address = "tcp://%s:%i" % address
124 124 self._address = address
125 125 atexit.register(self._notice_exit)
126 126
127 127 def _notice_exit(self):
128 128 self._exiting = True
129 129
130 130 def _run_loop(self):
131 131 """Run my loop, ignoring EINTR events in the poller"""
132 132 while True:
133 133 try:
134 134 self.ioloop.start()
135 135 except ZMQError as e:
136 136 if e.errno == errno.EINTR:
137 137 continue
138 138 else:
139 139 raise
140 140 except Exception:
141 141 if self._exiting:
142 142 break
143 143 else:
144 144 raise
145 145 else:
146 146 break
147 147
148 148 def stop(self):
149 149 """Stop the channel's event loop and join its thread.
150 150
151 151 This calls :method:`Thread.join` and returns when the thread
152 152 terminates. :class:`RuntimeError` will be raised if
153 153 :method:`self.start` is called again.
154 154 """
155 155 self.join()
156 156
157 157 @property
158 158 def address(self):
159 159 """Get the channel's address as a zmq url string.
160 160
161 161 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 162 """
163 163 return self._address
164 164
165 165 def _queue_send(self, msg):
166 166 """Queue a message to be sent from the IOLoop's thread.
167 167
168 168 Parameters
169 169 ----------
170 170 msg : message to send
171 171
172 172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 173 thread control of the action.
174 174 """
175 175 def thread_send():
176 176 self.session.send(self.stream, msg)
177 177 self.ioloop.add_callback(thread_send)
178 178
179 179 def _handle_recv(self, msg):
180 180 """Callback for stream.on_recv.
181 181
182 182 Unpacks message, and calls handlers with it.
183 183 """
184 184 ident,smsg = self.session.feed_identities(msg)
185 185 self.call_handlers(self.session.unserialize(smsg))
186 186
187 187
188 188
189 189 class ShellChannel(ZMQSocketChannel):
190 190 """The shell channel for issuing request/replies to the kernel."""
191 191
192 192 command_queue = None
193 193 # flag for whether execute requests should be allowed to call raw_input:
194 194 allow_stdin = True
195 195
196 196 def __init__(self, context, session, address):
197 197 super(ShellChannel, self).__init__(context, session, address)
198 198 self.ioloop = ioloop.IOLoop()
199 199
200 200 def run(self):
201 201 """The thread's main activity. Call start() instead."""
202 202 self.socket = self.context.socket(zmq.DEALER)
203 203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.linger = 0
205 204 self.socket.connect(self.address)
206 205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
207 206 self.stream.on_recv(self._handle_recv)
208 207 self._run_loop()
209 208 try:
210 209 self.socket.close()
211 210 except:
212 211 pass
213 212
214 213 def stop(self):
215 214 """Stop the channel's event loop and join its thread."""
216 215 self.ioloop.stop()
217 216 super(ShellChannel, self).stop()
218 217
219 218 def call_handlers(self, msg):
220 219 """This method is called in the ioloop thread when a message arrives.
221 220
222 221 Subclasses should override this method to handle incoming messages.
223 222 It is important to remember that this method is called in the thread
224 223 so that some logic must be done to ensure that the application leve
225 224 handlers are called in the application thread.
226 225 """
227 226 raise NotImplementedError('call_handlers must be defined in a subclass.')
228 227
229 228 def execute(self, code, silent=False, store_history=True,
230 229 user_variables=None, user_expressions=None, allow_stdin=None):
231 230 """Execute code in the kernel.
232 231
233 232 Parameters
234 233 ----------
235 234 code : str
236 235 A string of Python code.
237 236
238 237 silent : bool, optional (default False)
239 238 If set, the kernel will execute the code as quietly possible, and
240 239 will force store_history to be False.
241 240
242 241 store_history : bool, optional (default True)
243 242 If set, the kernel will store command history. This is forced
244 243 to be False if silent is True.
245 244
246 245 user_variables : list, optional
247 246 A list of variable names to pull from the user's namespace. They
248 247 will come back as a dict with these names as keys and their
249 248 :func:`repr` as values.
250 249
251 250 user_expressions : dict, optional
252 251 A dict mapping names to expressions to be evaluated in the user's
253 252 dict. The expression values are returned as strings formatted using
254 253 :func:`repr`.
255 254
256 255 allow_stdin : bool, optional (default self.allow_stdin)
257 256 Flag for whether the kernel can send stdin requests to frontends.
258 257
259 258 Some frontends (e.g. the Notebook) do not support stdin requests.
260 259 If raw_input is called from code executed from such a frontend, a
261 260 StdinNotImplementedError will be raised.
262 261
263 262 Returns
264 263 -------
265 264 The msg_id of the message sent.
266 265 """
267 266 if user_variables is None:
268 267 user_variables = []
269 268 if user_expressions is None:
270 269 user_expressions = {}
271 270 if allow_stdin is None:
272 271 allow_stdin = self.allow_stdin
273 272
274 273
275 274 # Don't waste network traffic if inputs are invalid
276 275 if not isinstance(code, basestring):
277 276 raise ValueError('code %r must be a string' % code)
278 277 validate_string_list(user_variables)
279 278 validate_string_dict(user_expressions)
280 279
281 280 # Create class for content/msg creation. Related to, but possibly
282 281 # not in Session.
283 282 content = dict(code=code, silent=silent, store_history=store_history,
284 283 user_variables=user_variables,
285 284 user_expressions=user_expressions,
286 285 allow_stdin=allow_stdin,
287 286 )
288 287 msg = self.session.msg('execute_request', content)
289 288 self._queue_send(msg)
290 289 return msg['header']['msg_id']
291 290
292 291 def complete(self, text, line, cursor_pos, block=None):
293 292 """Tab complete text in the kernel's namespace.
294 293
295 294 Parameters
296 295 ----------
297 296 text : str
298 297 The text to complete.
299 298 line : str
300 299 The full line of text that is the surrounding context for the
301 300 text to complete.
302 301 cursor_pos : int
303 302 The position of the cursor in the line where the completion was
304 303 requested.
305 304 block : str, optional
306 305 The full block of code in which the completion is being requested.
307 306
308 307 Returns
309 308 -------
310 309 The msg_id of the message sent.
311 310 """
312 311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
313 312 msg = self.session.msg('complete_request', content)
314 313 self._queue_send(msg)
315 314 return msg['header']['msg_id']
316 315
317 316 def object_info(self, oname, detail_level=0):
318 317 """Get metadata information about an object in the kernel's namespace.
319 318
320 319 Parameters
321 320 ----------
322 321 oname : str
323 322 A string specifying the object name.
324 323 detail_level : int, optional
325 324 The level of detail for the introspection (0-2)
326 325
327 326 Returns
328 327 -------
329 328 The msg_id of the message sent.
330 329 """
331 330 content = dict(oname=oname, detail_level=detail_level)
332 331 msg = self.session.msg('object_info_request', content)
333 332 self._queue_send(msg)
334 333 return msg['header']['msg_id']
335 334
336 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
337 336 """Get entries from the kernel's history list.
338 337
339 338 Parameters
340 339 ----------
341 340 raw : bool
342 341 If True, return the raw input.
343 342 output : bool
344 343 If True, then return the output as well.
345 344 hist_access_type : str
346 345 'range' (fill in session, start and stop params), 'tail' (fill in n)
347 346 or 'search' (fill in pattern param).
348 347
349 348 session : int
350 349 For a range request, the session from which to get lines. Session
351 350 numbers are positive integers; negative ones count back from the
352 351 current session.
353 352 start : int
354 353 The first line number of a history range.
355 354 stop : int
356 355 The final (excluded) line number of a history range.
357 356
358 357 n : int
359 358 The number of lines of history to get for a tail request.
360 359
361 360 pattern : str
362 361 The glob-syntax pattern for a search request.
363 362
364 363 Returns
365 364 -------
366 365 The msg_id of the message sent.
367 366 """
368 367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
369 368 **kwargs)
370 369 msg = self.session.msg('history_request', content)
371 370 self._queue_send(msg)
372 371 return msg['header']['msg_id']
373 372
374 373 def kernel_info(self):
375 374 """Request kernel info."""
376 375 msg = self.session.msg('kernel_info_request')
377 376 self._queue_send(msg)
378 377 return msg['header']['msg_id']
379 378
380 379 def shutdown(self, restart=False):
381 380 """Request an immediate kernel shutdown.
382 381
383 382 Upon receipt of the (empty) reply, client code can safely assume that
384 383 the kernel has shut down and it's safe to forcefully terminate it if
385 384 it's still alive.
386 385
387 386 The kernel will send the reply via a function registered with Python's
388 387 atexit module, ensuring it's truly done as the kernel is done with all
389 388 normal operation.
390 389 """
391 390 # Send quit message to kernel. Once we implement kernel-side setattr,
392 391 # this should probably be done that way, but for now this will do.
393 392 msg = self.session.msg('shutdown_request', {'restart':restart})
394 393 self._queue_send(msg)
395 394 return msg['header']['msg_id']
396 395
397 396
398 397
399 398 class IOPubChannel(ZMQSocketChannel):
400 399 """The iopub channel which listens for messages that the kernel publishes.
401 400
402 401 This channel is where all output is published to frontends.
403 402 """
404 403
405 404 def __init__(self, context, session, address):
406 405 super(IOPubChannel, self).__init__(context, session, address)
407 406 self.ioloop = ioloop.IOLoop()
408 407
409 408 def run(self):
410 409 """The thread's main activity. Call start() instead."""
411 410 self.socket = self.context.socket(zmq.SUB)
412 411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
413 412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
414 413 self.socket.connect(self.address)
415 414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
416 415 self.stream.on_recv(self._handle_recv)
417 416 self._run_loop()
418 417 try:
419 418 self.socket.close()
420 419 except:
421 420 pass
422 421
423 422 def stop(self):
424 423 """Stop the channel's event loop and join its thread."""
425 424 self.ioloop.stop()
426 425 super(IOPubChannel, self).stop()
427 426
428 427 def call_handlers(self, msg):
429 428 """This method is called in the ioloop thread when a message arrives.
430 429
431 430 Subclasses should override this method to handle incoming messages.
432 431 It is important to remember that this method is called in the thread
433 432 so that some logic must be done to ensure that the application leve
434 433 handlers are called in the application thread.
435 434 """
436 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
437 436
438 437 def flush(self, timeout=1.0):
439 438 """Immediately processes all pending messages on the iopub channel.
440 439
441 440 Callers should use this method to ensure that :method:`call_handlers`
442 441 has been called for all messages that have been received on the
443 442 0MQ SUB socket of this channel.
444 443
445 444 This method is thread safe.
446 445
447 446 Parameters
448 447 ----------
449 448 timeout : float, optional
450 449 The maximum amount of time to spend flushing, in seconds. The
451 450 default is one second.
452 451 """
453 452 # We do the IOLoop callback process twice to ensure that the IOLoop
454 453 # gets to perform at least one full poll.
455 454 stop_time = time.time() + timeout
456 455 for i in xrange(2):
457 456 self._flushed = False
458 457 self.ioloop.add_callback(self._flush)
459 458 while not self._flushed and time.time() < stop_time:
460 459 time.sleep(0.01)
461 460
462 461 def _flush(self):
463 462 """Callback for :method:`self.flush`."""
464 463 self.stream.flush()
465 464 self._flushed = True
466 465
467 466
468 467 class StdInChannel(ZMQSocketChannel):
469 468 """The stdin channel to handle raw_input requests that the kernel makes."""
470 469
471 470 msg_queue = None
472 471
473 472 def __init__(self, context, session, address):
474 473 super(StdInChannel, self).__init__(context, session, address)
475 474 self.ioloop = ioloop.IOLoop()
476 475
477 476 def run(self):
478 477 """The thread's main activity. Call start() instead."""
479 478 self.socket = self.context.socket(zmq.DEALER)
480 479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
481 480 self.socket.connect(self.address)
482 481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
483 482 self.stream.on_recv(self._handle_recv)
484 483 self._run_loop()
485 484 try:
486 485 self.socket.close()
487 486 except:
488 487 pass
489 488
490 489 def stop(self):
491 490 """Stop the channel's event loop and join its thread."""
492 491 self.ioloop.stop()
493 492 super(StdInChannel, self).stop()
494 493
495 494 def call_handlers(self, msg):
496 495 """This method is called in the ioloop thread when a message arrives.
497 496
498 497 Subclasses should override this method to handle incoming messages.
499 498 It is important to remember that this method is called in the thread
500 499 so that some logic must be done to ensure that the application leve
501 500 handlers are called in the application thread.
502 501 """
503 502 raise NotImplementedError('call_handlers must be defined in a subclass.')
504 503
505 504 def input(self, string):
506 505 """Send a string of raw input to the kernel."""
507 506 content = dict(value=string)
508 507 msg = self.session.msg('input_reply', content)
509 508 self._queue_send(msg)
510 509
511 510
512 511 class HBChannel(ZMQSocketChannel):
513 512 """The heartbeat channel which monitors the kernel heartbeat.
514 513
515 514 Note that the heartbeat channel is paused by default. As long as you start
516 515 this channel, the kernel manager will ensure that it is paused and un-paused
517 516 as appropriate.
518 517 """
519 518
520 519 time_to_dead = 3.0
521 520 socket = None
522 521 poller = None
523 522 _running = None
524 523 _pause = None
525 524 _beating = None
526 525
527 526 def __init__(self, context, session, address):
528 527 super(HBChannel, self).__init__(context, session, address)
529 528 self._running = False
530 529 self._pause =True
531 530 self.poller = zmq.Poller()
532 531
533 532 def _create_socket(self):
534 533 if self.socket is not None:
535 534 # close previous socket, before opening a new one
536 535 self.poller.unregister(self.socket)
537 536 self.socket.close()
538 537 self.socket = self.context.socket(zmq.REQ)
539 538 self.socket.setsockopt(zmq.LINGER, 0)
540 539 self.socket.connect(self.address)
541 540
542 541 self.poller.register(self.socket, zmq.POLLIN)
543 542
544 543 def _poll(self, start_time):
545 544 """poll for heartbeat replies until we reach self.time_to_dead.
546 545
547 546 Ignores interrupts, and returns the result of poll(), which
548 547 will be an empty list if no messages arrived before the timeout,
549 548 or the event tuple if there is a message to receive.
550 549 """
551 550
552 551 until_dead = self.time_to_dead - (time.time() - start_time)
553 552 # ensure poll at least once
554 553 until_dead = max(until_dead, 1e-3)
555 554 events = []
556 555 while True:
557 556 try:
558 557 events = self.poller.poll(1000 * until_dead)
559 558 except ZMQError as e:
560 559 if e.errno == errno.EINTR:
561 560 # ignore interrupts during heartbeat
562 561 # this may never actually happen
563 562 until_dead = self.time_to_dead - (time.time() - start_time)
564 563 until_dead = max(until_dead, 1e-3)
565 564 pass
566 565 else:
567 566 raise
568 567 except Exception:
569 568 if self._exiting:
570 569 break
571 570 else:
572 571 raise
573 572 else:
574 573 break
575 574 return events
576 575
577 576 def run(self):
578 577 """The thread's main activity. Call start() instead."""
579 578 self._create_socket()
580 579 self._running = True
581 580 self._beating = True
582 581
583 582 while self._running:
584 583 if self._pause:
585 584 # just sleep, and skip the rest of the loop
586 585 time.sleep(self.time_to_dead)
587 586 continue
588 587
589 588 since_last_heartbeat = 0.0
590 589 # io.rprint('Ping from HB channel') # dbg
591 590 # no need to catch EFSM here, because the previous event was
592 591 # either a recv or connect, which cannot be followed by EFSM
593 592 self.socket.send(b'ping')
594 593 request_time = time.time()
595 594 ready = self._poll(request_time)
596 595 if ready:
597 596 self._beating = True
598 597 # the poll above guarantees we have something to recv
599 598 self.socket.recv()
600 599 # sleep the remainder of the cycle
601 600 remainder = self.time_to_dead - (time.time() - request_time)
602 601 if remainder > 0:
603 602 time.sleep(remainder)
604 603 continue
605 604 else:
606 605 # nothing was received within the time limit, signal heart failure
607 606 self._beating = False
608 607 since_last_heartbeat = time.time() - request_time
609 608 self.call_handlers(since_last_heartbeat)
610 609 # and close/reopen the socket, because the REQ/REP cycle has been broken
611 610 self._create_socket()
612 611 continue
613 612 try:
614 613 self.socket.close()
615 614 except:
616 615 pass
617 616
618 617 def pause(self):
619 618 """Pause the heartbeat."""
620 619 self._pause = True
621 620
622 621 def unpause(self):
623 622 """Unpause the heartbeat."""
624 623 self._pause = False
625 624
626 625 def is_beating(self):
627 626 """Is the heartbeat running and responsive (and not paused)."""
628 627 if self.is_alive() and not self._pause and self._beating:
629 628 return True
630 629 else:
631 630 return False
632 631
633 632 def stop(self):
634 633 """Stop the channel's event loop and join its thread."""
635 634 self._running = False
636 635 super(HBChannel, self).stop()
637 636
638 637 def call_handlers(self, since_last_heartbeat):
639 638 """This method is called in the ioloop thread when a message arrives.
640 639
641 640 Subclasses should override this method to handle incoming messages.
642 641 It is important to remember that this method is called in the thread
643 642 so that some logic must be done to ensure that the application level
644 643 handlers are called in the application thread.
645 644 """
646 645 raise NotImplementedError('call_handlers must be defined in a subclass.')
647 646
648 647
649 648 #-----------------------------------------------------------------------------
650 649 # Main kernel manager class
651 650 #-----------------------------------------------------------------------------
652 651
653 652 class KernelManager(Configurable):
654 653 """Manages a single kernel on this host along with its channels.
655 654
656 655 There are four channels associated with each kernel:
657 656
658 657 * shell: for request/reply calls to the kernel.
659 658 * iopub: for the kernel to publish results to frontends.
660 659 * hb: for monitoring the kernel's heartbeat.
661 660 * stdin: for frontends to reply to raw_input calls in the kernel.
662 661
663 662 The usage of the channels that this class manages is optional. It is
664 663 entirely possible to connect to the kernels directly using ZeroMQ
665 664 sockets. These channels are useful primarily for talking to a kernel
666 665 whose :class:`KernelManager` is in the same process.
667 666
668 667 This version manages kernels started using Popen.
669 668 """
670 669 # The PyZMQ Context to use for communication with the kernel.
671 670 context = Instance(zmq.Context)
672 671 def _context_default(self):
673 672 return zmq.Context.instance()
674 673
675 674 # The Session to use for communication with the kernel.
676 675 session = Instance(Session)
677 676 def _session_default(self):
678 677 return Session(config=self.config)
679 678
680 679 # The kernel process with which the KernelManager is communicating.
681 680 kernel = Instance(Popen)
682 681
683 682 # The addresses for the communication channels.
684 683 connection_file = Unicode('')
685 684
686 685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
687 686
688 687 ip = Unicode(LOCALHOST, config=True)
689 688 def _ip_changed(self, name, old, new):
690 689 if new == '*':
691 690 self.ip = '0.0.0.0'
692 691 shell_port = Integer(0)
693 692 iopub_port = Integer(0)
694 693 stdin_port = Integer(0)
695 694 hb_port = Integer(0)
696 695
697 696 # The classes to use for the various channels.
698 697 shell_channel_class = Type(ShellChannel)
699 698 iopub_channel_class = Type(IOPubChannel)
700 699 stdin_channel_class = Type(StdInChannel)
701 700 hb_channel_class = Type(HBChannel)
702 701
703 702 # Protected traits.
704 703 _launch_args = Any
705 704 _shell_channel = Any
706 705 _iopub_channel = Any
707 706 _stdin_channel = Any
708 707 _hb_channel = Any
709 708 _connection_file_written=Bool(False)
710 709
711 710 def __del__(self):
712 711 self.cleanup_connection_file()
713 712
714 713 #--------------------------------------------------------------------------
715 714 # Channel management methods:
716 715 #--------------------------------------------------------------------------
717 716
718 717 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
719 718 """Starts the channels for this kernel.
720 719
721 720 This will create the channels if they do not exist and then start
722 721 them (their activity runs in a thread). If port numbers of 0 are
723 722 being used (random ports) then you must first call
724 723 :method:`start_kernel`. If the channels have been stopped and you
725 724 call this, :class:`RuntimeError` will be raised.
726 725 """
727 726 if shell:
728 727 self.shell_channel.start()
729 728 if iopub:
730 729 self.iopub_channel.start()
731 730 if stdin:
732 731 self.stdin_channel.start()
733 732 self.shell_channel.allow_stdin = True
734 733 else:
735 734 self.shell_channel.allow_stdin = False
736 735 if hb:
737 736 self.hb_channel.start()
738 737
739 738 def stop_channels(self):
740 739 """Stops all the running channels for this kernel.
741 740
742 741 This stops their event loops and joins their threads.
743 742 """
744 743 if self.shell_channel.is_alive():
745 744 self.shell_channel.stop()
746 745 if self.iopub_channel.is_alive():
747 746 self.iopub_channel.stop()
748 747 if self.stdin_channel.is_alive():
749 748 self.stdin_channel.stop()
750 749 if self.hb_channel.is_alive():
751 750 self.hb_channel.stop()
752 751
753 752 @property
754 753 def channels_running(self):
755 754 """Are any of the channels created and running?"""
756 755 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
757 756 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
758 757
759 758 def _make_url(self, port):
760 759 """Make a zmq url with a port.
761 760
762 761 There are two cases that this handles:
763 762
764 763 * tcp: tcp://ip:port
765 764 * ipc: ipc://ip-port
766 765 """
767 766 if self.transport == 'tcp':
768 767 return "tcp://%s:%i" % (self.ip, port)
769 768 else:
770 769 return "%s://%s-%s" % (self.transport, self.ip, port)
771 770
772 771 @property
773 772 def shell_channel(self):
774 773 """Get the shell channel object for this kernel."""
775 774 if self._shell_channel is None:
776 775 self._shell_channel = self.shell_channel_class(
777 776 self.context, self.session, self._make_url(self.shell_port)
778 777 )
779 778 return self._shell_channel
780 779
781 780 @property
782 781 def iopub_channel(self):
783 782 """Get the iopub channel object for this kernel."""
784 783 if self._iopub_channel is None:
785 784 self._iopub_channel = self.iopub_channel_class(
786 785 self.context, self.session, self._make_url(self.iopub_port)
787 786 )
788 787 return self._iopub_channel
789 788
790 789 @property
791 790 def stdin_channel(self):
792 791 """Get the stdin channel object for this kernel."""
793 792 if self._stdin_channel is None:
794 793 self._stdin_channel = self.stdin_channel_class(
795 794 self.context, self.session, self._make_url(self.stdin_port)
796 795 )
797 796 return self._stdin_channel
798 797
799 798 @property
800 799 def hb_channel(self):
801 800 """Get the hb channel object for this kernel."""
802 801 if self._hb_channel is None:
803 802 self._hb_channel = self.hb_channel_class(
804 803 self.context, self.session, self._make_url(self.hb_port)
805 804 )
806 805 return self._hb_channel
807 806
808 807 #--------------------------------------------------------------------------
809 808 # Connection and ipc file management.
810 809 #--------------------------------------------------------------------------
811 810
812 811 def cleanup_connection_file(self):
813 812 """Cleanup connection file *if we wrote it*
814 813
815 814 Will not raise if the connection file was already removed somehow.
816 815 """
817 816 if self._connection_file_written:
818 817 # cleanup connection files on full shutdown of kernel we started
819 818 self._connection_file_written = False
820 819 try:
821 820 os.remove(self.connection_file)
822 821 except (IOError, OSError):
823 822 pass
824 823
825 824 self.cleanup_ipc_files()
826 825
827 826 def cleanup_ipc_files(self):
828 827 """Cleanup ipc files if we wrote them."""
829 828 if self.transport != 'ipc':
830 829 return
831 830 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
832 831 ipcfile = "%s-%i" % (self.ip, port)
833 832 try:
834 833 os.remove(ipcfile)
835 834 except (IOError, OSError):
836 835 pass
837 836
838 837 def load_connection_file(self):
839 838 """Load connection info from JSON dict in self.connection_file."""
840 839 with open(self.connection_file) as f:
841 840 cfg = json.loads(f.read())
842 841
843 842 from pprint import pprint
844 843 pprint(cfg)
845 844 self.transport = cfg.get('transport', 'tcp')
846 845 self.ip = cfg['ip']
847 846 self.shell_port = cfg['shell_port']
848 847 self.stdin_port = cfg['stdin_port']
849 848 self.iopub_port = cfg['iopub_port']
850 849 self.hb_port = cfg['hb_port']
851 850 self.session.key = str_to_bytes(cfg['key'])
852 851
853 852 def write_connection_file(self):
854 853 """Write connection info to JSON dict in self.connection_file."""
855 854 if self._connection_file_written:
856 855 return
857 856 self.connection_file,cfg = write_connection_file(self.connection_file,
858 857 transport=self.transport, ip=self.ip, key=self.session.key,
859 858 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
860 859 shell_port=self.shell_port, hb_port=self.hb_port)
861 860 # write_connection_file also sets default ports:
862 861 self.shell_port = cfg['shell_port']
863 862 self.stdin_port = cfg['stdin_port']
864 863 self.iopub_port = cfg['iopub_port']
865 864 self.hb_port = cfg['hb_port']
866 865
867 866 self._connection_file_written = True
868 867
869 868 #--------------------------------------------------------------------------
870 869 # Kernel management.
871 870 #--------------------------------------------------------------------------
872 871
873 872 def start_kernel(self, **kw):
874 873 """Starts a kernel on this host in a separate process.
875 874
876 875 If random ports (port=0) are being used, this method must be called
877 876 before the channels are created.
878 877
879 878 Parameters:
880 879 -----------
881 880 launcher : callable, optional (default None)
882 881 A custom function for launching the kernel process (generally a
883 882 wrapper around ``entry_point.base_launch_kernel``). In most cases,
884 883 it should not be necessary to use this parameter.
885 884
886 885 **kw : optional
887 886 keyword arguments that are passed down into the launcher
888 887 callable.
889 888 """
890 889 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
891 890 raise RuntimeError("Can only launch a kernel on a local interface. "
892 891 "Make sure that the '*_address' attributes are "
893 892 "configured properly. "
894 893 "Currently valid addresses are: %s"%LOCAL_IPS
895 894 )
896 895
897 896 # write connection file / get default ports
898 897 self.write_connection_file()
899 898
900 899 self._launch_args = kw.copy()
901 900 launch_kernel = kw.pop('launcher', None)
902 901 if launch_kernel is None:
903 902 from ipkernel import launch_kernel
904 903 self.kernel = launch_kernel(fname=self.connection_file, **kw)
905 904
906 905 def shutdown_kernel(self, now=False, restart=False):
907 906 """Attempts to the stop the kernel process cleanly.
908 907
909 908 This attempts to shutdown the kernels cleanly by:
910 909
911 910 1. Sending it a shutdown message over the shell channel.
912 911 2. If that fails, the kernel is shutdown forcibly by sending it
913 912 a signal.
914 913
915 914 Parameters:
916 915 -----------
917 916 now : bool
918 917 Should the kernel be forcible killed *now*. This skips the
919 918 first, nice shutdown attempt.
920 919 restart: bool
921 920 Will this kernel be restarted after it is shutdown. When this
922 921 is True, connection files will not be cleaned up.
923 922 """
924 923 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
925 924 if sys.platform == 'win32':
926 925 self._kill_kernel()
927 926 return
928 927
929 928 # Pause the heart beat channel if it exists.
930 929 if self._hb_channel is not None:
931 930 self._hb_channel.pause()
932 931
933 932 if now:
934 933 if self.has_kernel:
935 934 self._kill_kernel()
936 935 else:
937 936 # Don't send any additional kernel kill messages immediately, to give
938 937 # the kernel a chance to properly execute shutdown actions. Wait for at
939 938 # most 1s, checking every 0.1s.
940 939 self.shell_channel.shutdown(restart=restart)
941 940 for i in range(10):
942 941 if self.is_alive:
943 942 time.sleep(0.1)
944 943 else:
945 944 break
946 945 else:
947 946 # OK, we've waited long enough.
948 947 if self.has_kernel:
949 948 self._kill_kernel()
950 949
951 950 if not restart:
952 951 self.cleanup_connection_file()
953 952 else:
954 953 self.cleanup_ipc_files()
955 954
956 955 def restart_kernel(self, now=False, **kw):
957 956 """Restarts a kernel with the arguments that were used to launch it.
958 957
959 958 If the old kernel was launched with random ports, the same ports will be
960 959 used for the new kernel. The same connection file is used again.
961 960
962 961 Parameters
963 962 ----------
964 963 now : bool, optional
965 964 If True, the kernel is forcefully restarted *immediately*, without
966 965 having a chance to do any cleanup action. Otherwise the kernel is
967 966 given 1s to clean up before a forceful restart is issued.
968 967
969 968 In all cases the kernel is restarted, the only difference is whether
970 969 it is given a chance to perform a clean shutdown or not.
971 970
972 971 **kw : optional
973 972 Any options specified here will overwrite those used to launch the
974 973 kernel.
975 974 """
976 975 if self._launch_args is None:
977 976 raise RuntimeError("Cannot restart the kernel. "
978 977 "No previous call to 'start_kernel'.")
979 978 else:
980 979 # Stop currently running kernel.
981 980 self.shutdown_kernel(now=now, restart=True)
982 981
983 982 # Start new kernel.
984 983 self._launch_args.update(kw)
985 984 self.start_kernel(**self._launch_args)
986 985
987 986 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
988 987 # unless there is some delay here.
989 988 if sys.platform == 'win32':
990 989 time.sleep(0.2)
991 990
992 991 @property
993 992 def has_kernel(self):
994 993 """Has a kernel been started that we are managing."""
995 994 return self.kernel is not None
996 995
997 996 def _kill_kernel(self):
998 997 """Kill the running kernel.
999 998
1000 999 This is a private method, callers should use shutdown_kernel(now=True).
1001 1000 """
1002 1001 if self.has_kernel:
1003 1002 # Pause the heart beat channel if it exists.
1004 1003 if self._hb_channel is not None:
1005 1004 self._hb_channel.pause()
1006 1005
1007 1006 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1008 1007 # TerminateProcess() on Win32).
1009 1008 try:
1010 1009 self.kernel.kill()
1011 1010 except OSError as e:
1012 1011 # In Windows, we will get an Access Denied error if the process
1013 1012 # has already terminated. Ignore it.
1014 1013 if sys.platform == 'win32':
1015 1014 if e.winerror != 5:
1016 1015 raise
1017 1016 # On Unix, we may get an ESRCH error if the process has already
1018 1017 # terminated. Ignore it.
1019 1018 else:
1020 1019 from errno import ESRCH
1021 1020 if e.errno != ESRCH:
1022 1021 raise
1023 1022
1024 1023 # Block until the kernel terminates.
1025 1024 self.kernel.wait()
1026 1025 self.kernel = None
1027 1026 else:
1028 1027 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1029 1028
1030 1029 def interrupt_kernel(self):
1031 1030 """Interrupts the kernel by sending it a signal.
1032 1031
1033 1032 Unlike ``signal_kernel``, this operation is well supported on all
1034 1033 platforms.
1035 1034 """
1036 1035 if self.has_kernel:
1037 1036 if sys.platform == 'win32':
1038 1037 from parentpoller import ParentPollerWindows as Poller
1039 1038 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1040 1039 else:
1041 1040 self.kernel.send_signal(signal.SIGINT)
1042 1041 else:
1043 1042 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1044 1043
1045 1044 def signal_kernel(self, signum):
1046 1045 """Sends a signal to the kernel.
1047 1046
1048 1047 Note that since only SIGTERM is supported on Windows, this function is
1049 1048 only useful on Unix systems.
1050 1049 """
1051 1050 if self.has_kernel:
1052 1051 self.kernel.send_signal(signum)
1053 1052 else:
1054 1053 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1055 1054
1056 1055 @property
1057 1056 def is_alive(self):
1058 1057 """Is the kernel process still running?"""
1059 1058 if self.has_kernel:
1060 1059 if self.kernel.poll() is None:
1061 1060 return True
1062 1061 else:
1063 1062 return False
1064 1063 elif self._hb_channel is not None:
1065 1064 # We didn't start the kernel with this KernelManager so we
1066 1065 # use the heartbeat.
1067 1066 return self._hb_channel.is_beating()
1068 1067 else:
1069 1068 # no heartbeat and not local, we can't tell if it's running,
1070 1069 # so naively return True
1071 1070 return True
1072 1071
1073 1072
1074 1073 #-----------------------------------------------------------------------------
1075 1074 # ABC Registration
1076 1075 #-----------------------------------------------------------------------------
1077 1076
1078 1077 ShellChannelABC.register(ShellChannel)
1079 1078 IOPubChannelABC.register(IOPubChannel)
1080 1079 HBChannelABC.register(HBChannel)
1081 1080 StdInChannelABC.register(StdInChannel)
1082 1081 KernelManagerABC.register(KernelManager)
1083 1082
@@ -1,36 +1,44 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 import time
3 4 from unittest import TestCase
4 5
5 6 from IPython.config.loader import Config
6 7 from IPython.zmq.kernelmanager import KernelManager
7 8
8 9 class TestKernelManager(TestCase):
9 10
10 11 def _get_tcp_km(self):
11 12 return KernelManager()
12 13
13 14 def _get_ipc_km(self):
14 15 c = Config()
15 16 c.KernelManager.transport = 'ipc'
16 17 c.KernelManager.ip = 'test'
17 18 km = KernelManager(config=c)
18 19 return km
19 20
20 21 def _run_lifecycle(self, km):
21 22 km.start_kernel()
22 23 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
23 # km.shell_channel.start()
24 24 km.restart_kernel()
25 # We need a delay here to give the restarting kernel a chance to
26 # restart. Otherwise, the interrupt will kill it, causing the test
27 # suite to hang. The reason it *hangs* is that the shutdown
28 # message for the restart sometimes hasn't been sent to the kernel.
29 # Because linger is oo on the shell channel, the context can't
30 # close until the message is sent to the kernel, which is not dead.
31 time.sleep()
25 32 km.interrupt_kernel()
26 33 self.assertTrue(isinstance(km, KernelManager))
27 34 km.shutdown_kernel()
28 35 km.shell_channel.stop()
29 36
30 def test_km_tcp(self):
37 def test_tcp_lifecycle(self):
31 38 km = self._get_tcp_km()
32 39 self._run_lifecycle(km)
33 40
34 def test_km_ipc(self):
41 def testipc_lifecycle(self):
35 42 km = self._get_ipc_km()
36 43 self._run_lifecycle(km)
44
General Comments 0
You need to be logged in to leave comments. Login now