##// END OF EJS Templates
Putting connection file cleanup back in __del__.
Brian E. Granger -
Show More
@@ -1,1078 +1,1081 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45 from IPython.zmq.kernelmanagerabc import (
46 46 ShellChannelABC, IOPubChannelABC,
47 47 HBChannelABC, StdInChannelABC,
48 48 KernelManagerABC
49 49 )
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Constants and exceptions
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class InvalidPortNumber(Exception):
57 57 pass
58 58
59 59 #-----------------------------------------------------------------------------
60 60 # Utility functions
61 61 #-----------------------------------------------------------------------------
62 62
63 63 # some utilities to validate message structure, these might get moved elsewhere
64 64 # if they prove to have more generic utility
65 65
66 66 def validate_string_list(lst):
67 67 """Validate that the input is a list of strings.
68 68
69 69 Raises ValueError if not."""
70 70 if not isinstance(lst, list):
71 71 raise ValueError('input %r must be a list' % lst)
72 72 for x in lst:
73 73 if not isinstance(x, basestring):
74 74 raise ValueError('element %r in list must be a string' % x)
75 75
76 76
77 77 def validate_string_dict(dct):
78 78 """Validate that the input is a dict with string keys and values.
79 79
80 80 Raises ValueError if not."""
81 81 for k,v in dct.iteritems():
82 82 if not isinstance(k, basestring):
83 83 raise ValueError('key %r in dict must be a string' % k)
84 84 if not isinstance(v, basestring):
85 85 raise ValueError('value %r in dict must be a string' % v)
86 86
87 87
88 88 #-----------------------------------------------------------------------------
89 89 # ZMQ Socket Channel classes
90 90 #-----------------------------------------------------------------------------
91 91
92 92 class ZMQSocketChannel(Thread):
93 93 """The base class for the channels that use ZMQ sockets."""
94 94 context = None
95 95 session = None
96 96 socket = None
97 97 ioloop = None
98 98 stream = None
99 99 _address = None
100 100 _exiting = False
101 101
102 102 def __init__(self, context, session, address):
103 103 """Create a channel.
104 104
105 105 Parameters
106 106 ----------
107 107 context : :class:`zmq.Context`
108 108 The ZMQ context to use.
109 109 session : :class:`session.Session`
110 110 The session to use.
111 111 address : zmq url
112 112 Standard (ip, port) tuple that the kernel is listening on.
113 113 """
114 114 super(ZMQSocketChannel, self).__init__()
115 115 self.daemon = True
116 116
117 117 self.context = context
118 118 self.session = session
119 119 if isinstance(address, tuple):
120 120 if address[1] == 0:
121 121 message = 'The port number for a channel cannot be 0.'
122 122 raise InvalidPortNumber(message)
123 123 address = "tcp://%s:%i" % address
124 124 self._address = address
125 125 atexit.register(self._notice_exit)
126 126
127 127 def _notice_exit(self):
128 128 self._exiting = True
129 129
130 130 def _run_loop(self):
131 131 """Run my loop, ignoring EINTR events in the poller"""
132 132 while True:
133 133 try:
134 134 self.ioloop.start()
135 135 except ZMQError as e:
136 136 if e.errno == errno.EINTR:
137 137 continue
138 138 else:
139 139 raise
140 140 except Exception:
141 141 if self._exiting:
142 142 break
143 143 else:
144 144 raise
145 145 else:
146 146 break
147 147
148 148 def stop(self):
149 149 """Stop the channel's event loop and join its thread.
150 150
151 151 This calls :method:`Thread.join` and returns when the thread
152 152 terminates. :class:`RuntimeError` will be raised if
153 153 :method:`self.start` is called again.
154 154 """
155 155 self.join()
156 156
157 157 @property
158 158 def address(self):
159 159 """Get the channel's address as a zmq url string.
160 160
161 161 These URLS have the form: 'tcp://127.0.0.1:5555'.
162 162 """
163 163 return self._address
164 164
165 165 def _queue_send(self, msg):
166 166 """Queue a message to be sent from the IOLoop's thread.
167 167
168 168 Parameters
169 169 ----------
170 170 msg : message to send
171 171
172 172 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
173 173 thread control of the action.
174 174 """
175 175 def thread_send():
176 176 self.session.send(self.stream, msg)
177 177 self.ioloop.add_callback(thread_send)
178 178
179 179 def _handle_recv(self, msg):
180 180 """Callback for stream.on_recv.
181 181
182 182 Unpacks message, and calls handlers with it.
183 183 """
184 184 ident,smsg = self.session.feed_identities(msg)
185 185 self.call_handlers(self.session.unserialize(smsg))
186 186
187 187
188 188
189 189 class ShellChannel(ZMQSocketChannel):
190 190 """The shell channel for issuing request/replies to the kernel."""
191 191
192 192 command_queue = None
193 193 # flag for whether execute requests should be allowed to call raw_input:
194 194 allow_stdin = True
195 195
196 196 def __init__(self, context, session, address):
197 197 super(ShellChannel, self).__init__(context, session, address)
198 198 self.ioloop = ioloop.IOLoop()
199 199
200 200 def run(self):
201 201 """The thread's main activity. Call start() instead."""
202 202 self.socket = self.context.socket(zmq.DEALER)
203 203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 204 self.socket.connect(self.address)
205 205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 206 self.stream.on_recv(self._handle_recv)
207 207 self._run_loop()
208 208 try:
209 209 self.socket.close()
210 210 except:
211 211 pass
212 212
213 213 def stop(self):
214 214 """Stop the channel's event loop and join its thread."""
215 215 self.ioloop.stop()
216 216 super(ShellChannel, self).stop()
217 217
218 218 def call_handlers(self, msg):
219 219 """This method is called in the ioloop thread when a message arrives.
220 220
221 221 Subclasses should override this method to handle incoming messages.
222 222 It is important to remember that this method is called in the thread
223 223 so that some logic must be done to ensure that the application leve
224 224 handlers are called in the application thread.
225 225 """
226 226 raise NotImplementedError('call_handlers must be defined in a subclass.')
227 227
228 228 def execute(self, code, silent=False, store_history=True,
229 229 user_variables=None, user_expressions=None, allow_stdin=None):
230 230 """Execute code in the kernel.
231 231
232 232 Parameters
233 233 ----------
234 234 code : str
235 235 A string of Python code.
236 236
237 237 silent : bool, optional (default False)
238 238 If set, the kernel will execute the code as quietly possible, and
239 239 will force store_history to be False.
240 240
241 241 store_history : bool, optional (default True)
242 242 If set, the kernel will store command history. This is forced
243 243 to be False if silent is True.
244 244
245 245 user_variables : list, optional
246 246 A list of variable names to pull from the user's namespace. They
247 247 will come back as a dict with these names as keys and their
248 248 :func:`repr` as values.
249 249
250 250 user_expressions : dict, optional
251 251 A dict mapping names to expressions to be evaluated in the user's
252 252 dict. The expression values are returned as strings formatted using
253 253 :func:`repr`.
254 254
255 255 allow_stdin : bool, optional (default self.allow_stdin)
256 256 Flag for whether the kernel can send stdin requests to frontends.
257 257
258 258 Some frontends (e.g. the Notebook) do not support stdin requests.
259 259 If raw_input is called from code executed from such a frontend, a
260 260 StdinNotImplementedError will be raised.
261 261
262 262 Returns
263 263 -------
264 264 The msg_id of the message sent.
265 265 """
266 266 if user_variables is None:
267 267 user_variables = []
268 268 if user_expressions is None:
269 269 user_expressions = {}
270 270 if allow_stdin is None:
271 271 allow_stdin = self.allow_stdin
272 272
273 273
274 274 # Don't waste network traffic if inputs are invalid
275 275 if not isinstance(code, basestring):
276 276 raise ValueError('code %r must be a string' % code)
277 277 validate_string_list(user_variables)
278 278 validate_string_dict(user_expressions)
279 279
280 280 # Create class for content/msg creation. Related to, but possibly
281 281 # not in Session.
282 282 content = dict(code=code, silent=silent, store_history=store_history,
283 283 user_variables=user_variables,
284 284 user_expressions=user_expressions,
285 285 allow_stdin=allow_stdin,
286 286 )
287 287 msg = self.session.msg('execute_request', content)
288 288 self._queue_send(msg)
289 289 return msg['header']['msg_id']
290 290
291 291 def complete(self, text, line, cursor_pos, block=None):
292 292 """Tab complete text in the kernel's namespace.
293 293
294 294 Parameters
295 295 ----------
296 296 text : str
297 297 The text to complete.
298 298 line : str
299 299 The full line of text that is the surrounding context for the
300 300 text to complete.
301 301 cursor_pos : int
302 302 The position of the cursor in the line where the completion was
303 303 requested.
304 304 block : str, optional
305 305 The full block of code in which the completion is being requested.
306 306
307 307 Returns
308 308 -------
309 309 The msg_id of the message sent.
310 310 """
311 311 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 312 msg = self.session.msg('complete_request', content)
313 313 self._queue_send(msg)
314 314 return msg['header']['msg_id']
315 315
316 316 def object_info(self, oname, detail_level=0):
317 317 """Get metadata information about an object in the kernel's namespace.
318 318
319 319 Parameters
320 320 ----------
321 321 oname : str
322 322 A string specifying the object name.
323 323 detail_level : int, optional
324 324 The level of detail for the introspection (0-2)
325 325
326 326 Returns
327 327 -------
328 328 The msg_id of the message sent.
329 329 """
330 330 content = dict(oname=oname, detail_level=detail_level)
331 331 msg = self.session.msg('object_info_request', content)
332 332 self._queue_send(msg)
333 333 return msg['header']['msg_id']
334 334
335 335 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 336 """Get entries from the kernel's history list.
337 337
338 338 Parameters
339 339 ----------
340 340 raw : bool
341 341 If True, return the raw input.
342 342 output : bool
343 343 If True, then return the output as well.
344 344 hist_access_type : str
345 345 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 346 or 'search' (fill in pattern param).
347 347
348 348 session : int
349 349 For a range request, the session from which to get lines. Session
350 350 numbers are positive integers; negative ones count back from the
351 351 current session.
352 352 start : int
353 353 The first line number of a history range.
354 354 stop : int
355 355 The final (excluded) line number of a history range.
356 356
357 357 n : int
358 358 The number of lines of history to get for a tail request.
359 359
360 360 pattern : str
361 361 The glob-syntax pattern for a search request.
362 362
363 363 Returns
364 364 -------
365 365 The msg_id of the message sent.
366 366 """
367 367 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 368 **kwargs)
369 369 msg = self.session.msg('history_request', content)
370 370 self._queue_send(msg)
371 371 return msg['header']['msg_id']
372 372
373 373 def kernel_info(self):
374 374 """Request kernel info."""
375 375 msg = self.session.msg('kernel_info_request')
376 376 self._queue_send(msg)
377 377 return msg['header']['msg_id']
378 378
379 379 def shutdown(self, restart=False):
380 380 """Request an immediate kernel shutdown.
381 381
382 382 Upon receipt of the (empty) reply, client code can safely assume that
383 383 the kernel has shut down and it's safe to forcefully terminate it if
384 384 it's still alive.
385 385
386 386 The kernel will send the reply via a function registered with Python's
387 387 atexit module, ensuring it's truly done as the kernel is done with all
388 388 normal operation.
389 389 """
390 390 # Send quit message to kernel. Once we implement kernel-side setattr,
391 391 # this should probably be done that way, but for now this will do.
392 392 msg = self.session.msg('shutdown_request', {'restart':restart})
393 393 self._queue_send(msg)
394 394 return msg['header']['msg_id']
395 395
396 396
397 397
398 398 class IOPubChannel(ZMQSocketChannel):
399 399 """The iopub channel which listens for messages that the kernel publishes.
400 400
401 401 This channel is where all output is published to frontends.
402 402 """
403 403
404 404 def __init__(self, context, session, address):
405 405 super(IOPubChannel, self).__init__(context, session, address)
406 406 self.ioloop = ioloop.IOLoop()
407 407
408 408 def run(self):
409 409 """The thread's main activity. Call start() instead."""
410 410 self.socket = self.context.socket(zmq.SUB)
411 411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 413 self.socket.connect(self.address)
414 414 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 415 self.stream.on_recv(self._handle_recv)
416 416 self._run_loop()
417 417 try:
418 418 self.socket.close()
419 419 except:
420 420 pass
421 421
422 422 def stop(self):
423 423 """Stop the channel's event loop and join its thread."""
424 424 self.ioloop.stop()
425 425 super(IOPubChannel, self).stop()
426 426
427 427 def call_handlers(self, msg):
428 428 """This method is called in the ioloop thread when a message arrives.
429 429
430 430 Subclasses should override this method to handle incoming messages.
431 431 It is important to remember that this method is called in the thread
432 432 so that some logic must be done to ensure that the application leve
433 433 handlers are called in the application thread.
434 434 """
435 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436 436
437 437 def flush(self, timeout=1.0):
438 438 """Immediately processes all pending messages on the iopub channel.
439 439
440 440 Callers should use this method to ensure that :method:`call_handlers`
441 441 has been called for all messages that have been received on the
442 442 0MQ SUB socket of this channel.
443 443
444 444 This method is thread safe.
445 445
446 446 Parameters
447 447 ----------
448 448 timeout : float, optional
449 449 The maximum amount of time to spend flushing, in seconds. The
450 450 default is one second.
451 451 """
452 452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 453 # gets to perform at least one full poll.
454 454 stop_time = time.time() + timeout
455 455 for i in xrange(2):
456 456 self._flushed = False
457 457 self.ioloop.add_callback(self._flush)
458 458 while not self._flushed and time.time() < stop_time:
459 459 time.sleep(0.01)
460 460
461 461 def _flush(self):
462 462 """Callback for :method:`self.flush`."""
463 463 self.stream.flush()
464 464 self._flushed = True
465 465
466 466
467 467 class StdInChannel(ZMQSocketChannel):
468 468 """The stdin channel to handle raw_input requests that the kernel makes."""
469 469
470 470 msg_queue = None
471 471
472 472 def __init__(self, context, session, address):
473 473 super(StdInChannel, self).__init__(context, session, address)
474 474 self.ioloop = ioloop.IOLoop()
475 475
476 476 def run(self):
477 477 """The thread's main activity. Call start() instead."""
478 478 self.socket = self.context.socket(zmq.DEALER)
479 479 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
480 480 self.socket.connect(self.address)
481 481 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
482 482 self.stream.on_recv(self._handle_recv)
483 483 self._run_loop()
484 484 try:
485 485 self.socket.close()
486 486 except:
487 487 pass
488 488
489 489 def stop(self):
490 490 """Stop the channel's event loop and join its thread."""
491 491 self.ioloop.stop()
492 492 super(StdInChannel, self).stop()
493 493
494 494 def call_handlers(self, msg):
495 495 """This method is called in the ioloop thread when a message arrives.
496 496
497 497 Subclasses should override this method to handle incoming messages.
498 498 It is important to remember that this method is called in the thread
499 499 so that some logic must be done to ensure that the application leve
500 500 handlers are called in the application thread.
501 501 """
502 502 raise NotImplementedError('call_handlers must be defined in a subclass.')
503 503
504 504 def input(self, string):
505 505 """Send a string of raw input to the kernel."""
506 506 content = dict(value=string)
507 507 msg = self.session.msg('input_reply', content)
508 508 self._queue_send(msg)
509 509
510 510
511 511 class HBChannel(ZMQSocketChannel):
512 512 """The heartbeat channel which monitors the kernel heartbeat.
513 513
514 514 Note that the heartbeat channel is paused by default. As long as you start
515 515 this channel, the kernel manager will ensure that it is paused and un-paused
516 516 as appropriate.
517 517 """
518 518
519 519 time_to_dead = 3.0
520 520 socket = None
521 521 poller = None
522 522 _running = None
523 523 _pause = None
524 524 _beating = None
525 525
526 526 def __init__(self, context, session, address):
527 527 super(HBChannel, self).__init__(context, session, address)
528 528 self._running = False
529 529 self._pause =True
530 530 self.poller = zmq.Poller()
531 531
532 532 def _create_socket(self):
533 533 if self.socket is not None:
534 534 # close previous socket, before opening a new one
535 535 self.poller.unregister(self.socket)
536 536 self.socket.close()
537 537 self.socket = self.context.socket(zmq.REQ)
538 538 self.socket.setsockopt(zmq.LINGER, 0)
539 539 self.socket.connect(self.address)
540 540
541 541 self.poller.register(self.socket, zmq.POLLIN)
542 542
543 543 def _poll(self, start_time):
544 544 """poll for heartbeat replies until we reach self.time_to_dead.
545 545
546 546 Ignores interrupts, and returns the result of poll(), which
547 547 will be an empty list if no messages arrived before the timeout,
548 548 or the event tuple if there is a message to receive.
549 549 """
550 550
551 551 until_dead = self.time_to_dead - (time.time() - start_time)
552 552 # ensure poll at least once
553 553 until_dead = max(until_dead, 1e-3)
554 554 events = []
555 555 while True:
556 556 try:
557 557 events = self.poller.poll(1000 * until_dead)
558 558 except ZMQError as e:
559 559 if e.errno == errno.EINTR:
560 560 # ignore interrupts during heartbeat
561 561 # this may never actually happen
562 562 until_dead = self.time_to_dead - (time.time() - start_time)
563 563 until_dead = max(until_dead, 1e-3)
564 564 pass
565 565 else:
566 566 raise
567 567 except Exception:
568 568 if self._exiting:
569 569 break
570 570 else:
571 571 raise
572 572 else:
573 573 break
574 574 return events
575 575
576 576 def run(self):
577 577 """The thread's main activity. Call start() instead."""
578 578 self._create_socket()
579 579 self._running = True
580 580 self._beating = True
581 581
582 582 while self._running:
583 583 if self._pause:
584 584 # just sleep, and skip the rest of the loop
585 585 time.sleep(self.time_to_dead)
586 586 continue
587 587
588 588 since_last_heartbeat = 0.0
589 589 # io.rprint('Ping from HB channel') # dbg
590 590 # no need to catch EFSM here, because the previous event was
591 591 # either a recv or connect, which cannot be followed by EFSM
592 592 self.socket.send(b'ping')
593 593 request_time = time.time()
594 594 ready = self._poll(request_time)
595 595 if ready:
596 596 self._beating = True
597 597 # the poll above guarantees we have something to recv
598 598 self.socket.recv()
599 599 # sleep the remainder of the cycle
600 600 remainder = self.time_to_dead - (time.time() - request_time)
601 601 if remainder > 0:
602 602 time.sleep(remainder)
603 603 continue
604 604 else:
605 605 # nothing was received within the time limit, signal heart failure
606 606 self._beating = False
607 607 since_last_heartbeat = time.time() - request_time
608 608 self.call_handlers(since_last_heartbeat)
609 609 # and close/reopen the socket, because the REQ/REP cycle has been broken
610 610 self._create_socket()
611 611 continue
612 612 try:
613 613 self.socket.close()
614 614 except:
615 615 pass
616 616
617 617 def pause(self):
618 618 """Pause the heartbeat."""
619 619 self._pause = True
620 620
621 621 def unpause(self):
622 622 """Unpause the heartbeat."""
623 623 self._pause = False
624 624
625 625 def is_beating(self):
626 626 """Is the heartbeat running and responsive (and not paused)."""
627 627 if self.is_alive() and not self._pause and self._beating:
628 628 return True
629 629 else:
630 630 return False
631 631
632 632 def stop(self):
633 633 """Stop the channel's event loop and join its thread."""
634 634 self._running = False
635 635 super(HBChannel, self).stop()
636 636
637 637 def call_handlers(self, since_last_heartbeat):
638 638 """This method is called in the ioloop thread when a message arrives.
639 639
640 640 Subclasses should override this method to handle incoming messages.
641 641 It is important to remember that this method is called in the thread
642 642 so that some logic must be done to ensure that the application level
643 643 handlers are called in the application thread.
644 644 """
645 645 raise NotImplementedError('call_handlers must be defined in a subclass.')
646 646
647 647
648 648 #-----------------------------------------------------------------------------
649 649 # Main kernel manager class
650 650 #-----------------------------------------------------------------------------
651 651
652 652 class KernelManager(Configurable):
653 653 """Manages a single kernel on this host along with its channels.
654 654
655 655 There are four channels associated with each kernel:
656 656
657 657 * shell: for request/reply calls to the kernel.
658 658 * iopub: for the kernel to publish results to frontends.
659 659 * hb: for monitoring the kernel's heartbeat.
660 660 * stdin: for frontends to reply to raw_input calls in the kernel.
661 661
662 662 The usage of the channels that this class manages is optional. It is
663 663 entirely possible to connect to the kernels directly using ZeroMQ
664 664 sockets. These channels are useful primarily for talking to a kernel
665 665 whose :class:`KernelManager` is in the same process.
666 666
667 667 This version manages kernels started using Popen.
668 668 """
669 669 # The PyZMQ Context to use for communication with the kernel.
670 670 context = Instance(zmq.Context)
671 671 def _context_default(self):
672 672 return zmq.Context.instance()
673 673
674 674 # The Session to use for communication with the kernel.
675 675 session = Instance(Session)
676 676 def _session_default(self):
677 677 return Session(config=self.config)
678 678
679 679 # The kernel process with which the KernelManager is communicating.
680 680 kernel = Instance(Popen)
681 681
682 682 # The addresses for the communication channels.
683 683 connection_file = Unicode('')
684 684
685 685 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
686 686
687 687 ip = Unicode(LOCALHOST, config=True)
688 688 def _ip_changed(self, name, old, new):
689 689 if new == '*':
690 690 self.ip = '0.0.0.0'
691 691 shell_port = Integer(0)
692 692 iopub_port = Integer(0)
693 693 stdin_port = Integer(0)
694 694 hb_port = Integer(0)
695 695
696 696 # The classes to use for the various channels.
697 697 shell_channel_class = Type(ShellChannel)
698 698 iopub_channel_class = Type(IOPubChannel)
699 699 stdin_channel_class = Type(StdInChannel)
700 700 hb_channel_class = Type(HBChannel)
701 701
702 702 # Protected traits.
703 703 _launch_args = Any
704 704 _shell_channel = Any
705 705 _iopub_channel = Any
706 706 _stdin_channel = Any
707 707 _hb_channel = Any
708 708 _connection_file_written=Bool(False)
709
710 def __del__(self):
711 self.cleanup_connection_file()
709 712
710 713 #--------------------------------------------------------------------------
711 714 # Channel management methods:
712 715 #--------------------------------------------------------------------------
713 716
714 717 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
715 718 """Starts the channels for this kernel.
716 719
717 720 This will create the channels if they do not exist and then start
718 721 them (their activity runs in a thread). If port numbers of 0 are
719 722 being used (random ports) then you must first call
720 723 :method:`start_kernel`. If the channels have been stopped and you
721 724 call this, :class:`RuntimeError` will be raised.
722 725 """
723 726 if shell:
724 727 self.shell_channel.start()
725 728 if iopub:
726 729 self.iopub_channel.start()
727 730 if stdin:
728 731 self.stdin_channel.start()
729 732 self.shell_channel.allow_stdin = True
730 733 else:
731 734 self.shell_channel.allow_stdin = False
732 735 if hb:
733 736 self.hb_channel.start()
734 737
735 738 def stop_channels(self):
736 739 """Stops all the running channels for this kernel.
737 740
738 741 This stops their event loops and joins their threads.
739 742 """
740 743 if self.shell_channel.is_alive():
741 744 self.shell_channel.stop()
742 745 if self.iopub_channel.is_alive():
743 746 self.iopub_channel.stop()
744 747 if self.stdin_channel.is_alive():
745 748 self.stdin_channel.stop()
746 749 if self.hb_channel.is_alive():
747 750 self.hb_channel.stop()
748 751
749 752 @property
750 753 def channels_running(self):
751 754 """Are any of the channels created and running?"""
752 755 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
753 756 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
754 757
755 758 def _make_url(self, port):
756 759 """Make a zmq url with a port.
757 760
758 761 There are two cases that this handles:
759 762
760 763 * tcp: tcp://ip:port
761 764 * ipc: ipc://ip-port
762 765 """
763 766 if self.transport == 'tcp':
764 767 return "tcp://%s:%i" % (self.ip, port)
765 768 else:
766 769 return "%s://%s-%s" % (self.transport, self.ip, port)
767 770
768 771 @property
769 772 def shell_channel(self):
770 773 """Get the shell channel object for this kernel."""
771 774 if self._shell_channel is None:
772 775 self._shell_channel = self.shell_channel_class(
773 776 self.context, self.session, self._make_url(self.shell_port)
774 777 )
775 778 return self._shell_channel
776 779
777 780 @property
778 781 def iopub_channel(self):
779 782 """Get the iopub channel object for this kernel."""
780 783 if self._iopub_channel is None:
781 784 self._iopub_channel = self.iopub_channel_class(
782 785 self.context, self.session, self._make_url(self.iopub_port)
783 786 )
784 787 return self._iopub_channel
785 788
786 789 @property
787 790 def stdin_channel(self):
788 791 """Get the stdin channel object for this kernel."""
789 792 if self._stdin_channel is None:
790 793 self._stdin_channel = self.stdin_channel_class(
791 794 self.context, self.session, self._make_url(self.stdin_port)
792 795 )
793 796 return self._stdin_channel
794 797
795 798 @property
796 799 def hb_channel(self):
797 800 """Get the hb channel object for this kernel."""
798 801 if self._hb_channel is None:
799 802 self._hb_channel = self.hb_channel_class(
800 803 self.context, self.session, self._make_url(self.hb_port)
801 804 )
802 805 return self._hb_channel
803 806
804 807 #--------------------------------------------------------------------------
805 808 # Connection and ipc file management
806 809 #--------------------------------------------------------------------------
807 810
808 811 def cleanup_connection_file(self):
809 812 """Cleanup connection file *if we wrote it*
810 813
811 814 Will not raise if the connection file was already removed somehow.
812 815 """
813 816 if self._connection_file_written:
814 817 # cleanup connection files on full shutdown of kernel we started
815 818 self._connection_file_written = False
816 819 try:
817 820 os.remove(self.connection_file)
818 821 except (IOError, OSError):
819 822 pass
820 823
821 824 def cleanup_ipc_files(self):
822 825 """Cleanup ipc files if we wrote them."""
823 826 if self.transport != 'ipc':
824 827 return
825 828 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
826 829 ipcfile = "%s-%i" % (self.ip, port)
827 830 try:
828 831 os.remove(ipcfile)
829 832 except (IOError, OSError):
830 833 pass
831 834
832 835 def load_connection_file(self):
833 836 """Load connection info from JSON dict in self.connection_file."""
834 837 with open(self.connection_file) as f:
835 838 cfg = json.loads(f.read())
836 839
837 840 from pprint import pprint
838 841 pprint(cfg)
839 842 self.transport = cfg.get('transport', 'tcp')
840 843 self.ip = cfg['ip']
841 844 self.shell_port = cfg['shell_port']
842 845 self.stdin_port = cfg['stdin_port']
843 846 self.iopub_port = cfg['iopub_port']
844 847 self.hb_port = cfg['hb_port']
845 848 self.session.key = str_to_bytes(cfg['key'])
846 849
847 850 def write_connection_file(self):
848 851 """Write connection info to JSON dict in self.connection_file."""
849 852 if self._connection_file_written:
850 853 return
851 854 self.connection_file,cfg = write_connection_file(self.connection_file,
852 855 transport=self.transport, ip=self.ip, key=self.session.key,
853 856 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
854 857 shell_port=self.shell_port, hb_port=self.hb_port)
855 858 # write_connection_file also sets default ports:
856 859 self.shell_port = cfg['shell_port']
857 860 self.stdin_port = cfg['stdin_port']
858 861 self.iopub_port = cfg['iopub_port']
859 862 self.hb_port = cfg['hb_port']
860 863
861 864 self._connection_file_written = True
862 865
863 866 #--------------------------------------------------------------------------
864 867 # Kernel management
865 868 #--------------------------------------------------------------------------
866 869
867 870 def start_kernel(self, **kw):
868 871 """Starts a kernel on this host in a separate process.
869 872
870 873 If random ports (port=0) are being used, this method must be called
871 874 before the channels are created.
872 875
873 876 Parameters:
874 877 -----------
875 878 launcher : callable, optional (default None)
876 879 A custom function for launching the kernel process (generally a
877 880 wrapper around ``entry_point.base_launch_kernel``). In most cases,
878 881 it should not be necessary to use this parameter.
879 882
880 883 **kw : optional
881 884 keyword arguments that are passed down into the launcher
882 885 callable.
883 886 """
884 887 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
885 888 raise RuntimeError("Can only launch a kernel on a local interface. "
886 889 "Make sure that the '*_address' attributes are "
887 890 "configured properly. "
888 891 "Currently valid addresses are: %s"%LOCAL_IPS
889 892 )
890 893
891 894 # write connection file / get default ports
892 895 self.write_connection_file()
893 896
894 897 self._launch_args = kw.copy()
895 898 launch_kernel = kw.pop('launcher', None)
896 899 if launch_kernel is None:
897 900 from ipkernel import launch_kernel
898 901 self.kernel = launch_kernel(fname=self.connection_file, **kw)
899 902
900 903 def shutdown_kernel(self, now=False, restart=False):
901 904 """Attempts to the stop the kernel process cleanly.
902 905
903 906 This attempts to shutdown the kernels cleanly by:
904 907
905 908 1. Sending it a shutdown message over the shell channel.
906 909 2. If that fails, the kernel is shutdown forcibly by sending it
907 910 a signal.
908 911
909 912 Parameters:
910 913 -----------
911 914 now : bool
912 915 Should the kernel be forcible killed *now*. This skips the
913 916 first, nice shutdown attempt.
914 917 restart: bool
915 918 Will this kernel be restarted after it is shutdown. When this
916 919 is True, connection files will not be cleaned up.
917 920 """
918 921 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
919 922 if sys.platform == 'win32':
920 923 self._kill_kernel()
921 924 return
922 925
923 926 # Pause the heart beat channel if it exists.
924 927 if self._hb_channel is not None:
925 928 self._hb_channel.pause()
926 929
927 930 if now:
928 931 if self.has_kernel:
929 932 self._kill_kernel()
930 933 else:
931 934 # Don't send any additional kernel kill messages immediately, to give
932 935 # the kernel a chance to properly execute shutdown actions. Wait for at
933 936 # most 1s, checking every 0.1s.
934 937 self.shell_channel.shutdown(restart=restart)
935 938 for i in range(10):
936 939 if self.is_alive:
937 940 time.sleep(0.1)
938 941 else:
939 942 break
940 943 else:
941 944 # OK, we've waited long enough.
942 945 if self.has_kernel:
943 946 self._kill_kernel()
944 947
945 948 if not restart:
946 949 self.cleanup_connection_file()
947 950 self.cleanup_ipc_files()
948 951 else:
949 952 self.cleanup_ipc_files()
950 953
951 954 def restart_kernel(self, now=False, **kw):
952 955 """Restarts a kernel with the arguments that were used to launch it.
953 956
954 957 If the old kernel was launched with random ports, the same ports will be
955 958 used for the new kernel. The same connection file is used again.
956 959
957 960 Parameters
958 961 ----------
959 962 now : bool, optional
960 963 If True, the kernel is forcefully restarted *immediately*, without
961 964 having a chance to do any cleanup action. Otherwise the kernel is
962 965 given 1s to clean up before a forceful restart is issued.
963 966
964 967 In all cases the kernel is restarted, the only difference is whether
965 968 it is given a chance to perform a clean shutdown or not.
966 969
967 970 **kw : optional
968 971 Any options specified here will overwrite those used to launch the
969 972 kernel.
970 973 """
971 974 if self._launch_args is None:
972 975 raise RuntimeError("Cannot restart the kernel. "
973 976 "No previous call to 'start_kernel'.")
974 977 else:
975 978 # Stop currently running kernel.
976 979 self.shutdown_kernel(now=now, restart=True)
977 980
978 981 # Start new kernel.
979 982 self._launch_args.update(kw)
980 983 self.start_kernel(**self._launch_args)
981 984
982 985 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
983 986 # unless there is some delay here.
984 987 if sys.platform == 'win32':
985 988 time.sleep(0.2)
986 989
987 990 @property
988 991 def has_kernel(self):
989 992 """Has a kernel been started that we are managing."""
990 993 return self.kernel is not None
991 994
992 995 def _kill_kernel(self):
993 996 """Kill the running kernel.
994 997
995 998 This is a private method, callers should use shutdown_kernel(now=True).
996 999 """
997 1000 if self.has_kernel:
998 1001 # Pause the heart beat channel if it exists.
999 1002 if self._hb_channel is not None:
1000 1003 self._hb_channel.pause()
1001 1004
1002 1005 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
1003 1006 # TerminateProcess() on Win32).
1004 1007 try:
1005 1008 self.kernel.kill()
1006 1009 except OSError as e:
1007 1010 # In Windows, we will get an Access Denied error if the process
1008 1011 # has already terminated. Ignore it.
1009 1012 if sys.platform == 'win32':
1010 1013 if e.winerror != 5:
1011 1014 raise
1012 1015 # On Unix, we may get an ESRCH error if the process has already
1013 1016 # terminated. Ignore it.
1014 1017 else:
1015 1018 from errno import ESRCH
1016 1019 if e.errno != ESRCH:
1017 1020 raise
1018 1021
1019 1022 # Block until the kernel terminates.
1020 1023 self.kernel.wait()
1021 1024 self.kernel = None
1022 1025 else:
1023 1026 raise RuntimeError("Cannot kill kernel. No kernel is running!")
1024 1027
1025 1028 def interrupt_kernel(self):
1026 1029 """Interrupts the kernel by sending it a signal.
1027 1030
1028 1031 Unlike ``signal_kernel``, this operation is well supported on all
1029 1032 platforms.
1030 1033 """
1031 1034 if self.has_kernel:
1032 1035 if sys.platform == 'win32':
1033 1036 from parentpoller import ParentPollerWindows as Poller
1034 1037 Poller.send_interrupt(self.kernel.win32_interrupt_event)
1035 1038 else:
1036 1039 self.kernel.send_signal(signal.SIGINT)
1037 1040 else:
1038 1041 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
1039 1042
1040 1043 def signal_kernel(self, signum):
1041 1044 """Sends a signal to the kernel.
1042 1045
1043 1046 Note that since only SIGTERM is supported on Windows, this function is
1044 1047 only useful on Unix systems.
1045 1048 """
1046 1049 if self.has_kernel:
1047 1050 self.kernel.send_signal(signum)
1048 1051 else:
1049 1052 raise RuntimeError("Cannot signal kernel. No kernel is running!")
1050 1053
1051 1054 @property
1052 1055 def is_alive(self):
1053 1056 """Is the kernel process still running?"""
1054 1057 if self.has_kernel:
1055 1058 if self.kernel.poll() is None:
1056 1059 return True
1057 1060 else:
1058 1061 return False
1059 1062 elif self._hb_channel is not None:
1060 1063 # We didn't start the kernel with this KernelManager so we
1061 1064 # use the heartbeat.
1062 1065 return self._hb_channel.is_beating()
1063 1066 else:
1064 1067 # no heartbeat and not local, we can't tell if it's running,
1065 1068 # so naively return True
1066 1069 return True
1067 1070
1068 1071
1069 1072 #-----------------------------------------------------------------------------
1070 1073 # ABC Registration
1071 1074 #-----------------------------------------------------------------------------
1072 1075
1073 1076 ShellChannelABC.register(ShellChannel)
1074 1077 IOPubChannelABC.register(IOPubChannel)
1075 1078 HBChannelABC.register(HBChannel)
1076 1079 StdInChannelABC.register(StdInChannel)
1077 1080 KernelManagerABC.register(KernelManager)
1078 1081
General Comments 0
You need to be logged in to leave comments. Login now