##// END OF EJS Templates
fix typo in docs
Jason Grout -
Show More
@@ -1,999 +1,999 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 106 address : tuple
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 114 if address[1] == 0:
115 115 message = 'The port number for a channel cannot be 0.'
116 116 raise InvalidPortNumber(message)
117 117 self._address = address
118 118 atexit.register(self._notice_exit)
119 119
120 120 def _notice_exit(self):
121 121 self._exiting = True
122 122
123 123 def _run_loop(self):
124 124 """Run my loop, ignoring EINTR events in the poller"""
125 125 while True:
126 126 try:
127 127 self.ioloop.start()
128 128 except ZMQError as e:
129 129 if e.errno == errno.EINTR:
130 130 continue
131 131 else:
132 132 raise
133 133 except Exception:
134 134 if self._exiting:
135 135 break
136 136 else:
137 137 raise
138 138 else:
139 139 break
140 140
141 141 def stop(self):
142 142 """Stop the channel's activity.
143 143
144 144 This calls :method:`Thread.join` and returns when the thread
145 145 terminates. :class:`RuntimeError` will be raised if
146 146 :method:`self.start` is called again.
147 147 """
148 148 self.join()
149 149
150 150 @property
151 151 def address(self):
152 152 """Get the channel's address as an (ip, port) tuple.
153 153
154 154 By the default, the address is (localhost, 0), where 0 means a random
155 155 port.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """callback for stream.on_recv
175 175
176 176 unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 self.call_handlers(self.session.unserialize(smsg))
180 180
181 181
182 182
183 183 class ShellSocketChannel(ZMQSocketChannel):
184 184 """The DEALER channel for issues request/replies to the kernel.
185 185 """
186 186
187 187 command_queue = None
188 188 # flag for whether execute requests should be allowed to call raw_input:
189 189 allow_stdin = True
190 190
191 191 def __init__(self, context, session, address):
192 192 super(ShellSocketChannel, self).__init__(context, session, address)
193 193 self.ioloop = ioloop.IOLoop()
194 194
195 195 def run(self):
196 196 """The thread's main activity. Call start() instead."""
197 197 self.socket = self.context.socket(zmq.DEALER)
198 198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 199 self.socket.connect('tcp://%s:%i' % self.address)
200 200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 201 self.stream.on_recv(self._handle_recv)
202 202 self._run_loop()
203 203 try:
204 204 self.socket.close()
205 205 except:
206 206 pass
207 207
208 208 def stop(self):
209 209 self.ioloop.stop()
210 210 super(ShellSocketChannel, self).stop()
211 211
212 212 def call_handlers(self, msg):
213 213 """This method is called in the ioloop thread when a message arrives.
214 214
215 215 Subclasses should override this method to handle incoming messages.
216 216 It is important to remember that this method is called in the thread
217 217 so that some logic must be done to ensure that the application leve
218 218 handlers are called in the application thread.
219 219 """
220 220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 221
222 222 def execute(self, code, silent=False, store_history=True,
223 223 user_variables=None, user_expressions=None, allow_stdin=None):
224 224 """Execute code in the kernel.
225 225
226 226 Parameters
227 227 ----------
228 228 code : str
229 229 A string of Python code.
230 230
231 231 silent : bool, optional (default False)
232 232 If set, the kernel will execute the code as quietly possible, and
233 233 will force store_history to be False.
234 234
235 235 store_history : bool, optional (default True)
236 If set, the kernel will execute store command history. This is forced
236 If set, the kernel will store command history. This is forced
237 237 to be False if silent is True.
238 238
239 239 user_variables : list, optional
240 240 A list of variable names to pull from the user's namespace. They
241 241 will come back as a dict with these names as keys and their
242 242 :func:`repr` as values.
243 243
244 244 user_expressions : dict, optional
245 245 A dict with string keys and to pull from the user's
246 246 namespace. They will come back as a dict with these names as keys
247 247 and their :func:`repr` as values.
248 248
249 249 allow_stdin : bool, optional
250 250 Flag for
251 251 A dict with string keys and to pull from the user's
252 252 namespace. They will come back as a dict with these names as keys
253 253 and their :func:`repr` as values.
254 254
255 255 Returns
256 256 -------
257 257 The msg_id of the message sent.
258 258 """
259 259 if user_variables is None:
260 260 user_variables = []
261 261 if user_expressions is None:
262 262 user_expressions = {}
263 263 if allow_stdin is None:
264 264 allow_stdin = self.allow_stdin
265 265
266 266
267 267 # Don't waste network traffic if inputs are invalid
268 268 if not isinstance(code, basestring):
269 269 raise ValueError('code %r must be a string' % code)
270 270 validate_string_list(user_variables)
271 271 validate_string_dict(user_expressions)
272 272
273 273 # Create class for content/msg creation. Related to, but possibly
274 274 # not in Session.
275 275 content = dict(code=code, silent=silent, store_history=store_history,
276 276 user_variables=user_variables,
277 277 user_expressions=user_expressions,
278 278 allow_stdin=allow_stdin,
279 279 )
280 280 msg = self.session.msg('execute_request', content)
281 281 self._queue_send(msg)
282 282 return msg['header']['msg_id']
283 283
284 284 def complete(self, text, line, cursor_pos, block=None):
285 285 """Tab complete text in the kernel's namespace.
286 286
287 287 Parameters
288 288 ----------
289 289 text : str
290 290 The text to complete.
291 291 line : str
292 292 The full line of text that is the surrounding context for the
293 293 text to complete.
294 294 cursor_pos : int
295 295 The position of the cursor in the line where the completion was
296 296 requested.
297 297 block : str, optional
298 298 The full block of code in which the completion is being requested.
299 299
300 300 Returns
301 301 -------
302 302 The msg_id of the message sent.
303 303 """
304 304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
305 305 msg = self.session.msg('complete_request', content)
306 306 self._queue_send(msg)
307 307 return msg['header']['msg_id']
308 308
309 309 def object_info(self, oname, detail_level=0):
310 310 """Get metadata information about an object.
311 311
312 312 Parameters
313 313 ----------
314 314 oname : str
315 315 A string specifying the object name.
316 316 detail_level : int, optional
317 317 The level of detail for the introspection (0-2)
318 318
319 319 Returns
320 320 -------
321 321 The msg_id of the message sent.
322 322 """
323 323 content = dict(oname=oname, detail_level=detail_level)
324 324 msg = self.session.msg('object_info_request', content)
325 325 self._queue_send(msg)
326 326 return msg['header']['msg_id']
327 327
328 328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
329 329 """Get entries from the history list.
330 330
331 331 Parameters
332 332 ----------
333 333 raw : bool
334 334 If True, return the raw input.
335 335 output : bool
336 336 If True, then return the output as well.
337 337 hist_access_type : str
338 338 'range' (fill in session, start and stop params), 'tail' (fill in n)
339 339 or 'search' (fill in pattern param).
340 340
341 341 session : int
342 342 For a range request, the session from which to get lines. Session
343 343 numbers are positive integers; negative ones count back from the
344 344 current session.
345 345 start : int
346 346 The first line number of a history range.
347 347 stop : int
348 348 The final (excluded) line number of a history range.
349 349
350 350 n : int
351 351 The number of lines of history to get for a tail request.
352 352
353 353 pattern : str
354 354 The glob-syntax pattern for a search request.
355 355
356 356 Returns
357 357 -------
358 358 The msg_id of the message sent.
359 359 """
360 360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
361 361 **kwargs)
362 362 msg = self.session.msg('history_request', content)
363 363 self._queue_send(msg)
364 364 return msg['header']['msg_id']
365 365
366 366 def shutdown(self, restart=False):
367 367 """Request an immediate kernel shutdown.
368 368
369 369 Upon receipt of the (empty) reply, client code can safely assume that
370 370 the kernel has shut down and it's safe to forcefully terminate it if
371 371 it's still alive.
372 372
373 373 The kernel will send the reply via a function registered with Python's
374 374 atexit module, ensuring it's truly done as the kernel is done with all
375 375 normal operation.
376 376 """
377 377 # Send quit message to kernel. Once we implement kernel-side setattr,
378 378 # this should probably be done that way, but for now this will do.
379 379 msg = self.session.msg('shutdown_request', {'restart':restart})
380 380 self._queue_send(msg)
381 381 return msg['header']['msg_id']
382 382
383 383
384 384
385 385 class SubSocketChannel(ZMQSocketChannel):
386 386 """The SUB channel which listens for messages that the kernel publishes.
387 387 """
388 388
389 389 def __init__(self, context, session, address):
390 390 super(SubSocketChannel, self).__init__(context, session, address)
391 391 self.ioloop = ioloop.IOLoop()
392 392
393 393 def run(self):
394 394 """The thread's main activity. Call start() instead."""
395 395 self.socket = self.context.socket(zmq.SUB)
396 396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
397 397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
398 398 self.socket.connect('tcp://%s:%i' % self.address)
399 399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
400 400 self.stream.on_recv(self._handle_recv)
401 401 self._run_loop()
402 402 try:
403 403 self.socket.close()
404 404 except:
405 405 pass
406 406
407 407 def stop(self):
408 408 self.ioloop.stop()
409 409 super(SubSocketChannel, self).stop()
410 410
411 411 def call_handlers(self, msg):
412 412 """This method is called in the ioloop thread when a message arrives.
413 413
414 414 Subclasses should override this method to handle incoming messages.
415 415 It is important to remember that this method is called in the thread
416 416 so that some logic must be done to ensure that the application leve
417 417 handlers are called in the application thread.
418 418 """
419 419 raise NotImplementedError('call_handlers must be defined in a subclass.')
420 420
421 421 def flush(self, timeout=1.0):
422 422 """Immediately processes all pending messages on the SUB channel.
423 423
424 424 Callers should use this method to ensure that :method:`call_handlers`
425 425 has been called for all messages that have been received on the
426 426 0MQ SUB socket of this channel.
427 427
428 428 This method is thread safe.
429 429
430 430 Parameters
431 431 ----------
432 432 timeout : float, optional
433 433 The maximum amount of time to spend flushing, in seconds. The
434 434 default is one second.
435 435 """
436 436 # We do the IOLoop callback process twice to ensure that the IOLoop
437 437 # gets to perform at least one full poll.
438 438 stop_time = time.time() + timeout
439 439 for i in xrange(2):
440 440 self._flushed = False
441 441 self.ioloop.add_callback(self._flush)
442 442 while not self._flushed and time.time() < stop_time:
443 443 time.sleep(0.01)
444 444
445 445 def _flush(self):
446 446 """Callback for :method:`self.flush`."""
447 447 self.stream.flush()
448 448 self._flushed = True
449 449
450 450
451 451 class StdInSocketChannel(ZMQSocketChannel):
452 452 """A reply channel to handle raw_input requests that the kernel makes."""
453 453
454 454 msg_queue = None
455 455
456 456 def __init__(self, context, session, address):
457 457 super(StdInSocketChannel, self).__init__(context, session, address)
458 458 self.ioloop = ioloop.IOLoop()
459 459
460 460 def run(self):
461 461 """The thread's main activity. Call start() instead."""
462 462 self.socket = self.context.socket(zmq.DEALER)
463 463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 464 self.socket.connect('tcp://%s:%i' % self.address)
465 465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
466 466 self.stream.on_recv(self._handle_recv)
467 467 self._run_loop()
468 468 try:
469 469 self.socket.close()
470 470 except:
471 471 pass
472 472
473 473
474 474 def stop(self):
475 475 self.ioloop.stop()
476 476 super(StdInSocketChannel, self).stop()
477 477
478 478 def call_handlers(self, msg):
479 479 """This method is called in the ioloop thread when a message arrives.
480 480
481 481 Subclasses should override this method to handle incoming messages.
482 482 It is important to remember that this method is called in the thread
483 483 so that some logic must be done to ensure that the application leve
484 484 handlers are called in the application thread.
485 485 """
486 486 raise NotImplementedError('call_handlers must be defined in a subclass.')
487 487
488 488 def input(self, string):
489 489 """Send a string of raw input to the kernel."""
490 490 content = dict(value=string)
491 491 msg = self.session.msg('input_reply', content)
492 492 self._queue_send(msg)
493 493
494 494
495 495 class HBSocketChannel(ZMQSocketChannel):
496 496 """The heartbeat channel which monitors the kernel heartbeat.
497 497
498 498 Note that the heartbeat channel is paused by default. As long as you start
499 499 this channel, the kernel manager will ensure that it is paused and un-paused
500 500 as appropriate.
501 501 """
502 502
503 503 time_to_dead = 3.0
504 504 socket = None
505 505 poller = None
506 506 _running = None
507 507 _pause = None
508 508 _beating = None
509 509
510 510 def __init__(self, context, session, address):
511 511 super(HBSocketChannel, self).__init__(context, session, address)
512 512 self._running = False
513 513 self._pause =True
514 514 self.poller = zmq.Poller()
515 515
516 516 def _create_socket(self):
517 517 if self.socket is not None:
518 518 # close previous socket, before opening a new one
519 519 self.poller.unregister(self.socket)
520 520 self.socket.close()
521 521 self.socket = self.context.socket(zmq.REQ)
522 522 self.socket.setsockopt(zmq.LINGER, 0)
523 523 self.socket.connect('tcp://%s:%i' % self.address)
524 524
525 525 self.poller.register(self.socket, zmq.POLLIN)
526 526
527 527 def _poll(self, start_time):
528 528 """poll for heartbeat replies until we reach self.time_to_dead
529 529
530 530 Ignores interrupts, and returns the result of poll(), which
531 531 will be an empty list if no messages arrived before the timeout,
532 532 or the event tuple if there is a message to receive.
533 533 """
534 534
535 535 until_dead = self.time_to_dead - (time.time() - start_time)
536 536 # ensure poll at least once
537 537 until_dead = max(until_dead, 1e-3)
538 538 events = []
539 539 while True:
540 540 try:
541 541 events = self.poller.poll(1000 * until_dead)
542 542 except ZMQError as e:
543 543 if e.errno == errno.EINTR:
544 544 # ignore interrupts during heartbeat
545 545 # this may never actually happen
546 546 until_dead = self.time_to_dead - (time.time() - start_time)
547 547 until_dead = max(until_dead, 1e-3)
548 548 pass
549 549 else:
550 550 raise
551 551 except Exception:
552 552 if self._exiting:
553 553 break
554 554 else:
555 555 raise
556 556 else:
557 557 break
558 558 return events
559 559
560 560 def run(self):
561 561 """The thread's main activity. Call start() instead."""
562 562 self._create_socket()
563 563 self._running = True
564 564 self._beating = True
565 565
566 566 while self._running:
567 567 if self._pause:
568 568 # just sleep, and skip the rest of the loop
569 569 time.sleep(self.time_to_dead)
570 570 continue
571 571
572 572 since_last_heartbeat = 0.0
573 573 # io.rprint('Ping from HB channel') # dbg
574 574 # no need to catch EFSM here, because the previous event was
575 575 # either a recv or connect, which cannot be followed by EFSM
576 576 self.socket.send(b'ping')
577 577 request_time = time.time()
578 578 ready = self._poll(request_time)
579 579 if ready:
580 580 self._beating = True
581 581 # the poll above guarantees we have something to recv
582 582 self.socket.recv()
583 583 # sleep the remainder of the cycle
584 584 remainder = self.time_to_dead - (time.time() - request_time)
585 585 if remainder > 0:
586 586 time.sleep(remainder)
587 587 continue
588 588 else:
589 589 # nothing was received within the time limit, signal heart failure
590 590 self._beating = False
591 591 since_last_heartbeat = time.time() - request_time
592 592 self.call_handlers(since_last_heartbeat)
593 593 # and close/reopen the socket, because the REQ/REP cycle has been broken
594 594 self._create_socket()
595 595 continue
596 596 try:
597 597 self.socket.close()
598 598 except:
599 599 pass
600 600
601 601 def pause(self):
602 602 """Pause the heartbeat."""
603 603 self._pause = True
604 604
605 605 def unpause(self):
606 606 """Unpause the heartbeat."""
607 607 self._pause = False
608 608
609 609 def is_beating(self):
610 610 """Is the heartbeat running and responsive (and not paused)."""
611 611 if self.is_alive() and not self._pause and self._beating:
612 612 return True
613 613 else:
614 614 return False
615 615
616 616 def stop(self):
617 617 self._running = False
618 618 super(HBSocketChannel, self).stop()
619 619
620 620 def call_handlers(self, since_last_heartbeat):
621 621 """This method is called in the ioloop thread when a message arrives.
622 622
623 623 Subclasses should override this method to handle incoming messages.
624 624 It is important to remember that this method is called in the thread
625 625 so that some logic must be done to ensure that the application level
626 626 handlers are called in the application thread.
627 627 """
628 628 raise NotImplementedError('call_handlers must be defined in a subclass.')
629 629
630 630
631 631 #-----------------------------------------------------------------------------
632 632 # Main kernel manager class
633 633 #-----------------------------------------------------------------------------
634 634
635 635 class KernelManager(HasTraits):
636 636 """ Manages a kernel for a frontend.
637 637
638 638 The SUB channel is for the frontend to receive messages published by the
639 639 kernel.
640 640
641 641 The REQ channel is for the frontend to make requests of the kernel.
642 642
643 643 The REP channel is for the kernel to request stdin (raw_input) from the
644 644 frontend.
645 645 """
646 646 # config object for passing to child configurables
647 647 config = Instance(Config)
648 648
649 649 # The PyZMQ Context to use for communication with the kernel.
650 650 context = Instance(zmq.Context)
651 651 def _context_default(self):
652 652 return zmq.Context.instance()
653 653
654 654 # The Session to use for communication with the kernel.
655 655 session = Instance(Session)
656 656
657 657 # The kernel process with which the KernelManager is communicating.
658 658 kernel = Instance(Popen)
659 659
660 660 # The addresses for the communication channels.
661 661 connection_file = Unicode('')
662 662 ip = Unicode(LOCALHOST)
663 663 def _ip_changed(self, name, old, new):
664 664 if new == '*':
665 665 self.ip = '0.0.0.0'
666 666 shell_port = Integer(0)
667 667 iopub_port = Integer(0)
668 668 stdin_port = Integer(0)
669 669 hb_port = Integer(0)
670 670
671 671 # The classes to use for the various channels.
672 672 shell_channel_class = Type(ShellSocketChannel)
673 673 sub_channel_class = Type(SubSocketChannel)
674 674 stdin_channel_class = Type(StdInSocketChannel)
675 675 hb_channel_class = Type(HBSocketChannel)
676 676
677 677 # Protected traits.
678 678 _launch_args = Any
679 679 _shell_channel = Any
680 680 _sub_channel = Any
681 681 _stdin_channel = Any
682 682 _hb_channel = Any
683 683 _connection_file_written=Bool(False)
684 684
685 685 def __init__(self, **kwargs):
686 686 super(KernelManager, self).__init__(**kwargs)
687 687 if self.session is None:
688 688 self.session = Session(config=self.config)
689 689
690 690 def __del__(self):
691 691 self.cleanup_connection_file()
692 692
693 693
694 694 #--------------------------------------------------------------------------
695 695 # Channel management methods:
696 696 #--------------------------------------------------------------------------
697 697
698 698 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
699 699 """Starts the channels for this kernel.
700 700
701 701 This will create the channels if they do not exist and then start
702 702 them. If port numbers of 0 are being used (random ports) then you
703 703 must first call :method:`start_kernel`. If the channels have been
704 704 stopped and you call this, :class:`RuntimeError` will be raised.
705 705 """
706 706 if shell:
707 707 self.shell_channel.start()
708 708 if sub:
709 709 self.sub_channel.start()
710 710 if stdin:
711 711 self.stdin_channel.start()
712 712 self.shell_channel.allow_stdin = True
713 713 else:
714 714 self.shell_channel.allow_stdin = False
715 715 if hb:
716 716 self.hb_channel.start()
717 717
718 718 def stop_channels(self):
719 719 """Stops all the running channels for this kernel.
720 720 """
721 721 if self.shell_channel.is_alive():
722 722 self.shell_channel.stop()
723 723 if self.sub_channel.is_alive():
724 724 self.sub_channel.stop()
725 725 if self.stdin_channel.is_alive():
726 726 self.stdin_channel.stop()
727 727 if self.hb_channel.is_alive():
728 728 self.hb_channel.stop()
729 729
730 730 @property
731 731 def channels_running(self):
732 732 """Are any of the channels created and running?"""
733 733 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
734 734 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
735 735
736 736 #--------------------------------------------------------------------------
737 737 # Kernel process management methods:
738 738 #--------------------------------------------------------------------------
739 739
740 740 def cleanup_connection_file(self):
741 741 """cleanup connection file *if we wrote it*
742 742
743 743 Will not raise if the connection file was already removed somehow.
744 744 """
745 745 if self._connection_file_written:
746 746 # cleanup connection files on full shutdown of kernel we started
747 747 self._connection_file_written = False
748 748 try:
749 749 os.remove(self.connection_file)
750 750 except OSError:
751 751 pass
752 752
753 753 def load_connection_file(self):
754 754 """load connection info from JSON dict in self.connection_file"""
755 755 with open(self.connection_file) as f:
756 756 cfg = json.loads(f.read())
757 757
758 758 self.ip = cfg['ip']
759 759 self.shell_port = cfg['shell_port']
760 760 self.stdin_port = cfg['stdin_port']
761 761 self.iopub_port = cfg['iopub_port']
762 762 self.hb_port = cfg['hb_port']
763 763 self.session.key = str_to_bytes(cfg['key'])
764 764
765 765 def write_connection_file(self):
766 766 """write connection info to JSON dict in self.connection_file"""
767 767 if self._connection_file_written:
768 768 return
769 769 self.connection_file,cfg = write_connection_file(self.connection_file,
770 770 ip=self.ip, key=self.session.key,
771 771 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
772 772 shell_port=self.shell_port, hb_port=self.hb_port)
773 773 # write_connection_file also sets default ports:
774 774 self.shell_port = cfg['shell_port']
775 775 self.stdin_port = cfg['stdin_port']
776 776 self.iopub_port = cfg['iopub_port']
777 777 self.hb_port = cfg['hb_port']
778 778
779 779 self._connection_file_written = True
780 780
781 781 def start_kernel(self, **kw):
782 782 """Starts a kernel process and configures the manager to use it.
783 783
784 784 If random ports (port=0) are being used, this method must be called
785 785 before the channels are created.
786 786
787 787 Parameters:
788 788 -----------
789 789 launcher : callable, optional (default None)
790 790 A custom function for launching the kernel process (generally a
791 791 wrapper around ``entry_point.base_launch_kernel``). In most cases,
792 792 it should not be necessary to use this parameter.
793 793
794 794 **kw : optional
795 795 See respective options for IPython and Python kernels.
796 796 """
797 797 if self.ip not in LOCAL_IPS:
798 798 raise RuntimeError("Can only launch a kernel on a local interface. "
799 799 "Make sure that the '*_address' attributes are "
800 800 "configured properly. "
801 801 "Currently valid addresses are: %s"%LOCAL_IPS
802 802 )
803 803
804 804 # write connection file / get default ports
805 805 self.write_connection_file()
806 806
807 807 self._launch_args = kw.copy()
808 808 launch_kernel = kw.pop('launcher', None)
809 809 if launch_kernel is None:
810 810 from ipkernel import launch_kernel
811 811 self.kernel = launch_kernel(fname=self.connection_file, **kw)
812 812
813 813 def shutdown_kernel(self, restart=False):
814 814 """ Attempts to the stop the kernel process cleanly. If the kernel
815 815 cannot be stopped, it is killed, if possible.
816 816 """
817 817 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
818 818 if sys.platform == 'win32':
819 819 self.kill_kernel()
820 820 return
821 821
822 822 # Pause the heart beat channel if it exists.
823 823 if self._hb_channel is not None:
824 824 self._hb_channel.pause()
825 825
826 826 # Don't send any additional kernel kill messages immediately, to give
827 827 # the kernel a chance to properly execute shutdown actions. Wait for at
828 828 # most 1s, checking every 0.1s.
829 829 self.shell_channel.shutdown(restart=restart)
830 830 for i in range(10):
831 831 if self.is_alive:
832 832 time.sleep(0.1)
833 833 else:
834 834 break
835 835 else:
836 836 # OK, we've waited long enough.
837 837 if self.has_kernel:
838 838 self.kill_kernel()
839 839
840 840 if not restart and self._connection_file_written:
841 841 # cleanup connection files on full shutdown of kernel we started
842 842 self._connection_file_written = False
843 843 try:
844 844 os.remove(self.connection_file)
845 845 except IOError:
846 846 pass
847 847
848 848 def restart_kernel(self, now=False, **kw):
849 849 """Restarts a kernel with the arguments that were used to launch it.
850 850
851 851 If the old kernel was launched with random ports, the same ports will be
852 852 used for the new kernel.
853 853
854 854 Parameters
855 855 ----------
856 856 now : bool, optional
857 857 If True, the kernel is forcefully restarted *immediately*, without
858 858 having a chance to do any cleanup action. Otherwise the kernel is
859 859 given 1s to clean up before a forceful restart is issued.
860 860
861 861 In all cases the kernel is restarted, the only difference is whether
862 862 it is given a chance to perform a clean shutdown or not.
863 863
864 864 **kw : optional
865 865 Any options specified here will replace those used to launch the
866 866 kernel.
867 867 """
868 868 if self._launch_args is None:
869 869 raise RuntimeError("Cannot restart the kernel. "
870 870 "No previous call to 'start_kernel'.")
871 871 else:
872 872 # Stop currently running kernel.
873 873 if self.has_kernel:
874 874 if now:
875 875 self.kill_kernel()
876 876 else:
877 877 self.shutdown_kernel(restart=True)
878 878
879 879 # Start new kernel.
880 880 self._launch_args.update(kw)
881 881 self.start_kernel(**self._launch_args)
882 882
883 883 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
884 884 # unless there is some delay here.
885 885 if sys.platform == 'win32':
886 886 time.sleep(0.2)
887 887
888 888 @property
889 889 def has_kernel(self):
890 890 """Returns whether a kernel process has been specified for the kernel
891 891 manager.
892 892 """
893 893 return self.kernel is not None
894 894
895 895 def kill_kernel(self):
896 896 """ Kill the running kernel. """
897 897 if self.has_kernel:
898 898 # Pause the heart beat channel if it exists.
899 899 if self._hb_channel is not None:
900 900 self._hb_channel.pause()
901 901
902 902 # Attempt to kill the kernel.
903 903 try:
904 904 self.kernel.kill()
905 905 except OSError as e:
906 906 # In Windows, we will get an Access Denied error if the process
907 907 # has already terminated. Ignore it.
908 908 if sys.platform == 'win32':
909 909 if e.winerror != 5:
910 910 raise
911 911 # On Unix, we may get an ESRCH error if the process has already
912 912 # terminated. Ignore it.
913 913 else:
914 914 from errno import ESRCH
915 915 if e.errno != ESRCH:
916 916 raise
917 917 self.kernel = None
918 918 else:
919 919 raise RuntimeError("Cannot kill kernel. No kernel is running!")
920 920
921 921 def interrupt_kernel(self):
922 922 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
923 923 well supported on all platforms.
924 924 """
925 925 if self.has_kernel:
926 926 if sys.platform == 'win32':
927 927 from parentpoller import ParentPollerWindows as Poller
928 928 Poller.send_interrupt(self.kernel.win32_interrupt_event)
929 929 else:
930 930 self.kernel.send_signal(signal.SIGINT)
931 931 else:
932 932 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
933 933
934 934 def signal_kernel(self, signum):
935 935 """ Sends a signal to the kernel. Note that since only SIGTERM is
936 936 supported on Windows, this function is only useful on Unix systems.
937 937 """
938 938 if self.has_kernel:
939 939 self.kernel.send_signal(signum)
940 940 else:
941 941 raise RuntimeError("Cannot signal kernel. No kernel is running!")
942 942
943 943 @property
944 944 def is_alive(self):
945 945 """Is the kernel process still running?"""
946 946 if self.has_kernel:
947 947 if self.kernel.poll() is None:
948 948 return True
949 949 else:
950 950 return False
951 951 elif self._hb_channel is not None:
952 952 # We didn't start the kernel with this KernelManager so we
953 953 # use the heartbeat.
954 954 return self._hb_channel.is_beating()
955 955 else:
956 956 # no heartbeat and not local, we can't tell if it's running,
957 957 # so naively return True
958 958 return True
959 959
960 960 #--------------------------------------------------------------------------
961 961 # Channels used for communication with the kernel:
962 962 #--------------------------------------------------------------------------
963 963
964 964 @property
965 965 def shell_channel(self):
966 966 """Get the REQ socket channel object to make requests of the kernel."""
967 967 if self._shell_channel is None:
968 968 self._shell_channel = self.shell_channel_class(self.context,
969 969 self.session,
970 970 (self.ip, self.shell_port))
971 971 return self._shell_channel
972 972
973 973 @property
974 974 def sub_channel(self):
975 975 """Get the SUB socket channel object."""
976 976 if self._sub_channel is None:
977 977 self._sub_channel = self.sub_channel_class(self.context,
978 978 self.session,
979 979 (self.ip, self.iopub_port))
980 980 return self._sub_channel
981 981
982 982 @property
983 983 def stdin_channel(self):
984 984 """Get the REP socket channel object to handle stdin (raw_input)."""
985 985 if self._stdin_channel is None:
986 986 self._stdin_channel = self.stdin_channel_class(self.context,
987 987 self.session,
988 988 (self.ip, self.stdin_port))
989 989 return self._stdin_channel
990 990
991 991 @property
992 992 def hb_channel(self):
993 993 """Get the heartbeat socket channel object to check that the
994 994 kernel is alive."""
995 995 if self._hb_channel is None:
996 996 self._hb_channel = self.hb_channel_class(self.context,
997 997 self.session,
998 998 (self.ip, self.hb_port))
999 999 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now