##// END OF EJS Templates
Removing connection/ipc file cleanup from KernelManager.__del__.
Brian E. Granger -
Show More
@@ -1,1082 +1,1078
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45 from IPython.zmq.kernelmanagerabc import (
46 46 ShellChannelABC, IOPubChannelABC,
47 47 HBChannelABC, StdInChannelABC,
48 48 KernelManagerABC
49 49 )
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Constants and exceptions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class InvalidPortNumber(Exception):
57 57 pass
58 58
59 59 #-----------------------------------------------------------------------------
60 60 # Utility functions
61 61 #-----------------------------------------------------------------------------
62 62
63 63 # some utilities to validate message structure, these might get moved elsewhere
64 64 # if they prove to have more generic utility
65 65
66 66 def validate_string_list(lst):
67 67 """Validate that the input is a list of strings.
68 68
69 69 Raises ValueError if not."""
70 70 if not isinstance(lst, list):
71 71 raise ValueError('input %r must be a list' % lst)
72 72 for x in lst:
73 73 if not isinstance(x, basestring):
74 74 raise ValueError('element %r in list must be a string' % x)
75 75
76 76
77 77 def validate_string_dict(dct):
78 78 """Validate that the input is a dict with string keys and values.
79 79
80 80 Raises ValueError if not."""
81 81 for k,v in dct.iteritems():
82 82 if not isinstance(k, basestring):
83 83 raise ValueError('key %r in dict must be a string' % k)
84 84 if not isinstance(v, basestring):
85 85 raise ValueError('value %r in dict must be a string' % v)
86 86
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # ZMQ Socket Channel classes
90 90 #-----------------------------------------------------------------------------
91 91
92 92 class ZMQSocketChannel(Thread):
93 93 """The base class for the channels that use ZMQ sockets."""
94 94 context = None
95 95 session = None
96 96 socket = None
97 97 ioloop = None
98 98 stream = None
99 99 _address = None
100 100 _exiting = False
101 101
102 102 def __init__(self, context, session, address):
103 103 """Create a channel.
104 104
105 105 Parameters
106 106 ----------
107 107 context : :class:`zmq.Context`
108 108 The ZMQ context to use.
109 109 session : :class:`session.Session`
110 110 The session to use.
111 111 address : zmq url
112 112 Standard (ip, port) tuple that the kernel is listening on.
113 113 """
114 114 super(ZMQSocketChannel, self).__init__()
115 115 self.daemon = True
116 116
117 117 self.context = context
118 118 self.session = session
119 119 if isinstance(address, tuple):
120 120 if address[1] == 0:
121 121 message = 'The port number for a channel cannot be 0.'
122 122 raise InvalidPortNumber(message)
123 123 address = "tcp://%s:%i" % address
124 124 self._address = address
125 125 atexit.register(self._notice_exit)
126 126
127 127 def _notice_exit(self):
128 128 self._exiting = True
129 129
130 130 def _run_loop(self):
131 131 """Run my loop, ignoring EINTR events in the poller"""
132 132 while True:
133 133 try:
134 134 self.ioloop.start()
135 135 except ZMQError as e:
136 136 if e.errno == errno.EINTR:
137 137 continue
138 138 else:
139 139 raise
140 140 except Exception:
141 141 if self._exiting:
142 142 break
143 143 else:
144 144 raise
145 145 else:
146 146 break
147 147
148 148 def stop(self):
149 149 """Stop the channel's event loop and join its thread.
150 150
151 151 This calls :method:`Thread.join` and returns when the thread
152 152 terminates. :class:`RuntimeError` will be raised if
153 153 :method:`self.start` is called again.
154 154 """
155 155 self.join()
156 156
157 157 @property
158 158 def address(self):
159 159 """Get the channel's address as a zmq url string.
160 160
161 161 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 162 """
163 163 return self._address
164 164
165 165 def _queue_send(self, msg):
166 166 """Queue a message to be sent from the IOLoop's thread.
167 167
168 168 Parameters
169 169 ----------
170 170 msg : message to send
171 171
172 172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 173 thread control of the action.
174 174 """
175 175 def thread_send():
176 176 self.session.send(self.stream, msg)
177 177 self.ioloop.add_callback(thread_send)
178 178
179 179 def _handle_recv(self, msg):
180 180 """Callback for stream.on_recv.
181 181
182 182 Unpacks message, and calls handlers with it.
183 183 """
184 184 ident,smsg = self.session.feed_identities(msg)
185 185 self.call_handlers(self.session.unserialize(smsg))
186 186
187 187
188 188
189 189 class ShellChannel(ZMQSocketChannel):
190 190 """The shell channel for issuing request/replies to the kernel."""
191 191
192 192 command_queue = None
193 193 # flag for whether execute requests should be allowed to call raw_input:
194 194 allow_stdin = True
195 195
196 196 def __init__(self, context, session, address):
197 197 super(ShellChannel, self).__init__(context, session, address)
198 198 self.ioloop = ioloop.IOLoop()
199 199
200 200 def run(self):
201 201 """The thread's main activity. Call start() instead."""
202 202 self.socket = self.context.socket(zmq.DEALER)
203 203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 204 self.socket.connect(self.address)
205 205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 206 self.stream.on_recv(self._handle_recv)
207 207 self._run_loop()
208 208 try:
209 209 self.socket.close()
210 210 except:
211 211 pass
212 212
213 213 def stop(self):
214 214 """Stop the channel's event loop and join its thread."""
215 215 self.ioloop.stop()
216 216 super(ShellChannel, self).stop()
217 217
218 218 def call_handlers(self, msg):
219 219 """This method is called in the ioloop thread when a message arrives.
220 220
221 221 Subclasses should override this method to handle incoming messages.
222 222 It is important to remember that this method is called in the thread
223 223 so that some logic must be done to ensure that the application leve
224 224 handlers are called in the application thread.
225 225 """
226 226 raise NotImplementedError('call_handlers must be defined in a subclass.')
227 227
228 228 def execute(self, code, silent=False, store_history=True,
229 229 user_variables=None, user_expressions=None, allow_stdin=None):
230 230 """Execute code in the kernel.
231 231
232 232 Parameters
233 233 ----------
234 234 code : str
235 235 A string of Python code.
236 236
237 237 silent : bool, optional (default False)
238 238 If set, the kernel will execute the code as quietly possible, and
239 239 will force store_history to be False.
240 240
241 241 store_history : bool, optional (default True)
242 242 If set, the kernel will store command history. This is forced
243 243 to be False if silent is True.
244 244
245 245 user_variables : list, optional
246 246 A list of variable names to pull from the user's namespace. They
247 247 will come back as a dict with these names as keys and their
248 248 :func:`repr` as values.
249 249
250 250 user_expressions : dict, optional
251 251 A dict mapping names to expressions to be evaluated in the user's
252 252 dict. The expression values are returned as strings formatted using
253 253 :func:`repr`.
254 254
255 255 allow_stdin : bool, optional (default self.allow_stdin)
256 256 Flag for whether the kernel can send stdin requests to frontends.
257 257
258 258 Some frontends (e.g. the Notebook) do not support stdin requests.
259 259 If raw_input is called from code executed from such a frontend, a
260 260 StdinNotImplementedError will be raised.
261 261
262 262 Returns
263 263 -------
264 264 The msg_id of the message sent.
265 265 """
266 266 if user_variables is None:
267 267 user_variables = []
268 268 if user_expressions is None:
269 269 user_expressions = {}
270 270 if allow_stdin is None:
271 271 allow_stdin = self.allow_stdin
272 272
273 273
274 274 # Don't waste network traffic if inputs are invalid
275 275 if not isinstance(code, basestring):
276 276 raise ValueError('code %r must be a string' % code)
277 277 validate_string_list(user_variables)
278 278 validate_string_dict(user_expressions)
279 279
280 280 # Create class for content/msg creation. Related to, but possibly
281 281 # not in Session.
282 282 content = dict(code=code, silent=silent, store_history=store_history,
283 283 user_variables=user_variables,
284 284 user_expressions=user_expressions,
285 285 allow_stdin=allow_stdin,
286 286 )
287 287 msg = self.session.msg('execute_request', content)
288 288 self._queue_send(msg)
289 289 return msg['header']['msg_id']
290 290
291 291 def complete(self, text, line, cursor_pos, block=None):
292 292 """Tab complete text in the kernel's namespace.
293 293
294 294 Parameters
295 295 ----------
296 296 text : str
297 297 The text to complete.
298 298 line : str
299 299 The full line of text that is the surrounding context for the
300 300 text to complete.
301 301 cursor_pos : int
302 302 The position of the cursor in the line where the completion was
303 303 requested.
304 304 block : str, optional
305 305 The full block of code in which the completion is being requested.
306 306
307 307 Returns
308 308 -------
309 309 The msg_id of the message sent.
310 310 """
311 311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 312 msg = self.session.msg('complete_request', content)
313 313 self._queue_send(msg)
314 314 return msg['header']['msg_id']
315 315
316 316 def object_info(self, oname, detail_level=0):
317 317 """Get metadata information about an object in the kernel's namespace.
318 318
319 319 Parameters
320 320 ----------
321 321 oname : str
322 322 A string specifying the object name.
323 323 detail_level : int, optional
324 324 The level of detail for the introspection (0-2)
325 325
326 326 Returns
327 327 -------
328 328 The msg_id of the message sent.
329 329 """
330 330 content = dict(oname=oname, detail_level=detail_level)
331 331 msg = self.session.msg('object_info_request', content)
332 332 self._queue_send(msg)
333 333 return msg['header']['msg_id']
334 334
335 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 336 """Get entries from the kernel's history list.
337 337
338 338 Parameters
339 339 ----------
340 340 raw : bool
341 341 If True, return the raw input.
342 342 output : bool
343 343 If True, then return the output as well.
344 344 hist_access_type : str
345 345 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 346 or 'search' (fill in pattern param).
347 347
348 348 session : int
349 349 For a range request, the session from which to get lines. Session
350 350 numbers are positive integers; negative ones count back from the
351 351 current session.
352 352 start : int
353 353 The first line number of a history range.
354 354 stop : int
355 355 The final (excluded) line number of a history range.
356 356
357 357 n : int
358 358 The number of lines of history to get for a tail request.
359 359
360 360 pattern : str
361 361 The glob-syntax pattern for a search request.
362 362
363 363 Returns
364 364 -------
365 365 The msg_id of the message sent.
366 366 """
367 367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 368 **kwargs)
369 369 msg = self.session.msg('history_request', content)
370 370 self._queue_send(msg)
371 371 return msg['header']['msg_id']
372 372
373 373 def kernel_info(self):
374 374 """Request kernel info."""
375 375 msg = self.session.msg('kernel_info_request')
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def shutdown(self, restart=False):
380 380 """Request an immediate kernel shutdown.
381 381
382 382 Upon receipt of the (empty) reply, client code can safely assume that
383 383 the kernel has shut down and it's safe to forcefully terminate it if
384 384 it's still alive.
385 385
386 386 The kernel will send the reply via a function registered with Python's
387 387 atexit module, ensuring it's truly done as the kernel is done with all
388 388 normal operation.
389 389 """
390 390 # Send quit message to kernel. Once we implement kernel-side setattr,
391 391 # this should probably be done that way, but for now this will do.
392 392 msg = self.session.msg('shutdown_request', {'restart':restart})
393 393 self._queue_send(msg)
394 394 return msg['header']['msg_id']
395 395
396 396
397 397
398 398 class IOPubChannel(ZMQSocketChannel):
399 399 """The iopub channel which listens for messages that the kernel publishes.
400 400
401 401 This channel is where all output is published to frontends.
402 402 """
403 403
404 404 def __init__(self, context, session, address):
405 405 super(IOPubChannel, self).__init__(context, session, address)
406 406 self.ioloop = ioloop.IOLoop()
407 407
408 408 def run(self):
409 409 """The thread's main activity. Call start() instead."""
410 410 self.socket = self.context.socket(zmq.SUB)
411 411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 413 self.socket.connect(self.address)
414 414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 415 self.stream.on_recv(self._handle_recv)
416 416 self._run_loop()
417 417 try:
418 418 self.socket.close()
419 419 except:
420 420 pass
421 421
422 422 def stop(self):
423 423 """Stop the channel's event loop and join its thread."""
424 424 self.ioloop.stop()
425 425 super(IOPubChannel, self).stop()
426 426
427 427 def call_handlers(self, msg):
428 428 """This method is called in the ioloop thread when a message arrives.
429 429
430 430 Subclasses should override this method to handle incoming messages.
431 431 It is important to remember that this method is called in the thread
432 432 so that some logic must be done to ensure that the application leve
433 433 handlers are called in the application thread.
434 434 """
435 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436 436
437 437 def flush(self, timeout=1.0):
438 438 """Immediately processes all pending messages on the iopub channel.
439 439
440 440 Callers should use this method to ensure that :method:`call_handlers`
441 441 has been called for all messages that have been received on the
442 442 0MQ SUB socket of this channel.
443 443
444 444 This method is thread safe.
445 445
446 446 Parameters
447 447 ----------
448 448 timeout : float, optional
449 449 The maximum amount of time to spend flushing, in seconds. The
450 450 default is one second.
451 451 """
452 452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 453 # gets to perform at least one full poll.
454 454 stop_time = time.time() + timeout
455 455 for i in xrange(2):
456 456 self._flushed = False
457 457 self.ioloop.add_callback(self._flush)
458 458 while not self._flushed and time.time() < stop_time:
459 459 time.sleep(0.01)
460 460
461 461 def _flush(self):
462 462 """Callback for :method:`self.flush`."""
463 463 self.stream.flush()
464 464 self._flushed = True
465 465
466 466
467 467 class StdInChannel(ZMQSocketChannel):
468 468 """The stdin channel to handle raw_input requests that the kernel makes."""
469 469
470 470 msg_queue = None
471 471
472 472 def __init__(self, context, session, address):
473 473 super(StdInChannel, self).__init__(context, session, address)
474 474 self.ioloop = ioloop.IOLoop()
475 475
476 476 def run(self):
477 477 """The thread's main activity. Call start() instead."""
478 478 self.socket = self.context.socket(zmq.DEALER)
479 479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 480 self.socket.connect(self.address)
481 481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 482 self.stream.on_recv(self._handle_recv)
483 483 self._run_loop()
484 484 try:
485 485 self.socket.close()
486 486 except:
487 487 pass
488 488
489 489 def stop(self):
490 490 """Stop the channel's event loop and join its thread."""
491 491 self.ioloop.stop()
492 492 super(StdInChannel, self).stop()
493 493
494 494 def call_handlers(self, msg):
495 495 """This method is called in the ioloop thread when a message arrives.
496 496
497 497 Subclasses should override this method to handle incoming messages.
498 498 It is important to remember that this method is called in the thread
499 499 so that some logic must be done to ensure that the application leve
500 500 handlers are called in the application thread.
501 501 """
502 502 raise NotImplementedError('call_handlers must be defined in a subclass.')
503 503
504 504 def input(self, string):
505 505 """Send a string of raw input to the kernel."""
506 506 content = dict(value=string)
507 507 msg = self.session.msg('input_reply', content)
508 508 self._queue_send(msg)
509 509
510 510
511 511 class HBChannel(ZMQSocketChannel):
512 512 """The heartbeat channel which monitors the kernel heartbeat.
513 513
514 514 Note that the heartbeat channel is paused by default. As long as you start
515 515 this channel, the kernel manager will ensure that it is paused and un-paused
516 516 as appropriate.
517 517 """
518 518
519 519 time_to_dead = 3.0
520 520 socket = None
521 521 poller = None
522 522 _running = None
523 523 _pause = None
524 524 _beating = None
525 525
526 526 def __init__(self, context, session, address):
527 527 super(HBChannel, self).__init__(context, session, address)
528 528 self._running = False
529 529 self._pause =True
530 530 self.poller = zmq.Poller()
531 531
532 532 def _create_socket(self):
533 533 if self.socket is not None:
534 534 # close previous socket, before opening a new one
535 535 self.poller.unregister(self.socket)
536 536 self.socket.close()
537 537 self.socket = self.context.socket(zmq.REQ)
538 538 self.socket.setsockopt(zmq.LINGER, 0)
539 539 self.socket.connect(self.address)
540 540
541 541 self.poller.register(self.socket, zmq.POLLIN)
542 542
543 543 def _poll(self, start_time):
544 544 """poll for heartbeat replies until we reach self.time_to_dead.
545 545
546 546 Ignores interrupts, and returns the result of poll(), which
547 547 will be an empty list if no messages arrived before the timeout,
548 548 or the event tuple if there is a message to receive.
549 549 """
550 550
551 551 until_dead = self.time_to_dead - (time.time() - start_time)
552 552 # ensure poll at least once
553 553 until_dead = max(until_dead, 1e-3)
554 554 events = []
555 555 while True:
556 556 try:
557 557 events = self.poller.poll(1000 * until_dead)
558 558 except ZMQError as e:
559 559 if e.errno == errno.EINTR:
560 560 # ignore interrupts during heartbeat
561 561 # this may never actually happen
562 562 until_dead = self.time_to_dead - (time.time() - start_time)
563 563 until_dead = max(until_dead, 1e-3)
564 564 pass
565 565 else:
566 566 raise
567 567 except Exception:
568 568 if self._exiting:
569 569 break
570 570 else:
571 571 raise
572 572 else:
573 573 break
574 574 return events
575 575
576 576 def run(self):
577 577 """The thread's main activity. Call start() instead."""
578 578 self._create_socket()
579 579 self._running = True
580 580 self._beating = True
581 581
582 582 while self._running:
583 583 if self._pause:
584 584 # just sleep, and skip the rest of the loop
585 585 time.sleep(self.time_to_dead)
586 586 continue
587 587
588 588 since_last_heartbeat = 0.0
589 589 # io.rprint('Ping from HB channel') # dbg
590 590 # no need to catch EFSM here, because the previous event was
591 591 # either a recv or connect, which cannot be followed by EFSM
592 592 self.socket.send(b'ping')
593 593 request_time = time.time()
594 594 ready = self._poll(request_time)
595 595 if ready:
596 596 self._beating = True
597 597 # the poll above guarantees we have something to recv
598 598 self.socket.recv()
599 599 # sleep the remainder of the cycle
600 600 remainder = self.time_to_dead - (time.time() - request_time)
601 601 if remainder > 0:
602 602 time.sleep(remainder)
603 603 continue
604 604 else:
605 605 # nothing was received within the time limit, signal heart failure
606 606 self._beating = False
607 607 since_last_heartbeat = time.time() - request_time
608 608 self.call_handlers(since_last_heartbeat)
609 609 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 610 self._create_socket()
611 611 continue
612 612 try:
613 613 self.socket.close()
614 614 except:
615 615 pass
616 616
617 617 def pause(self):
618 618 """Pause the heartbeat."""
619 619 self._pause = True
620 620
621 621 def unpause(self):
622 622 """Unpause the heartbeat."""
623 623 self._pause = False
624 624
625 625 def is_beating(self):
626 626 """Is the heartbeat running and responsive (and not paused)."""
627 627 if self.is_alive() and not self._pause and self._beating:
628 628 return True
629 629 else:
630 630 return False
631 631
632 632 def stop(self):
633 633 """Stop the channel's event loop and join its thread."""
634 634 self._running = False
635 635 super(HBChannel, self).stop()
636 636
637 637 def call_handlers(self, since_last_heartbeat):
638 638 """This method is called in the ioloop thread when a message arrives.
639 639
640 640 Subclasses should override this method to handle incoming messages.
641 641 It is important to remember that this method is called in the thread
642 642 so that some logic must be done to ensure that the application level
643 643 handlers are called in the application thread.
644 644 """
645 645 raise NotImplementedError('call_handlers must be defined in a subclass.')
646 646
647 647
648 648 #-----------------------------------------------------------------------------
649 649 # Main kernel manager class
650 650 #-----------------------------------------------------------------------------
651 651
652 652 class KernelManager(Configurable):
653 653 """Manages a single kernel on this host along with its channels.
654 654
655 655 There are four channels associated with each kernel:
656 656
657 657 * shell: for request/reply calls to the kernel.
658 658 * iopub: for the kernel to publish results to frontends.
659 659 * hb: for monitoring the kernel's heartbeat.
660 660 * stdin: for frontends to reply to raw_input calls in the kernel.
661 661
662 662 The usage of the channels that this class manages is optional. It is
663 663 entirely possible to connect to the kernels directly using ZeroMQ
664 664 sockets. These channels are useful primarily for talking to a kernel
665 665 whose :class:`KernelManager` is in the same process.
666 666
667 667 This version manages kernels started using Popen.
668 668 """
669 669 # The PyZMQ Context to use for communication with the kernel.
670 670 context = Instance(zmq.Context)
671 671 def _context_default(self):
672 672 return zmq.Context.instance()
673 673
674 674 # The Session to use for communication with the kernel.
675 675 session = Instance(Session)
676 676 def _session_default(self):
677 677 return Session(config=self.config)
678 678
679 679 # The kernel process with which the KernelManager is communicating.
680 680 kernel = Instance(Popen)
681 681
682 682 # The addresses for the communication channels.
683 683 connection_file = Unicode('')
684 684
685 685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686 686
687 687 ip = Unicode(LOCALHOST, config=True)
688 688 def _ip_changed(self, name, old, new):
689 689 if new == '*':
690 690 self.ip = '0.0.0.0'
691 691 shell_port = Integer(0)
692 692 iopub_port = Integer(0)
693 693 stdin_port = Integer(0)
694 694 hb_port = Integer(0)
695 695
696 696 # The classes to use for the various channels.
697 697 shell_channel_class = Type(ShellChannel)
698 698 iopub_channel_class = Type(IOPubChannel)
699 699 stdin_channel_class = Type(StdInChannel)
700 700 hb_channel_class = Type(HBChannel)
701 701
702 702 # Protected traits.
703 703 _launch_args = Any
704 704 _shell_channel = Any
705 705 _iopub_channel = Any
706 706 _stdin_channel = Any
707 707 _hb_channel = Any
708 708 _connection_file_written=Bool(False)
709 709
710 def __del__(self):
711 self.cleanup_connection_file()
712 self.cleanup_ipc_files()
713
714 710 #--------------------------------------------------------------------------
715 711 # Channel management methods:
716 712 #--------------------------------------------------------------------------
717 713
718 714 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
719 715 """Starts the channels for this kernel.
720 716
721 717 This will create the channels if they do not exist and then start
722 718 them (their activity runs in a thread). If port numbers of 0 are
723 719 being used (random ports) then you must first call
724 720 :method:`start_kernel`. If the channels have been stopped and you
725 721 call this, :class:`RuntimeError` will be raised.
726 722 """
727 723 if shell:
728 724 self.shell_channel.start()
729 725 if iopub:
730 726 self.iopub_channel.start()
731 727 if stdin:
732 728 self.stdin_channel.start()
733 729 self.shell_channel.allow_stdin = True
734 730 else:
735 731 self.shell_channel.allow_stdin = False
736 732 if hb:
737 733 self.hb_channel.start()
738 734
739 735 def stop_channels(self):
740 736 """Stops all the running channels for this kernel.
741 737
742 738 This stops their event loops and joins their threads.
743 739 """
744 740 if self.shell_channel.is_alive():
745 741 self.shell_channel.stop()
746 742 if self.iopub_channel.is_alive():
747 743 self.iopub_channel.stop()
748 744 if self.stdin_channel.is_alive():
749 745 self.stdin_channel.stop()
750 746 if self.hb_channel.is_alive():
751 747 self.hb_channel.stop()
752 748
753 749 @property
754 750 def channels_running(self):
755 751 """Are any of the channels created and running?"""
756 752 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
757 753 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
758 754
759 755 def _make_url(self, port):
760 756 """Make a zmq url with a port.
761 757
762 758 There are two cases that this handles:
763 759
764 760 * tcp: tcp://ip:port
765 761 * ipc: ipc://ip-port
766 762 """
767 763 if self.transport == 'tcp':
768 764 return "tcp://%s:%i" % (self.ip, port)
769 765 else:
770 766 return "%s://%s-%s" % (self.transport, self.ip, port)
771 767
772 768 @property
773 769 def shell_channel(self):
774 770 """Get the shell channel object for this kernel."""
775 771 if self._shell_channel is None:
776 772 self._shell_channel = self.shell_channel_class(
777 773 self.context, self.session, self._make_url(self.shell_port)
778 774 )
779 775 return self._shell_channel
780 776
781 777 @property
782 778 def iopub_channel(self):
783 779 """Get the iopub channel object for this kernel."""
784 780 if self._iopub_channel is None:
785 781 self._iopub_channel = self.iopub_channel_class(
786 782 self.context, self.session, self._make_url(self.iopub_port)
787 783 )
788 784 return self._iopub_channel
789 785
790 786 @property
791 787 def stdin_channel(self):
792 788 """Get the stdin channel object for this kernel."""
793 789 if self._stdin_channel is None:
794 790 self._stdin_channel = self.stdin_channel_class(
795 791 self.context, self.session, self._make_url(self.stdin_port)
796 792 )
797 793 return self._stdin_channel
798 794
799 795 @property
800 796 def hb_channel(self):
801 797 """Get the hb channel object for this kernel."""
802 798 if self._hb_channel is None:
803 799 self._hb_channel = self.hb_channel_class(
804 800 self.context, self.session, self._make_url(self.hb_port)
805 801 )
806 802 return self._hb_channel
807 803
808 804 #--------------------------------------------------------------------------
809 805 # Connection and ipc file management
810 806 #--------------------------------------------------------------------------
811 807
812 808 def cleanup_connection_file(self):
813 809 """Cleanup connection file *if we wrote it*
814 810
815 811 Will not raise if the connection file was already removed somehow.
816 812 """
817 813 if self._connection_file_written:
818 814 # cleanup connection files on full shutdown of kernel we started
819 815 self._connection_file_written = False
820 816 try:
821 817 os.remove(self.connection_file)
822 818 except (IOError, OSError):
823 819 pass
824 820
825 821 def cleanup_ipc_files(self):
826 822 """Cleanup ipc files if we wrote them."""
827 823 if self.transport != 'ipc':
828 824 return
829 825 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
830 826 ipcfile = "%s-%i" % (self.ip, port)
831 827 try:
832 828 os.remove(ipcfile)
833 829 except (IOError, OSError):
834 830 pass
835 831
836 832 def load_connection_file(self):
837 833 """Load connection info from JSON dict in self.connection_file."""
838 834 with open(self.connection_file) as f:
839 835 cfg = json.loads(f.read())
840 836
841 837 from pprint import pprint
842 838 pprint(cfg)
843 839 self.transport = cfg.get('transport', 'tcp')
844 840 self.ip = cfg['ip']
845 841 self.shell_port = cfg['shell_port']
846 842 self.stdin_port = cfg['stdin_port']
847 843 self.iopub_port = cfg['iopub_port']
848 844 self.hb_port = cfg['hb_port']
849 845 self.session.key = str_to_bytes(cfg['key'])
850 846
851 847 def write_connection_file(self):
852 848 """Write connection info to JSON dict in self.connection_file."""
853 849 if self._connection_file_written:
854 850 return
855 851 self.connection_file,cfg = write_connection_file(self.connection_file,
856 852 transport=self.transport, ip=self.ip, key=self.session.key,
857 853 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
858 854 shell_port=self.shell_port, hb_port=self.hb_port)
859 855 # write_connection_file also sets default ports:
860 856 self.shell_port = cfg['shell_port']
861 857 self.stdin_port = cfg['stdin_port']
862 858 self.iopub_port = cfg['iopub_port']
863 859 self.hb_port = cfg['hb_port']
864 860
865 861 self._connection_file_written = True
866 862
867 863 #--------------------------------------------------------------------------
868 864 # Kernel management
869 865 #--------------------------------------------------------------------------
870 866
871 867 def start_kernel(self, **kw):
872 868 """Starts a kernel on this host in a separate process.
873 869
874 870 If random ports (port=0) are being used, this method must be called
875 871 before the channels are created.
876 872
877 873 Parameters:
878 874 -----------
879 875 launcher : callable, optional (default None)
880 876 A custom function for launching the kernel process (generally a
881 877 wrapper around ``entry_point.base_launch_kernel``). In most cases,
882 878 it should not be necessary to use this parameter.
883 879
884 880 **kw : optional
885 881 keyword arguments that are passed down into the launcher
886 882 callable.
887 883 """
888 884 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
889 885 raise RuntimeError("Can only launch a kernel on a local interface. "
890 886 "Make sure that the '*_address' attributes are "
891 887 "configured properly. "
892 888 "Currently valid addresses are: %s"%LOCAL_IPS
893 889 )
894 890
895 891 # write connection file / get default ports
896 892 self.write_connection_file()
897 893
898 894 self._launch_args = kw.copy()
899 895 launch_kernel = kw.pop('launcher', None)
900 896 if launch_kernel is None:
901 897 from ipkernel import launch_kernel
902 898 self.kernel = launch_kernel(fname=self.connection_file, **kw)
903 899
904 900 def shutdown_kernel(self, now=False, restart=False):
905 901 """Attempts to the stop the kernel process cleanly.
906 902
907 903 This attempts to shutdown the kernels cleanly by:
908 904
909 905 1. Sending it a shutdown message over the shell channel.
910 906 2. If that fails, the kernel is shutdown forcibly by sending it
911 907 a signal.
912 908
913 909 Parameters:
914 910 -----------
915 911 now : bool
916 912 Should the kernel be forcible killed *now*. This skips the
917 913 first, nice shutdown attempt.
918 914 restart: bool
919 915 Will this kernel be restarted after it is shutdown. When this
920 916 is True, connection files will not be cleaned up.
921 917 """
922 918 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
923 919 if sys.platform == 'win32':
924 920 self._kill_kernel()
925 921 return
926 922
927 923 # Pause the heart beat channel if it exists.
928 924 if self._hb_channel is not None:
929 925 self._hb_channel.pause()
930 926
931 927 if now:
932 928 if self.has_kernel:
933 929 self._kill_kernel()
934 930 else:
935 931 # Don't send any additional kernel kill messages immediately, to give
936 932 # the kernel a chance to properly execute shutdown actions. Wait for at
937 933 # most 1s, checking every 0.1s.
938 934 self.shell_channel.shutdown(restart=restart)
939 935 for i in range(10):
940 936 if self.is_alive:
941 937 time.sleep(0.1)
942 938 else:
943 939 break
944 940 else:
945 941 # OK, we've waited long enough.
946 942 if self.has_kernel:
947 943 self._kill_kernel()
948 944
949 945 if not restart:
950 946 self.cleanup_connection_file()
951 947 self.cleanup_ipc_files()
952 948 else:
953 949 self.cleanup_ipc_files()
954 950
955 951 def restart_kernel(self, now=False, **kw):
956 952 """Restarts a kernel with the arguments that were used to launch it.
957 953
958 954 If the old kernel was launched with random ports, the same ports will be
959 955 used for the new kernel. The same connection file is used again.
960 956
961 957 Parameters
962 958 ----------
963 959 now : bool, optional
964 960 If True, the kernel is forcefully restarted *immediately*, without
965 961 having a chance to do any cleanup action. Otherwise the kernel is
966 962 given 1s to clean up before a forceful restart is issued.
967 963
968 964 In all cases the kernel is restarted, the only difference is whether
969 965 it is given a chance to perform a clean shutdown or not.
970 966
971 967 **kw : optional
972 968 Any options specified here will overwrite those used to launch the
973 969 kernel.
974 970 """
975 971 if self._launch_args is None:
976 972 raise RuntimeError("Cannot restart the kernel. "
977 973 "No previous call to 'start_kernel'.")
978 974 else:
979 975 # Stop currently running kernel.
980 976 self.shutdown_kernel(now=now, restart=True)
981 977
982 978 # Start new kernel.
983 979 self._launch_args.update(kw)
984 980 self.start_kernel(**self._launch_args)
985 981
986 982 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
987 983 # unless there is some delay here.
988 984 if sys.platform == 'win32':
989 985 time.sleep(0.2)
990 986
991 987 @property
992 988 def has_kernel(self):
993 989 """Has a kernel been started that we are managing."""
994 990 return self.kernel is not None
995 991
996 992 def _kill_kernel(self):
997 993 """Kill the running kernel.
998 994
999 995 This is a private method, callers should use shutdown_kernel(now=True).
1000 996 """
1001 997 if self.has_kernel:
1002 998 # Pause the heart beat channel if it exists.
1003 999 if self._hb_channel is not None:
1004 1000 self._hb_channel.pause()
1005 1001
1006 1002 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1007 1003 # TerminateProcess() on Win32).
1008 1004 try:
1009 1005 self.kernel.kill()
1010 1006 except OSError as e:
1011 1007 # In Windows, we will get an Access Denied error if the process
1012 1008 # has already terminated. Ignore it.
1013 1009 if sys.platform == 'win32':
1014 1010 if e.winerror != 5:
1015 1011 raise
1016 1012 # On Unix, we may get an ESRCH error if the process has already
1017 1013 # terminated. Ignore it.
1018 1014 else:
1019 1015 from errno import ESRCH
1020 1016 if e.errno != ESRCH:
1021 1017 raise
1022 1018
1023 1019 # Block until the kernel terminates.
1024 1020 self.kernel.wait()
1025 1021 self.kernel = None
1026 1022 else:
1027 1023 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1028 1024
1029 1025 def interrupt_kernel(self):
1030 1026 """Interrupts the kernel by sending it a signal.
1031 1027
1032 1028 Unlike ``signal_kernel``, this operation is well supported on all
1033 1029 platforms.
1034 1030 """
1035 1031 if self.has_kernel:
1036 1032 if sys.platform == 'win32':
1037 1033 from parentpoller import ParentPollerWindows as Poller
1038 1034 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1039 1035 else:
1040 1036 self.kernel.send_signal(signal.SIGINT)
1041 1037 else:
1042 1038 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1043 1039
1044 1040 def signal_kernel(self, signum):
1045 1041 """Sends a signal to the kernel.
1046 1042
1047 1043 Note that since only SIGTERM is supported on Windows, this function is
1048 1044 only useful on Unix systems.
1049 1045 """
1050 1046 if self.has_kernel:
1051 1047 self.kernel.send_signal(signum)
1052 1048 else:
1053 1049 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1054 1050
1055 1051 @property
1056 1052 def is_alive(self):
1057 1053 """Is the kernel process still running?"""
1058 1054 if self.has_kernel:
1059 1055 if self.kernel.poll() is None:
1060 1056 return True
1061 1057 else:
1062 1058 return False
1063 1059 elif self._hb_channel is not None:
1064 1060 # We didn't start the kernel with this KernelManager so we
1065 1061 # use the heartbeat.
1066 1062 return self._hb_channel.is_beating()
1067 1063 else:
1068 1064 # no heartbeat and not local, we can't tell if it's running,
1069 1065 # so naively return True
1070 1066 return True
1071 1067
1072 1068
1073 1069 #-----------------------------------------------------------------------------
1074 1070 # ABC Registration
1075 1071 #-----------------------------------------------------------------------------
1076 1072
1077 1073 ShellChannelABC.register(ShellChannel)
1078 1074 IOPubChannelABC.register(IOPubChannel)
1079 1075 HBChannelABC.register(HBChannel)
1080 1076 StdInChannelABC.register(StdInChannel)
1081 1077 KernelManagerABC.register(KernelManager)
1082 1078
General Comments 0
You need to be logged in to leave comments. Login now