##// END OF EJS Templates
remove debug print statement
MinRK -
Show More
@@ -1,1131 +1,1130 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import absolute_import
19 19
20 20 # Standard library imports
21 21 import atexit
22 22 import errno
23 23 import json
24 24 from subprocess import Popen
25 25 import os
26 26 import signal
27 27 import sys
28 28 from threading import Thread
29 29 import time
30 30
31 31 # System library imports
32 32 import zmq
33 33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 34 # during garbage collection of threads at exit:
35 35 from zmq import ZMQError
36 36 from zmq.eventloop import ioloop, zmqstream
37 37
38 38 # Local imports
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 41 from IPython.utils.traitlets import (
42 42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
43 43 )
44 44 from IPython.utils.py3compat import str_to_bytes
45 45 from IPython.kernel import (
46 46 write_connection_file,
47 47 make_ipkernel_cmd,
48 48 launch_kernel,
49 49 )
50 50 from .zmq.session import Session
51 51 from .kernelmanagerabc import (
52 52 ShellChannelABC, IOPubChannelABC,
53 53 HBChannelABC, StdInChannelABC,
54 54 KernelManagerABC
55 55 )
56 56
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # Constants and exceptions
60 60 #-----------------------------------------------------------------------------
61 61
62 62 class InvalidPortNumber(Exception):
63 63 pass
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Utility functions
67 67 #-----------------------------------------------------------------------------
68 68
69 69 # some utilities to validate message structure, these might get moved elsewhere
70 70 # if they prove to have more generic utility
71 71
72 72 def validate_string_list(lst):
73 73 """Validate that the input is a list of strings.
74 74
75 75 Raises ValueError if not."""
76 76 if not isinstance(lst, list):
77 77 raise ValueError('input %r must be a list' % lst)
78 78 for x in lst:
79 79 if not isinstance(x, basestring):
80 80 raise ValueError('element %r in list must be a string' % x)
81 81
82 82
83 83 def validate_string_dict(dct):
84 84 """Validate that the input is a dict with string keys and values.
85 85
86 86 Raises ValueError if not."""
87 87 for k,v in dct.iteritems():
88 88 if not isinstance(k, basestring):
89 89 raise ValueError('key %r in dict must be a string' % k)
90 90 if not isinstance(v, basestring):
91 91 raise ValueError('value %r in dict must be a string' % v)
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # ZMQ Socket Channel classes
96 96 #-----------------------------------------------------------------------------
97 97
98 98 class ZMQSocketChannel(Thread):
99 99 """The base class for the channels that use ZMQ sockets."""
100 100 context = None
101 101 session = None
102 102 socket = None
103 103 ioloop = None
104 104 stream = None
105 105 _address = None
106 106 _exiting = False
107 107
108 108 def __init__(self, context, session, address):
109 109 """Create a channel.
110 110
111 111 Parameters
112 112 ----------
113 113 context : :class:`zmq.Context`
114 114 The ZMQ context to use.
115 115 session : :class:`session.Session`
116 116 The session to use.
117 117 address : zmq url
118 118 Standard (ip, port) tuple that the kernel is listening on.
119 119 """
120 120 super(ZMQSocketChannel, self).__init__()
121 121 self.daemon = True
122 122
123 123 self.context = context
124 124 self.session = session
125 125 if isinstance(address, tuple):
126 126 if address[1] == 0:
127 127 message = 'The port number for a channel cannot be 0.'
128 128 raise InvalidPortNumber(message)
129 129 address = "tcp://%s:%i" % address
130 130 self._address = address
131 131 atexit.register(self._notice_exit)
132 132
133 133 def _notice_exit(self):
134 134 self._exiting = True
135 135
136 136 def _run_loop(self):
137 137 """Run my loop, ignoring EINTR events in the poller"""
138 138 while True:
139 139 try:
140 140 self.ioloop.start()
141 141 except ZMQError as e:
142 142 if e.errno == errno.EINTR:
143 143 continue
144 144 else:
145 145 raise
146 146 except Exception:
147 147 if self._exiting:
148 148 break
149 149 else:
150 150 raise
151 151 else:
152 152 break
153 153
154 154 def stop(self):
155 155 """Stop the channel's event loop and join its thread.
156 156
157 157 This calls :method:`Thread.join` and returns when the thread
158 158 terminates. :class:`RuntimeError` will be raised if
159 159 :method:`self.start` is called again.
160 160 """
161 161 self.join()
162 162
163 163 @property
164 164 def address(self):
165 165 """Get the channel's address as a zmq url string.
166 166
167 167 These URLS have the form: 'tcp://127.0.0.1:5555'.
168 168 """
169 169 return self._address
170 170
171 171 def _queue_send(self, msg):
172 172 """Queue a message to be sent from the IOLoop's thread.
173 173
174 174 Parameters
175 175 ----------
176 176 msg : message to send
177 177
178 178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
179 179 thread control of the action.
180 180 """
181 181 def thread_send():
182 182 self.session.send(self.stream, msg)
183 183 self.ioloop.add_callback(thread_send)
184 184
185 185 def _handle_recv(self, msg):
186 186 """Callback for stream.on_recv.
187 187
188 188 Unpacks message, and calls handlers with it.
189 189 """
190 190 ident,smsg = self.session.feed_identities(msg)
191 191 self.call_handlers(self.session.unserialize(smsg))
192 192
193 193
194 194
195 195 class ShellChannel(ZMQSocketChannel):
196 196 """The shell channel for issuing request/replies to the kernel."""
197 197
198 198 command_queue = None
199 199 # flag for whether execute requests should be allowed to call raw_input:
200 200 allow_stdin = True
201 201
202 202 def __init__(self, context, session, address):
203 203 super(ShellChannel, self).__init__(context, session, address)
204 204 self.ioloop = ioloop.IOLoop()
205 205
206 206 def run(self):
207 207 """The thread's main activity. Call start() instead."""
208 208 self.socket = self.context.socket(zmq.DEALER)
209 209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
210 210 self.socket.connect(self.address)
211 211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 212 self.stream.on_recv(self._handle_recv)
213 213 self._run_loop()
214 214 try:
215 215 self.socket.close()
216 216 except:
217 217 pass
218 218
219 219 def stop(self):
220 220 """Stop the channel's event loop and join its thread."""
221 221 self.ioloop.stop()
222 222 super(ShellChannel, self).stop()
223 223
224 224 def call_handlers(self, msg):
225 225 """This method is called in the ioloop thread when a message arrives.
226 226
227 227 Subclasses should override this method to handle incoming messages.
228 228 It is important to remember that this method is called in the thread
229 229 so that some logic must be done to ensure that the application leve
230 230 handlers are called in the application thread.
231 231 """
232 232 raise NotImplementedError('call_handlers must be defined in a subclass.')
233 233
234 234 def execute(self, code, silent=False, store_history=True,
235 235 user_variables=None, user_expressions=None, allow_stdin=None):
236 236 """Execute code in the kernel.
237 237
238 238 Parameters
239 239 ----------
240 240 code : str
241 241 A string of Python code.
242 242
243 243 silent : bool, optional (default False)
244 244 If set, the kernel will execute the code as quietly possible, and
245 245 will force store_history to be False.
246 246
247 247 store_history : bool, optional (default True)
248 248 If set, the kernel will store command history. This is forced
249 249 to be False if silent is True.
250 250
251 251 user_variables : list, optional
252 252 A list of variable names to pull from the user's namespace. They
253 253 will come back as a dict with these names as keys and their
254 254 :func:`repr` as values.
255 255
256 256 user_expressions : dict, optional
257 257 A dict mapping names to expressions to be evaluated in the user's
258 258 dict. The expression values are returned as strings formatted using
259 259 :func:`repr`.
260 260
261 261 allow_stdin : bool, optional (default self.allow_stdin)
262 262 Flag for whether the kernel can send stdin requests to frontends.
263 263
264 264 Some frontends (e.g. the Notebook) do not support stdin requests.
265 265 If raw_input is called from code executed from such a frontend, a
266 266 StdinNotImplementedError will be raised.
267 267
268 268 Returns
269 269 -------
270 270 The msg_id of the message sent.
271 271 """
272 272 if user_variables is None:
273 273 user_variables = []
274 274 if user_expressions is None:
275 275 user_expressions = {}
276 276 if allow_stdin is None:
277 277 allow_stdin = self.allow_stdin
278 278
279 279
280 280 # Don't waste network traffic if inputs are invalid
281 281 if not isinstance(code, basestring):
282 282 raise ValueError('code %r must be a string' % code)
283 283 validate_string_list(user_variables)
284 284 validate_string_dict(user_expressions)
285 285
286 286 # Create class for content/msg creation. Related to, but possibly
287 287 # not in Session.
288 288 content = dict(code=code, silent=silent, store_history=store_history,
289 289 user_variables=user_variables,
290 290 user_expressions=user_expressions,
291 291 allow_stdin=allow_stdin,
292 292 )
293 293 msg = self.session.msg('execute_request', content)
294 294 self._queue_send(msg)
295 295 return msg['header']['msg_id']
296 296
297 297 def complete(self, text, line, cursor_pos, block=None):
298 298 """Tab complete text in the kernel's namespace.
299 299
300 300 Parameters
301 301 ----------
302 302 text : str
303 303 The text to complete.
304 304 line : str
305 305 The full line of text that is the surrounding context for the
306 306 text to complete.
307 307 cursor_pos : int
308 308 The position of the cursor in the line where the completion was
309 309 requested.
310 310 block : str, optional
311 311 The full block of code in which the completion is being requested.
312 312
313 313 Returns
314 314 -------
315 315 The msg_id of the message sent.
316 316 """
317 317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
318 318 msg = self.session.msg('complete_request', content)
319 319 self._queue_send(msg)
320 320 return msg['header']['msg_id']
321 321
322 322 def object_info(self, oname, detail_level=0):
323 323 """Get metadata information about an object in the kernel's namespace.
324 324
325 325 Parameters
326 326 ----------
327 327 oname : str
328 328 A string specifying the object name.
329 329 detail_level : int, optional
330 330 The level of detail for the introspection (0-2)
331 331
332 332 Returns
333 333 -------
334 334 The msg_id of the message sent.
335 335 """
336 336 content = dict(oname=oname, detail_level=detail_level)
337 337 msg = self.session.msg('object_info_request', content)
338 338 self._queue_send(msg)
339 339 return msg['header']['msg_id']
340 340
341 341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
342 342 """Get entries from the kernel's history list.
343 343
344 344 Parameters
345 345 ----------
346 346 raw : bool
347 347 If True, return the raw input.
348 348 output : bool
349 349 If True, then return the output as well.
350 350 hist_access_type : str
351 351 'range' (fill in session, start and stop params), 'tail' (fill in n)
352 352 or 'search' (fill in pattern param).
353 353
354 354 session : int
355 355 For a range request, the session from which to get lines. Session
356 356 numbers are positive integers; negative ones count back from the
357 357 current session.
358 358 start : int
359 359 The first line number of a history range.
360 360 stop : int
361 361 The final (excluded) line number of a history range.
362 362
363 363 n : int
364 364 The number of lines of history to get for a tail request.
365 365
366 366 pattern : str
367 367 The glob-syntax pattern for a search request.
368 368
369 369 Returns
370 370 -------
371 371 The msg_id of the message sent.
372 372 """
373 373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
374 374 **kwargs)
375 375 msg = self.session.msg('history_request', content)
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def kernel_info(self):
380 380 """Request kernel info."""
381 381 msg = self.session.msg('kernel_info_request')
382 382 self._queue_send(msg)
383 383 return msg['header']['msg_id']
384 384
385 385 def shutdown(self, restart=False):
386 386 """Request an immediate kernel shutdown.
387 387
388 388 Upon receipt of the (empty) reply, client code can safely assume that
389 389 the kernel has shut down and it's safe to forcefully terminate it if
390 390 it's still alive.
391 391
392 392 The kernel will send the reply via a function registered with Python's
393 393 atexit module, ensuring it's truly done as the kernel is done with all
394 394 normal operation.
395 395 """
396 396 # Send quit message to kernel. Once we implement kernel-side setattr,
397 397 # this should probably be done that way, but for now this will do.
398 398 msg = self.session.msg('shutdown_request', {'restart':restart})
399 399 self._queue_send(msg)
400 400 return msg['header']['msg_id']
401 401
402 402
403 403
404 404 class IOPubChannel(ZMQSocketChannel):
405 405 """The iopub channel which listens for messages that the kernel publishes.
406 406
407 407 This channel is where all output is published to frontends.
408 408 """
409 409
410 410 def __init__(self, context, session, address):
411 411 super(IOPubChannel, self).__init__(context, session, address)
412 412 self.ioloop = ioloop.IOLoop()
413 413
414 414 def run(self):
415 415 """The thread's main activity. Call start() instead."""
416 416 self.socket = self.context.socket(zmq.SUB)
417 417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
418 418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
419 419 self.socket.connect(self.address)
420 420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
421 421 self.stream.on_recv(self._handle_recv)
422 422 self._run_loop()
423 423 try:
424 424 self.socket.close()
425 425 except:
426 426 pass
427 427
428 428 def stop(self):
429 429 """Stop the channel's event loop and join its thread."""
430 430 self.ioloop.stop()
431 431 super(IOPubChannel, self).stop()
432 432
433 433 def call_handlers(self, msg):
434 434 """This method is called in the ioloop thread when a message arrives.
435 435
436 436 Subclasses should override this method to handle incoming messages.
437 437 It is important to remember that this method is called in the thread
438 438 so that some logic must be done to ensure that the application leve
439 439 handlers are called in the application thread.
440 440 """
441 441 raise NotImplementedError('call_handlers must be defined in a subclass.')
442 442
443 443 def flush(self, timeout=1.0):
444 444 """Immediately processes all pending messages on the iopub channel.
445 445
446 446 Callers should use this method to ensure that :method:`call_handlers`
447 447 has been called for all messages that have been received on the
448 448 0MQ SUB socket of this channel.
449 449
450 450 This method is thread safe.
451 451
452 452 Parameters
453 453 ----------
454 454 timeout : float, optional
455 455 The maximum amount of time to spend flushing, in seconds. The
456 456 default is one second.
457 457 """
458 458 # We do the IOLoop callback process twice to ensure that the IOLoop
459 459 # gets to perform at least one full poll.
460 460 stop_time = time.time() + timeout
461 461 for i in xrange(2):
462 462 self._flushed = False
463 463 self.ioloop.add_callback(self._flush)
464 464 while not self._flushed and time.time() < stop_time:
465 465 time.sleep(0.01)
466 466
467 467 def _flush(self):
468 468 """Callback for :method:`self.flush`."""
469 469 self.stream.flush()
470 470 self._flushed = True
471 471
472 472
473 473 class StdInChannel(ZMQSocketChannel):
474 474 """The stdin channel to handle raw_input requests that the kernel makes."""
475 475
476 476 msg_queue = None
477 477
478 478 def __init__(self, context, session, address):
479 479 super(StdInChannel, self).__init__(context, session, address)
480 480 self.ioloop = ioloop.IOLoop()
481 481
482 482 def run(self):
483 483 """The thread's main activity. Call start() instead."""
484 484 self.socket = self.context.socket(zmq.DEALER)
485 485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
486 486 self.socket.connect(self.address)
487 487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
488 488 self.stream.on_recv(self._handle_recv)
489 489 self._run_loop()
490 490 try:
491 491 self.socket.close()
492 492 except:
493 493 pass
494 494
495 495 def stop(self):
496 496 """Stop the channel's event loop and join its thread."""
497 497 self.ioloop.stop()
498 498 super(StdInChannel, self).stop()
499 499
500 500 def call_handlers(self, msg):
501 501 """This method is called in the ioloop thread when a message arrives.
502 502
503 503 Subclasses should override this method to handle incoming messages.
504 504 It is important to remember that this method is called in the thread
505 505 so that some logic must be done to ensure that the application leve
506 506 handlers are called in the application thread.
507 507 """
508 508 raise NotImplementedError('call_handlers must be defined in a subclass.')
509 509
510 510 def input(self, string):
511 511 """Send a string of raw input to the kernel."""
512 512 content = dict(value=string)
513 513 msg = self.session.msg('input_reply', content)
514 514 self._queue_send(msg)
515 515
516 516
517 517 class HBChannel(ZMQSocketChannel):
518 518 """The heartbeat channel which monitors the kernel heartbeat.
519 519
520 520 Note that the heartbeat channel is paused by default. As long as you start
521 521 this channel, the kernel manager will ensure that it is paused and un-paused
522 522 as appropriate.
523 523 """
524 524
525 525 time_to_dead = 3.0
526 526 socket = None
527 527 poller = None
528 528 _running = None
529 529 _pause = None
530 530 _beating = None
531 531
532 532 def __init__(self, context, session, address):
533 533 super(HBChannel, self).__init__(context, session, address)
534 534 self._running = False
535 535 self._pause =True
536 536 self.poller = zmq.Poller()
537 537
538 538 def _create_socket(self):
539 539 if self.socket is not None:
540 540 # close previous socket, before opening a new one
541 541 self.poller.unregister(self.socket)
542 542 self.socket.close()
543 543 self.socket = self.context.socket(zmq.REQ)
544 544 self.socket.setsockopt(zmq.LINGER, 0)
545 545 self.socket.connect(self.address)
546 546
547 547 self.poller.register(self.socket, zmq.POLLIN)
548 548
549 549 def _poll(self, start_time):
550 550 """poll for heartbeat replies until we reach self.time_to_dead.
551 551
552 552 Ignores interrupts, and returns the result of poll(), which
553 553 will be an empty list if no messages arrived before the timeout,
554 554 or the event tuple if there is a message to receive.
555 555 """
556 556
557 557 until_dead = self.time_to_dead - (time.time() - start_time)
558 558 # ensure poll at least once
559 559 until_dead = max(until_dead, 1e-3)
560 560 events = []
561 561 while True:
562 562 try:
563 563 events = self.poller.poll(1000 * until_dead)
564 564 except ZMQError as e:
565 565 if e.errno == errno.EINTR:
566 566 # ignore interrupts during heartbeat
567 567 # this may never actually happen
568 568 until_dead = self.time_to_dead - (time.time() - start_time)
569 569 until_dead = max(until_dead, 1e-3)
570 570 pass
571 571 else:
572 572 raise
573 573 except Exception:
574 574 if self._exiting:
575 575 break
576 576 else:
577 577 raise
578 578 else:
579 579 break
580 580 return events
581 581
582 582 def run(self):
583 583 """The thread's main activity. Call start() instead."""
584 584 self._create_socket()
585 585 self._running = True
586 586 self._beating = True
587 587
588 588 while self._running:
589 589 if self._pause:
590 590 # just sleep, and skip the rest of the loop
591 591 time.sleep(self.time_to_dead)
592 592 continue
593 593
594 594 since_last_heartbeat = 0.0
595 595 # io.rprint('Ping from HB channel') # dbg
596 596 # no need to catch EFSM here, because the previous event was
597 597 # either a recv or connect, which cannot be followed by EFSM
598 598 self.socket.send(b'ping')
599 599 request_time = time.time()
600 600 ready = self._poll(request_time)
601 601 if ready:
602 602 self._beating = True
603 603 # the poll above guarantees we have something to recv
604 604 self.socket.recv()
605 605 # sleep the remainder of the cycle
606 606 remainder = self.time_to_dead - (time.time() - request_time)
607 607 if remainder > 0:
608 608 time.sleep(remainder)
609 609 continue
610 610 else:
611 611 # nothing was received within the time limit, signal heart failure
612 612 self._beating = False
613 613 since_last_heartbeat = time.time() - request_time
614 614 self.call_handlers(since_last_heartbeat)
615 615 # and close/reopen the socket, because the REQ/REP cycle has been broken
616 616 self._create_socket()
617 617 continue
618 618 try:
619 619 self.socket.close()
620 620 except:
621 621 pass
622 622
623 623 def pause(self):
624 624 """Pause the heartbeat."""
625 625 self._pause = True
626 626
627 627 def unpause(self):
628 628 """Unpause the heartbeat."""
629 629 self._pause = False
630 630
631 631 def is_beating(self):
632 632 """Is the heartbeat running and responsive (and not paused)."""
633 633 if self.is_alive() and not self._pause and self._beating:
634 634 return True
635 635 else:
636 636 return False
637 637
638 638 def stop(self):
639 639 """Stop the channel's event loop and join its thread."""
640 640 self._running = False
641 641 super(HBChannel, self).stop()
642 642
643 643 def call_handlers(self, since_last_heartbeat):
644 644 """This method is called in the ioloop thread when a message arrives.
645 645
646 646 Subclasses should override this method to handle incoming messages.
647 647 It is important to remember that this method is called in the thread
648 648 so that some logic must be done to ensure that the application level
649 649 handlers are called in the application thread.
650 650 """
651 651 raise NotImplementedError('call_handlers must be defined in a subclass.')
652 652
653 653
654 654 #-----------------------------------------------------------------------------
655 655 # Main kernel manager class
656 656 #-----------------------------------------------------------------------------
657 657
658 658 class KernelManager(Configurable):
659 659 """Manages a single kernel on this host along with its channels.
660 660
661 661 There are four channels associated with each kernel:
662 662
663 663 * shell: for request/reply calls to the kernel.
664 664 * iopub: for the kernel to publish results to frontends.
665 665 * hb: for monitoring the kernel's heartbeat.
666 666 * stdin: for frontends to reply to raw_input calls in the kernel.
667 667
668 668 The usage of the channels that this class manages is optional. It is
669 669 entirely possible to connect to the kernels directly using ZeroMQ
670 670 sockets. These channels are useful primarily for talking to a kernel
671 671 whose :class:`KernelManager` is in the same process.
672 672
673 673 This version manages kernels started using Popen.
674 674 """
675 675 # The PyZMQ Context to use for communication with the kernel.
676 676 context = Instance(zmq.Context)
677 677 def _context_default(self):
678 678 return zmq.Context.instance()
679 679
680 680 # The Session to use for communication with the kernel.
681 681 session = Instance(Session)
682 682 def _session_default(self):
683 683 return Session(config=self.config)
684 684
685 685 # The kernel process with which the KernelManager is communicating.
686 686 # generally a Popen instance
687 687 kernel = Any()
688 688
689 689 kernel_cmd = List(Unicode, config=True,
690 690 help="""The Popen Command to launch the kernel.
691 691 Override this if you have a custom
692 692 """
693 693 )
694 694 def _kernel_cmd_changed(self, name, old, new):
695 print 'kernel cmd changed', new
696 695 self.ipython_kernel = False
697 696
698 697 ipython_kernel = Bool(True)
699 698
700 699
701 700 # The addresses for the communication channels.
702 701 connection_file = Unicode('')
703 702
704 703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
705 704
706 705 ip = Unicode(LOCALHOST, config=True,
707 706 help="""Set the kernel\'s IP address [default localhost].
708 707 If the IP address is something other than localhost, then
709 708 Consoles on other machines will be able to connect
710 709 to the Kernel, so be careful!"""
711 710 )
712 711 def _ip_default(self):
713 712 if self.transport == 'ipc':
714 713 if self.connection_file:
715 714 return os.path.splitext(self.connection_file)[0] + '-ipc'
716 715 else:
717 716 return 'kernel-ipc'
718 717 else:
719 718 return LOCALHOST
720 719 def _ip_changed(self, name, old, new):
721 720 if new == '*':
722 721 self.ip = '0.0.0.0'
723 722 shell_port = Integer(0)
724 723 iopub_port = Integer(0)
725 724 stdin_port = Integer(0)
726 725 hb_port = Integer(0)
727 726
728 727 # The classes to use for the various channels.
729 728 shell_channel_class = Type(ShellChannel)
730 729 iopub_channel_class = Type(IOPubChannel)
731 730 stdin_channel_class = Type(StdInChannel)
732 731 hb_channel_class = Type(HBChannel)
733 732
734 733 # Protected traits.
735 734 _launch_args = Any
736 735 _shell_channel = Any
737 736 _iopub_channel = Any
738 737 _stdin_channel = Any
739 738 _hb_channel = Any
740 739 _connection_file_written=Bool(False)
741 740
742 741 def __del__(self):
743 742 self.cleanup_connection_file()
744 743
745 744 #--------------------------------------------------------------------------
746 745 # Channel management methods:
747 746 #--------------------------------------------------------------------------
748 747
749 748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
750 749 """Starts the channels for this kernel.
751 750
752 751 This will create the channels if they do not exist and then start
753 752 them (their activity runs in a thread). If port numbers of 0 are
754 753 being used (random ports) then you must first call
755 754 :method:`start_kernel`. If the channels have been stopped and you
756 755 call this, :class:`RuntimeError` will be raised.
757 756 """
758 757 if shell:
759 758 self.shell_channel.start()
760 759 if iopub:
761 760 self.iopub_channel.start()
762 761 if stdin:
763 762 self.stdin_channel.start()
764 763 self.shell_channel.allow_stdin = True
765 764 else:
766 765 self.shell_channel.allow_stdin = False
767 766 if hb:
768 767 self.hb_channel.start()
769 768
770 769 def stop_channels(self):
771 770 """Stops all the running channels for this kernel.
772 771
773 772 This stops their event loops and joins their threads.
774 773 """
775 774 if self.shell_channel.is_alive():
776 775 self.shell_channel.stop()
777 776 if self.iopub_channel.is_alive():
778 777 self.iopub_channel.stop()
779 778 if self.stdin_channel.is_alive():
780 779 self.stdin_channel.stop()
781 780 if self.hb_channel.is_alive():
782 781 self.hb_channel.stop()
783 782
784 783 @property
785 784 def channels_running(self):
786 785 """Are any of the channels created and running?"""
787 786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
788 787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
789 788
790 789 def _make_url(self, port):
791 790 """Make a zmq url with a port.
792 791
793 792 There are two cases that this handles:
794 793
795 794 * tcp: tcp://ip:port
796 795 * ipc: ipc://ip-port
797 796 """
798 797 if self.transport == 'tcp':
799 798 return "tcp://%s:%i" % (self.ip, port)
800 799 else:
801 800 return "%s://%s-%s" % (self.transport, self.ip, port)
802 801
803 802 @property
804 803 def shell_channel(self):
805 804 """Get the shell channel object for this kernel."""
806 805 if self._shell_channel is None:
807 806 self._shell_channel = self.shell_channel_class(
808 807 self.context, self.session, self._make_url(self.shell_port)
809 808 )
810 809 return self._shell_channel
811 810
812 811 @property
813 812 def iopub_channel(self):
814 813 """Get the iopub channel object for this kernel."""
815 814 if self._iopub_channel is None:
816 815 self._iopub_channel = self.iopub_channel_class(
817 816 self.context, self.session, self._make_url(self.iopub_port)
818 817 )
819 818 return self._iopub_channel
820 819
821 820 @property
822 821 def stdin_channel(self):
823 822 """Get the stdin channel object for this kernel."""
824 823 if self._stdin_channel is None:
825 824 self._stdin_channel = self.stdin_channel_class(
826 825 self.context, self.session, self._make_url(self.stdin_port)
827 826 )
828 827 return self._stdin_channel
829 828
830 829 @property
831 830 def hb_channel(self):
832 831 """Get the hb channel object for this kernel."""
833 832 if self._hb_channel is None:
834 833 self._hb_channel = self.hb_channel_class(
835 834 self.context, self.session, self._make_url(self.hb_port)
836 835 )
837 836 return self._hb_channel
838 837
839 838 #--------------------------------------------------------------------------
840 839 # Connection and ipc file management
841 840 #--------------------------------------------------------------------------
842 841
843 842 def cleanup_connection_file(self):
844 843 """Cleanup connection file *if we wrote it*
845 844
846 845 Will not raise if the connection file was already removed somehow.
847 846 """
848 847 if self._connection_file_written:
849 848 # cleanup connection files on full shutdown of kernel we started
850 849 self._connection_file_written = False
851 850 try:
852 851 os.remove(self.connection_file)
853 852 except (IOError, OSError):
854 853 pass
855 854
856 855 def cleanup_ipc_files(self):
857 856 """Cleanup ipc files if we wrote them."""
858 857 if self.transport != 'ipc':
859 858 return
860 859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
861 860 ipcfile = "%s-%i" % (self.ip, port)
862 861 try:
863 862 os.remove(ipcfile)
864 863 except (IOError, OSError):
865 864 pass
866 865
867 866 def load_connection_file(self):
868 867 """Load connection info from JSON dict in self.connection_file."""
869 868 with open(self.connection_file) as f:
870 869 cfg = json.loads(f.read())
871 870
872 871 from pprint import pprint
873 872 pprint(cfg)
874 873 self.transport = cfg.get('transport', 'tcp')
875 874 self.ip = cfg['ip']
876 875 self.shell_port = cfg['shell_port']
877 876 self.stdin_port = cfg['stdin_port']
878 877 self.iopub_port = cfg['iopub_port']
879 878 self.hb_port = cfg['hb_port']
880 879 self.session.key = str_to_bytes(cfg['key'])
881 880
882 881 def write_connection_file(self):
883 882 """Write connection info to JSON dict in self.connection_file."""
884 883 if self._connection_file_written:
885 884 return
886 885 self.connection_file,cfg = write_connection_file(self.connection_file,
887 886 transport=self.transport, ip=self.ip, key=self.session.key,
888 887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
889 888 shell_port=self.shell_port, hb_port=self.hb_port)
890 889 # write_connection_file also sets default ports:
891 890 self.shell_port = cfg['shell_port']
892 891 self.stdin_port = cfg['stdin_port']
893 892 self.iopub_port = cfg['iopub_port']
894 893 self.hb_port = cfg['hb_port']
895 894
896 895 self._connection_file_written = True
897 896
898 897 #--------------------------------------------------------------------------
899 898 # Kernel management
900 899 #--------------------------------------------------------------------------
901 900
902 901 def format_kernel_cmd(self, **kw):
903 902 """format templated args (e.g. {connection_file})"""
904 903 if self.kernel_cmd:
905 904 cmd = self.kernel_cmd
906 905 else:
907 906 cmd = make_ipkernel_cmd(
908 907 'from IPython.kernel.zmq.kernelapp import main; main()',
909 908 **kw
910 909 )
911 910 ns = dict(connection_file=self.connection_file)
912 911 ns.update(self._launch_args)
913 912 return [ c.format(**ns) for c in cmd ]
914 913
915 914 def _launch_kernel(self, kernel_cmd, **kw):
916 915 """actually launch the kernel
917 916
918 917 override in a subclass to launch kernel subprocesses differently
919 918 """
920 919 return launch_kernel(kernel_cmd, **kw)
921 920
922 921 def start_kernel(self, **kw):
923 922 """Starts a kernel on this host in a separate process.
924 923
925 924 If random ports (port=0) are being used, this method must be called
926 925 before the channels are created.
927 926
928 927 Parameters:
929 928 -----------
930 929 **kw : optional
931 930 keyword arguments that are passed down to build the kernel_cmd
932 931 and launching the kernel (e.g. Popen kwargs).
933 932 """
934 933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
935 934 raise RuntimeError("Can only launch a kernel on a local interface. "
936 935 "Make sure that the '*_address' attributes are "
937 936 "configured properly. "
938 937 "Currently valid addresses are: %s"%LOCAL_IPS
939 938 )
940 939
941 940 # write connection file / get default ports
942 941 self.write_connection_file()
943 942
944 943 # save kwargs for use in restart
945 944 self._launch_args = kw.copy()
946 945 # build the Popen cmd
947 946 kernel_cmd = self.format_kernel_cmd(**kw)
948 947 # launch the kernel subprocess
949 948 self.kernel = self._launch_kernel(kernel_cmd,
950 949 ipython_kernel=self.ipython_kernel,
951 950 **kw)
952 951
953 952 def shutdown_kernel(self, now=False, restart=False):
954 953 """Attempts to the stop the kernel process cleanly.
955 954
956 955 This attempts to shutdown the kernels cleanly by:
957 956
958 957 1. Sending it a shutdown message over the shell channel.
959 958 2. If that fails, the kernel is shutdown forcibly by sending it
960 959 a signal.
961 960
962 961 Parameters:
963 962 -----------
964 963 now : bool
965 964 Should the kernel be forcible killed *now*. This skips the
966 965 first, nice shutdown attempt.
967 966 restart: bool
968 967 Will this kernel be restarted after it is shutdown. When this
969 968 is True, connection files will not be cleaned up.
970 969 """
971 970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
972 971 if sys.platform == 'win32':
973 972 self._kill_kernel()
974 973 return
975 974
976 975 # Pause the heart beat channel if it exists.
977 976 if self._hb_channel is not None:
978 977 self._hb_channel.pause()
979 978
980 979 if now:
981 980 if self.has_kernel:
982 981 self._kill_kernel()
983 982 else:
984 983 # Don't send any additional kernel kill messages immediately, to give
985 984 # the kernel a chance to properly execute shutdown actions. Wait for at
986 985 # most 1s, checking every 0.1s.
987 986 self.shell_channel.shutdown(restart=restart)
988 987 for i in range(10):
989 988 if self.is_alive:
990 989 time.sleep(0.1)
991 990 else:
992 991 break
993 992 else:
994 993 # OK, we've waited long enough.
995 994 if self.has_kernel:
996 995 self._kill_kernel()
997 996
998 997 if not restart:
999 998 self.cleanup_connection_file()
1000 999 self.cleanup_ipc_files()
1001 1000 else:
1002 1001 self.cleanup_ipc_files()
1003 1002
1004 1003 def restart_kernel(self, now=False, **kw):
1005 1004 """Restarts a kernel with the arguments that were used to launch it.
1006 1005
1007 1006 If the old kernel was launched with random ports, the same ports will be
1008 1007 used for the new kernel. The same connection file is used again.
1009 1008
1010 1009 Parameters
1011 1010 ----------
1012 1011 now : bool, optional
1013 1012 If True, the kernel is forcefully restarted *immediately*, without
1014 1013 having a chance to do any cleanup action. Otherwise the kernel is
1015 1014 given 1s to clean up before a forceful restart is issued.
1016 1015
1017 1016 In all cases the kernel is restarted, the only difference is whether
1018 1017 it is given a chance to perform a clean shutdown or not.
1019 1018
1020 1019 **kw : optional
1021 1020 Any options specified here will overwrite those used to launch the
1022 1021 kernel.
1023 1022 """
1024 1023 if self._launch_args is None:
1025 1024 raise RuntimeError("Cannot restart the kernel. "
1026 1025 "No previous call to 'start_kernel'.")
1027 1026 else:
1028 1027 # Stop currently running kernel.
1029 1028 self.shutdown_kernel(now=now, restart=True)
1030 1029
1031 1030 # Start new kernel.
1032 1031 self._launch_args.update(kw)
1033 1032 self.start_kernel(**self._launch_args)
1034 1033
1035 1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1036 1035 # unless there is some delay here.
1037 1036 if sys.platform == 'win32':
1038 1037 time.sleep(0.2)
1039 1038
1040 1039 @property
1041 1040 def has_kernel(self):
1042 1041 """Has a kernel been started that we are managing."""
1043 1042 return self.kernel is not None
1044 1043
1045 1044 def _kill_kernel(self):
1046 1045 """Kill the running kernel.
1047 1046
1048 1047 This is a private method, callers should use shutdown_kernel(now=True).
1049 1048 """
1050 1049 if self.has_kernel:
1051 1050 # Pause the heart beat channel if it exists.
1052 1051 if self._hb_channel is not None:
1053 1052 self._hb_channel.pause()
1054 1053
1055 1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1056 1055 # TerminateProcess() on Win32).
1057 1056 try:
1058 1057 self.kernel.kill()
1059 1058 except OSError as e:
1060 1059 # In Windows, we will get an Access Denied error if the process
1061 1060 # has already terminated. Ignore it.
1062 1061 if sys.platform == 'win32':
1063 1062 if e.winerror != 5:
1064 1063 raise
1065 1064 # On Unix, we may get an ESRCH error if the process has already
1066 1065 # terminated. Ignore it.
1067 1066 else:
1068 1067 from errno import ESRCH
1069 1068 if e.errno != ESRCH:
1070 1069 raise
1071 1070
1072 1071 # Block until the kernel terminates.
1073 1072 self.kernel.wait()
1074 1073 self.kernel = None
1075 1074 else:
1076 1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1077 1076
1078 1077 def interrupt_kernel(self):
1079 1078 """Interrupts the kernel by sending it a signal.
1080 1079
1081 1080 Unlike ``signal_kernel``, this operation is well supported on all
1082 1081 platforms.
1083 1082 """
1084 1083 if self.has_kernel:
1085 1084 if sys.platform == 'win32':
1086 1085 from parentpoller import ParentPollerWindows as Poller
1087 1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1088 1087 else:
1089 1088 self.kernel.send_signal(signal.SIGINT)
1090 1089 else:
1091 1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1092 1091
1093 1092 def signal_kernel(self, signum):
1094 1093 """Sends a signal to the kernel.
1095 1094
1096 1095 Note that since only SIGTERM is supported on Windows, this function is
1097 1096 only useful on Unix systems.
1098 1097 """
1099 1098 if self.has_kernel:
1100 1099 self.kernel.send_signal(signum)
1101 1100 else:
1102 1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1103 1102
1104 1103 @property
1105 1104 def is_alive(self):
1106 1105 """Is the kernel process still running?"""
1107 1106 if self.has_kernel:
1108 1107 if self.kernel.poll() is None:
1109 1108 return True
1110 1109 else:
1111 1110 return False
1112 1111 elif self._hb_channel is not None:
1113 1112 # We didn't start the kernel with this KernelManager so we
1114 1113 # use the heartbeat.
1115 1114 return self._hb_channel.is_beating()
1116 1115 else:
1117 1116 # no heartbeat and not local, we can't tell if it's running,
1118 1117 # so naively return True
1119 1118 return True
1120 1119
1121 1120
1122 1121 #-----------------------------------------------------------------------------
1123 1122 # ABC Registration
1124 1123 #-----------------------------------------------------------------------------
1125 1124
1126 1125 ShellChannelABC.register(ShellChannel)
1127 1126 IOPubChannelABC.register(IOPubChannel)
1128 1127 HBChannelABC.register(HBChannel)
1129 1128 StdInChannelABC.register(StdInChannel)
1130 1129 KernelManagerABC.register(KernelManager)
1131 1130
General Comments 0
You need to be logged in to leave comments. Login now