##// END OF EJS Templates
Merge pull request #2369 from epatters/block-after-kill...
Fernando Perez -
r8492:0e7b3a21 merge
parent child Browse files
Show More
@@ -1,1000 +1,1012
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 106 address : tuple
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 114 if address[1] == 0:
115 115 message = 'The port number for a channel cannot be 0.'
116 116 raise InvalidPortNumber(message)
117 117 self._address = address
118 118 atexit.register(self._notice_exit)
119 119
120 120 def _notice_exit(self):
121 121 self._exiting = True
122 122
123 123 def _run_loop(self):
124 124 """Run my loop, ignoring EINTR events in the poller"""
125 125 while True:
126 126 try:
127 127 self.ioloop.start()
128 128 except ZMQError as e:
129 129 if e.errno == errno.EINTR:
130 130 continue
131 131 else:
132 132 raise
133 133 except Exception:
134 134 if self._exiting:
135 135 break
136 136 else:
137 137 raise
138 138 else:
139 139 break
140 140
141 141 def stop(self):
142 142 """Stop the channel's activity.
143 143
144 144 This calls :method:`Thread.join` and returns when the thread
145 145 terminates. :class:`RuntimeError` will be raised if
146 146 :method:`self.start` is called again.
147 147 """
148 148 self.join()
149 149
150 150 @property
151 151 def address(self):
152 152 """Get the channel's address as an (ip, port) tuple.
153 153
154 154 By the default, the address is (localhost, 0), where 0 means a random
155 155 port.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """callback for stream.on_recv
175 175
176 176 unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 self.call_handlers(self.session.unserialize(smsg))
180 180
181 181
182 182
183 183 class ShellSocketChannel(ZMQSocketChannel):
184 184 """The DEALER channel for issues request/replies to the kernel.
185 185 """
186 186
187 187 command_queue = None
188 188 # flag for whether execute requests should be allowed to call raw_input:
189 189 allow_stdin = True
190 190
191 191 def __init__(self, context, session, address):
192 192 super(ShellSocketChannel, self).__init__(context, session, address)
193 193 self.ioloop = ioloop.IOLoop()
194 194
195 195 def run(self):
196 196 """The thread's main activity. Call start() instead."""
197 197 self.socket = self.context.socket(zmq.DEALER)
198 198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 199 self.socket.connect('tcp://%s:%i' % self.address)
200 200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 201 self.stream.on_recv(self._handle_recv)
202 202 self._run_loop()
203 203 try:
204 204 self.socket.close()
205 205 except:
206 206 pass
207 207
208 208 def stop(self):
209 209 self.ioloop.stop()
210 210 super(ShellSocketChannel, self).stop()
211 211
212 212 def call_handlers(self, msg):
213 213 """This method is called in the ioloop thread when a message arrives.
214 214
215 215 Subclasses should override this method to handle incoming messages.
216 216 It is important to remember that this method is called in the thread
217 217 so that some logic must be done to ensure that the application leve
218 218 handlers are called in the application thread.
219 219 """
220 220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 221
222 222 def execute(self, code, silent=False, store_history=True,
223 223 user_variables=None, user_expressions=None, allow_stdin=None):
224 224 """Execute code in the kernel.
225 225
226 226 Parameters
227 227 ----------
228 228 code : str
229 229 A string of Python code.
230 230
231 231 silent : bool, optional (default False)
232 232 If set, the kernel will execute the code as quietly possible, and
233 233 will force store_history to be False.
234 234
235 235 store_history : bool, optional (default True)
236 236 If set, the kernel will store command history. This is forced
237 237 to be False if silent is True.
238 238
239 239 user_variables : list, optional
240 240 A list of variable names to pull from the user's namespace. They
241 241 will come back as a dict with these names as keys and their
242 242 :func:`repr` as values.
243 243
244 244 user_expressions : dict, optional
245 245 A dict mapping names to expressions to be evaluated in the user's
246 246 dict. The expression values are returned as strings formatted using
247 247 :func:`repr`.
248 248
249 249 allow_stdin : bool, optional (default self.allow_stdin)
250 250 Flag for whether the kernel can send stdin requests to frontends.
251 251
252 252 Some frontends (e.g. the Notebook) do not support stdin requests.
253 253 If raw_input is called from code executed from such a frontend, a
254 254 StdinNotImplementedError will be raised.
255 255
256 256 Returns
257 257 -------
258 258 The msg_id of the message sent.
259 259 """
260 260 if user_variables is None:
261 261 user_variables = []
262 262 if user_expressions is None:
263 263 user_expressions = {}
264 264 if allow_stdin is None:
265 265 allow_stdin = self.allow_stdin
266 266
267 267
268 268 # Don't waste network traffic if inputs are invalid
269 269 if not isinstance(code, basestring):
270 270 raise ValueError('code %r must be a string' % code)
271 271 validate_string_list(user_variables)
272 272 validate_string_dict(user_expressions)
273 273
274 274 # Create class for content/msg creation. Related to, but possibly
275 275 # not in Session.
276 276 content = dict(code=code, silent=silent, store_history=store_history,
277 277 user_variables=user_variables,
278 278 user_expressions=user_expressions,
279 279 allow_stdin=allow_stdin,
280 280 )
281 281 msg = self.session.msg('execute_request', content)
282 282 self._queue_send(msg)
283 283 return msg['header']['msg_id']
284 284
285 285 def complete(self, text, line, cursor_pos, block=None):
286 286 """Tab complete text in the kernel's namespace.
287 287
288 288 Parameters
289 289 ----------
290 290 text : str
291 291 The text to complete.
292 292 line : str
293 293 The full line of text that is the surrounding context for the
294 294 text to complete.
295 295 cursor_pos : int
296 296 The position of the cursor in the line where the completion was
297 297 requested.
298 298 block : str, optional
299 299 The full block of code in which the completion is being requested.
300 300
301 301 Returns
302 302 -------
303 303 The msg_id of the message sent.
304 304 """
305 305 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
306 306 msg = self.session.msg('complete_request', content)
307 307 self._queue_send(msg)
308 308 return msg['header']['msg_id']
309 309
310 310 def object_info(self, oname, detail_level=0):
311 311 """Get metadata information about an object.
312 312
313 313 Parameters
314 314 ----------
315 315 oname : str
316 316 A string specifying the object name.
317 317 detail_level : int, optional
318 318 The level of detail for the introspection (0-2)
319 319
320 320 Returns
321 321 -------
322 322 The msg_id of the message sent.
323 323 """
324 324 content = dict(oname=oname, detail_level=detail_level)
325 325 msg = self.session.msg('object_info_request', content)
326 326 self._queue_send(msg)
327 327 return msg['header']['msg_id']
328 328
329 329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 330 """Get entries from the history list.
331 331
332 332 Parameters
333 333 ----------
334 334 raw : bool
335 335 If True, return the raw input.
336 336 output : bool
337 337 If True, then return the output as well.
338 338 hist_access_type : str
339 339 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 340 or 'search' (fill in pattern param).
341 341
342 342 session : int
343 343 For a range request, the session from which to get lines. Session
344 344 numbers are positive integers; negative ones count back from the
345 345 current session.
346 346 start : int
347 347 The first line number of a history range.
348 348 stop : int
349 349 The final (excluded) line number of a history range.
350 350
351 351 n : int
352 352 The number of lines of history to get for a tail request.
353 353
354 354 pattern : str
355 355 The glob-syntax pattern for a search request.
356 356
357 357 Returns
358 358 -------
359 359 The msg_id of the message sent.
360 360 """
361 361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 362 **kwargs)
363 363 msg = self.session.msg('history_request', content)
364 364 self._queue_send(msg)
365 365 return msg['header']['msg_id']
366 366
367 367 def shutdown(self, restart=False):
368 368 """Request an immediate kernel shutdown.
369 369
370 370 Upon receipt of the (empty) reply, client code can safely assume that
371 371 the kernel has shut down and it's safe to forcefully terminate it if
372 372 it's still alive.
373 373
374 374 The kernel will send the reply via a function registered with Python's
375 375 atexit module, ensuring it's truly done as the kernel is done with all
376 376 normal operation.
377 377 """
378 378 # Send quit message to kernel. Once we implement kernel-side setattr,
379 379 # this should probably be done that way, but for now this will do.
380 380 msg = self.session.msg('shutdown_request', {'restart':restart})
381 381 self._queue_send(msg)
382 382 return msg['header']['msg_id']
383 383
384 384
385 385
386 386 class SubSocketChannel(ZMQSocketChannel):
387 387 """The SUB channel which listens for messages that the kernel publishes.
388 388 """
389 389
390 390 def __init__(self, context, session, address):
391 391 super(SubSocketChannel, self).__init__(context, session, address)
392 392 self.ioloop = ioloop.IOLoop()
393 393
394 394 def run(self):
395 395 """The thread's main activity. Call start() instead."""
396 396 self.socket = self.context.socket(zmq.SUB)
397 397 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
398 398 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
399 399 self.socket.connect('tcp://%s:%i' % self.address)
400 400 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
401 401 self.stream.on_recv(self._handle_recv)
402 402 self._run_loop()
403 403 try:
404 404 self.socket.close()
405 405 except:
406 406 pass
407 407
408 408 def stop(self):
409 409 self.ioloop.stop()
410 410 super(SubSocketChannel, self).stop()
411 411
412 412 def call_handlers(self, msg):
413 413 """This method is called in the ioloop thread when a message arrives.
414 414
415 415 Subclasses should override this method to handle incoming messages.
416 416 It is important to remember that this method is called in the thread
417 417 so that some logic must be done to ensure that the application leve
418 418 handlers are called in the application thread.
419 419 """
420 420 raise NotImplementedError('call_handlers must be defined in a subclass.')
421 421
422 422 def flush(self, timeout=1.0):
423 423 """Immediately processes all pending messages on the SUB channel.
424 424
425 425 Callers should use this method to ensure that :method:`call_handlers`
426 426 has been called for all messages that have been received on the
427 427 0MQ SUB socket of this channel.
428 428
429 429 This method is thread safe.
430 430
431 431 Parameters
432 432 ----------
433 433 timeout : float, optional
434 434 The maximum amount of time to spend flushing, in seconds. The
435 435 default is one second.
436 436 """
437 437 # We do the IOLoop callback process twice to ensure that the IOLoop
438 438 # gets to perform at least one full poll.
439 439 stop_time = time.time() + timeout
440 440 for i in xrange(2):
441 441 self._flushed = False
442 442 self.ioloop.add_callback(self._flush)
443 443 while not self._flushed and time.time() < stop_time:
444 444 time.sleep(0.01)
445 445
446 446 def _flush(self):
447 447 """Callback for :method:`self.flush`."""
448 448 self.stream.flush()
449 449 self._flushed = True
450 450
451 451
452 452 class StdInSocketChannel(ZMQSocketChannel):
453 453 """A reply channel to handle raw_input requests that the kernel makes."""
454 454
455 455 msg_queue = None
456 456
457 457 def __init__(self, context, session, address):
458 458 super(StdInSocketChannel, self).__init__(context, session, address)
459 459 self.ioloop = ioloop.IOLoop()
460 460
461 461 def run(self):
462 462 """The thread's main activity. Call start() instead."""
463 463 self.socket = self.context.socket(zmq.DEALER)
464 464 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 465 self.socket.connect('tcp://%s:%i' % self.address)
466 466 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
467 467 self.stream.on_recv(self._handle_recv)
468 468 self._run_loop()
469 469 try:
470 470 self.socket.close()
471 471 except:
472 472 pass
473 473
474 474
475 475 def stop(self):
476 476 self.ioloop.stop()
477 477 super(StdInSocketChannel, self).stop()
478 478
479 479 def call_handlers(self, msg):
480 480 """This method is called in the ioloop thread when a message arrives.
481 481
482 482 Subclasses should override this method to handle incoming messages.
483 483 It is important to remember that this method is called in the thread
484 484 so that some logic must be done to ensure that the application leve
485 485 handlers are called in the application thread.
486 486 """
487 487 raise NotImplementedError('call_handlers must be defined in a subclass.')
488 488
489 489 def input(self, string):
490 490 """Send a string of raw input to the kernel."""
491 491 content = dict(value=string)
492 492 msg = self.session.msg('input_reply', content)
493 493 self._queue_send(msg)
494 494
495 495
496 496 class HBSocketChannel(ZMQSocketChannel):
497 497 """The heartbeat channel which monitors the kernel heartbeat.
498 498
499 499 Note that the heartbeat channel is paused by default. As long as you start
500 500 this channel, the kernel manager will ensure that it is paused and un-paused
501 501 as appropriate.
502 502 """
503 503
504 504 time_to_dead = 3.0
505 505 socket = None
506 506 poller = None
507 507 _running = None
508 508 _pause = None
509 509 _beating = None
510 510
511 511 def __init__(self, context, session, address):
512 512 super(HBSocketChannel, self).__init__(context, session, address)
513 513 self._running = False
514 514 self._pause =True
515 515 self.poller = zmq.Poller()
516 516
517 517 def _create_socket(self):
518 518 if self.socket is not None:
519 519 # close previous socket, before opening a new one
520 520 self.poller.unregister(self.socket)
521 521 self.socket.close()
522 522 self.socket = self.context.socket(zmq.REQ)
523 523 self.socket.setsockopt(zmq.LINGER, 0)
524 524 self.socket.connect('tcp://%s:%i' % self.address)
525 525
526 526 self.poller.register(self.socket, zmq.POLLIN)
527 527
528 528 def _poll(self, start_time):
529 529 """poll for heartbeat replies until we reach self.time_to_dead
530 530
531 531 Ignores interrupts, and returns the result of poll(), which
532 532 will be an empty list if no messages arrived before the timeout,
533 533 or the event tuple if there is a message to receive.
534 534 """
535 535
536 536 until_dead = self.time_to_dead - (time.time() - start_time)
537 537 # ensure poll at least once
538 538 until_dead = max(until_dead, 1e-3)
539 539 events = []
540 540 while True:
541 541 try:
542 542 events = self.poller.poll(1000 * until_dead)
543 543 except ZMQError as e:
544 544 if e.errno == errno.EINTR:
545 545 # ignore interrupts during heartbeat
546 546 # this may never actually happen
547 547 until_dead = self.time_to_dead - (time.time() - start_time)
548 548 until_dead = max(until_dead, 1e-3)
549 549 pass
550 550 else:
551 551 raise
552 552 except Exception:
553 553 if self._exiting:
554 554 break
555 555 else:
556 556 raise
557 557 else:
558 558 break
559 559 return events
560 560
561 561 def run(self):
562 562 """The thread's main activity. Call start() instead."""
563 563 self._create_socket()
564 564 self._running = True
565 565 self._beating = True
566 566
567 567 while self._running:
568 568 if self._pause:
569 569 # just sleep, and skip the rest of the loop
570 570 time.sleep(self.time_to_dead)
571 571 continue
572 572
573 573 since_last_heartbeat = 0.0
574 574 # io.rprint('Ping from HB channel') # dbg
575 575 # no need to catch EFSM here, because the previous event was
576 576 # either a recv or connect, which cannot be followed by EFSM
577 577 self.socket.send(b'ping')
578 578 request_time = time.time()
579 579 ready = self._poll(request_time)
580 580 if ready:
581 581 self._beating = True
582 582 # the poll above guarantees we have something to recv
583 583 self.socket.recv()
584 584 # sleep the remainder of the cycle
585 585 remainder = self.time_to_dead - (time.time() - request_time)
586 586 if remainder > 0:
587 587 time.sleep(remainder)
588 588 continue
589 589 else:
590 590 # nothing was received within the time limit, signal heart failure
591 591 self._beating = False
592 592 since_last_heartbeat = time.time() - request_time
593 593 self.call_handlers(since_last_heartbeat)
594 594 # and close/reopen the socket, because the REQ/REP cycle has been broken
595 595 self._create_socket()
596 596 continue
597 597 try:
598 598 self.socket.close()
599 599 except:
600 600 pass
601 601
602 602 def pause(self):
603 603 """Pause the heartbeat."""
604 604 self._pause = True
605 605
606 606 def unpause(self):
607 607 """Unpause the heartbeat."""
608 608 self._pause = False
609 609
610 610 def is_beating(self):
611 611 """Is the heartbeat running and responsive (and not paused)."""
612 612 if self.is_alive() and not self._pause and self._beating:
613 613 return True
614 614 else:
615 615 return False
616 616
617 617 def stop(self):
618 618 self._running = False
619 619 super(HBSocketChannel, self).stop()
620 620
621 621 def call_handlers(self, since_last_heartbeat):
622 622 """This method is called in the ioloop thread when a message arrives.
623 623
624 624 Subclasses should override this method to handle incoming messages.
625 625 It is important to remember that this method is called in the thread
626 626 so that some logic must be done to ensure that the application level
627 627 handlers are called in the application thread.
628 628 """
629 629 raise NotImplementedError('call_handlers must be defined in a subclass.')
630 630
631 631
632 632 #-----------------------------------------------------------------------------
633 633 # Main kernel manager class
634 634 #-----------------------------------------------------------------------------
635 635
636 636 class KernelManager(HasTraits):
637 637 """ Manages a kernel for a frontend.
638 638
639 639 The SUB channel is for the frontend to receive messages published by the
640 640 kernel.
641 641
642 642 The REQ channel is for the frontend to make requests of the kernel.
643 643
644 644 The REP channel is for the kernel to request stdin (raw_input) from the
645 645 frontend.
646 646 """
647 647 # config object for passing to child configurables
648 648 config = Instance(Config)
649 649
650 650 # The PyZMQ Context to use for communication with the kernel.
651 651 context = Instance(zmq.Context)
652 652 def _context_default(self):
653 653 return zmq.Context.instance()
654 654
655 655 # The Session to use for communication with the kernel.
656 656 session = Instance(Session)
657 657
658 658 # The kernel process with which the KernelManager is communicating.
659 659 kernel = Instance(Popen)
660 660
661 661 # The addresses for the communication channels.
662 662 connection_file = Unicode('')
663 663 ip = Unicode(LOCALHOST)
664 664 def _ip_changed(self, name, old, new):
665 665 if new == '*':
666 666 self.ip = '0.0.0.0'
667 667 shell_port = Integer(0)
668 668 iopub_port = Integer(0)
669 669 stdin_port = Integer(0)
670 670 hb_port = Integer(0)
671 671
672 672 # The classes to use for the various channels.
673 673 shell_channel_class = Type(ShellSocketChannel)
674 674 sub_channel_class = Type(SubSocketChannel)
675 675 stdin_channel_class = Type(StdInSocketChannel)
676 676 hb_channel_class = Type(HBSocketChannel)
677 677
678 678 # Protected traits.
679 679 _launch_args = Any
680 680 _shell_channel = Any
681 681 _sub_channel = Any
682 682 _stdin_channel = Any
683 683 _hb_channel = Any
684 684 _connection_file_written=Bool(False)
685 685
686 686 def __init__(self, **kwargs):
687 687 super(KernelManager, self).__init__(**kwargs)
688 688 if self.session is None:
689 689 self.session = Session(config=self.config)
690 690
691 691 def __del__(self):
692 692 self.cleanup_connection_file()
693 693
694 694
695 695 #--------------------------------------------------------------------------
696 696 # Channel management methods:
697 697 #--------------------------------------------------------------------------
698 698
699 699 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
700 700 """Starts the channels for this kernel.
701 701
702 702 This will create the channels if they do not exist and then start
703 703 them. If port numbers of 0 are being used (random ports) then you
704 704 must first call :method:`start_kernel`. If the channels have been
705 705 stopped and you call this, :class:`RuntimeError` will be raised.
706 706 """
707 707 if shell:
708 708 self.shell_channel.start()
709 709 if sub:
710 710 self.sub_channel.start()
711 711 if stdin:
712 712 self.stdin_channel.start()
713 713 self.shell_channel.allow_stdin = True
714 714 else:
715 715 self.shell_channel.allow_stdin = False
716 716 if hb:
717 717 self.hb_channel.start()
718 718
719 719 def stop_channels(self):
720 720 """Stops all the running channels for this kernel.
721 721 """
722 722 if self.shell_channel.is_alive():
723 723 self.shell_channel.stop()
724 724 if self.sub_channel.is_alive():
725 725 self.sub_channel.stop()
726 726 if self.stdin_channel.is_alive():
727 727 self.stdin_channel.stop()
728 728 if self.hb_channel.is_alive():
729 729 self.hb_channel.stop()
730 730
731 731 @property
732 732 def channels_running(self):
733 733 """Are any of the channels created and running?"""
734 734 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
735 735 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
736 736
737 737 #--------------------------------------------------------------------------
738 738 # Kernel process management methods:
739 739 #--------------------------------------------------------------------------
740 740
741 741 def cleanup_connection_file(self):
742 742 """cleanup connection file *if we wrote it*
743 743
744 744 Will not raise if the connection file was already removed somehow.
745 745 """
746 746 if self._connection_file_written:
747 747 # cleanup connection files on full shutdown of kernel we started
748 748 self._connection_file_written = False
749 749 try:
750 750 os.remove(self.connection_file)
751 751 except OSError:
752 752 pass
753 753
754 754 def load_connection_file(self):
755 755 """load connection info from JSON dict in self.connection_file"""
756 756 with open(self.connection_file) as f:
757 757 cfg = json.loads(f.read())
758 758
759 759 self.ip = cfg['ip']
760 760 self.shell_port = cfg['shell_port']
761 761 self.stdin_port = cfg['stdin_port']
762 762 self.iopub_port = cfg['iopub_port']
763 763 self.hb_port = cfg['hb_port']
764 764 self.session.key = str_to_bytes(cfg['key'])
765 765
766 766 def write_connection_file(self):
767 767 """write connection info to JSON dict in self.connection_file"""
768 768 if self._connection_file_written:
769 769 return
770 770 self.connection_file,cfg = write_connection_file(self.connection_file,
771 771 ip=self.ip, key=self.session.key,
772 772 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
773 773 shell_port=self.shell_port, hb_port=self.hb_port)
774 774 # write_connection_file also sets default ports:
775 775 self.shell_port = cfg['shell_port']
776 776 self.stdin_port = cfg['stdin_port']
777 777 self.iopub_port = cfg['iopub_port']
778 778 self.hb_port = cfg['hb_port']
779 779
780 780 self._connection_file_written = True
781 781
782 782 def start_kernel(self, **kw):
783 783 """Starts a kernel process and configures the manager to use it.
784 784
785 785 If random ports (port=0) are being used, this method must be called
786 786 before the channels are created.
787 787
788 788 Parameters:
789 789 -----------
790 790 launcher : callable, optional (default None)
791 791 A custom function for launching the kernel process (generally a
792 792 wrapper around ``entry_point.base_launch_kernel``). In most cases,
793 793 it should not be necessary to use this parameter.
794 794
795 795 **kw : optional
796 796 See respective options for IPython and Python kernels.
797 797 """
798 798 if self.ip not in LOCAL_IPS:
799 799 raise RuntimeError("Can only launch a kernel on a local interface. "
800 800 "Make sure that the '*_address' attributes are "
801 801 "configured properly. "
802 802 "Currently valid addresses are: %s"%LOCAL_IPS
803 803 )
804 804
805 805 # write connection file / get default ports
806 806 self.write_connection_file()
807 807
808 808 self._launch_args = kw.copy()
809 809 launch_kernel = kw.pop('launcher', None)
810 810 if launch_kernel is None:
811 811 from ipkernel import launch_kernel
812 812 self.kernel = launch_kernel(fname=self.connection_file, **kw)
813 813
814 814 def shutdown_kernel(self, restart=False):
815 """ Attempts to the stop the kernel process cleanly. If the kernel
816 cannot be stopped, it is killed, if possible.
815 """ Attempts to the stop the kernel process cleanly.
816
817 If the kernel cannot be stopped and the kernel is local, it is killed.
817 818 """
818 819 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
819 820 if sys.platform == 'win32':
820 821 self.kill_kernel()
821 822 return
822 823
823 824 # Pause the heart beat channel if it exists.
824 825 if self._hb_channel is not None:
825 826 self._hb_channel.pause()
826 827
827 828 # Don't send any additional kernel kill messages immediately, to give
828 829 # the kernel a chance to properly execute shutdown actions. Wait for at
829 830 # most 1s, checking every 0.1s.
830 831 self.shell_channel.shutdown(restart=restart)
831 832 for i in range(10):
832 833 if self.is_alive:
833 834 time.sleep(0.1)
834 835 else:
835 836 break
836 837 else:
837 838 # OK, we've waited long enough.
838 839 if self.has_kernel:
839 840 self.kill_kernel()
840 841
841 842 if not restart and self._connection_file_written:
842 843 # cleanup connection files on full shutdown of kernel we started
843 844 self._connection_file_written = False
844 845 try:
845 846 os.remove(self.connection_file)
846 847 except IOError:
847 848 pass
848 849
849 850 def restart_kernel(self, now=False, **kw):
850 851 """Restarts a kernel with the arguments that were used to launch it.
851 852
852 853 If the old kernel was launched with random ports, the same ports will be
853 854 used for the new kernel.
854 855
855 856 Parameters
856 857 ----------
857 858 now : bool, optional
858 859 If True, the kernel is forcefully restarted *immediately*, without
859 860 having a chance to do any cleanup action. Otherwise the kernel is
860 861 given 1s to clean up before a forceful restart is issued.
861 862
862 863 In all cases the kernel is restarted, the only difference is whether
863 864 it is given a chance to perform a clean shutdown or not.
864 865
865 866 **kw : optional
866 867 Any options specified here will replace those used to launch the
867 868 kernel.
868 869 """
869 870 if self._launch_args is None:
870 871 raise RuntimeError("Cannot restart the kernel. "
871 872 "No previous call to 'start_kernel'.")
872 873 else:
873 874 # Stop currently running kernel.
874 875 if self.has_kernel:
875 876 if now:
876 877 self.kill_kernel()
877 878 else:
878 879 self.shutdown_kernel(restart=True)
879 880
880 881 # Start new kernel.
881 882 self._launch_args.update(kw)
882 883 self.start_kernel(**self._launch_args)
883 884
884 885 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
885 886 # unless there is some delay here.
886 887 if sys.platform == 'win32':
887 888 time.sleep(0.2)
888 889
889 890 @property
890 891 def has_kernel(self):
891 892 """Returns whether a kernel process has been specified for the kernel
892 893 manager.
893 894 """
894 895 return self.kernel is not None
895 896
896 897 def kill_kernel(self):
897 """ Kill the running kernel. """
898 """ Kill the running kernel.
899
900 This method blocks until the kernel process has terminated.
901 """
898 902 if self.has_kernel:
899 903 # Pause the heart beat channel if it exists.
900 904 if self._hb_channel is not None:
901 905 self._hb_channel.pause()
902 906
903 # Attempt to kill the kernel.
907 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
908 # TerminateProcess() on Win32).
904 909 try:
905 910 self.kernel.kill()
906 911 except OSError as e:
907 912 # In Windows, we will get an Access Denied error if the process
908 913 # has already terminated. Ignore it.
909 914 if sys.platform == 'win32':
910 915 if e.winerror != 5:
911 916 raise
912 917 # On Unix, we may get an ESRCH error if the process has already
913 918 # terminated. Ignore it.
914 919 else:
915 920 from errno import ESRCH
916 921 if e.errno != ESRCH:
917 922 raise
923
924 # Block until the kernel terminates.
925 self.kernel.wait()
918 926 self.kernel = None
919 927 else:
920 928 raise RuntimeError("Cannot kill kernel. No kernel is running!")
921 929
922 930 def interrupt_kernel(self):
923 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
924 well supported on all platforms.
931 """ Interrupts the kernel.
932
933 Unlike ``signal_kernel``, this operation is well supported on all
934 platforms.
925 935 """
926 936 if self.has_kernel:
927 937 if sys.platform == 'win32':
928 938 from parentpoller import ParentPollerWindows as Poller
929 939 Poller.send_interrupt(self.kernel.win32_interrupt_event)
930 940 else:
931 941 self.kernel.send_signal(signal.SIGINT)
932 942 else:
933 943 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
934 944
935 945 def signal_kernel(self, signum):
936 """ Sends a signal to the kernel. Note that since only SIGTERM is
937 supported on Windows, this function is only useful on Unix systems.
946 """ Sends a signal to the kernel.
947
948 Note that since only SIGTERM is supported on Windows, this function is
949 only useful on Unix systems.
938 950 """
939 951 if self.has_kernel:
940 952 self.kernel.send_signal(signum)
941 953 else:
942 954 raise RuntimeError("Cannot signal kernel. No kernel is running!")
943 955
944 956 @property
945 957 def is_alive(self):
946 958 """Is the kernel process still running?"""
947 959 if self.has_kernel:
948 960 if self.kernel.poll() is None:
949 961 return True
950 962 else:
951 963 return False
952 964 elif self._hb_channel is not None:
953 965 # We didn't start the kernel with this KernelManager so we
954 966 # use the heartbeat.
955 967 return self._hb_channel.is_beating()
956 968 else:
957 969 # no heartbeat and not local, we can't tell if it's running,
958 970 # so naively return True
959 971 return True
960 972
961 973 #--------------------------------------------------------------------------
962 974 # Channels used for communication with the kernel:
963 975 #--------------------------------------------------------------------------
964 976
965 977 @property
966 978 def shell_channel(self):
967 979 """Get the REQ socket channel object to make requests of the kernel."""
968 980 if self._shell_channel is None:
969 981 self._shell_channel = self.shell_channel_class(self.context,
970 982 self.session,
971 983 (self.ip, self.shell_port))
972 984 return self._shell_channel
973 985
974 986 @property
975 987 def sub_channel(self):
976 988 """Get the SUB socket channel object."""
977 989 if self._sub_channel is None:
978 990 self._sub_channel = self.sub_channel_class(self.context,
979 991 self.session,
980 992 (self.ip, self.iopub_port))
981 993 return self._sub_channel
982 994
983 995 @property
984 996 def stdin_channel(self):
985 997 """Get the REP socket channel object to handle stdin (raw_input)."""
986 998 if self._stdin_channel is None:
987 999 self._stdin_channel = self.stdin_channel_class(self.context,
988 1000 self.session,
989 1001 (self.ip, self.stdin_port))
990 1002 return self._stdin_channel
991 1003
992 1004 @property
993 1005 def hb_channel(self):
994 1006 """Get the heartbeat socket channel object to check that the
995 1007 kernel is alive."""
996 1008 if self._hb_channel is None:
997 1009 self._hb_channel = self.hb_channel_class(self.context,
998 1010 self.session,
999 1011 (self.ip, self.hb_port))
1000 1012 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now