##// END OF EJS Templates
Cleaning up MultiKernelManager test and adding linger=0 to shell....
Brian Granger -
Show More
@@ -1,67 +1,60
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 3 from unittest import TestCase
4 4
5 5 from IPython.config.loader import Config
6 6 from IPython.frontend.html.notebook.kernelmanager import MultiKernelManager
7 7 from IPython.zmq.kernelmanager import KernelManager
8 8
9 9 class TestKernelManager(TestCase):
10 10
11 11 def _get_tcp_km(self):
12 12 return MultiKernelManager()
13 13
14 14 def _get_ipc_km(self):
15 15 c = Config()
16 16 c.KernelManager.transport = 'ipc'
17 17 c.KernelManager.ip = 'test'
18 18 km = MultiKernelManager(config=c)
19 19 return km
20 20
21 21 def _run_lifecycle(self, km):
22 22 kid = km.start_kernel()
23 23 self.assertTrue(kid in km)
24 24 self.assertTrue(kid in km.list_kernel_ids())
25 25 self.assertEqual(len(km),1)
26 26 km.restart_kernel(kid)
27 27 self.assertTrue(kid in km.list_kernel_ids())
28 28 km.interrupt_kernel(kid)
29 29 k = km.get_kernel(kid)
30 30 self.assertTrue(isinstance(k, KernelManager))
31 31 km.shutdown_kernel(kid)
32 32 self.assertTrue(not kid in km)
33 33
34 def test_km_tcp(self):
35 km = self._get_tcp_km()
36 self._run_lifecycle(km)
37
38 def test_km_ipc(self):
39 km = self._get_ipc_km()
40 self._run_lifecycle(km)
41
42 def test_tcp_cinfo(self):
43 km = self._get_tcp_km()
34 def _run_cinfo(self, km, transport, ip):
44 35 kid = km.start_kernel()
45 36 k = km.get_kernel(kid)
46 37 cinfo = km.get_connection_info(kid)
47 self.assertEqual('tcp', cinfo['transport'])
48 self.assertEqual('127.0.0.1', cinfo['ip'])
38 self.assertEqual(transport, cinfo['transport'])
39 self.assertEqual(ip, cinfo['ip'])
49 40 self.assertTrue('stdin_port' in cinfo)
50 41 self.assertTrue('iopub_port' in cinfo)
51 42 self.assertTrue('shell_port' in cinfo)
52 43 self.assertTrue('hb_port' in cinfo)
53 44 km.shutdown_kernel(kid)
54 45
46 def test_km_tcp(self):
47 km = self._get_tcp_km()
48 self._run_lifecycle(km)
49
50 def test_tcp_cinfo(self):
51 km = self._get_tcp_km()
52 self._run_cinfo(km, 'tcp', '127.0.0.1')
53
54 def test_km_ipc(self):
55 km = self._get_ipc_km()
56 self._run_lifecycle(km)
57
55 58 def test_ipc_cinfo(self):
56 59 km = self._get_ipc_km()
57 kid = km.start_kernel()
58 k = km.get_kernel(kid)
59 cinfo = km.get_connection_info(kid)
60 self.assertEqual('ipc', cinfo['transport'])
61 self.assertEqual('test', cinfo['ip'])
62 self.assertTrue('stdin_port' in cinfo)
63 self.assertTrue('iopub_port' in cinfo)
64 self.assertTrue('shell_port' in cinfo)
65 self.assertTrue('hb_port' in cinfo)
66 km.shutdown_kernel(kid)
67
60 self._run_cinfo(km, 'ipc', 'test')
@@ -1,1054 +1,1055
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45 from IPython.zmq.kernelmanagerabc import (
46 46 ShellChannelABC, IOPubChannelABC,
47 47 HBChannelABC, StdInChannelABC,
48 48 KernelManagerABC
49 49 )
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Constants and exceptions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class InvalidPortNumber(Exception):
57 57 pass
58 58
59 59 #-----------------------------------------------------------------------------
60 60 # Utility functions
61 61 #-----------------------------------------------------------------------------
62 62
63 63 # some utilities to validate message structure, these might get moved elsewhere
64 64 # if they prove to have more generic utility
65 65
66 66 def validate_string_list(lst):
67 67 """Validate that the input is a list of strings.
68 68
69 69 Raises ValueError if not."""
70 70 if not isinstance(lst, list):
71 71 raise ValueError('input %r must be a list' % lst)
72 72 for x in lst:
73 73 if not isinstance(x, basestring):
74 74 raise ValueError('element %r in list must be a string' % x)
75 75
76 76
77 77 def validate_string_dict(dct):
78 78 """Validate that the input is a dict with string keys and values.
79 79
80 80 Raises ValueError if not."""
81 81 for k,v in dct.iteritems():
82 82 if not isinstance(k, basestring):
83 83 raise ValueError('key %r in dict must be a string' % k)
84 84 if not isinstance(v, basestring):
85 85 raise ValueError('value %r in dict must be a string' % v)
86 86
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # ZMQ Socket Channel classes
90 90 #-----------------------------------------------------------------------------
91 91
92 92 class ZMQSocketChannel(Thread):
93 93 """The base class for the channels that use ZMQ sockets.
94 94 """
95 95 context = None
96 96 session = None
97 97 socket = None
98 98 ioloop = None
99 99 stream = None
100 100 _address = None
101 101 _exiting = False
102 102
103 103 def __init__(self, context, session, address):
104 104 """Create a channel
105 105
106 106 Parameters
107 107 ----------
108 108 context : :class:`zmq.Context`
109 109 The ZMQ context to use.
110 110 session : :class:`session.Session`
111 111 The session to use.
112 112 address : zmq url
113 113 Standard (ip, port) tuple that the kernel is listening on.
114 114 """
115 115 super(ZMQSocketChannel, self).__init__()
116 116 self.daemon = True
117 117
118 118 self.context = context
119 119 self.session = session
120 120 if isinstance(address, tuple):
121 121 if address[1] == 0:
122 122 message = 'The port number for a channel cannot be 0.'
123 123 raise InvalidPortNumber(message)
124 124 address = "tcp://%s:%i" % address
125 125 self._address = address
126 126 atexit.register(self._notice_exit)
127 127
128 128 def _notice_exit(self):
129 129 self._exiting = True
130 130
131 131 def _run_loop(self):
132 132 """Run my loop, ignoring EINTR events in the poller"""
133 133 while True:
134 134 try:
135 135 self.ioloop.start()
136 136 except ZMQError as e:
137 137 if e.errno == errno.EINTR:
138 138 continue
139 139 else:
140 140 raise
141 141 except Exception:
142 142 if self._exiting:
143 143 break
144 144 else:
145 145 raise
146 146 else:
147 147 break
148 148
149 149 def stop(self):
150 150 """Stop the channel's activity.
151 151
152 152 This calls :method:`Thread.join` and returns when the thread
153 153 terminates. :class:`RuntimeError` will be raised if
154 154 :method:`self.start` is called again.
155 155 """
156 156 self.join()
157 157
158 158 @property
159 159 def address(self):
160 160 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
161 161 """
162 162 return self._address
163 163
164 164 def _queue_send(self, msg):
165 165 """Queue a message to be sent from the IOLoop's thread.
166 166
167 167 Parameters
168 168 ----------
169 169 msg : message to send
170 170
171 171 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
172 172 thread control of the action.
173 173 """
174 174 def thread_send():
175 175 self.session.send(self.stream, msg)
176 176 self.ioloop.add_callback(thread_send)
177 177
178 178 def _handle_recv(self, msg):
179 179 """callback for stream.on_recv
180 180
181 181 unpacks message, and calls handlers with it.
182 182 """
183 183 ident,smsg = self.session.feed_identities(msg)
184 184 self.call_handlers(self.session.unserialize(smsg))
185 185
186 186
187 187
188 188 class ShellChannel(ZMQSocketChannel):
189 189 """The DEALER channel for issues request/replies to the kernel.
190 190 """
191 191
192 192 command_queue = None
193 193 # flag for whether execute requests should be allowed to call raw_input:
194 194 allow_stdin = True
195 195
196 196 def __init__(self, context, session, address):
197 197 super(ShellChannel, self).__init__(context, session, address)
198 198 self.ioloop = ioloop.IOLoop()
199 199
200 200 def run(self):
201 201 """The thread's main activity. Call start() instead."""
202 202 self.socket = self.context.socket(zmq.DEALER)
203 203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.linger = 0
204 205 self.socket.connect(self.address)
205 206 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 207 self.stream.on_recv(self._handle_recv)
207 208 self._run_loop()
208 209 try:
209 210 self.socket.close()
210 211 except:
211 212 pass
212 213
213 214 def stop(self):
214 215 self.ioloop.stop()
215 216 super(ShellChannel, self).stop()
216 217
217 218 def call_handlers(self, msg):
218 219 """This method is called in the ioloop thread when a message arrives.
219 220
220 221 Subclasses should override this method to handle incoming messages.
221 222 It is important to remember that this method is called in the thread
222 223 so that some logic must be done to ensure that the application leve
223 224 handlers are called in the application thread.
224 225 """
225 226 raise NotImplementedError('call_handlers must be defined in a subclass.')
226 227
227 228 def execute(self, code, silent=False, store_history=True,
228 229 user_variables=None, user_expressions=None, allow_stdin=None):
229 230 """Execute code in the kernel.
230 231
231 232 Parameters
232 233 ----------
233 234 code : str
234 235 A string of Python code.
235 236
236 237 silent : bool, optional (default False)
237 238 If set, the kernel will execute the code as quietly possible, and
238 239 will force store_history to be False.
239 240
240 241 store_history : bool, optional (default True)
241 242 If set, the kernel will store command history. This is forced
242 243 to be False if silent is True.
243 244
244 245 user_variables : list, optional
245 246 A list of variable names to pull from the user's namespace. They
246 247 will come back as a dict with these names as keys and their
247 248 :func:`repr` as values.
248 249
249 250 user_expressions : dict, optional
250 251 A dict mapping names to expressions to be evaluated in the user's
251 252 dict. The expression values are returned as strings formatted using
252 253 :func:`repr`.
253 254
254 255 allow_stdin : bool, optional (default self.allow_stdin)
255 256 Flag for whether the kernel can send stdin requests to frontends.
256 257
257 258 Some frontends (e.g. the Notebook) do not support stdin requests.
258 259 If raw_input is called from code executed from such a frontend, a
259 260 StdinNotImplementedError will be raised.
260 261
261 262 Returns
262 263 -------
263 264 The msg_id of the message sent.
264 265 """
265 266 if user_variables is None:
266 267 user_variables = []
267 268 if user_expressions is None:
268 269 user_expressions = {}
269 270 if allow_stdin is None:
270 271 allow_stdin = self.allow_stdin
271 272
272 273
273 274 # Don't waste network traffic if inputs are invalid
274 275 if not isinstance(code, basestring):
275 276 raise ValueError('code %r must be a string' % code)
276 277 validate_string_list(user_variables)
277 278 validate_string_dict(user_expressions)
278 279
279 280 # Create class for content/msg creation. Related to, but possibly
280 281 # not in Session.
281 282 content = dict(code=code, silent=silent, store_history=store_history,
282 283 user_variables=user_variables,
283 284 user_expressions=user_expressions,
284 285 allow_stdin=allow_stdin,
285 286 )
286 287 msg = self.session.msg('execute_request', content)
287 288 self._queue_send(msg)
288 289 return msg['header']['msg_id']
289 290
290 291 def complete(self, text, line, cursor_pos, block=None):
291 292 """Tab complete text in the kernel's namespace.
292 293
293 294 Parameters
294 295 ----------
295 296 text : str
296 297 The text to complete.
297 298 line : str
298 299 The full line of text that is the surrounding context for the
299 300 text to complete.
300 301 cursor_pos : int
301 302 The position of the cursor in the line where the completion was
302 303 requested.
303 304 block : str, optional
304 305 The full block of code in which the completion is being requested.
305 306
306 307 Returns
307 308 -------
308 309 The msg_id of the message sent.
309 310 """
310 311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
311 312 msg = self.session.msg('complete_request', content)
312 313 self._queue_send(msg)
313 314 return msg['header']['msg_id']
314 315
315 316 def object_info(self, oname, detail_level=0):
316 317 """Get metadata information about an object.
317 318
318 319 Parameters
319 320 ----------
320 321 oname : str
321 322 A string specifying the object name.
322 323 detail_level : int, optional
323 324 The level of detail for the introspection (0-2)
324 325
325 326 Returns
326 327 -------
327 328 The msg_id of the message sent.
328 329 """
329 330 content = dict(oname=oname, detail_level=detail_level)
330 331 msg = self.session.msg('object_info_request', content)
331 332 self._queue_send(msg)
332 333 return msg['header']['msg_id']
333 334
334 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
335 336 """Get entries from the history list.
336 337
337 338 Parameters
338 339 ----------
339 340 raw : bool
340 341 If True, return the raw input.
341 342 output : bool
342 343 If True, then return the output as well.
343 344 hist_access_type : str
344 345 'range' (fill in session, start and stop params), 'tail' (fill in n)
345 346 or 'search' (fill in pattern param).
346 347
347 348 session : int
348 349 For a range request, the session from which to get lines. Session
349 350 numbers are positive integers; negative ones count back from the
350 351 current session.
351 352 start : int
352 353 The first line number of a history range.
353 354 stop : int
354 355 The final (excluded) line number of a history range.
355 356
356 357 n : int
357 358 The number of lines of history to get for a tail request.
358 359
359 360 pattern : str
360 361 The glob-syntax pattern for a search request.
361 362
362 363 Returns
363 364 -------
364 365 The msg_id of the message sent.
365 366 """
366 367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
367 368 **kwargs)
368 369 msg = self.session.msg('history_request', content)
369 370 self._queue_send(msg)
370 371 return msg['header']['msg_id']
371 372
372 373 def kernel_info(self):
373 374 """Request kernel info."""
374 375 msg = self.session.msg('kernel_info_request')
375 376 self._queue_send(msg)
376 377 return msg['header']['msg_id']
377 378
378 379 def shutdown(self, restart=False):
379 380 """Request an immediate kernel shutdown.
380 381
381 382 Upon receipt of the (empty) reply, client code can safely assume that
382 383 the kernel has shut down and it's safe to forcefully terminate it if
383 384 it's still alive.
384 385
385 386 The kernel will send the reply via a function registered with Python's
386 387 atexit module, ensuring it's truly done as the kernel is done with all
387 388 normal operation.
388 389 """
389 390 # Send quit message to kernel. Once we implement kernel-side setattr,
390 391 # this should probably be done that way, but for now this will do.
391 392 msg = self.session.msg('shutdown_request', {'restart':restart})
392 393 self._queue_send(msg)
393 394 return msg['header']['msg_id']
394 395
395 396
396 397
397 398 class IOPubChannel(ZMQSocketChannel):
398 399 """The SUB channel which listens for messages that the kernel publishes.
399 400 """
400 401
401 402 def __init__(self, context, session, address):
402 403 super(IOPubChannel, self).__init__(context, session, address)
403 404 self.ioloop = ioloop.IOLoop()
404 405
405 406 def run(self):
406 407 """The thread's main activity. Call start() instead."""
407 408 self.socket = self.context.socket(zmq.SUB)
408 409 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
409 410 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
410 411 self.socket.connect(self.address)
411 412 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
412 413 self.stream.on_recv(self._handle_recv)
413 414 self._run_loop()
414 415 try:
415 416 self.socket.close()
416 417 except:
417 418 pass
418 419
419 420 def stop(self):
420 421 self.ioloop.stop()
421 422 super(IOPubChannel, self).stop()
422 423
423 424 def call_handlers(self, msg):
424 425 """This method is called in the ioloop thread when a message arrives.
425 426
426 427 Subclasses should override this method to handle incoming messages.
427 428 It is important to remember that this method is called in the thread
428 429 so that some logic must be done to ensure that the application leve
429 430 handlers are called in the application thread.
430 431 """
431 432 raise NotImplementedError('call_handlers must be defined in a subclass.')
432 433
433 434 def flush(self, timeout=1.0):
434 435 """Immediately processes all pending messages on the SUB channel.
435 436
436 437 Callers should use this method to ensure that :method:`call_handlers`
437 438 has been called for all messages that have been received on the
438 439 0MQ SUB socket of this channel.
439 440
440 441 This method is thread safe.
441 442
442 443 Parameters
443 444 ----------
444 445 timeout : float, optional
445 446 The maximum amount of time to spend flushing, in seconds. The
446 447 default is one second.
447 448 """
448 449 # We do the IOLoop callback process twice to ensure that the IOLoop
449 450 # gets to perform at least one full poll.
450 451 stop_time = time.time() + timeout
451 452 for i in xrange(2):
452 453 self._flushed = False
453 454 self.ioloop.add_callback(self._flush)
454 455 while not self._flushed and time.time() < stop_time:
455 456 time.sleep(0.01)
456 457
457 458 def _flush(self):
458 459 """Callback for :method:`self.flush`."""
459 460 self.stream.flush()
460 461 self._flushed = True
461 462
462 463
463 464 class StdInChannel(ZMQSocketChannel):
464 465 """A reply channel to handle raw_input requests that the kernel makes."""
465 466
466 467 msg_queue = None
467 468
468 469 def __init__(self, context, session, address):
469 470 super(StdInChannel, self).__init__(context, session, address)
470 471 self.ioloop = ioloop.IOLoop()
471 472
472 473 def run(self):
473 474 """The thread's main activity. Call start() instead."""
474 475 self.socket = self.context.socket(zmq.DEALER)
475 476 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
476 477 self.socket.connect(self.address)
477 478 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
478 479 self.stream.on_recv(self._handle_recv)
479 480 self._run_loop()
480 481 try:
481 482 self.socket.close()
482 483 except:
483 484 pass
484 485
485 486
486 487 def stop(self):
487 488 self.ioloop.stop()
488 489 super(StdInChannel, self).stop()
489 490
490 491 def call_handlers(self, msg):
491 492 """This method is called in the ioloop thread when a message arrives.
492 493
493 494 Subclasses should override this method to handle incoming messages.
494 495 It is important to remember that this method is called in the thread
495 496 so that some logic must be done to ensure that the application leve
496 497 handlers are called in the application thread.
497 498 """
498 499 raise NotImplementedError('call_handlers must be defined in a subclass.')
499 500
500 501 def input(self, string):
501 502 """Send a string of raw input to the kernel."""
502 503 content = dict(value=string)
503 504 msg = self.session.msg('input_reply', content)
504 505 self._queue_send(msg)
505 506
506 507
507 508 class HBChannel(ZMQSocketChannel):
508 509 """The heartbeat channel which monitors the kernel heartbeat.
509 510
510 511 Note that the heartbeat channel is paused by default. As long as you start
511 512 this channel, the kernel manager will ensure that it is paused and un-paused
512 513 as appropriate.
513 514 """
514 515
515 516 time_to_dead = 3.0
516 517 socket = None
517 518 poller = None
518 519 _running = None
519 520 _pause = None
520 521 _beating = None
521 522
522 523 def __init__(self, context, session, address):
523 524 super(HBChannel, self).__init__(context, session, address)
524 525 self._running = False
525 526 self._pause =True
526 527 self.poller = zmq.Poller()
527 528
528 529 def _create_socket(self):
529 530 if self.socket is not None:
530 531 # close previous socket, before opening a new one
531 532 self.poller.unregister(self.socket)
532 533 self.socket.close()
533 534 self.socket = self.context.socket(zmq.REQ)
534 535 self.socket.setsockopt(zmq.LINGER, 0)
535 536 self.socket.connect(self.address)
536 537
537 538 self.poller.register(self.socket, zmq.POLLIN)
538 539
539 540 def _poll(self, start_time):
540 541 """poll for heartbeat replies until we reach self.time_to_dead
541 542
542 543 Ignores interrupts, and returns the result of poll(), which
543 544 will be an empty list if no messages arrived before the timeout,
544 545 or the event tuple if there is a message to receive.
545 546 """
546 547
547 548 until_dead = self.time_to_dead - (time.time() - start_time)
548 549 # ensure poll at least once
549 550 until_dead = max(until_dead, 1e-3)
550 551 events = []
551 552 while True:
552 553 try:
553 554 events = self.poller.poll(1000 * until_dead)
554 555 except ZMQError as e:
555 556 if e.errno == errno.EINTR:
556 557 # ignore interrupts during heartbeat
557 558 # this may never actually happen
558 559 until_dead = self.time_to_dead - (time.time() - start_time)
559 560 until_dead = max(until_dead, 1e-3)
560 561 pass
561 562 else:
562 563 raise
563 564 except Exception:
564 565 if self._exiting:
565 566 break
566 567 else:
567 568 raise
568 569 else:
569 570 break
570 571 return events
571 572
572 573 def run(self):
573 574 """The thread's main activity. Call start() instead."""
574 575 self._create_socket()
575 576 self._running = True
576 577 self._beating = True
577 578
578 579 while self._running:
579 580 if self._pause:
580 581 # just sleep, and skip the rest of the loop
581 582 time.sleep(self.time_to_dead)
582 583 continue
583 584
584 585 since_last_heartbeat = 0.0
585 586 # io.rprint('Ping from HB channel') # dbg
586 587 # no need to catch EFSM here, because the previous event was
587 588 # either a recv or connect, which cannot be followed by EFSM
588 589 self.socket.send(b'ping')
589 590 request_time = time.time()
590 591 ready = self._poll(request_time)
591 592 if ready:
592 593 self._beating = True
593 594 # the poll above guarantees we have something to recv
594 595 self.socket.recv()
595 596 # sleep the remainder of the cycle
596 597 remainder = self.time_to_dead - (time.time() - request_time)
597 598 if remainder > 0:
598 599 time.sleep(remainder)
599 600 continue
600 601 else:
601 602 # nothing was received within the time limit, signal heart failure
602 603 self._beating = False
603 604 since_last_heartbeat = time.time() - request_time
604 605 self.call_handlers(since_last_heartbeat)
605 606 # and close/reopen the socket, because the REQ/REP cycle has been broken
606 607 self._create_socket()
607 608 continue
608 609 try:
609 610 self.socket.close()
610 611 except:
611 612 pass
612 613
613 614 def pause(self):
614 615 """Pause the heartbeat."""
615 616 self._pause = True
616 617
617 618 def unpause(self):
618 619 """Unpause the heartbeat."""
619 620 self._pause = False
620 621
621 622 def is_beating(self):
622 623 """Is the heartbeat running and responsive (and not paused)."""
623 624 if self.is_alive() and not self._pause and self._beating:
624 625 return True
625 626 else:
626 627 return False
627 628
628 629 def stop(self):
629 630 self._running = False
630 631 super(HBChannel, self).stop()
631 632
632 633 def call_handlers(self, since_last_heartbeat):
633 634 """This method is called in the ioloop thread when a message arrives.
634 635
635 636 Subclasses should override this method to handle incoming messages.
636 637 It is important to remember that this method is called in the thread
637 638 so that some logic must be done to ensure that the application level
638 639 handlers are called in the application thread.
639 640 """
640 641 raise NotImplementedError('call_handlers must be defined in a subclass.')
641 642
642 643
643 644 #-----------------------------------------------------------------------------
644 645 # Main kernel manager class
645 646 #-----------------------------------------------------------------------------
646 647
647 648 class KernelManager(Configurable):
648 649 """ Manages a kernel for a frontend.
649 650
650 651 The SUB channel is for the frontend to receive messages published by the
651 652 kernel.
652 653
653 654 The REQ channel is for the frontend to make requests of the kernel.
654 655
655 656 The REP channel is for the kernel to request stdin (raw_input) from the
656 657 frontend.
657 658 """
658 659 # The PyZMQ Context to use for communication with the kernel.
659 660 context = Instance(zmq.Context)
660 661 def _context_default(self):
661 662 return zmq.Context.instance()
662 663
663 664 # The Session to use for communication with the kernel.
664 665 session = Instance(Session)
665 666 def _session_default(self):
666 667 return Session(config=self.config)
667 668
668 669 # The kernel process with which the KernelManager is communicating.
669 670 kernel = Instance(Popen)
670 671
671 672 # The addresses for the communication channels.
672 673 connection_file = Unicode('')
673 674
674 675 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
675 676
676 677 ip = Unicode(LOCALHOST, config=True)
677 678 def _ip_changed(self, name, old, new):
678 679 if new == '*':
679 680 self.ip = '0.0.0.0'
680 681 shell_port = Integer(0)
681 682 iopub_port = Integer(0)
682 683 stdin_port = Integer(0)
683 684 hb_port = Integer(0)
684 685
685 686 # The classes to use for the various channels.
686 687 shell_channel_class = Type(ShellChannel)
687 688 iopub_channel_class = Type(IOPubChannel)
688 689 stdin_channel_class = Type(StdInChannel)
689 690 hb_channel_class = Type(HBChannel)
690 691
691 692 # Protected traits.
692 693 _launch_args = Any
693 694 _shell_channel = Any
694 695 _iopub_channel = Any
695 696 _stdin_channel = Any
696 697 _hb_channel = Any
697 698 _connection_file_written=Bool(False)
698 699
699 700 def __del__(self):
700 701 self.cleanup_connection_file()
701 702
702 703 #--------------------------------------------------------------------------
703 704 # Channel management methods:
704 705 #--------------------------------------------------------------------------
705 706
706 707 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
707 708 """Starts the channels for this kernel.
708 709
709 710 This will create the channels if they do not exist and then start
710 711 them. If port numbers of 0 are being used (random ports) then you
711 712 must first call :method:`start_kernel`. If the channels have been
712 713 stopped and you call this, :class:`RuntimeError` will be raised.
713 714 """
714 715 if shell:
715 716 self.shell_channel.start()
716 717 if iopub:
717 718 self.iopub_channel.start()
718 719 if stdin:
719 720 self.stdin_channel.start()
720 721 self.shell_channel.allow_stdin = True
721 722 else:
722 723 self.shell_channel.allow_stdin = False
723 724 if hb:
724 725 self.hb_channel.start()
725 726
726 727 def stop_channels(self):
727 728 """Stops all the running channels for this kernel.
728 729 """
729 730 if self.shell_channel.is_alive():
730 731 self.shell_channel.stop()
731 732 if self.iopub_channel.is_alive():
732 733 self.iopub_channel.stop()
733 734 if self.stdin_channel.is_alive():
734 735 self.stdin_channel.stop()
735 736 if self.hb_channel.is_alive():
736 737 self.hb_channel.stop()
737 738
738 739 @property
739 740 def channels_running(self):
740 741 """Are any of the channels created and running?"""
741 742 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
742 743 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
743 744
744 745 def _make_url(self, port):
745 746 """make a zmq url with a port"""
746 747 if self.transport == 'tcp':
747 748 return "tcp://%s:%i" % (self.ip, port)
748 749 else:
749 750 return "%s://%s-%s" % (self.transport, self.ip, port)
750 751
751 752 @property
752 753 def shell_channel(self):
753 754 """Get the REQ socket channel object to make requests of the kernel."""
754 755 if self._shell_channel is None:
755 756 self._shell_channel = self.shell_channel_class(self.context,
756 757 self.session,
757 758 self._make_url(self.shell_port),
758 759 )
759 760 return self._shell_channel
760 761
761 762 @property
762 763 def iopub_channel(self):
763 764 """Get the SUB socket channel object."""
764 765 if self._iopub_channel is None:
765 766 self._iopub_channel = self.iopub_channel_class(self.context,
766 767 self.session,
767 768 self._make_url(self.iopub_port),
768 769 )
769 770 return self._iopub_channel
770 771
771 772 @property
772 773 def stdin_channel(self):
773 774 """Get the REP socket channel object to handle stdin (raw_input)."""
774 775 if self._stdin_channel is None:
775 776 self._stdin_channel = self.stdin_channel_class(self.context,
776 777 self.session,
777 778 self._make_url(self.stdin_port),
778 779 )
779 780 return self._stdin_channel
780 781
781 782 @property
782 783 def hb_channel(self):
783 784 """Get the heartbeat socket channel object to check that the
784 785 kernel is alive."""
785 786 if self._hb_channel is None:
786 787 self._hb_channel = self.hb_channel_class(self.context,
787 788 self.session,
788 789 self._make_url(self.hb_port),
789 790 )
790 791 return self._hb_channel
791 792
792 793 #--------------------------------------------------------------------------
793 794 # Connection and ipc file management.
794 795 #--------------------------------------------------------------------------
795 796
796 797 def cleanup_connection_file(self):
797 798 """cleanup connection file *if we wrote it*
798 799
799 800 Will not raise if the connection file was already removed somehow.
800 801 """
801 802 if self._connection_file_written:
802 803 # cleanup connection files on full shutdown of kernel we started
803 804 self._connection_file_written = False
804 805 try:
805 806 os.remove(self.connection_file)
806 807 except (IOError, OSError):
807 808 pass
808 809
809 810 self.cleanup_ipc_files()
810 811
811 812 def cleanup_ipc_files(self):
812 813 """cleanup ipc files if we wrote them"""
813 814 if self.transport != 'ipc':
814 815 return
815 816 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
816 817 ipcfile = "%s-%i" % (self.ip, port)
817 818 try:
818 819 os.remove(ipcfile)
819 820 except (IOError, OSError):
820 821 pass
821 822
822 823 def load_connection_file(self):
823 824 """load connection info from JSON dict in self.connection_file"""
824 825 with open(self.connection_file) as f:
825 826 cfg = json.loads(f.read())
826 827
827 828 from pprint import pprint
828 829 pprint(cfg)
829 830 self.transport = cfg.get('transport', 'tcp')
830 831 self.ip = cfg['ip']
831 832 self.shell_port = cfg['shell_port']
832 833 self.stdin_port = cfg['stdin_port']
833 834 self.iopub_port = cfg['iopub_port']
834 835 self.hb_port = cfg['hb_port']
835 836 self.session.key = str_to_bytes(cfg['key'])
836 837
837 838 def write_connection_file(self):
838 839 """write connection info to JSON dict in self.connection_file"""
839 840 if self._connection_file_written:
840 841 return
841 842 self.connection_file,cfg = write_connection_file(self.connection_file,
842 843 transport=self.transport, ip=self.ip, key=self.session.key,
843 844 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
844 845 shell_port=self.shell_port, hb_port=self.hb_port)
845 846 # write_connection_file also sets default ports:
846 847 self.shell_port = cfg['shell_port']
847 848 self.stdin_port = cfg['stdin_port']
848 849 self.iopub_port = cfg['iopub_port']
849 850 self.hb_port = cfg['hb_port']
850 851
851 852 self._connection_file_written = True
852 853
853 854 #--------------------------------------------------------------------------
854 855 # Kernel management.
855 856 #--------------------------------------------------------------------------
856 857
857 858 def start_kernel(self, **kw):
858 859 """Starts a kernel process and configures the manager to use it.
859 860
860 861 If random ports (port=0) are being used, this method must be called
861 862 before the channels are created.
862 863
863 864 Parameters:
864 865 -----------
865 866 launcher : callable, optional (default None)
866 867 A custom function for launching the kernel process (generally a
867 868 wrapper around ``entry_point.base_launch_kernel``). In most cases,
868 869 it should not be necessary to use this parameter.
869 870
870 871 **kw : optional
871 872 See respective options for IPython and Python kernels.
872 873 """
873 874 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
874 875 raise RuntimeError("Can only launch a kernel on a local interface. "
875 876 "Make sure that the '*_address' attributes are "
876 877 "configured properly. "
877 878 "Currently valid addresses are: %s"%LOCAL_IPS
878 879 )
879 880
880 881 # write connection file / get default ports
881 882 self.write_connection_file()
882 883
883 884 self._launch_args = kw.copy()
884 885 launch_kernel = kw.pop('launcher', None)
885 886 if launch_kernel is None:
886 887 from ipkernel import launch_kernel
887 888 self.kernel = launch_kernel(fname=self.connection_file, **kw)
888 889
889 890 def shutdown_kernel(self, now=False, restart=False):
890 891 """ Attempts to the stop the kernel process cleanly.
891 892
892 893 If the kernel cannot be stopped and the kernel is local, it is killed.
893 894 """
894 895 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
895 896 if sys.platform == 'win32':
896 897 self.kill_kernel()
897 898 return
898 899
899 900 # Pause the heart beat channel if it exists.
900 901 if self._hb_channel is not None:
901 902 self._hb_channel.pause()
902 903
903 904 if now:
904 905 if self.has_kernel:
905 906 self.kill_kernel()
906 907 else:
907 908 # Don't send any additional kernel kill messages immediately, to give
908 909 # the kernel a chance to properly execute shutdown actions. Wait for at
909 910 # most 1s, checking every 0.1s.
910 911 self.shell_channel.shutdown(restart=restart)
911 912 for i in range(10):
912 913 if self.is_alive:
913 914 time.sleep(0.1)
914 915 else:
915 916 break
916 917 else:
917 918 # OK, we've waited long enough.
918 919 if self.has_kernel:
919 920 self.kill_kernel()
920 921
921 922 if not restart:
922 923 self.cleanup_connection_file()
923 924 else:
924 925 self.cleanup_ipc_files()
925 926
926 927 def restart_kernel(self, now=False, **kw):
927 928 """Restarts a kernel with the arguments that were used to launch it.
928 929
929 930 If the old kernel was launched with random ports, the same ports will be
930 931 used for the new kernel.
931 932
932 933 Parameters
933 934 ----------
934 935 now : bool, optional
935 936 If True, the kernel is forcefully restarted *immediately*, without
936 937 having a chance to do any cleanup action. Otherwise the kernel is
937 938 given 1s to clean up before a forceful restart is issued.
938 939
939 940 In all cases the kernel is restarted, the only difference is whether
940 941 it is given a chance to perform a clean shutdown or not.
941 942
942 943 **kw : optional
943 944 Any options specified here will replace those used to launch the
944 945 kernel.
945 946 """
946 947 if self._launch_args is None:
947 948 raise RuntimeError("Cannot restart the kernel. "
948 949 "No previous call to 'start_kernel'.")
949 950 else:
950 951 # Stop currently running kernel.
951 952 self.shutdown_kernel(now=now, restart=True)
952 953
953 954 # Start new kernel.
954 955 self._launch_args.update(kw)
955 956 self.start_kernel(**self._launch_args)
956 957
957 958 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
958 959 # unless there is some delay here.
959 960 if sys.platform == 'win32':
960 961 time.sleep(0.2)
961 962
962 963 @property
963 964 def has_kernel(self):
964 965 """Returns whether a kernel process has been specified for the kernel
965 966 manager.
966 967 """
967 968 return self.kernel is not None
968 969
969 970 def kill_kernel(self):
970 971 """ Kill the running kernel.
971 972
972 973 This method blocks until the kernel process has terminated.
973 974 """
974 975 if self.has_kernel:
975 976 # Pause the heart beat channel if it exists.
976 977 if self._hb_channel is not None:
977 978 self._hb_channel.pause()
978 979
979 980 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
980 981 # TerminateProcess() on Win32).
981 982 try:
982 983 self.kernel.kill()
983 984 except OSError as e:
984 985 # In Windows, we will get an Access Denied error if the process
985 986 # has already terminated. Ignore it.
986 987 if sys.platform == 'win32':
987 988 if e.winerror != 5:
988 989 raise
989 990 # On Unix, we may get an ESRCH error if the process has already
990 991 # terminated. Ignore it.
991 992 else:
992 993 from errno import ESRCH
993 994 if e.errno != ESRCH:
994 995 raise
995 996
996 997 # Block until the kernel terminates.
997 998 self.kernel.wait()
998 999 self.kernel = None
999 1000 else:
1000 1001 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1001 1002
1002 1003 def interrupt_kernel(self):
1003 1004 """ Interrupts the kernel.
1004 1005
1005 1006 Unlike ``signal_kernel``, this operation is well supported on all
1006 1007 platforms.
1007 1008 """
1008 1009 if self.has_kernel:
1009 1010 if sys.platform == 'win32':
1010 1011 from parentpoller import ParentPollerWindows as Poller
1011 1012 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1012 1013 else:
1013 1014 self.kernel.send_signal(signal.SIGINT)
1014 1015 else:
1015 1016 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1016 1017
1017 1018 def signal_kernel(self, signum):
1018 1019 """ Sends a signal to the kernel.
1019 1020
1020 1021 Note that since only SIGTERM is supported on Windows, this function is
1021 1022 only useful on Unix systems.
1022 1023 """
1023 1024 if self.has_kernel:
1024 1025 self.kernel.send_signal(signum)
1025 1026 else:
1026 1027 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1027 1028
1028 1029 @property
1029 1030 def is_alive(self):
1030 1031 """Is the kernel process still running?"""
1031 1032 if self.has_kernel:
1032 1033 if self.kernel.poll() is None:
1033 1034 return True
1034 1035 else:
1035 1036 return False
1036 1037 elif self._hb_channel is not None:
1037 1038 # We didn't start the kernel with this KernelManager so we
1038 1039 # use the heartbeat.
1039 1040 return self._hb_channel.is_beating()
1040 1041 else:
1041 1042 # no heartbeat and not local, we can't tell if it's running,
1042 1043 # so naively return True
1043 1044 return True
1044 1045
1045 1046
1046 1047 #-----------------------------------------------------------------------------
1047 1048 # ABC Registration
1048 1049 #-----------------------------------------------------------------------------
1049 1050
1050 1051 ShellChannelABC.register(ShellChannel)
1051 1052 IOPubChannelABC.register(IOPubChannel)
1052 1053 HBChannelABC.register(HBChannel)
1053 1054 StdInChannelABC.register(StdInChannel)
1054 1055 KernelManagerABC.register(KernelManager)
General Comments 0
You need to be logged in to leave comments. Login now