##// END OF EJS Templates
Merge pull request #2904 from minrk/winipc...
Min RK -
r9525:f474d500 merge
parent child Browse files
Show More
@@ -1,1130 +1,1130 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import absolute_import
19 19
20 20 # Standard library imports
21 21 import atexit
22 22 import errno
23 23 import json
24 24 from subprocess import Popen
25 25 import os
26 26 import signal
27 27 import sys
28 28 from threading import Thread
29 29 import time
30 30
31 31 # System library imports
32 32 import zmq
33 33 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
34 34 # during garbage collection of threads at exit:
35 35 from zmq import ZMQError
36 36 from zmq.eventloop import ioloop, zmqstream
37 37
38 38 # Local imports
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 41 from IPython.utils.traitlets import (
42 42 Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum
43 43 )
44 44 from IPython.utils.py3compat import str_to_bytes
45 45 from IPython.kernel import (
46 46 write_connection_file,
47 47 make_ipkernel_cmd,
48 48 launch_kernel,
49 49 )
50 50 from .zmq.session import Session
51 51 from .kernelmanagerabc import (
52 52 ShellChannelABC, IOPubChannelABC,
53 53 HBChannelABC, StdInChannelABC,
54 54 KernelManagerABC
55 55 )
56 56
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # Constants and exceptions
60 60 #-----------------------------------------------------------------------------
61 61
62 62 class InvalidPortNumber(Exception):
63 63 pass
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Utility functions
67 67 #-----------------------------------------------------------------------------
68 68
69 69 # some utilities to validate message structure, these might get moved elsewhere
70 70 # if they prove to have more generic utility
71 71
72 72 def validate_string_list(lst):
73 73 """Validate that the input is a list of strings.
74 74
75 75 Raises ValueError if not."""
76 76 if not isinstance(lst, list):
77 77 raise ValueError('input %r must be a list' % lst)
78 78 for x in lst:
79 79 if not isinstance(x, basestring):
80 80 raise ValueError('element %r in list must be a string' % x)
81 81
82 82
83 83 def validate_string_dict(dct):
84 84 """Validate that the input is a dict with string keys and values.
85 85
86 86 Raises ValueError if not."""
87 87 for k,v in dct.iteritems():
88 88 if not isinstance(k, basestring):
89 89 raise ValueError('key %r in dict must be a string' % k)
90 90 if not isinstance(v, basestring):
91 91 raise ValueError('value %r in dict must be a string' % v)
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # ZMQ Socket Channel classes
96 96 #-----------------------------------------------------------------------------
97 97
98 98 class ZMQSocketChannel(Thread):
99 99 """The base class for the channels that use ZMQ sockets."""
100 100 context = None
101 101 session = None
102 102 socket = None
103 103 ioloop = None
104 104 stream = None
105 105 _address = None
106 106 _exiting = False
107 107
108 108 def __init__(self, context, session, address):
109 109 """Create a channel.
110 110
111 111 Parameters
112 112 ----------
113 113 context : :class:`zmq.Context`
114 114 The ZMQ context to use.
115 115 session : :class:`session.Session`
116 116 The session to use.
117 117 address : zmq url
118 118 Standard (ip, port) tuple that the kernel is listening on.
119 119 """
120 120 super(ZMQSocketChannel, self).__init__()
121 121 self.daemon = True
122 122
123 123 self.context = context
124 124 self.session = session
125 125 if isinstance(address, tuple):
126 126 if address[1] == 0:
127 127 message = 'The port number for a channel cannot be 0.'
128 128 raise InvalidPortNumber(message)
129 129 address = "tcp://%s:%i" % address
130 130 self._address = address
131 131 atexit.register(self._notice_exit)
132 132
133 133 def _notice_exit(self):
134 134 self._exiting = True
135 135
136 136 def _run_loop(self):
137 137 """Run my loop, ignoring EINTR events in the poller"""
138 138 while True:
139 139 try:
140 140 self.ioloop.start()
141 141 except ZMQError as e:
142 142 if e.errno == errno.EINTR:
143 143 continue
144 144 else:
145 145 raise
146 146 except Exception:
147 147 if self._exiting:
148 148 break
149 149 else:
150 150 raise
151 151 else:
152 152 break
153 153
154 154 def stop(self):
155 155 """Stop the channel's event loop and join its thread.
156 156
157 157 This calls :method:`Thread.join` and returns when the thread
158 158 terminates. :class:`RuntimeError` will be raised if
159 159 :method:`self.start` is called again.
160 160 """
161 161 self.join()
162 162
163 163 @property
164 164 def address(self):
165 165 """Get the channel's address as a zmq url string.
166 166
167 167 These URLS have the form: 'tcp://127.0.0.1:5555'.
168 168 """
169 169 return self._address
170 170
171 171 def _queue_send(self, msg):
172 172 """Queue a message to be sent from the IOLoop's thread.
173 173
174 174 Parameters
175 175 ----------
176 176 msg : message to send
177 177
178 178 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
179 179 thread control of the action.
180 180 """
181 181 def thread_send():
182 182 self.session.send(self.stream, msg)
183 183 self.ioloop.add_callback(thread_send)
184 184
185 185 def _handle_recv(self, msg):
186 186 """Callback for stream.on_recv.
187 187
188 188 Unpacks message, and calls handlers with it.
189 189 """
190 190 ident,smsg = self.session.feed_identities(msg)
191 191 self.call_handlers(self.session.unserialize(smsg))
192 192
193 193
194 194
195 195 class ShellChannel(ZMQSocketChannel):
196 196 """The shell channel for issuing request/replies to the kernel."""
197 197
198 198 command_queue = None
199 199 # flag for whether execute requests should be allowed to call raw_input:
200 200 allow_stdin = True
201 201
202 202 def __init__(self, context, session, address):
203 203 super(ShellChannel, self).__init__(context, session, address)
204 204 self.ioloop = ioloop.IOLoop()
205 205
206 206 def run(self):
207 207 """The thread's main activity. Call start() instead."""
208 208 self.socket = self.context.socket(zmq.DEALER)
209 209 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
210 210 self.socket.connect(self.address)
211 211 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
212 212 self.stream.on_recv(self._handle_recv)
213 213 self._run_loop()
214 214 try:
215 215 self.socket.close()
216 216 except:
217 217 pass
218 218
219 219 def stop(self):
220 220 """Stop the channel's event loop and join its thread."""
221 221 self.ioloop.stop()
222 222 super(ShellChannel, self).stop()
223 223
224 224 def call_handlers(self, msg):
225 225 """This method is called in the ioloop thread when a message arrives.
226 226
227 227 Subclasses should override this method to handle incoming messages.
228 228 It is important to remember that this method is called in the thread
229 229 so that some logic must be done to ensure that the application leve
230 230 handlers are called in the application thread.
231 231 """
232 232 raise NotImplementedError('call_handlers must be defined in a subclass.')
233 233
234 234 def execute(self, code, silent=False, store_history=True,
235 235 user_variables=None, user_expressions=None, allow_stdin=None):
236 236 """Execute code in the kernel.
237 237
238 238 Parameters
239 239 ----------
240 240 code : str
241 241 A string of Python code.
242 242
243 243 silent : bool, optional (default False)
244 244 If set, the kernel will execute the code as quietly possible, and
245 245 will force store_history to be False.
246 246
247 247 store_history : bool, optional (default True)
248 248 If set, the kernel will store command history. This is forced
249 249 to be False if silent is True.
250 250
251 251 user_variables : list, optional
252 252 A list of variable names to pull from the user's namespace. They
253 253 will come back as a dict with these names as keys and their
254 254 :func:`repr` as values.
255 255
256 256 user_expressions : dict, optional
257 257 A dict mapping names to expressions to be evaluated in the user's
258 258 dict. The expression values are returned as strings formatted using
259 259 :func:`repr`.
260 260
261 261 allow_stdin : bool, optional (default self.allow_stdin)
262 262 Flag for whether the kernel can send stdin requests to frontends.
263 263
264 264 Some frontends (e.g. the Notebook) do not support stdin requests.
265 265 If raw_input is called from code executed from such a frontend, a
266 266 StdinNotImplementedError will be raised.
267 267
268 268 Returns
269 269 -------
270 270 The msg_id of the message sent.
271 271 """
272 272 if user_variables is None:
273 273 user_variables = []
274 274 if user_expressions is None:
275 275 user_expressions = {}
276 276 if allow_stdin is None:
277 277 allow_stdin = self.allow_stdin
278 278
279 279
280 280 # Don't waste network traffic if inputs are invalid
281 281 if not isinstance(code, basestring):
282 282 raise ValueError('code %r must be a string' % code)
283 283 validate_string_list(user_variables)
284 284 validate_string_dict(user_expressions)
285 285
286 286 # Create class for content/msg creation. Related to, but possibly
287 287 # not in Session.
288 288 content = dict(code=code, silent=silent, store_history=store_history,
289 289 user_variables=user_variables,
290 290 user_expressions=user_expressions,
291 291 allow_stdin=allow_stdin,
292 292 )
293 293 msg = self.session.msg('execute_request', content)
294 294 self._queue_send(msg)
295 295 return msg['header']['msg_id']
296 296
297 297 def complete(self, text, line, cursor_pos, block=None):
298 298 """Tab complete text in the kernel's namespace.
299 299
300 300 Parameters
301 301 ----------
302 302 text : str
303 303 The text to complete.
304 304 line : str
305 305 The full line of text that is the surrounding context for the
306 306 text to complete.
307 307 cursor_pos : int
308 308 The position of the cursor in the line where the completion was
309 309 requested.
310 310 block : str, optional
311 311 The full block of code in which the completion is being requested.
312 312
313 313 Returns
314 314 -------
315 315 The msg_id of the message sent.
316 316 """
317 317 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
318 318 msg = self.session.msg('complete_request', content)
319 319 self._queue_send(msg)
320 320 return msg['header']['msg_id']
321 321
322 322 def object_info(self, oname, detail_level=0):
323 323 """Get metadata information about an object in the kernel's namespace.
324 324
325 325 Parameters
326 326 ----------
327 327 oname : str
328 328 A string specifying the object name.
329 329 detail_level : int, optional
330 330 The level of detail for the introspection (0-2)
331 331
332 332 Returns
333 333 -------
334 334 The msg_id of the message sent.
335 335 """
336 336 content = dict(oname=oname, detail_level=detail_level)
337 337 msg = self.session.msg('object_info_request', content)
338 338 self._queue_send(msg)
339 339 return msg['header']['msg_id']
340 340
341 341 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
342 342 """Get entries from the kernel's history list.
343 343
344 344 Parameters
345 345 ----------
346 346 raw : bool
347 347 If True, return the raw input.
348 348 output : bool
349 349 If True, then return the output as well.
350 350 hist_access_type : str
351 351 'range' (fill in session, start and stop params), 'tail' (fill in n)
352 352 or 'search' (fill in pattern param).
353 353
354 354 session : int
355 355 For a range request, the session from which to get lines. Session
356 356 numbers are positive integers; negative ones count back from the
357 357 current session.
358 358 start : int
359 359 The first line number of a history range.
360 360 stop : int
361 361 The final (excluded) line number of a history range.
362 362
363 363 n : int
364 364 The number of lines of history to get for a tail request.
365 365
366 366 pattern : str
367 367 The glob-syntax pattern for a search request.
368 368
369 369 Returns
370 370 -------
371 371 The msg_id of the message sent.
372 372 """
373 373 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
374 374 **kwargs)
375 375 msg = self.session.msg('history_request', content)
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def kernel_info(self):
380 380 """Request kernel info."""
381 381 msg = self.session.msg('kernel_info_request')
382 382 self._queue_send(msg)
383 383 return msg['header']['msg_id']
384 384
385 385 def shutdown(self, restart=False):
386 386 """Request an immediate kernel shutdown.
387 387
388 388 Upon receipt of the (empty) reply, client code can safely assume that
389 389 the kernel has shut down and it's safe to forcefully terminate it if
390 390 it's still alive.
391 391
392 392 The kernel will send the reply via a function registered with Python's
393 393 atexit module, ensuring it's truly done as the kernel is done with all
394 394 normal operation.
395 395 """
396 396 # Send quit message to kernel. Once we implement kernel-side setattr,
397 397 # this should probably be done that way, but for now this will do.
398 398 msg = self.session.msg('shutdown_request', {'restart':restart})
399 399 self._queue_send(msg)
400 400 return msg['header']['msg_id']
401 401
402 402
403 403
404 404 class IOPubChannel(ZMQSocketChannel):
405 405 """The iopub channel which listens for messages that the kernel publishes.
406 406
407 407 This channel is where all output is published to frontends.
408 408 """
409 409
410 410 def __init__(self, context, session, address):
411 411 super(IOPubChannel, self).__init__(context, session, address)
412 412 self.ioloop = ioloop.IOLoop()
413 413
414 414 def run(self):
415 415 """The thread's main activity. Call start() instead."""
416 416 self.socket = self.context.socket(zmq.SUB)
417 417 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
418 418 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
419 419 self.socket.connect(self.address)
420 420 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
421 421 self.stream.on_recv(self._handle_recv)
422 422 self._run_loop()
423 423 try:
424 424 self.socket.close()
425 425 except:
426 426 pass
427 427
428 428 def stop(self):
429 429 """Stop the channel's event loop and join its thread."""
430 430 self.ioloop.stop()
431 431 super(IOPubChannel, self).stop()
432 432
433 433 def call_handlers(self, msg):
434 434 """This method is called in the ioloop thread when a message arrives.
435 435
436 436 Subclasses should override this method to handle incoming messages.
437 437 It is important to remember that this method is called in the thread
438 438 so that some logic must be done to ensure that the application leve
439 439 handlers are called in the application thread.
440 440 """
441 441 raise NotImplementedError('call_handlers must be defined in a subclass.')
442 442
443 443 def flush(self, timeout=1.0):
444 444 """Immediately processes all pending messages on the iopub channel.
445 445
446 446 Callers should use this method to ensure that :method:`call_handlers`
447 447 has been called for all messages that have been received on the
448 448 0MQ SUB socket of this channel.
449 449
450 450 This method is thread safe.
451 451
452 452 Parameters
453 453 ----------
454 454 timeout : float, optional
455 455 The maximum amount of time to spend flushing, in seconds. The
456 456 default is one second.
457 457 """
458 458 # We do the IOLoop callback process twice to ensure that the IOLoop
459 459 # gets to perform at least one full poll.
460 460 stop_time = time.time() + timeout
461 461 for i in xrange(2):
462 462 self._flushed = False
463 463 self.ioloop.add_callback(self._flush)
464 464 while not self._flushed and time.time() < stop_time:
465 465 time.sleep(0.01)
466 466
467 467 def _flush(self):
468 468 """Callback for :method:`self.flush`."""
469 469 self.stream.flush()
470 470 self._flushed = True
471 471
472 472
473 473 class StdInChannel(ZMQSocketChannel):
474 474 """The stdin channel to handle raw_input requests that the kernel makes."""
475 475
476 476 msg_queue = None
477 477
478 478 def __init__(self, context, session, address):
479 479 super(StdInChannel, self).__init__(context, session, address)
480 480 self.ioloop = ioloop.IOLoop()
481 481
482 482 def run(self):
483 483 """The thread's main activity. Call start() instead."""
484 484 self.socket = self.context.socket(zmq.DEALER)
485 485 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
486 486 self.socket.connect(self.address)
487 487 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
488 488 self.stream.on_recv(self._handle_recv)
489 489 self._run_loop()
490 490 try:
491 491 self.socket.close()
492 492 except:
493 493 pass
494 494
495 495 def stop(self):
496 496 """Stop the channel's event loop and join its thread."""
497 497 self.ioloop.stop()
498 498 super(StdInChannel, self).stop()
499 499
500 500 def call_handlers(self, msg):
501 501 """This method is called in the ioloop thread when a message arrives.
502 502
503 503 Subclasses should override this method to handle incoming messages.
504 504 It is important to remember that this method is called in the thread
505 505 so that some logic must be done to ensure that the application leve
506 506 handlers are called in the application thread.
507 507 """
508 508 raise NotImplementedError('call_handlers must be defined in a subclass.')
509 509
510 510 def input(self, string):
511 511 """Send a string of raw input to the kernel."""
512 512 content = dict(value=string)
513 513 msg = self.session.msg('input_reply', content)
514 514 self._queue_send(msg)
515 515
516 516
517 517 class HBChannel(ZMQSocketChannel):
518 518 """The heartbeat channel which monitors the kernel heartbeat.
519 519
520 520 Note that the heartbeat channel is paused by default. As long as you start
521 521 this channel, the kernel manager will ensure that it is paused and un-paused
522 522 as appropriate.
523 523 """
524 524
525 525 time_to_dead = 3.0
526 526 socket = None
527 527 poller = None
528 528 _running = None
529 529 _pause = None
530 530 _beating = None
531 531
532 532 def __init__(self, context, session, address):
533 533 super(HBChannel, self).__init__(context, session, address)
534 534 self._running = False
535 535 self._pause =True
536 536 self.poller = zmq.Poller()
537 537
538 538 def _create_socket(self):
539 539 if self.socket is not None:
540 540 # close previous socket, before opening a new one
541 541 self.poller.unregister(self.socket)
542 542 self.socket.close()
543 543 self.socket = self.context.socket(zmq.REQ)
544 544 self.socket.setsockopt(zmq.LINGER, 0)
545 545 self.socket.connect(self.address)
546 546
547 547 self.poller.register(self.socket, zmq.POLLIN)
548 548
549 549 def _poll(self, start_time):
550 550 """poll for heartbeat replies until we reach self.time_to_dead.
551 551
552 552 Ignores interrupts, and returns the result of poll(), which
553 553 will be an empty list if no messages arrived before the timeout,
554 554 or the event tuple if there is a message to receive.
555 555 """
556 556
557 557 until_dead = self.time_to_dead - (time.time() - start_time)
558 558 # ensure poll at least once
559 559 until_dead = max(until_dead, 1e-3)
560 560 events = []
561 561 while True:
562 562 try:
563 563 events = self.poller.poll(1000 * until_dead)
564 564 except ZMQError as e:
565 565 if e.errno == errno.EINTR:
566 566 # ignore interrupts during heartbeat
567 567 # this may never actually happen
568 568 until_dead = self.time_to_dead - (time.time() - start_time)
569 569 until_dead = max(until_dead, 1e-3)
570 570 pass
571 571 else:
572 572 raise
573 573 except Exception:
574 574 if self._exiting:
575 575 break
576 576 else:
577 577 raise
578 578 else:
579 579 break
580 580 return events
581 581
582 582 def run(self):
583 583 """The thread's main activity. Call start() instead."""
584 584 self._create_socket()
585 585 self._running = True
586 586 self._beating = True
587 587
588 588 while self._running:
589 589 if self._pause:
590 590 # just sleep, and skip the rest of the loop
591 591 time.sleep(self.time_to_dead)
592 592 continue
593 593
594 594 since_last_heartbeat = 0.0
595 595 # io.rprint('Ping from HB channel') # dbg
596 596 # no need to catch EFSM here, because the previous event was
597 597 # either a recv or connect, which cannot be followed by EFSM
598 598 self.socket.send(b'ping')
599 599 request_time = time.time()
600 600 ready = self._poll(request_time)
601 601 if ready:
602 602 self._beating = True
603 603 # the poll above guarantees we have something to recv
604 604 self.socket.recv()
605 605 # sleep the remainder of the cycle
606 606 remainder = self.time_to_dead - (time.time() - request_time)
607 607 if remainder > 0:
608 608 time.sleep(remainder)
609 609 continue
610 610 else:
611 611 # nothing was received within the time limit, signal heart failure
612 612 self._beating = False
613 613 since_last_heartbeat = time.time() - request_time
614 614 self.call_handlers(since_last_heartbeat)
615 615 # and close/reopen the socket, because the REQ/REP cycle has been broken
616 616 self._create_socket()
617 617 continue
618 618 try:
619 619 self.socket.close()
620 620 except:
621 621 pass
622 622
623 623 def pause(self):
624 624 """Pause the heartbeat."""
625 625 self._pause = True
626 626
627 627 def unpause(self):
628 628 """Unpause the heartbeat."""
629 629 self._pause = False
630 630
631 631 def is_beating(self):
632 632 """Is the heartbeat running and responsive (and not paused)."""
633 633 if self.is_alive() and not self._pause and self._beating:
634 634 return True
635 635 else:
636 636 return False
637 637
638 638 def stop(self):
639 639 """Stop the channel's event loop and join its thread."""
640 640 self._running = False
641 641 super(HBChannel, self).stop()
642 642
643 643 def call_handlers(self, since_last_heartbeat):
644 644 """This method is called in the ioloop thread when a message arrives.
645 645
646 646 Subclasses should override this method to handle incoming messages.
647 647 It is important to remember that this method is called in the thread
648 648 so that some logic must be done to ensure that the application level
649 649 handlers are called in the application thread.
650 650 """
651 651 raise NotImplementedError('call_handlers must be defined in a subclass.')
652 652
653 653
654 654 #-----------------------------------------------------------------------------
655 655 # Main kernel manager class
656 656 #-----------------------------------------------------------------------------
657 657
658 658 class KernelManager(Configurable):
659 659 """Manages a single kernel on this host along with its channels.
660 660
661 661 There are four channels associated with each kernel:
662 662
663 663 * shell: for request/reply calls to the kernel.
664 664 * iopub: for the kernel to publish results to frontends.
665 665 * hb: for monitoring the kernel's heartbeat.
666 666 * stdin: for frontends to reply to raw_input calls in the kernel.
667 667
668 668 The usage of the channels that this class manages is optional. It is
669 669 entirely possible to connect to the kernels directly using ZeroMQ
670 670 sockets. These channels are useful primarily for talking to a kernel
671 671 whose :class:`KernelManager` is in the same process.
672 672
673 673 This version manages kernels started using Popen.
674 674 """
675 675 # The PyZMQ Context to use for communication with the kernel.
676 676 context = Instance(zmq.Context)
677 677 def _context_default(self):
678 678 return zmq.Context.instance()
679 679
680 680 # The Session to use for communication with the kernel.
681 681 session = Instance(Session)
682 682 def _session_default(self):
683 683 return Session(config=self.config)
684 684
685 685 # The kernel process with which the KernelManager is communicating.
686 686 # generally a Popen instance
687 687 kernel = Any()
688 688
689 689 kernel_cmd = List(Unicode, config=True,
690 690 help="""The Popen Command to launch the kernel.
691 691 Override this if you have a custom
692 692 """
693 693 )
694 694 def _kernel_cmd_changed(self, name, old, new):
695 695 self.ipython_kernel = False
696 696
697 697 ipython_kernel = Bool(True)
698 698
699 699
700 700 # The addresses for the communication channels.
701 701 connection_file = Unicode('')
702 702
703 703 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
704 704
705 705 ip = Unicode(LOCALHOST, config=True,
706 706 help="""Set the kernel\'s IP address [default localhost].
707 707 If the IP address is something other than localhost, then
708 708 Consoles on other machines will be able to connect
709 709 to the Kernel, so be careful!"""
710 710 )
711 711 def _ip_default(self):
712 712 if self.transport == 'ipc':
713 713 if self.connection_file:
714 714 return os.path.splitext(self.connection_file)[0] + '-ipc'
715 715 else:
716 716 return 'kernel-ipc'
717 717 else:
718 718 return LOCALHOST
719 719 def _ip_changed(self, name, old, new):
720 720 if new == '*':
721 721 self.ip = '0.0.0.0'
722 722 shell_port = Integer(0)
723 723 iopub_port = Integer(0)
724 724 stdin_port = Integer(0)
725 725 hb_port = Integer(0)
726 726
727 727 # The classes to use for the various channels.
728 728 shell_channel_class = Type(ShellChannel)
729 729 iopub_channel_class = Type(IOPubChannel)
730 730 stdin_channel_class = Type(StdInChannel)
731 731 hb_channel_class = Type(HBChannel)
732 732
733 733 # Protected traits.
734 734 _launch_args = Any
735 735 _shell_channel = Any
736 736 _iopub_channel = Any
737 737 _stdin_channel = Any
738 738 _hb_channel = Any
739 739 _connection_file_written=Bool(False)
740 740
741 741 def __del__(self):
742 742 self.cleanup_connection_file()
743 743
744 744 #--------------------------------------------------------------------------
745 745 # Channel management methods:
746 746 #--------------------------------------------------------------------------
747 747
748 748 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
749 749 """Starts the channels for this kernel.
750 750
751 751 This will create the channels if they do not exist and then start
752 752 them (their activity runs in a thread). If port numbers of 0 are
753 753 being used (random ports) then you must first call
754 754 :method:`start_kernel`. If the channels have been stopped and you
755 755 call this, :class:`RuntimeError` will be raised.
756 756 """
757 757 if shell:
758 758 self.shell_channel.start()
759 759 if iopub:
760 760 self.iopub_channel.start()
761 761 if stdin:
762 762 self.stdin_channel.start()
763 763 self.shell_channel.allow_stdin = True
764 764 else:
765 765 self.shell_channel.allow_stdin = False
766 766 if hb:
767 767 self.hb_channel.start()
768 768
769 769 def stop_channels(self):
770 770 """Stops all the running channels for this kernel.
771 771
772 772 This stops their event loops and joins their threads.
773 773 """
774 774 if self.shell_channel.is_alive():
775 775 self.shell_channel.stop()
776 776 if self.iopub_channel.is_alive():
777 777 self.iopub_channel.stop()
778 778 if self.stdin_channel.is_alive():
779 779 self.stdin_channel.stop()
780 780 if self.hb_channel.is_alive():
781 781 self.hb_channel.stop()
782 782
783 783 @property
784 784 def channels_running(self):
785 785 """Are any of the channels created and running?"""
786 786 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
787 787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
788 788
789 789 def _make_url(self, port):
790 790 """Make a zmq url with a port.
791 791
792 792 There are two cases that this handles:
793 793
794 794 * tcp: tcp://ip:port
795 795 * ipc: ipc://ip-port
796 796 """
797 797 if self.transport == 'tcp':
798 798 return "tcp://%s:%i" % (self.ip, port)
799 799 else:
800 800 return "%s://%s-%s" % (self.transport, self.ip, port)
801 801
802 802 @property
803 803 def shell_channel(self):
804 804 """Get the shell channel object for this kernel."""
805 805 if self._shell_channel is None:
806 806 self._shell_channel = self.shell_channel_class(
807 807 self.context, self.session, self._make_url(self.shell_port)
808 808 )
809 809 return self._shell_channel
810 810
811 811 @property
812 812 def iopub_channel(self):
813 813 """Get the iopub channel object for this kernel."""
814 814 if self._iopub_channel is None:
815 815 self._iopub_channel = self.iopub_channel_class(
816 816 self.context, self.session, self._make_url(self.iopub_port)
817 817 )
818 818 return self._iopub_channel
819 819
820 820 @property
821 821 def stdin_channel(self):
822 822 """Get the stdin channel object for this kernel."""
823 823 if self._stdin_channel is None:
824 824 self._stdin_channel = self.stdin_channel_class(
825 825 self.context, self.session, self._make_url(self.stdin_port)
826 826 )
827 827 return self._stdin_channel
828 828
829 829 @property
830 830 def hb_channel(self):
831 831 """Get the hb channel object for this kernel."""
832 832 if self._hb_channel is None:
833 833 self._hb_channel = self.hb_channel_class(
834 834 self.context, self.session, self._make_url(self.hb_port)
835 835 )
836 836 return self._hb_channel
837 837
838 838 #--------------------------------------------------------------------------
839 839 # Connection and ipc file management
840 840 #--------------------------------------------------------------------------
841 841
842 842 def cleanup_connection_file(self):
843 843 """Cleanup connection file *if we wrote it*
844 844
845 845 Will not raise if the connection file was already removed somehow.
846 846 """
847 847 if self._connection_file_written:
848 848 # cleanup connection files on full shutdown of kernel we started
849 849 self._connection_file_written = False
850 850 try:
851 851 os.remove(self.connection_file)
852 except (IOError, OSError):
852 except (IOError, OSError, AttributeError):
853 853 pass
854 854
855 855 def cleanup_ipc_files(self):
856 856 """Cleanup ipc files if we wrote them."""
857 857 if self.transport != 'ipc':
858 858 return
859 859 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
860 860 ipcfile = "%s-%i" % (self.ip, port)
861 861 try:
862 862 os.remove(ipcfile)
863 863 except (IOError, OSError):
864 864 pass
865 865
866 866 def load_connection_file(self):
867 867 """Load connection info from JSON dict in self.connection_file."""
868 868 with open(self.connection_file) as f:
869 869 cfg = json.loads(f.read())
870 870
871 871 from pprint import pprint
872 872 pprint(cfg)
873 873 self.transport = cfg.get('transport', 'tcp')
874 874 self.ip = cfg['ip']
875 875 self.shell_port = cfg['shell_port']
876 876 self.stdin_port = cfg['stdin_port']
877 877 self.iopub_port = cfg['iopub_port']
878 878 self.hb_port = cfg['hb_port']
879 879 self.session.key = str_to_bytes(cfg['key'])
880 880
881 881 def write_connection_file(self):
882 882 """Write connection info to JSON dict in self.connection_file."""
883 883 if self._connection_file_written:
884 884 return
885 885 self.connection_file,cfg = write_connection_file(self.connection_file,
886 886 transport=self.transport, ip=self.ip, key=self.session.key,
887 887 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
888 888 shell_port=self.shell_port, hb_port=self.hb_port)
889 889 # write_connection_file also sets default ports:
890 890 self.shell_port = cfg['shell_port']
891 891 self.stdin_port = cfg['stdin_port']
892 892 self.iopub_port = cfg['iopub_port']
893 893 self.hb_port = cfg['hb_port']
894 894
895 895 self._connection_file_written = True
896 896
897 897 #--------------------------------------------------------------------------
898 898 # Kernel management
899 899 #--------------------------------------------------------------------------
900 900
901 901 def format_kernel_cmd(self, **kw):
902 902 """format templated args (e.g. {connection_file})"""
903 903 if self.kernel_cmd:
904 904 cmd = self.kernel_cmd
905 905 else:
906 906 cmd = make_ipkernel_cmd(
907 907 'from IPython.kernel.zmq.kernelapp import main; main()',
908 908 **kw
909 909 )
910 910 ns = dict(connection_file=self.connection_file)
911 911 ns.update(self._launch_args)
912 912 return [ c.format(**ns) for c in cmd ]
913 913
914 914 def _launch_kernel(self, kernel_cmd, **kw):
915 915 """actually launch the kernel
916 916
917 917 override in a subclass to launch kernel subprocesses differently
918 918 """
919 919 return launch_kernel(kernel_cmd, **kw)
920 920
921 921 def start_kernel(self, **kw):
922 922 """Starts a kernel on this host in a separate process.
923 923
924 924 If random ports (port=0) are being used, this method must be called
925 925 before the channels are created.
926 926
927 927 Parameters:
928 928 -----------
929 929 **kw : optional
930 930 keyword arguments that are passed down to build the kernel_cmd
931 931 and launching the kernel (e.g. Popen kwargs).
932 932 """
933 933 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
934 934 raise RuntimeError("Can only launch a kernel on a local interface. "
935 935 "Make sure that the '*_address' attributes are "
936 936 "configured properly. "
937 937 "Currently valid addresses are: %s"%LOCAL_IPS
938 938 )
939 939
940 940 # write connection file / get default ports
941 941 self.write_connection_file()
942 942
943 943 # save kwargs for use in restart
944 944 self._launch_args = kw.copy()
945 945 # build the Popen cmd
946 946 kernel_cmd = self.format_kernel_cmd(**kw)
947 947 # launch the kernel subprocess
948 948 self.kernel = self._launch_kernel(kernel_cmd,
949 949 ipython_kernel=self.ipython_kernel,
950 950 **kw)
951 951
952 952 def shutdown_kernel(self, now=False, restart=False):
953 953 """Attempts to the stop the kernel process cleanly.
954 954
955 955 This attempts to shutdown the kernels cleanly by:
956 956
957 957 1. Sending it a shutdown message over the shell channel.
958 958 2. If that fails, the kernel is shutdown forcibly by sending it
959 959 a signal.
960 960
961 961 Parameters:
962 962 -----------
963 963 now : bool
964 964 Should the kernel be forcible killed *now*. This skips the
965 965 first, nice shutdown attempt.
966 966 restart: bool
967 967 Will this kernel be restarted after it is shutdown. When this
968 968 is True, connection files will not be cleaned up.
969 969 """
970 970 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
971 971 if sys.platform == 'win32':
972 972 self._kill_kernel()
973 973 return
974 974
975 975 # Pause the heart beat channel if it exists.
976 976 if self._hb_channel is not None:
977 977 self._hb_channel.pause()
978 978
979 979 if now:
980 980 if self.has_kernel:
981 981 self._kill_kernel()
982 982 else:
983 983 # Don't send any additional kernel kill messages immediately, to give
984 984 # the kernel a chance to properly execute shutdown actions. Wait for at
985 985 # most 1s, checking every 0.1s.
986 986 self.shell_channel.shutdown(restart=restart)
987 987 for i in range(10):
988 988 if self.is_alive:
989 989 time.sleep(0.1)
990 990 else:
991 991 break
992 992 else:
993 993 # OK, we've waited long enough.
994 994 if self.has_kernel:
995 995 self._kill_kernel()
996 996
997 997 if not restart:
998 998 self.cleanup_connection_file()
999 999 self.cleanup_ipc_files()
1000 1000 else:
1001 1001 self.cleanup_ipc_files()
1002 1002
1003 1003 def restart_kernel(self, now=False, **kw):
1004 1004 """Restarts a kernel with the arguments that were used to launch it.
1005 1005
1006 1006 If the old kernel was launched with random ports, the same ports will be
1007 1007 used for the new kernel. The same connection file is used again.
1008 1008
1009 1009 Parameters
1010 1010 ----------
1011 1011 now : bool, optional
1012 1012 If True, the kernel is forcefully restarted *immediately*, without
1013 1013 having a chance to do any cleanup action. Otherwise the kernel is
1014 1014 given 1s to clean up before a forceful restart is issued.
1015 1015
1016 1016 In all cases the kernel is restarted, the only difference is whether
1017 1017 it is given a chance to perform a clean shutdown or not.
1018 1018
1019 1019 **kw : optional
1020 1020 Any options specified here will overwrite those used to launch the
1021 1021 kernel.
1022 1022 """
1023 1023 if self._launch_args is None:
1024 1024 raise RuntimeError("Cannot restart the kernel. "
1025 1025 "No previous call to 'start_kernel'.")
1026 1026 else:
1027 1027 # Stop currently running kernel.
1028 1028 self.shutdown_kernel(now=now, restart=True)
1029 1029
1030 1030 # Start new kernel.
1031 1031 self._launch_args.update(kw)
1032 1032 self.start_kernel(**self._launch_args)
1033 1033
1034 1034 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
1035 1035 # unless there is some delay here.
1036 1036 if sys.platform == 'win32':
1037 1037 time.sleep(0.2)
1038 1038
1039 1039 @property
1040 1040 def has_kernel(self):
1041 1041 """Has a kernel been started that we are managing."""
1042 1042 return self.kernel is not None
1043 1043
1044 1044 def _kill_kernel(self):
1045 1045 """Kill the running kernel.
1046 1046
1047 1047 This is a private method, callers should use shutdown_kernel(now=True).
1048 1048 """
1049 1049 if self.has_kernel:
1050 1050 # Pause the heart beat channel if it exists.
1051 1051 if self._hb_channel is not None:
1052 1052 self._hb_channel.pause()
1053 1053
1054 1054 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1055 1055 # TerminateProcess() on Win32).
1056 1056 try:
1057 1057 self.kernel.kill()
1058 1058 except OSError as e:
1059 1059 # In Windows, we will get an Access Denied error if the process
1060 1060 # has already terminated. Ignore it.
1061 1061 if sys.platform == 'win32':
1062 1062 if e.winerror != 5:
1063 1063 raise
1064 1064 # On Unix, we may get an ESRCH error if the process has already
1065 1065 # terminated. Ignore it.
1066 1066 else:
1067 1067 from errno import ESRCH
1068 1068 if e.errno != ESRCH:
1069 1069 raise
1070 1070
1071 1071 # Block until the kernel terminates.
1072 1072 self.kernel.wait()
1073 1073 self.kernel = None
1074 1074 else:
1075 1075 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1076 1076
1077 1077 def interrupt_kernel(self):
1078 1078 """Interrupts the kernel by sending it a signal.
1079 1079
1080 1080 Unlike ``signal_kernel``, this operation is well supported on all
1081 1081 platforms.
1082 1082 """
1083 1083 if self.has_kernel:
1084 1084 if sys.platform == 'win32':
1085 1085 from .zmq.parentpoller import ParentPollerWindows as Poller
1086 1086 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1087 1087 else:
1088 1088 self.kernel.send_signal(signal.SIGINT)
1089 1089 else:
1090 1090 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1091 1091
1092 1092 def signal_kernel(self, signum):
1093 1093 """Sends a signal to the kernel.
1094 1094
1095 1095 Note that since only SIGTERM is supported on Windows, this function is
1096 1096 only useful on Unix systems.
1097 1097 """
1098 1098 if self.has_kernel:
1099 1099 self.kernel.send_signal(signum)
1100 1100 else:
1101 1101 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1102 1102
1103 1103 @property
1104 1104 def is_alive(self):
1105 1105 """Is the kernel process still running?"""
1106 1106 if self.has_kernel:
1107 1107 if self.kernel.poll() is None:
1108 1108 return True
1109 1109 else:
1110 1110 return False
1111 1111 elif self._hb_channel is not None:
1112 1112 # We didn't start the kernel with this KernelManager so we
1113 1113 # use the heartbeat.
1114 1114 return self._hb_channel.is_beating()
1115 1115 else:
1116 1116 # no heartbeat and not local, we can't tell if it's running,
1117 1117 # so naively return True
1118 1118 return True
1119 1119
1120 1120
1121 1121 #-----------------------------------------------------------------------------
1122 1122 # ABC Registration
1123 1123 #-----------------------------------------------------------------------------
1124 1124
1125 1125 ShellChannelABC.register(ShellChannel)
1126 1126 IOPubChannelABC.register(IOPubChannel)
1127 1127 HBChannelABC.register(HBChannel)
1128 1128 StdInChannelABC.register(StdInChannel)
1129 1129 KernelManagerABC.register(KernelManager)
1130 1130
@@ -1,80 +1,81 b''
1 1 """Tests for the notebook kernel and session manager."""
2 2
3 3 from subprocess import PIPE
4 4 import time
5 5 from unittest import TestCase
6 6
7 7 from IPython.testing import decorators as dec
8 8
9 9 from IPython.config.loader import Config
10 10 from IPython.utils.localinterfaces import LOCALHOST
11 11 from IPython.kernel.kernelmanager import KernelManager
12 12 from IPython.kernel.multikernelmanager import MultiKernelManager
13 13
14 14 class TestKernelManager(TestCase):
15 15
16 16 def _get_tcp_km(self):
17 17 return MultiKernelManager()
18 18
19 19 def _get_ipc_km(self):
20 20 c = Config()
21 21 c.KernelManager.transport = 'ipc'
22 22 c.KernelManager.ip = 'test'
23 23 km = MultiKernelManager(config=c)
24 24 return km
25 25
26 26 def _run_lifecycle(self, km):
27 27 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
28 28 self.assertTrue(kid in km)
29 29 self.assertTrue(kid in km.list_kernel_ids())
30 30 self.assertEqual(len(km),1)
31 31 km.restart_kernel(kid)
32 32 self.assertTrue(kid in km.list_kernel_ids())
33 33 # We need a delay here to give the restarting kernel a chance to
34 34 # restart. Otherwise, the interrupt will kill it, causing the test
35 35 # suite to hang. The reason it *hangs* is that the shutdown
36 36 # message for the restart sometimes hasn't been sent to the kernel.
37 37 # Because linger is oo on the shell channel, the context can't
38 38 # close until the message is sent to the kernel, which is not dead.
39 39 time.sleep(1.0)
40 40 km.interrupt_kernel(kid)
41 41 k = km.get_kernel(kid)
42 42 self.assertTrue(isinstance(k, KernelManager))
43 43 km.shutdown_kernel(kid)
44 44 self.assertTrue(not kid in km)
45 45
46 46 def _run_cinfo(self, km, transport, ip):
47 47 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
48 48 k = km.get_kernel(kid)
49 49 cinfo = km.get_connection_info(kid)
50 50 self.assertEqual(transport, cinfo['transport'])
51 51 self.assertEqual(ip, cinfo['ip'])
52 52 self.assertTrue('stdin_port' in cinfo)
53 53 self.assertTrue('iopub_port' in cinfo)
54 54 stream = km.create_iopub_stream(kid)
55 55 stream.close()
56 56 self.assertTrue('shell_port' in cinfo)
57 57 stream = km.create_shell_stream(kid)
58 58 stream.close()
59 59 self.assertTrue('hb_port' in cinfo)
60 60 stream = km.create_hb_stream(kid)
61 61 stream.close()
62 62 km.shutdown_kernel(kid)
63 63
64 64 def test_tcp_lifecycle(self):
65 65 km = self._get_tcp_km()
66 66 self._run_lifecycle(km)
67 67
68 @dec.skip_win32
69 68 def test_tcp_cinfo(self):
70 69 km = self._get_tcp_km()
71 70 self._run_cinfo(km, 'tcp', LOCALHOST)
72 71
72 @dec.skip_win32
73 73 def test_ipc_lifecycle(self):
74 74 km = self._get_ipc_km()
75 75 self._run_lifecycle(km)
76 76
77 @dec.skip_win32
77 78 def test_ipc_cinfo(self):
78 79 km = self._get_ipc_km()
79 80 self._run_cinfo(km, 'ipc', 'test')
80 81
General Comments 0
You need to be logged in to leave comments. Login now