##// END OF EJS Templates
Final cleanup of kernelmanager...
Brian E. Granger -
Show More
@@ -1,1082 +1,1082 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45 from IPython.zmq.kernelmanagerabc import (
46 46 ShellChannelABC, IOPubChannelABC,
47 47 HBChannelABC, StdInChannelABC,
48 48 KernelManagerABC
49 49 )
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Constants and exceptions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class InvalidPortNumber(Exception):
57 57 pass
58 58
59 59 #-----------------------------------------------------------------------------
60 60 # Utility functions
61 61 #-----------------------------------------------------------------------------
62 62
63 63 # some utilities to validate message structure, these might get moved elsewhere
64 64 # if they prove to have more generic utility
65 65
66 66 def validate_string_list(lst):
67 67 """Validate that the input is a list of strings.
68 68
69 69 Raises ValueError if not."""
70 70 if not isinstance(lst, list):
71 71 raise ValueError('input %r must be a list' % lst)
72 72 for x in lst:
73 73 if not isinstance(x, basestring):
74 74 raise ValueError('element %r in list must be a string' % x)
75 75
76 76
77 77 def validate_string_dict(dct):
78 78 """Validate that the input is a dict with string keys and values.
79 79
80 80 Raises ValueError if not."""
81 81 for k,v in dct.iteritems():
82 82 if not isinstance(k, basestring):
83 83 raise ValueError('key %r in dict must be a string' % k)
84 84 if not isinstance(v, basestring):
85 85 raise ValueError('value %r in dict must be a string' % v)
86 86
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # ZMQ Socket Channel classes
90 90 #-----------------------------------------------------------------------------
91 91
92 92 class ZMQSocketChannel(Thread):
93 93 """The base class for the channels that use ZMQ sockets."""
94 94 context = None
95 95 session = None
96 96 socket = None
97 97 ioloop = None
98 98 stream = None
99 99 _address = None
100 100 _exiting = False
101 101
102 102 def __init__(self, context, session, address):
103 103 """Create a channel.
104 104
105 105 Parameters
106 106 ----------
107 107 context : :class:`zmq.Context`
108 108 The ZMQ context to use.
109 109 session : :class:`session.Session`
110 110 The session to use.
111 111 address : zmq url
112 112 Standard (ip, port) tuple that the kernel is listening on.
113 113 """
114 114 super(ZMQSocketChannel, self).__init__()
115 115 self.daemon = True
116 116
117 117 self.context = context
118 118 self.session = session
119 119 if isinstance(address, tuple):
120 120 if address[1] == 0:
121 121 message = 'The port number for a channel cannot be 0.'
122 122 raise InvalidPortNumber(message)
123 123 address = "tcp://%s:%i" % address
124 124 self._address = address
125 125 atexit.register(self._notice_exit)
126 126
127 127 def _notice_exit(self):
128 128 self._exiting = True
129 129
130 130 def _run_loop(self):
131 131 """Run my loop, ignoring EINTR events in the poller"""
132 132 while True:
133 133 try:
134 134 self.ioloop.start()
135 135 except ZMQError as e:
136 136 if e.errno == errno.EINTR:
137 137 continue
138 138 else:
139 139 raise
140 140 except Exception:
141 141 if self._exiting:
142 142 break
143 143 else:
144 144 raise
145 145 else:
146 146 break
147 147
148 148 def stop(self):
149 149 """Stop the channel's event loop and join its thread.
150 150
151 151 This calls :method:`Thread.join` and returns when the thread
152 152 terminates. :class:`RuntimeError` will be raised if
153 153 :method:`self.start` is called again.
154 154 """
155 155 self.join()
156 156
157 157 @property
158 158 def address(self):
159 159 """Get the channel's address as a zmq url string.
160 160
161 161 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 162 """
163 163 return self._address
164 164
165 165 def _queue_send(self, msg):
166 166 """Queue a message to be sent from the IOLoop's thread.
167 167
168 168 Parameters
169 169 ----------
170 170 msg : message to send
171 171
172 172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 173 thread control of the action.
174 174 """
175 175 def thread_send():
176 176 self.session.send(self.stream, msg)
177 177 self.ioloop.add_callback(thread_send)
178 178
179 179 def _handle_recv(self, msg):
180 180 """Callback for stream.on_recv.
181 181
182 182 Unpacks message, and calls handlers with it.
183 183 """
184 184 ident,smsg = self.session.feed_identities(msg)
185 185 self.call_handlers(self.session.unserialize(smsg))
186 186
187 187
188 188
189 189 class ShellChannel(ZMQSocketChannel):
190 190 """The shell channel for issuing request/replies to the kernel."""
191 191
192 192 command_queue = None
193 193 # flag for whether execute requests should be allowed to call raw_input:
194 194 allow_stdin = True
195 195
196 196 def __init__(self, context, session, address):
197 197 super(ShellChannel, self).__init__(context, session, address)
198 198 self.ioloop = ioloop.IOLoop()
199 199
200 200 def run(self):
201 201 """The thread's main activity. Call start() instead."""
202 202 self.socket = self.context.socket(zmq.DEALER)
203 203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 204 self.socket.connect(self.address)
205 205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 206 self.stream.on_recv(self._handle_recv)
207 207 self._run_loop()
208 208 try:
209 209 self.socket.close()
210 210 except:
211 211 pass
212 212
213 213 def stop(self):
214 214 """Stop the channel's event loop and join its thread."""
215 215 self.ioloop.stop()
216 216 super(ShellChannel, self).stop()
217 217
218 218 def call_handlers(self, msg):
219 219 """This method is called in the ioloop thread when a message arrives.
220 220
221 221 Subclasses should override this method to handle incoming messages.
222 222 It is important to remember that this method is called in the thread
223 223 so that some logic must be done to ensure that the application leve
224 224 handlers are called in the application thread.
225 225 """
226 226 raise NotImplementedError('call_handlers must be defined in a subclass.')
227 227
228 228 def execute(self, code, silent=False, store_history=True,
229 229 user_variables=None, user_expressions=None, allow_stdin=None):
230 230 """Execute code in the kernel.
231 231
232 232 Parameters
233 233 ----------
234 234 code : str
235 235 A string of Python code.
236 236
237 237 silent : bool, optional (default False)
238 238 If set, the kernel will execute the code as quietly possible, and
239 239 will force store_history to be False.
240 240
241 241 store_history : bool, optional (default True)
242 242 If set, the kernel will store command history. This is forced
243 243 to be False if silent is True.
244 244
245 245 user_variables : list, optional
246 246 A list of variable names to pull from the user's namespace. They
247 247 will come back as a dict with these names as keys and their
248 248 :func:`repr` as values.
249 249
250 250 user_expressions : dict, optional
251 251 A dict mapping names to expressions to be evaluated in the user's
252 252 dict. The expression values are returned as strings formatted using
253 253 :func:`repr`.
254 254
255 255 allow_stdin : bool, optional (default self.allow_stdin)
256 256 Flag for whether the kernel can send stdin requests to frontends.
257 257
258 258 Some frontends (e.g. the Notebook) do not support stdin requests.
259 259 If raw_input is called from code executed from such a frontend, a
260 260 StdinNotImplementedError will be raised.
261 261
262 262 Returns
263 263 -------
264 264 The msg_id of the message sent.
265 265 """
266 266 if user_variables is None:
267 267 user_variables = []
268 268 if user_expressions is None:
269 269 user_expressions = {}
270 270 if allow_stdin is None:
271 271 allow_stdin = self.allow_stdin
272 272
273 273
274 274 # Don't waste network traffic if inputs are invalid
275 275 if not isinstance(code, basestring):
276 276 raise ValueError('code %r must be a string' % code)
277 277 validate_string_list(user_variables)
278 278 validate_string_dict(user_expressions)
279 279
280 280 # Create class for content/msg creation. Related to, but possibly
281 281 # not in Session.
282 282 content = dict(code=code, silent=silent, store_history=store_history,
283 283 user_variables=user_variables,
284 284 user_expressions=user_expressions,
285 285 allow_stdin=allow_stdin,
286 286 )
287 287 msg = self.session.msg('execute_request', content)
288 288 self._queue_send(msg)
289 289 return msg['header']['msg_id']
290 290
291 291 def complete(self, text, line, cursor_pos, block=None):
292 292 """Tab complete text in the kernel's namespace.
293 293
294 294 Parameters
295 295 ----------
296 296 text : str
297 297 The text to complete.
298 298 line : str
299 299 The full line of text that is the surrounding context for the
300 300 text to complete.
301 301 cursor_pos : int
302 302 The position of the cursor in the line where the completion was
303 303 requested.
304 304 block : str, optional
305 305 The full block of code in which the completion is being requested.
306 306
307 307 Returns
308 308 -------
309 309 The msg_id of the message sent.
310 310 """
311 311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 312 msg = self.session.msg('complete_request', content)
313 313 self._queue_send(msg)
314 314 return msg['header']['msg_id']
315 315
316 316 def object_info(self, oname, detail_level=0):
317 317 """Get metadata information about an object in the kernel's namespace.
318 318
319 319 Parameters
320 320 ----------
321 321 oname : str
322 322 A string specifying the object name.
323 323 detail_level : int, optional
324 324 The level of detail for the introspection (0-2)
325 325
326 326 Returns
327 327 -------
328 328 The msg_id of the message sent.
329 329 """
330 330 content = dict(oname=oname, detail_level=detail_level)
331 331 msg = self.session.msg('object_info_request', content)
332 332 self._queue_send(msg)
333 333 return msg['header']['msg_id']
334 334
335 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 336 """Get entries from the kernel's history list.
337 337
338 338 Parameters
339 339 ----------
340 340 raw : bool
341 341 If True, return the raw input.
342 342 output : bool
343 343 If True, then return the output as well.
344 344 hist_access_type : str
345 345 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 346 or 'search' (fill in pattern param).
347 347
348 348 session : int
349 349 For a range request, the session from which to get lines. Session
350 350 numbers are positive integers; negative ones count back from the
351 351 current session.
352 352 start : int
353 353 The first line number of a history range.
354 354 stop : int
355 355 The final (excluded) line number of a history range.
356 356
357 357 n : int
358 358 The number of lines of history to get for a tail request.
359 359
360 360 pattern : str
361 361 The glob-syntax pattern for a search request.
362 362
363 363 Returns
364 364 -------
365 365 The msg_id of the message sent.
366 366 """
367 367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 368 **kwargs)
369 369 msg = self.session.msg('history_request', content)
370 370 self._queue_send(msg)
371 371 return msg['header']['msg_id']
372 372
373 373 def kernel_info(self):
374 374 """Request kernel info."""
375 375 msg = self.session.msg('kernel_info_request')
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def shutdown(self, restart=False):
380 380 """Request an immediate kernel shutdown.
381 381
382 382 Upon receipt of the (empty) reply, client code can safely assume that
383 383 the kernel has shut down and it's safe to forcefully terminate it if
384 384 it's still alive.
385 385
386 386 The kernel will send the reply via a function registered with Python's
387 387 atexit module, ensuring it's truly done as the kernel is done with all
388 388 normal operation.
389 389 """
390 390 # Send quit message to kernel. Once we implement kernel-side setattr,
391 391 # this should probably be done that way, but for now this will do.
392 392 msg = self.session.msg('shutdown_request', {'restart':restart})
393 393 self._queue_send(msg)
394 394 return msg['header']['msg_id']
395 395
396 396
397 397
398 398 class IOPubChannel(ZMQSocketChannel):
399 399 """The iopub channel which listens for messages that the kernel publishes.
400 400
401 401 This channel is where all output is published to frontends.
402 402 """
403 403
404 404 def __init__(self, context, session, address):
405 405 super(IOPubChannel, self).__init__(context, session, address)
406 406 self.ioloop = ioloop.IOLoop()
407 407
408 408 def run(self):
409 409 """The thread's main activity. Call start() instead."""
410 410 self.socket = self.context.socket(zmq.SUB)
411 411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 413 self.socket.connect(self.address)
414 414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 415 self.stream.on_recv(self._handle_recv)
416 416 self._run_loop()
417 417 try:
418 418 self.socket.close()
419 419 except:
420 420 pass
421 421
422 422 def stop(self):
423 423 """Stop the channel's event loop and join its thread."""
424 424 self.ioloop.stop()
425 425 super(IOPubChannel, self).stop()
426 426
427 427 def call_handlers(self, msg):
428 428 """This method is called in the ioloop thread when a message arrives.
429 429
430 430 Subclasses should override this method to handle incoming messages.
431 431 It is important to remember that this method is called in the thread
432 432 so that some logic must be done to ensure that the application leve
433 433 handlers are called in the application thread.
434 434 """
435 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436 436
437 437 def flush(self, timeout=1.0):
438 438 """Immediately processes all pending messages on the iopub channel.
439 439
440 440 Callers should use this method to ensure that :method:`call_handlers`
441 441 has been called for all messages that have been received on the
442 442 0MQ SUB socket of this channel.
443 443
444 444 This method is thread safe.
445 445
446 446 Parameters
447 447 ----------
448 448 timeout : float, optional
449 449 The maximum amount of time to spend flushing, in seconds. The
450 450 default is one second.
451 451 """
452 452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 453 # gets to perform at least one full poll.
454 454 stop_time = time.time() + timeout
455 455 for i in xrange(2):
456 456 self._flushed = False
457 457 self.ioloop.add_callback(self._flush)
458 458 while not self._flushed and time.time() < stop_time:
459 459 time.sleep(0.01)
460 460
461 461 def _flush(self):
462 462 """Callback for :method:`self.flush`."""
463 463 self.stream.flush()
464 464 self._flushed = True
465 465
466 466
467 467 class StdInChannel(ZMQSocketChannel):
468 468 """The stdin channel to handle raw_input requests that the kernel makes."""
469 469
470 470 msg_queue = None
471 471
472 472 def __init__(self, context, session, address):
473 473 super(StdInChannel, self).__init__(context, session, address)
474 474 self.ioloop = ioloop.IOLoop()
475 475
476 476 def run(self):
477 477 """The thread's main activity. Call start() instead."""
478 478 self.socket = self.context.socket(zmq.DEALER)
479 479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 480 self.socket.connect(self.address)
481 481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 482 self.stream.on_recv(self._handle_recv)
483 483 self._run_loop()
484 484 try:
485 485 self.socket.close()
486 486 except:
487 487 pass
488 488
489 489 def stop(self):
490 490 """Stop the channel's event loop and join its thread."""
491 491 self.ioloop.stop()
492 492 super(StdInChannel, self).stop()
493 493
494 494 def call_handlers(self, msg):
495 495 """This method is called in the ioloop thread when a message arrives.
496 496
497 497 Subclasses should override this method to handle incoming messages.
498 498 It is important to remember that this method is called in the thread
499 499 so that some logic must be done to ensure that the application leve
500 500 handlers are called in the application thread.
501 501 """
502 502 raise NotImplementedError('call_handlers must be defined in a subclass.')
503 503
504 504 def input(self, string):
505 505 """Send a string of raw input to the kernel."""
506 506 content = dict(value=string)
507 507 msg = self.session.msg('input_reply', content)
508 508 self._queue_send(msg)
509 509
510 510
511 511 class HBChannel(ZMQSocketChannel):
512 512 """The heartbeat channel which monitors the kernel heartbeat.
513 513
514 514 Note that the heartbeat channel is paused by default. As long as you start
515 515 this channel, the kernel manager will ensure that it is paused and un-paused
516 516 as appropriate.
517 517 """
518 518
519 519 time_to_dead = 3.0
520 520 socket = None
521 521 poller = None
522 522 _running = None
523 523 _pause = None
524 524 _beating = None
525 525
526 526 def __init__(self, context, session, address):
527 527 super(HBChannel, self).__init__(context, session, address)
528 528 self._running = False
529 529 self._pause =True
530 530 self.poller = zmq.Poller()
531 531
532 532 def _create_socket(self):
533 533 if self.socket is not None:
534 534 # close previous socket, before opening a new one
535 535 self.poller.unregister(self.socket)
536 536 self.socket.close()
537 537 self.socket = self.context.socket(zmq.REQ)
538 538 self.socket.setsockopt(zmq.LINGER, 0)
539 539 self.socket.connect(self.address)
540 540
541 541 self.poller.register(self.socket, zmq.POLLIN)
542 542
543 543 def _poll(self, start_time):
544 544 """poll for heartbeat replies until we reach self.time_to_dead.
545 545
546 546 Ignores interrupts, and returns the result of poll(), which
547 547 will be an empty list if no messages arrived before the timeout,
548 548 or the event tuple if there is a message to receive.
549 549 """
550 550
551 551 until_dead = self.time_to_dead - (time.time() - start_time)
552 552 # ensure poll at least once
553 553 until_dead = max(until_dead, 1e-3)
554 554 events = []
555 555 while True:
556 556 try:
557 557 events = self.poller.poll(1000 * until_dead)
558 558 except ZMQError as e:
559 559 if e.errno == errno.EINTR:
560 560 # ignore interrupts during heartbeat
561 561 # this may never actually happen
562 562 until_dead = self.time_to_dead - (time.time() - start_time)
563 563 until_dead = max(until_dead, 1e-3)
564 564 pass
565 565 else:
566 566 raise
567 567 except Exception:
568 568 if self._exiting:
569 569 break
570 570 else:
571 571 raise
572 572 else:
573 573 break
574 574 return events
575 575
576 576 def run(self):
577 577 """The thread's main activity. Call start() instead."""
578 578 self._create_socket()
579 579 self._running = True
580 580 self._beating = True
581 581
582 582 while self._running:
583 583 if self._pause:
584 584 # just sleep, and skip the rest of the loop
585 585 time.sleep(self.time_to_dead)
586 586 continue
587 587
588 588 since_last_heartbeat = 0.0
589 589 # io.rprint('Ping from HB channel') # dbg
590 590 # no need to catch EFSM here, because the previous event was
591 591 # either a recv or connect, which cannot be followed by EFSM
592 592 self.socket.send(b'ping')
593 593 request_time = time.time()
594 594 ready = self._poll(request_time)
595 595 if ready:
596 596 self._beating = True
597 597 # the poll above guarantees we have something to recv
598 598 self.socket.recv()
599 599 # sleep the remainder of the cycle
600 600 remainder = self.time_to_dead - (time.time() - request_time)
601 601 if remainder > 0:
602 602 time.sleep(remainder)
603 603 continue
604 604 else:
605 605 # nothing was received within the time limit, signal heart failure
606 606 self._beating = False
607 607 since_last_heartbeat = time.time() - request_time
608 608 self.call_handlers(since_last_heartbeat)
609 609 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 610 self._create_socket()
611 611 continue
612 612 try:
613 613 self.socket.close()
614 614 except:
615 615 pass
616 616
617 617 def pause(self):
618 618 """Pause the heartbeat."""
619 619 self._pause = True
620 620
621 621 def unpause(self):
622 622 """Unpause the heartbeat."""
623 623 self._pause = False
624 624
625 625 def is_beating(self):
626 626 """Is the heartbeat running and responsive (and not paused)."""
627 627 if self.is_alive() and not self._pause and self._beating:
628 628 return True
629 629 else:
630 630 return False
631 631
632 632 def stop(self):
633 633 """Stop the channel's event loop and join its thread."""
634 634 self._running = False
635 635 super(HBChannel, self).stop()
636 636
637 637 def call_handlers(self, since_last_heartbeat):
638 638 """This method is called in the ioloop thread when a message arrives.
639 639
640 640 Subclasses should override this method to handle incoming messages.
641 641 It is important to remember that this method is called in the thread
642 642 so that some logic must be done to ensure that the application level
643 643 handlers are called in the application thread.
644 644 """
645 645 raise NotImplementedError('call_handlers must be defined in a subclass.')
646 646
647 647
648 648 #-----------------------------------------------------------------------------
649 649 # Main kernel manager class
650 650 #-----------------------------------------------------------------------------
651 651
652 652 class KernelManager(Configurable):
653 653 """Manages a single kernel on this host along with its channels.
654 654
655 655 There are four channels associated with each kernel:
656 656
657 657 * shell: for request/reply calls to the kernel.
658 658 * iopub: for the kernel to publish results to frontends.
659 659 * hb: for monitoring the kernel's heartbeat.
660 660 * stdin: for frontends to reply to raw_input calls in the kernel.
661 661
662 662 The usage of the channels that this class manages is optional. It is
663 663 entirely possible to connect to the kernels directly using ZeroMQ
664 664 sockets. These channels are useful primarily for talking to a kernel
665 665 whose :class:`KernelManager` is in the same process.
666 666
667 667 This version manages kernels started using Popen.
668 668 """
669 669 # The PyZMQ Context to use for communication with the kernel.
670 670 context = Instance(zmq.Context)
671 671 def _context_default(self):
672 672 return zmq.Context.instance()
673 673
674 674 # The Session to use for communication with the kernel.
675 675 session = Instance(Session)
676 676 def _session_default(self):
677 677 return Session(config=self.config)
678 678
679 679 # The kernel process with which the KernelManager is communicating.
680 680 kernel = Instance(Popen)
681 681
682 682 # The addresses for the communication channels.
683 683 connection_file = Unicode('')
684 684
685 685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686 686
687 687 ip = Unicode(LOCALHOST, config=True)
688 688 def _ip_changed(self, name, old, new):
689 689 if new == '*':
690 690 self.ip = '0.0.0.0'
691 691 shell_port = Integer(0)
692 692 iopub_port = Integer(0)
693 693 stdin_port = Integer(0)
694 694 hb_port = Integer(0)
695 695
696 696 # The classes to use for the various channels.
697 697 shell_channel_class = Type(ShellChannel)
698 698 iopub_channel_class = Type(IOPubChannel)
699 699 stdin_channel_class = Type(StdInChannel)
700 700 hb_channel_class = Type(HBChannel)
701 701
702 702 # Protected traits.
703 703 _launch_args = Any
704 704 _shell_channel = Any
705 705 _iopub_channel = Any
706 706 _stdin_channel = Any
707 707 _hb_channel = Any
708 708 _connection_file_written=Bool(False)
709 709
710 710 def __del__(self):
711 711 self.cleanup_connection_file()
712 self.cleanup_ipc_files()
712 713
713 714 #--------------------------------------------------------------------------
714 715 # Channel management methods:
715 716 #--------------------------------------------------------------------------
716 717
717 718 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
718 719 """Starts the channels for this kernel.
719 720
720 721 This will create the channels if they do not exist and then start
721 722 them (their activity runs in a thread). If port numbers of 0 are
722 723 being used (random ports) then you must first call
723 724 :method:`start_kernel`. If the channels have been stopped and you
724 725 call this, :class:`RuntimeError` will be raised.
725 726 """
726 727 if shell:
727 728 self.shell_channel.start()
728 729 if iopub:
729 730 self.iopub_channel.start()
730 731 if stdin:
731 732 self.stdin_channel.start()
732 733 self.shell_channel.allow_stdin = True
733 734 else:
734 735 self.shell_channel.allow_stdin = False
735 736 if hb:
736 737 self.hb_channel.start()
737 738
738 739 def stop_channels(self):
739 740 """Stops all the running channels for this kernel.
740 741
741 742 This stops their event loops and joins their threads.
742 743 """
743 744 if self.shell_channel.is_alive():
744 745 self.shell_channel.stop()
745 746 if self.iopub_channel.is_alive():
746 747 self.iopub_channel.stop()
747 748 if self.stdin_channel.is_alive():
748 749 self.stdin_channel.stop()
749 750 if self.hb_channel.is_alive():
750 751 self.hb_channel.stop()
751 752
752 753 @property
753 754 def channels_running(self):
754 755 """Are any of the channels created and running?"""
755 756 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
756 757 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
757 758
758 759 def _make_url(self, port):
759 760 """Make a zmq url with a port.
760 761
761 762 There are two cases that this handles:
762 763
763 764 * tcp: tcp://ip:port
764 765 * ipc: ipc://ip-port
765 766 """
766 767 if self.transport == 'tcp':
767 768 return "tcp://%s:%i" % (self.ip, port)
768 769 else:
769 770 return "%s://%s-%s" % (self.transport, self.ip, port)
770 771
771 772 @property
772 773 def shell_channel(self):
773 774 """Get the shell channel object for this kernel."""
774 775 if self._shell_channel is None:
775 776 self._shell_channel = self.shell_channel_class(
776 777 self.context, self.session, self._make_url(self.shell_port)
777 778 )
778 779 return self._shell_channel
779 780
780 781 @property
781 782 def iopub_channel(self):
782 783 """Get the iopub channel object for this kernel."""
783 784 if self._iopub_channel is None:
784 785 self._iopub_channel = self.iopub_channel_class(
785 786 self.context, self.session, self._make_url(self.iopub_port)
786 787 )
787 788 return self._iopub_channel
788 789
789 790 @property
790 791 def stdin_channel(self):
791 792 """Get the stdin channel object for this kernel."""
792 793 if self._stdin_channel is None:
793 794 self._stdin_channel = self.stdin_channel_class(
794 795 self.context, self.session, self._make_url(self.stdin_port)
795 796 )
796 797 return self._stdin_channel
797 798
798 799 @property
799 800 def hb_channel(self):
800 801 """Get the hb channel object for this kernel."""
801 802 if self._hb_channel is None:
802 803 self._hb_channel = self.hb_channel_class(
803 804 self.context, self.session, self._make_url(self.hb_port)
804 805 )
805 806 return self._hb_channel
806 807
807 808 #--------------------------------------------------------------------------
808 # Connection and ipc file management.
809 # Connection and ipc file management
809 810 #--------------------------------------------------------------------------
810 811
811 812 def cleanup_connection_file(self):
812 813 """Cleanup connection file *if we wrote it*
813 814
814 815 Will not raise if the connection file was already removed somehow.
815 816 """
816 817 if self._connection_file_written:
817 818 # cleanup connection files on full shutdown of kernel we started
818 819 self._connection_file_written = False
819 820 try:
820 821 os.remove(self.connection_file)
821 822 except (IOError, OSError):
822 823 pass
823
824 self.cleanup_ipc_files()
825 824
826 825 def cleanup_ipc_files(self):
827 826 """Cleanup ipc files if we wrote them."""
828 827 if self.transport != 'ipc':
829 828 return
830 829 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
831 830 ipcfile = "%s-%i" % (self.ip, port)
832 831 try:
833 832 os.remove(ipcfile)
834 833 except (IOError, OSError):
835 834 pass
836 835
837 836 def load_connection_file(self):
838 837 """Load connection info from JSON dict in self.connection_file."""
839 838 with open(self.connection_file) as f:
840 839 cfg = json.loads(f.read())
841 840
842 841 from pprint import pprint
843 842 pprint(cfg)
844 843 self.transport = cfg.get('transport', 'tcp')
845 844 self.ip = cfg['ip']
846 845 self.shell_port = cfg['shell_port']
847 846 self.stdin_port = cfg['stdin_port']
848 847 self.iopub_port = cfg['iopub_port']
849 848 self.hb_port = cfg['hb_port']
850 849 self.session.key = str_to_bytes(cfg['key'])
851 850
852 851 def write_connection_file(self):
853 852 """Write connection info to JSON dict in self.connection_file."""
854 853 if self._connection_file_written:
855 854 return
856 855 self.connection_file,cfg = write_connection_file(self.connection_file,
857 856 transport=self.transport, ip=self.ip, key=self.session.key,
858 857 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
859 858 shell_port=self.shell_port, hb_port=self.hb_port)
860 859 # write_connection_file also sets default ports:
861 860 self.shell_port = cfg['shell_port']
862 861 self.stdin_port = cfg['stdin_port']
863 862 self.iopub_port = cfg['iopub_port']
864 863 self.hb_port = cfg['hb_port']
865 864
866 865 self._connection_file_written = True
867 866
868 867 #--------------------------------------------------------------------------
869 # Kernel management.
868 # Kernel management
870 869 #--------------------------------------------------------------------------
871 870
872 871 def start_kernel(self, **kw):
873 872 """Starts a kernel on this host in a separate process.
874 873
875 874 If random ports (port=0) are being used, this method must be called
876 875 before the channels are created.
877 876
878 877 Parameters:
879 878 -----------
880 879 launcher : callable, optional (default None)
881 880 A custom function for launching the kernel process (generally a
882 881 wrapper around ``entry_point.base_launch_kernel``). In most cases,
883 882 it should not be necessary to use this parameter.
884 883
885 884 **kw : optional
886 885 keyword arguments that are passed down into the launcher
887 886 callable.
888 887 """
889 888 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
890 889 raise RuntimeError("Can only launch a kernel on a local interface. "
891 890 "Make sure that the '*_address' attributes are "
892 891 "configured properly. "
893 892 "Currently valid addresses are: %s"%LOCAL_IPS
894 893 )
895 894
896 895 # write connection file / get default ports
897 896 self.write_connection_file()
898 897
899 898 self._launch_args = kw.copy()
900 899 launch_kernel = kw.pop('launcher', None)
901 900 if launch_kernel is None:
902 901 from ipkernel import launch_kernel
903 902 self.kernel = launch_kernel(fname=self.connection_file, **kw)
904 903
905 904 def shutdown_kernel(self, now=False, restart=False):
906 905 """Attempts to the stop the kernel process cleanly.
907 906
908 907 This attempts to shutdown the kernels cleanly by:
909 908
910 909 1. Sending it a shutdown message over the shell channel.
911 910 2. If that fails, the kernel is shutdown forcibly by sending it
912 911 a signal.
913 912
914 913 Parameters:
915 914 -----------
916 915 now : bool
917 916 Should the kernel be forcible killed *now*. This skips the
918 917 first, nice shutdown attempt.
919 918 restart: bool
920 919 Will this kernel be restarted after it is shutdown. When this
921 920 is True, connection files will not be cleaned up.
922 921 """
923 922 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
924 923 if sys.platform == 'win32':
925 924 self._kill_kernel()
926 925 return
927 926
928 927 # Pause the heart beat channel if it exists.
929 928 if self._hb_channel is not None:
930 929 self._hb_channel.pause()
931 930
932 931 if now:
933 932 if self.has_kernel:
934 933 self._kill_kernel()
935 934 else:
936 935 # Don't send any additional kernel kill messages immediately, to give
937 936 # the kernel a chance to properly execute shutdown actions. Wait for at
938 937 # most 1s, checking every 0.1s.
939 938 self.shell_channel.shutdown(restart=restart)
940 939 for i in range(10):
941 940 if self.is_alive:
942 941 time.sleep(0.1)
943 942 else:
944 943 break
945 944 else:
946 945 # OK, we've waited long enough.
947 946 if self.has_kernel:
948 947 self._kill_kernel()
949 948
950 949 if not restart:
951 950 self.cleanup_connection_file()
951 self.cleanup_ipc_files()
952 952 else:
953 953 self.cleanup_ipc_files()
954 954
955 955 def restart_kernel(self, now=False, **kw):
956 956 """Restarts a kernel with the arguments that were used to launch it.
957 957
958 958 If the old kernel was launched with random ports, the same ports will be
959 959 used for the new kernel. The same connection file is used again.
960 960
961 961 Parameters
962 962 ----------
963 963 now : bool, optional
964 964 If True, the kernel is forcefully restarted *immediately*, without
965 965 having a chance to do any cleanup action. Otherwise the kernel is
966 966 given 1s to clean up before a forceful restart is issued.
967 967
968 968 In all cases the kernel is restarted, the only difference is whether
969 969 it is given a chance to perform a clean shutdown or not.
970 970
971 971 **kw : optional
972 972 Any options specified here will overwrite those used to launch the
973 973 kernel.
974 974 """
975 975 if self._launch_args is None:
976 976 raise RuntimeError("Cannot restart the kernel. "
977 977 "No previous call to 'start_kernel'.")
978 978 else:
979 979 # Stop currently running kernel.
980 980 self.shutdown_kernel(now=now, restart=True)
981 981
982 982 # Start new kernel.
983 983 self._launch_args.update(kw)
984 984 self.start_kernel(**self._launch_args)
985 985
986 986 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
987 987 # unless there is some delay here.
988 988 if sys.platform == 'win32':
989 989 time.sleep(0.2)
990 990
991 991 @property
992 992 def has_kernel(self):
993 993 """Has a kernel been started that we are managing."""
994 994 return self.kernel is not None
995 995
996 996 def _kill_kernel(self):
997 997 """Kill the running kernel.
998 998
999 999 This is a private method, callers should use shutdown_kernel(now=True).
1000 1000 """
1001 1001 if self.has_kernel:
1002 1002 # Pause the heart beat channel if it exists.
1003 1003 if self._hb_channel is not None:
1004 1004 self._hb_channel.pause()
1005 1005
1006 1006 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1007 1007 # TerminateProcess() on Win32).
1008 1008 try:
1009 1009 self.kernel.kill()
1010 1010 except OSError as e:
1011 1011 # In Windows, we will get an Access Denied error if the process
1012 1012 # has already terminated. Ignore it.
1013 1013 if sys.platform == 'win32':
1014 1014 if e.winerror != 5:
1015 1015 raise
1016 1016 # On Unix, we may get an ESRCH error if the process has already
1017 1017 # terminated. Ignore it.
1018 1018 else:
1019 1019 from errno import ESRCH
1020 1020 if e.errno != ESRCH:
1021 1021 raise
1022 1022
1023 1023 # Block until the kernel terminates.
1024 1024 self.kernel.wait()
1025 1025 self.kernel = None
1026 1026 else:
1027 1027 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1028 1028
1029 1029 def interrupt_kernel(self):
1030 1030 """Interrupts the kernel by sending it a signal.
1031 1031
1032 1032 Unlike ``signal_kernel``, this operation is well supported on all
1033 1033 platforms.
1034 1034 """
1035 1035 if self.has_kernel:
1036 1036 if sys.platform == 'win32':
1037 1037 from parentpoller import ParentPollerWindows as Poller
1038 1038 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1039 1039 else:
1040 1040 self.kernel.send_signal(signal.SIGINT)
1041 1041 else:
1042 1042 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1043 1043
1044 1044 def signal_kernel(self, signum):
1045 1045 """Sends a signal to the kernel.
1046 1046
1047 1047 Note that since only SIGTERM is supported on Windows, this function is
1048 1048 only useful on Unix systems.
1049 1049 """
1050 1050 if self.has_kernel:
1051 1051 self.kernel.send_signal(signum)
1052 1052 else:
1053 1053 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1054 1054
1055 1055 @property
1056 1056 def is_alive(self):
1057 1057 """Is the kernel process still running?"""
1058 1058 if self.has_kernel:
1059 1059 if self.kernel.poll() is None:
1060 1060 return True
1061 1061 else:
1062 1062 return False
1063 1063 elif self._hb_channel is not None:
1064 1064 # We didn't start the kernel with this KernelManager so we
1065 1065 # use the heartbeat.
1066 1066 return self._hb_channel.is_beating()
1067 1067 else:
1068 1068 # no heartbeat and not local, we can't tell if it's running,
1069 1069 # so naively return True
1070 1070 return True
1071 1071
1072 1072
1073 1073 #-----------------------------------------------------------------------------
1074 1074 # ABC Registration
1075 1075 #-----------------------------------------------------------------------------
1076 1076
1077 1077 ShellChannelABC.register(ShellChannel)
1078 1078 IOPubChannelABC.register(IOPubChannel)
1079 1079 HBChannelABC.register(HBChannel)
1080 1080 StdInChannelABC.register(StdInChannel)
1081 1081 KernelManagerABC.register(KernelManager)
1082 1082
@@ -1,225 +1,226 b''
1 1 """Abstract base classes for kernel manager and channels."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # Standard library imports.
15 15 import abc
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Channels
19 19 #-----------------------------------------------------------------------------
20 20
21 21
22 22 class ChannelABC(object):
23 23 """A base class for all channel ABCs."""
24 24
25 25 __metaclass__ = abc.ABCMeta
26 26
27 27 @abc.abstractmethod
28 28 def start(self):
29 29 pass
30 30
31 31 @abc.abstractmethod
32 32 def stop(self):
33 33 pass
34 34
35 35 @abc.abstractmethod
36 36 def is_alive(self):
37 37 pass
38 38
39 39
40 40 class ShellChannelABC(ChannelABC):
41 41 """ShellChannel ABC.
42 42
43 43 The docstrings for this class can be found in the base implementation:
44 44
45 45 `IPython.zmq.kernelmanager.ShellChannel`
46 46 """
47 47
48 48 @abc.abstractproperty
49 49 def allow_stdin(self):
50 50 pass
51 51
52 52 @abc.abstractmethod
53 53 def execute(self, code, silent=False, store_history=True,
54 54 user_variables=None, user_expressions=None, allow_stdin=None):
55 55 pass
56 56
57 57 @abc.abstractmethod
58 58 def complete(self, text, line, cursor_pos, block=None):
59 59 pass
60 60
61 61 @abc.abstractmethod
62 62 def object_info(self, oname, detail_level=0):
63 63 pass
64 64
65 65 @abc.abstractmethod
66 66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 67 pass
68 68
69 69 @abc.abstractmethod
70 70 def kernel_info(self):
71 71 pass
72 72
73 73 @abc.abstractmethod
74 74 def shutdown(self, restart=False):
75 75 pass
76 76
77 77
78 78 class IOPubChannelABC(ChannelABC):
79 79 """IOPubChannel ABC.
80 80
81 81 The docstrings for this class can be found in the base implementation:
82 82
83 83 `IPython.zmq.kernelmanager.IOPubChannel`
84 84 """
85 85
86 86 @abc.abstractmethod
87 87 def flush(self, timeout=1.0):
88 88 pass
89 89
90 90
91 91 class StdInChannelABC(ChannelABC):
92 92 """StdInChannel ABC.
93 93
94 94 The docstrings for this class can be found in the base implementation:
95 95
96 96 `IPython.zmq.kernelmanager.StdInChannel`
97 97 """
98 98
99 99 @abc.abstractmethod
100 100 def input(self, string):
101 101 pass
102 102
103 103
104 104 class HBChannelABC(ChannelABC):
105 105 """HBChannel ABC.
106 106
107 107 The docstrings for this class can be found in the base implementation:
108 108
109 109 `IPython.zmq.kernelmanager.HBChannel`
110 110 """
111 111
112 112 @abc.abstractproperty
113 113 def time_to_dead(self):
114 114 pass
115 115
116 116 @abc.abstractmethod
117 117 def pause(self):
118 118 pass
119 119
120 120 @abc.abstractmethod
121 121 def unpause(self):
122 122 pass
123 123
124 124 @abc.abstractmethod
125 125 def is_beating(self):
126 126 pass
127 127
128 128
129 129 #-----------------------------------------------------------------------------
130 130 # Main kernel manager class
131 131 #-----------------------------------------------------------------------------
132 132
133 133 class KernelManagerABC(object):
134 134 """KernelManager ABC.
135 135
136 136 The docstrings for this class can be found in the base implementation:
137 137
138 138 `IPython.zmq.kernelmanager.KernelManager`
139 139 """
140 140
141 141 __metaclass__ = abc.ABCMeta
142 142
143 143 @abc.abstractproperty
144 144 def kernel(self):
145 145 pass
146 146
147 147 @abc.abstractproperty
148 148 def shell_channel_class(self):
149 149 pass
150 150
151 151 @abc.abstractproperty
152 152 def iopub_channel_class(self):
153 153 pass
154 154
155 155 @abc.abstractproperty
156 156 def hb_channel_class(self):
157 157 pass
158 158
159 159 @abc.abstractproperty
160 160 def stdin_channel_class(self):
161 161 pass
162 162
163 163 #--------------------------------------------------------------------------
164 # Channel management methods:
164 # Channel management methods
165 165 #--------------------------------------------------------------------------
166 166
167 167 @abc.abstractmethod
168 168 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
169 169 pass
170 170
171 171 @abc.abstractmethod
172 172 def stop_channels(self):
173 173 pass
174 174
175 175 @abc.abstractproperty
176 176 def channels_running(self):
177 177 pass
178 178
179 179 @abc.abstractproperty
180 180 def shell_channel(self):
181 181 pass
182 182
183 183 @abc.abstractproperty
184 184 def iopub_channel(self):
185 185 pass
186 186
187 187 @abc.abstractproperty
188 188 def stdin_channel(self):
189 189 pass
190 190
191 191 @abc.abstractproperty
192 192 def hb_channel(self):
193 193 pass
194 194
195 195 #--------------------------------------------------------------------------
196 # Kernel management.
196 # Kernel management
197 197 #--------------------------------------------------------------------------
198 198
199 199 @abc.abstractmethod
200 200 def start_kernel(self, **kw):
201 201 pass
202 202
203 203 @abc.abstractmethod
204 204 def shutdown_kernel(self, now=False, restart=False):
205 205 pass
206 206
207 207 @abc.abstractmethod
208 208 def restart_kernel(self, now=False, **kw):
209 209 pass
210 210
211 211 @abc.abstractproperty
212 212 def has_kernel(self):
213 213 pass
214 214
215 215 @abc.abstractmethod
216 216 def interrupt_kernel(self):
217 217 pass
218 218
219 219 @abc.abstractmethod
220 220 def signal_kernel(self, signum):
221 221 pass
222 222
223 223 @abc.abstractproperty
224 224 def is_alive(self):
225 225 pass
226
@@ -1,45 +1,45 b''
1 """Tests for the notebook kernel and session manager."""
1 """Tests for the notebook kernel and session manager"""
2 2
3 3 from subprocess import PIPE
4 4 import time
5 5 from unittest import TestCase
6 6
7 7 from IPython.config.loader import Config
8 8 from IPython.zmq.kernelmanager import KernelManager
9 9
10 10 class TestKernelManager(TestCase):
11 11
12 12 def _get_tcp_km(self):
13 13 return KernelManager()
14 14
15 15 def _get_ipc_km(self):
16 16 c = Config()
17 17 c.KernelManager.transport = 'ipc'
18 18 c.KernelManager.ip = 'test'
19 19 km = KernelManager(config=c)
20 20 return km
21 21
22 22 def _run_lifecycle(self, km):
23 23 km.start_kernel(stdout=PIPE, stderr=PIPE)
24 24 km.start_channels(shell=True, iopub=False, stdin=False, hb=False)
25 25 km.restart_kernel()
26 26 # We need a delay here to give the restarting kernel a chance to
27 27 # restart. Otherwise, the interrupt will kill it, causing the test
28 28 # suite to hang. The reason it *hangs* is that the shutdown
29 29 # message for the restart sometimes hasn't been sent to the kernel.
30 30 # Because linger is oo on the shell channel, the context can't
31 31 # close until the message is sent to the kernel, which is not dead.
32 32 time.sleep(1.0)
33 33 km.interrupt_kernel()
34 34 self.assertTrue(isinstance(km, KernelManager))
35 35 km.shutdown_kernel()
36 36 km.shell_channel.stop()
37 37
38 38 def test_tcp_lifecycle(self):
39 39 km = self._get_tcp_km()
40 40 self._run_lifecycle(km)
41 41
42 42 def testipc_lifecycle(self):
43 43 km = self._get_ipc_km()
44 44 self._run_lifecycle(km)
45 45
General Comments 0
You need to be logged in to leave comments. Login now