##// END OF EJS Templates
Push store_history parameter up to kernelmanager.
Jason Grout -
Show More
@@ -1,994 +1,999 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 106 address : tuple
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 114 if address[1] == 0:
115 115 message = 'The port number for a channel cannot be 0.'
116 116 raise InvalidPortNumber(message)
117 117 self._address = address
118 118 atexit.register(self._notice_exit)
119 119
120 120 def _notice_exit(self):
121 121 self._exiting = True
122 122
123 123 def _run_loop(self):
124 124 """Run my loop, ignoring EINTR events in the poller"""
125 125 while True:
126 126 try:
127 127 self.ioloop.start()
128 128 except ZMQError as e:
129 129 if e.errno == errno.EINTR:
130 130 continue
131 131 else:
132 132 raise
133 133 except Exception:
134 134 if self._exiting:
135 135 break
136 136 else:
137 137 raise
138 138 else:
139 139 break
140 140
141 141 def stop(self):
142 142 """Stop the channel's activity.
143 143
144 144 This calls :method:`Thread.join` and returns when the thread
145 145 terminates. :class:`RuntimeError` will be raised if
146 146 :method:`self.start` is called again.
147 147 """
148 148 self.join()
149 149
150 150 @property
151 151 def address(self):
152 152 """Get the channel's address as an (ip, port) tuple.
153 153
154 154 By the default, the address is (localhost, 0), where 0 means a random
155 155 port.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """callback for stream.on_recv
175 175
176 176 unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 self.call_handlers(self.session.unserialize(smsg))
180 180
181 181
182 182
183 183 class ShellSocketChannel(ZMQSocketChannel):
184 184 """The DEALER channel for issues request/replies to the kernel.
185 185 """
186 186
187 187 command_queue = None
188 188 # flag for whether execute requests should be allowed to call raw_input:
189 189 allow_stdin = True
190 190
191 191 def __init__(self, context, session, address):
192 192 super(ShellSocketChannel, self).__init__(context, session, address)
193 193 self.ioloop = ioloop.IOLoop()
194 194
195 195 def run(self):
196 196 """The thread's main activity. Call start() instead."""
197 197 self.socket = self.context.socket(zmq.DEALER)
198 198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 199 self.socket.connect('tcp://%s:%i' % self.address)
200 200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 201 self.stream.on_recv(self._handle_recv)
202 202 self._run_loop()
203 203 try:
204 204 self.socket.close()
205 205 except:
206 206 pass
207 207
208 208 def stop(self):
209 209 self.ioloop.stop()
210 210 super(ShellSocketChannel, self).stop()
211 211
212 212 def call_handlers(self, msg):
213 213 """This method is called in the ioloop thread when a message arrives.
214 214
215 215 Subclasses should override this method to handle incoming messages.
216 216 It is important to remember that this method is called in the thread
217 217 so that some logic must be done to ensure that the application leve
218 218 handlers are called in the application thread.
219 219 """
220 220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 221
222 def execute(self, code, silent=False,
222 def execute(self, code, silent=False, store_history=True,
223 223 user_variables=None, user_expressions=None, allow_stdin=None):
224 224 """Execute code in the kernel.
225 225
226 226 Parameters
227 227 ----------
228 228 code : str
229 229 A string of Python code.
230 230
231 231 silent : bool, optional (default False)
232 If set, the kernel will execute the code as quietly possible.
232 If set, the kernel will execute the code as quietly possible, and
233 will force store_history to be False.
234
235 store_history : bool, optional (default True)
236 If set, the kernel will execute store command history. This is forced
237 to be False if silent is True.
233 238
234 239 user_variables : list, optional
235 240 A list of variable names to pull from the user's namespace. They
236 241 will come back as a dict with these names as keys and their
237 242 :func:`repr` as values.
238 243
239 244 user_expressions : dict, optional
240 245 A dict with string keys and to pull from the user's
241 246 namespace. They will come back as a dict with these names as keys
242 247 and their :func:`repr` as values.
243 248
244 249 allow_stdin : bool, optional
245 250 Flag for
246 251 A dict with string keys and to pull from the user's
247 252 namespace. They will come back as a dict with these names as keys
248 253 and their :func:`repr` as values.
249 254
250 255 Returns
251 256 -------
252 257 The msg_id of the message sent.
253 258 """
254 259 if user_variables is None:
255 260 user_variables = []
256 261 if user_expressions is None:
257 262 user_expressions = {}
258 263 if allow_stdin is None:
259 264 allow_stdin = self.allow_stdin
260 265
261 266
262 267 # Don't waste network traffic if inputs are invalid
263 268 if not isinstance(code, basestring):
264 269 raise ValueError('code %r must be a string' % code)
265 270 validate_string_list(user_variables)
266 271 validate_string_dict(user_expressions)
267 272
268 273 # Create class for content/msg creation. Related to, but possibly
269 274 # not in Session.
270 content = dict(code=code, silent=silent,
275 content = dict(code=code, silent=silent, store_history=store_history,
271 276 user_variables=user_variables,
272 277 user_expressions=user_expressions,
273 278 allow_stdin=allow_stdin,
274 279 )
275 280 msg = self.session.msg('execute_request', content)
276 281 self._queue_send(msg)
277 282 return msg['header']['msg_id']
278 283
279 284 def complete(self, text, line, cursor_pos, block=None):
280 285 """Tab complete text in the kernel's namespace.
281 286
282 287 Parameters
283 288 ----------
284 289 text : str
285 290 The text to complete.
286 291 line : str
287 292 The full line of text that is the surrounding context for the
288 293 text to complete.
289 294 cursor_pos : int
290 295 The position of the cursor in the line where the completion was
291 296 requested.
292 297 block : str, optional
293 298 The full block of code in which the completion is being requested.
294 299
295 300 Returns
296 301 -------
297 302 The msg_id of the message sent.
298 303 """
299 304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
300 305 msg = self.session.msg('complete_request', content)
301 306 self._queue_send(msg)
302 307 return msg['header']['msg_id']
303 308
304 309 def object_info(self, oname, detail_level=0):
305 310 """Get metadata information about an object.
306 311
307 312 Parameters
308 313 ----------
309 314 oname : str
310 315 A string specifying the object name.
311 316 detail_level : int, optional
312 317 The level of detail for the introspection (0-2)
313 318
314 319 Returns
315 320 -------
316 321 The msg_id of the message sent.
317 322 """
318 323 content = dict(oname=oname, detail_level=detail_level)
319 324 msg = self.session.msg('object_info_request', content)
320 325 self._queue_send(msg)
321 326 return msg['header']['msg_id']
322 327
323 328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
324 329 """Get entries from the history list.
325 330
326 331 Parameters
327 332 ----------
328 333 raw : bool
329 334 If True, return the raw input.
330 335 output : bool
331 336 If True, then return the output as well.
332 337 hist_access_type : str
333 338 'range' (fill in session, start and stop params), 'tail' (fill in n)
334 339 or 'search' (fill in pattern param).
335 340
336 341 session : int
337 342 For a range request, the session from which to get lines. Session
338 343 numbers are positive integers; negative ones count back from the
339 344 current session.
340 345 start : int
341 346 The first line number of a history range.
342 347 stop : int
343 348 The final (excluded) line number of a history range.
344 349
345 350 n : int
346 351 The number of lines of history to get for a tail request.
347 352
348 353 pattern : str
349 354 The glob-syntax pattern for a search request.
350 355
351 356 Returns
352 357 -------
353 358 The msg_id of the message sent.
354 359 """
355 360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
356 361 **kwargs)
357 362 msg = self.session.msg('history_request', content)
358 363 self._queue_send(msg)
359 364 return msg['header']['msg_id']
360 365
361 366 def shutdown(self, restart=False):
362 367 """Request an immediate kernel shutdown.
363 368
364 369 Upon receipt of the (empty) reply, client code can safely assume that
365 370 the kernel has shut down and it's safe to forcefully terminate it if
366 371 it's still alive.
367 372
368 373 The kernel will send the reply via a function registered with Python's
369 374 atexit module, ensuring it's truly done as the kernel is done with all
370 375 normal operation.
371 376 """
372 377 # Send quit message to kernel. Once we implement kernel-side setattr,
373 378 # this should probably be done that way, but for now this will do.
374 379 msg = self.session.msg('shutdown_request', {'restart':restart})
375 380 self._queue_send(msg)
376 381 return msg['header']['msg_id']
377 382
378 383
379 384
380 385 class SubSocketChannel(ZMQSocketChannel):
381 386 """The SUB channel which listens for messages that the kernel publishes.
382 387 """
383 388
384 389 def __init__(self, context, session, address):
385 390 super(SubSocketChannel, self).__init__(context, session, address)
386 391 self.ioloop = ioloop.IOLoop()
387 392
388 393 def run(self):
389 394 """The thread's main activity. Call start() instead."""
390 395 self.socket = self.context.socket(zmq.SUB)
391 396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
392 397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
393 398 self.socket.connect('tcp://%s:%i' % self.address)
394 399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
395 400 self.stream.on_recv(self._handle_recv)
396 401 self._run_loop()
397 402 try:
398 403 self.socket.close()
399 404 except:
400 405 pass
401 406
402 407 def stop(self):
403 408 self.ioloop.stop()
404 409 super(SubSocketChannel, self).stop()
405 410
406 411 def call_handlers(self, msg):
407 412 """This method is called in the ioloop thread when a message arrives.
408 413
409 414 Subclasses should override this method to handle incoming messages.
410 415 It is important to remember that this method is called in the thread
411 416 so that some logic must be done to ensure that the application leve
412 417 handlers are called in the application thread.
413 418 """
414 419 raise NotImplementedError('call_handlers must be defined in a subclass.')
415 420
416 421 def flush(self, timeout=1.0):
417 422 """Immediately processes all pending messages on the SUB channel.
418 423
419 424 Callers should use this method to ensure that :method:`call_handlers`
420 425 has been called for all messages that have been received on the
421 426 0MQ SUB socket of this channel.
422 427
423 428 This method is thread safe.
424 429
425 430 Parameters
426 431 ----------
427 432 timeout : float, optional
428 433 The maximum amount of time to spend flushing, in seconds. The
429 434 default is one second.
430 435 """
431 436 # We do the IOLoop callback process twice to ensure that the IOLoop
432 437 # gets to perform at least one full poll.
433 438 stop_time = time.time() + timeout
434 439 for i in xrange(2):
435 440 self._flushed = False
436 441 self.ioloop.add_callback(self._flush)
437 442 while not self._flushed and time.time() < stop_time:
438 443 time.sleep(0.01)
439 444
440 445 def _flush(self):
441 446 """Callback for :method:`self.flush`."""
442 447 self.stream.flush()
443 448 self._flushed = True
444 449
445 450
446 451 class StdInSocketChannel(ZMQSocketChannel):
447 452 """A reply channel to handle raw_input requests that the kernel makes."""
448 453
449 454 msg_queue = None
450 455
451 456 def __init__(self, context, session, address):
452 457 super(StdInSocketChannel, self).__init__(context, session, address)
453 458 self.ioloop = ioloop.IOLoop()
454 459
455 460 def run(self):
456 461 """The thread's main activity. Call start() instead."""
457 462 self.socket = self.context.socket(zmq.DEALER)
458 463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
459 464 self.socket.connect('tcp://%s:%i' % self.address)
460 465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
461 466 self.stream.on_recv(self._handle_recv)
462 467 self._run_loop()
463 468 try:
464 469 self.socket.close()
465 470 except:
466 471 pass
467 472
468 473
469 474 def stop(self):
470 475 self.ioloop.stop()
471 476 super(StdInSocketChannel, self).stop()
472 477
473 478 def call_handlers(self, msg):
474 479 """This method is called in the ioloop thread when a message arrives.
475 480
476 481 Subclasses should override this method to handle incoming messages.
477 482 It is important to remember that this method is called in the thread
478 483 so that some logic must be done to ensure that the application leve
479 484 handlers are called in the application thread.
480 485 """
481 486 raise NotImplementedError('call_handlers must be defined in a subclass.')
482 487
483 488 def input(self, string):
484 489 """Send a string of raw input to the kernel."""
485 490 content = dict(value=string)
486 491 msg = self.session.msg('input_reply', content)
487 492 self._queue_send(msg)
488 493
489 494
490 495 class HBSocketChannel(ZMQSocketChannel):
491 496 """The heartbeat channel which monitors the kernel heartbeat.
492 497
493 498 Note that the heartbeat channel is paused by default. As long as you start
494 499 this channel, the kernel manager will ensure that it is paused and un-paused
495 500 as appropriate.
496 501 """
497 502
498 503 time_to_dead = 3.0
499 504 socket = None
500 505 poller = None
501 506 _running = None
502 507 _pause = None
503 508 _beating = None
504 509
505 510 def __init__(self, context, session, address):
506 511 super(HBSocketChannel, self).__init__(context, session, address)
507 512 self._running = False
508 513 self._pause =True
509 514 self.poller = zmq.Poller()
510 515
511 516 def _create_socket(self):
512 517 if self.socket is not None:
513 518 # close previous socket, before opening a new one
514 519 self.poller.unregister(self.socket)
515 520 self.socket.close()
516 521 self.socket = self.context.socket(zmq.REQ)
517 522 self.socket.setsockopt(zmq.LINGER, 0)
518 523 self.socket.connect('tcp://%s:%i' % self.address)
519 524
520 525 self.poller.register(self.socket, zmq.POLLIN)
521 526
522 527 def _poll(self, start_time):
523 528 """poll for heartbeat replies until we reach self.time_to_dead
524 529
525 530 Ignores interrupts, and returns the result of poll(), which
526 531 will be an empty list if no messages arrived before the timeout,
527 532 or the event tuple if there is a message to receive.
528 533 """
529 534
530 535 until_dead = self.time_to_dead - (time.time() - start_time)
531 536 # ensure poll at least once
532 537 until_dead = max(until_dead, 1e-3)
533 538 events = []
534 539 while True:
535 540 try:
536 541 events = self.poller.poll(1000 * until_dead)
537 542 except ZMQError as e:
538 543 if e.errno == errno.EINTR:
539 544 # ignore interrupts during heartbeat
540 545 # this may never actually happen
541 546 until_dead = self.time_to_dead - (time.time() - start_time)
542 547 until_dead = max(until_dead, 1e-3)
543 548 pass
544 549 else:
545 550 raise
546 551 except Exception:
547 552 if self._exiting:
548 553 break
549 554 else:
550 555 raise
551 556 else:
552 557 break
553 558 return events
554 559
555 560 def run(self):
556 561 """The thread's main activity. Call start() instead."""
557 562 self._create_socket()
558 563 self._running = True
559 564 self._beating = True
560 565
561 566 while self._running:
562 567 if self._pause:
563 568 # just sleep, and skip the rest of the loop
564 569 time.sleep(self.time_to_dead)
565 570 continue
566 571
567 572 since_last_heartbeat = 0.0
568 573 # io.rprint('Ping from HB channel') # dbg
569 574 # no need to catch EFSM here, because the previous event was
570 575 # either a recv or connect, which cannot be followed by EFSM
571 576 self.socket.send(b'ping')
572 577 request_time = time.time()
573 578 ready = self._poll(request_time)
574 579 if ready:
575 580 self._beating = True
576 581 # the poll above guarantees we have something to recv
577 582 self.socket.recv()
578 583 # sleep the remainder of the cycle
579 584 remainder = self.time_to_dead - (time.time() - request_time)
580 585 if remainder > 0:
581 586 time.sleep(remainder)
582 587 continue
583 588 else:
584 589 # nothing was received within the time limit, signal heart failure
585 590 self._beating = False
586 591 since_last_heartbeat = time.time() - request_time
587 592 self.call_handlers(since_last_heartbeat)
588 593 # and close/reopen the socket, because the REQ/REP cycle has been broken
589 594 self._create_socket()
590 595 continue
591 596 try:
592 597 self.socket.close()
593 598 except:
594 599 pass
595 600
596 601 def pause(self):
597 602 """Pause the heartbeat."""
598 603 self._pause = True
599 604
600 605 def unpause(self):
601 606 """Unpause the heartbeat."""
602 607 self._pause = False
603 608
604 609 def is_beating(self):
605 610 """Is the heartbeat running and responsive (and not paused)."""
606 611 if self.is_alive() and not self._pause and self._beating:
607 612 return True
608 613 else:
609 614 return False
610 615
611 616 def stop(self):
612 617 self._running = False
613 618 super(HBSocketChannel, self).stop()
614 619
615 620 def call_handlers(self, since_last_heartbeat):
616 621 """This method is called in the ioloop thread when a message arrives.
617 622
618 623 Subclasses should override this method to handle incoming messages.
619 624 It is important to remember that this method is called in the thread
620 625 so that some logic must be done to ensure that the application level
621 626 handlers are called in the application thread.
622 627 """
623 628 raise NotImplementedError('call_handlers must be defined in a subclass.')
624 629
625 630
626 631 #-----------------------------------------------------------------------------
627 632 # Main kernel manager class
628 633 #-----------------------------------------------------------------------------
629 634
630 635 class KernelManager(HasTraits):
631 636 """ Manages a kernel for a frontend.
632 637
633 638 The SUB channel is for the frontend to receive messages published by the
634 639 kernel.
635 640
636 641 The REQ channel is for the frontend to make requests of the kernel.
637 642
638 643 The REP channel is for the kernel to request stdin (raw_input) from the
639 644 frontend.
640 645 """
641 646 # config object for passing to child configurables
642 647 config = Instance(Config)
643 648
644 649 # The PyZMQ Context to use for communication with the kernel.
645 650 context = Instance(zmq.Context)
646 651 def _context_default(self):
647 652 return zmq.Context.instance()
648 653
649 654 # The Session to use for communication with the kernel.
650 655 session = Instance(Session)
651 656
652 657 # The kernel process with which the KernelManager is communicating.
653 658 kernel = Instance(Popen)
654 659
655 660 # The addresses for the communication channels.
656 661 connection_file = Unicode('')
657 662 ip = Unicode(LOCALHOST)
658 663 def _ip_changed(self, name, old, new):
659 664 if new == '*':
660 665 self.ip = '0.0.0.0'
661 666 shell_port = Integer(0)
662 667 iopub_port = Integer(0)
663 668 stdin_port = Integer(0)
664 669 hb_port = Integer(0)
665 670
666 671 # The classes to use for the various channels.
667 672 shell_channel_class = Type(ShellSocketChannel)
668 673 sub_channel_class = Type(SubSocketChannel)
669 674 stdin_channel_class = Type(StdInSocketChannel)
670 675 hb_channel_class = Type(HBSocketChannel)
671 676
672 677 # Protected traits.
673 678 _launch_args = Any
674 679 _shell_channel = Any
675 680 _sub_channel = Any
676 681 _stdin_channel = Any
677 682 _hb_channel = Any
678 683 _connection_file_written=Bool(False)
679 684
680 685 def __init__(self, **kwargs):
681 686 super(KernelManager, self).__init__(**kwargs)
682 687 if self.session is None:
683 688 self.session = Session(config=self.config)
684 689
685 690 def __del__(self):
686 691 self.cleanup_connection_file()
687 692
688 693
689 694 #--------------------------------------------------------------------------
690 695 # Channel management methods:
691 696 #--------------------------------------------------------------------------
692 697
693 698 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
694 699 """Starts the channels for this kernel.
695 700
696 701 This will create the channels if they do not exist and then start
697 702 them. If port numbers of 0 are being used (random ports) then you
698 703 must first call :method:`start_kernel`. If the channels have been
699 704 stopped and you call this, :class:`RuntimeError` will be raised.
700 705 """
701 706 if shell:
702 707 self.shell_channel.start()
703 708 if sub:
704 709 self.sub_channel.start()
705 710 if stdin:
706 711 self.stdin_channel.start()
707 712 self.shell_channel.allow_stdin = True
708 713 else:
709 714 self.shell_channel.allow_stdin = False
710 715 if hb:
711 716 self.hb_channel.start()
712 717
713 718 def stop_channels(self):
714 719 """Stops all the running channels for this kernel.
715 720 """
716 721 if self.shell_channel.is_alive():
717 722 self.shell_channel.stop()
718 723 if self.sub_channel.is_alive():
719 724 self.sub_channel.stop()
720 725 if self.stdin_channel.is_alive():
721 726 self.stdin_channel.stop()
722 727 if self.hb_channel.is_alive():
723 728 self.hb_channel.stop()
724 729
725 730 @property
726 731 def channels_running(self):
727 732 """Are any of the channels created and running?"""
728 733 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
729 734 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
730 735
731 736 #--------------------------------------------------------------------------
732 737 # Kernel process management methods:
733 738 #--------------------------------------------------------------------------
734 739
735 740 def cleanup_connection_file(self):
736 741 """cleanup connection file *if we wrote it*
737 742
738 743 Will not raise if the connection file was already removed somehow.
739 744 """
740 745 if self._connection_file_written:
741 746 # cleanup connection files on full shutdown of kernel we started
742 747 self._connection_file_written = False
743 748 try:
744 749 os.remove(self.connection_file)
745 750 except OSError:
746 751 pass
747 752
748 753 def load_connection_file(self):
749 754 """load connection info from JSON dict in self.connection_file"""
750 755 with open(self.connection_file) as f:
751 756 cfg = json.loads(f.read())
752 757
753 758 self.ip = cfg['ip']
754 759 self.shell_port = cfg['shell_port']
755 760 self.stdin_port = cfg['stdin_port']
756 761 self.iopub_port = cfg['iopub_port']
757 762 self.hb_port = cfg['hb_port']
758 763 self.session.key = str_to_bytes(cfg['key'])
759 764
760 765 def write_connection_file(self):
761 766 """write connection info to JSON dict in self.connection_file"""
762 767 if self._connection_file_written:
763 768 return
764 769 self.connection_file,cfg = write_connection_file(self.connection_file,
765 770 ip=self.ip, key=self.session.key,
766 771 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
767 772 shell_port=self.shell_port, hb_port=self.hb_port)
768 773 # write_connection_file also sets default ports:
769 774 self.shell_port = cfg['shell_port']
770 775 self.stdin_port = cfg['stdin_port']
771 776 self.iopub_port = cfg['iopub_port']
772 777 self.hb_port = cfg['hb_port']
773 778
774 779 self._connection_file_written = True
775 780
776 781 def start_kernel(self, **kw):
777 782 """Starts a kernel process and configures the manager to use it.
778 783
779 784 If random ports (port=0) are being used, this method must be called
780 785 before the channels are created.
781 786
782 787 Parameters:
783 788 -----------
784 789 launcher : callable, optional (default None)
785 790 A custom function for launching the kernel process (generally a
786 791 wrapper around ``entry_point.base_launch_kernel``). In most cases,
787 792 it should not be necessary to use this parameter.
788 793
789 794 **kw : optional
790 795 See respective options for IPython and Python kernels.
791 796 """
792 797 if self.ip not in LOCAL_IPS:
793 798 raise RuntimeError("Can only launch a kernel on a local interface. "
794 799 "Make sure that the '*_address' attributes are "
795 800 "configured properly. "
796 801 "Currently valid addresses are: %s"%LOCAL_IPS
797 802 )
798 803
799 804 # write connection file / get default ports
800 805 self.write_connection_file()
801 806
802 807 self._launch_args = kw.copy()
803 808 launch_kernel = kw.pop('launcher', None)
804 809 if launch_kernel is None:
805 810 from ipkernel import launch_kernel
806 811 self.kernel = launch_kernel(fname=self.connection_file, **kw)
807 812
808 813 def shutdown_kernel(self, restart=False):
809 814 """ Attempts to the stop the kernel process cleanly. If the kernel
810 815 cannot be stopped, it is killed, if possible.
811 816 """
812 817 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
813 818 if sys.platform == 'win32':
814 819 self.kill_kernel()
815 820 return
816 821
817 822 # Pause the heart beat channel if it exists.
818 823 if self._hb_channel is not None:
819 824 self._hb_channel.pause()
820 825
821 826 # Don't send any additional kernel kill messages immediately, to give
822 827 # the kernel a chance to properly execute shutdown actions. Wait for at
823 828 # most 1s, checking every 0.1s.
824 829 self.shell_channel.shutdown(restart=restart)
825 830 for i in range(10):
826 831 if self.is_alive:
827 832 time.sleep(0.1)
828 833 else:
829 834 break
830 835 else:
831 836 # OK, we've waited long enough.
832 837 if self.has_kernel:
833 838 self.kill_kernel()
834 839
835 840 if not restart and self._connection_file_written:
836 841 # cleanup connection files on full shutdown of kernel we started
837 842 self._connection_file_written = False
838 843 try:
839 844 os.remove(self.connection_file)
840 845 except IOError:
841 846 pass
842 847
843 848 def restart_kernel(self, now=False, **kw):
844 849 """Restarts a kernel with the arguments that were used to launch it.
845 850
846 851 If the old kernel was launched with random ports, the same ports will be
847 852 used for the new kernel.
848 853
849 854 Parameters
850 855 ----------
851 856 now : bool, optional
852 857 If True, the kernel is forcefully restarted *immediately*, without
853 858 having a chance to do any cleanup action. Otherwise the kernel is
854 859 given 1s to clean up before a forceful restart is issued.
855 860
856 861 In all cases the kernel is restarted, the only difference is whether
857 862 it is given a chance to perform a clean shutdown or not.
858 863
859 864 **kw : optional
860 865 Any options specified here will replace those used to launch the
861 866 kernel.
862 867 """
863 868 if self._launch_args is None:
864 869 raise RuntimeError("Cannot restart the kernel. "
865 870 "No previous call to 'start_kernel'.")
866 871 else:
867 872 # Stop currently running kernel.
868 873 if self.has_kernel:
869 874 if now:
870 875 self.kill_kernel()
871 876 else:
872 877 self.shutdown_kernel(restart=True)
873 878
874 879 # Start new kernel.
875 880 self._launch_args.update(kw)
876 881 self.start_kernel(**self._launch_args)
877 882
878 883 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
879 884 # unless there is some delay here.
880 885 if sys.platform == 'win32':
881 886 time.sleep(0.2)
882 887
883 888 @property
884 889 def has_kernel(self):
885 890 """Returns whether a kernel process has been specified for the kernel
886 891 manager.
887 892 """
888 893 return self.kernel is not None
889 894
890 895 def kill_kernel(self):
891 896 """ Kill the running kernel. """
892 897 if self.has_kernel:
893 898 # Pause the heart beat channel if it exists.
894 899 if self._hb_channel is not None:
895 900 self._hb_channel.pause()
896 901
897 902 # Attempt to kill the kernel.
898 903 try:
899 904 self.kernel.kill()
900 905 except OSError as e:
901 906 # In Windows, we will get an Access Denied error if the process
902 907 # has already terminated. Ignore it.
903 908 if sys.platform == 'win32':
904 909 if e.winerror != 5:
905 910 raise
906 911 # On Unix, we may get an ESRCH error if the process has already
907 912 # terminated. Ignore it.
908 913 else:
909 914 from errno import ESRCH
910 915 if e.errno != ESRCH:
911 916 raise
912 917 self.kernel = None
913 918 else:
914 919 raise RuntimeError("Cannot kill kernel. No kernel is running!")
915 920
916 921 def interrupt_kernel(self):
917 922 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
918 923 well supported on all platforms.
919 924 """
920 925 if self.has_kernel:
921 926 if sys.platform == 'win32':
922 927 from parentpoller import ParentPollerWindows as Poller
923 928 Poller.send_interrupt(self.kernel.win32_interrupt_event)
924 929 else:
925 930 self.kernel.send_signal(signal.SIGINT)
926 931 else:
927 932 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
928 933
929 934 def signal_kernel(self, signum):
930 935 """ Sends a signal to the kernel. Note that since only SIGTERM is
931 936 supported on Windows, this function is only useful on Unix systems.
932 937 """
933 938 if self.has_kernel:
934 939 self.kernel.send_signal(signum)
935 940 else:
936 941 raise RuntimeError("Cannot signal kernel. No kernel is running!")
937 942
938 943 @property
939 944 def is_alive(self):
940 945 """Is the kernel process still running?"""
941 946 if self.has_kernel:
942 947 if self.kernel.poll() is None:
943 948 return True
944 949 else:
945 950 return False
946 951 elif self._hb_channel is not None:
947 952 # We didn't start the kernel with this KernelManager so we
948 953 # use the heartbeat.
949 954 return self._hb_channel.is_beating()
950 955 else:
951 956 # no heartbeat and not local, we can't tell if it's running,
952 957 # so naively return True
953 958 return True
954 959
955 960 #--------------------------------------------------------------------------
956 961 # Channels used for communication with the kernel:
957 962 #--------------------------------------------------------------------------
958 963
959 964 @property
960 965 def shell_channel(self):
961 966 """Get the REQ socket channel object to make requests of the kernel."""
962 967 if self._shell_channel is None:
963 968 self._shell_channel = self.shell_channel_class(self.context,
964 969 self.session,
965 970 (self.ip, self.shell_port))
966 971 return self._shell_channel
967 972
968 973 @property
969 974 def sub_channel(self):
970 975 """Get the SUB socket channel object."""
971 976 if self._sub_channel is None:
972 977 self._sub_channel = self.sub_channel_class(self.context,
973 978 self.session,
974 979 (self.ip, self.iopub_port))
975 980 return self._sub_channel
976 981
977 982 @property
978 983 def stdin_channel(self):
979 984 """Get the REP socket channel object to handle stdin (raw_input)."""
980 985 if self._stdin_channel is None:
981 986 self._stdin_channel = self.stdin_channel_class(self.context,
982 987 self.session,
983 988 (self.ip, self.stdin_port))
984 989 return self._stdin_channel
985 990
986 991 @property
987 992 def hb_channel(self):
988 993 """Get the heartbeat socket channel object to check that the
989 994 kernel is alive."""
990 995 if self._hb_channel is None:
991 996 self._hb_channel = self.hb_channel_class(self.context,
992 997 self.session,
993 998 (self.ip, self.hb_port))
994 999 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now