##// END OF EJS Templates
avoid AttributeErrors on zmq.ZMQError at shutdown of qtconsole...
MinRK -
Show More
@@ -1,962 +1,965
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import errno
20 20 import json
21 21 from subprocess import Popen
22 22 import os
23 23 import signal
24 24 import sys
25 25 from threading import Thread
26 26 import time
27 27
28 28 # System library imports.
29 29 import zmq
30 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
31 # during garbage collection of threads at exit:
32 from zmq import ZMQError
30 33 from zmq.eventloop import ioloop, zmqstream
31 34
32 35 # Local imports.
33 36 from IPython.config.loader import Config
34 37 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 38 from IPython.utils.traitlets import (
36 39 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
37 40 )
38 41 from IPython.utils.py3compat import str_to_bytes
39 42 from IPython.zmq.entry_point import write_connection_file
40 43 from session import Session
41 44
42 45 #-----------------------------------------------------------------------------
43 46 # Constants and exceptions
44 47 #-----------------------------------------------------------------------------
45 48
46 49 class InvalidPortNumber(Exception):
47 50 pass
48 51
49 52 #-----------------------------------------------------------------------------
50 53 # Utility functions
51 54 #-----------------------------------------------------------------------------
52 55
53 56 # some utilities to validate message structure, these might get moved elsewhere
54 57 # if they prove to have more generic utility
55 58
56 59 def validate_string_list(lst):
57 60 """Validate that the input is a list of strings.
58 61
59 62 Raises ValueError if not."""
60 63 if not isinstance(lst, list):
61 64 raise ValueError('input %r must be a list' % lst)
62 65 for x in lst:
63 66 if not isinstance(x, basestring):
64 67 raise ValueError('element %r in list must be a string' % x)
65 68
66 69
67 70 def validate_string_dict(dct):
68 71 """Validate that the input is a dict with string keys and values.
69 72
70 73 Raises ValueError if not."""
71 74 for k,v in dct.iteritems():
72 75 if not isinstance(k, basestring):
73 76 raise ValueError('key %r in dict must be a string' % k)
74 77 if not isinstance(v, basestring):
75 78 raise ValueError('value %r in dict must be a string' % v)
76 79
77 80
78 81 #-----------------------------------------------------------------------------
79 82 # ZMQ Socket Channel classes
80 83 #-----------------------------------------------------------------------------
81 84
82 85 class ZMQSocketChannel(Thread):
83 86 """The base class for the channels that use ZMQ sockets.
84 87 """
85 88 context = None
86 89 session = None
87 90 socket = None
88 91 ioloop = None
89 92 stream = None
90 93 _address = None
91 94
92 95 def __init__(self, context, session, address):
93 96 """Create a channel
94 97
95 98 Parameters
96 99 ----------
97 100 context : :class:`zmq.Context`
98 101 The ZMQ context to use.
99 102 session : :class:`session.Session`
100 103 The session to use.
101 104 address : tuple
102 105 Standard (ip, port) tuple that the kernel is listening on.
103 106 """
104 107 super(ZMQSocketChannel, self).__init__()
105 108 self.daemon = True
106 109
107 110 self.context = context
108 111 self.session = session
109 112 if address[1] == 0:
110 113 message = 'The port number for a channel cannot be 0.'
111 114 raise InvalidPortNumber(message)
112 115 self._address = address
113 116
114 117 def _run_loop(self):
115 118 """Run my loop, ignoring EINTR events in the poller"""
116 119 while True:
117 120 try:
118 121 self.ioloop.start()
119 except zmq.ZMQError as e:
122 except ZMQError as e:
120 123 if e.errno == errno.EINTR:
121 124 continue
122 125 else:
123 126 raise
124 127 else:
125 128 break
126 129
127 130 def stop(self):
128 131 """Stop the channel's activity.
129 132
130 133 This calls :method:`Thread.join` and returns when the thread
131 134 terminates. :class:`RuntimeError` will be raised if
132 135 :method:`self.start` is called again.
133 136 """
134 137 self.join()
135 138
136 139 @property
137 140 def address(self):
138 141 """Get the channel's address as an (ip, port) tuple.
139 142
140 143 By the default, the address is (localhost, 0), where 0 means a random
141 144 port.
142 145 """
143 146 return self._address
144 147
145 148 def _queue_send(self, msg):
146 149 """Queue a message to be sent from the IOLoop's thread.
147 150
148 151 Parameters
149 152 ----------
150 153 msg : message to send
151 154
152 155 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
153 156 thread control of the action.
154 157 """
155 158 def thread_send():
156 159 self.session.send(self.stream, msg)
157 160 self.ioloop.add_callback(thread_send)
158 161
159 162 def _handle_recv(self, msg):
160 163 """callback for stream.on_recv
161 164
162 165 unpacks message, and calls handlers with it.
163 166 """
164 167 ident,smsg = self.session.feed_identities(msg)
165 168 self.call_handlers(self.session.unserialize(smsg))
166 169
167 170
168 171
169 172 class ShellSocketChannel(ZMQSocketChannel):
170 173 """The XREQ channel for issues request/replies to the kernel.
171 174 """
172 175
173 176 command_queue = None
174 177 # flag for whether execute requests should be allowed to call raw_input:
175 178 allow_stdin = True
176 179
177 180 def __init__(self, context, session, address):
178 181 super(ShellSocketChannel, self).__init__(context, session, address)
179 182 self.ioloop = ioloop.IOLoop()
180 183
181 184 def run(self):
182 185 """The thread's main activity. Call start() instead."""
183 186 self.socket = self.context.socket(zmq.DEALER)
184 187 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
185 188 self.socket.connect('tcp://%s:%i' % self.address)
186 189 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
187 190 self.stream.on_recv(self._handle_recv)
188 191 self._run_loop()
189 192
190 193 def stop(self):
191 194 self.ioloop.stop()
192 195 super(ShellSocketChannel, self).stop()
193 196
194 197 def call_handlers(self, msg):
195 198 """This method is called in the ioloop thread when a message arrives.
196 199
197 200 Subclasses should override this method to handle incoming messages.
198 201 It is important to remember that this method is called in the thread
199 202 so that some logic must be done to ensure that the application leve
200 203 handlers are called in the application thread.
201 204 """
202 205 raise NotImplementedError('call_handlers must be defined in a subclass.')
203 206
204 207 def execute(self, code, silent=False,
205 208 user_variables=None, user_expressions=None, allow_stdin=None):
206 209 """Execute code in the kernel.
207 210
208 211 Parameters
209 212 ----------
210 213 code : str
211 214 A string of Python code.
212 215
213 216 silent : bool, optional (default False)
214 217 If set, the kernel will execute the code as quietly possible.
215 218
216 219 user_variables : list, optional
217 220 A list of variable names to pull from the user's namespace. They
218 221 will come back as a dict with these names as keys and their
219 222 :func:`repr` as values.
220 223
221 224 user_expressions : dict, optional
222 225 A dict with string keys and to pull from the user's
223 226 namespace. They will come back as a dict with these names as keys
224 227 and their :func:`repr` as values.
225 228
226 229 allow_stdin : bool, optional
227 230 Flag for
228 231 A dict with string keys and to pull from the user's
229 232 namespace. They will come back as a dict with these names as keys
230 233 and their :func:`repr` as values.
231 234
232 235 Returns
233 236 -------
234 237 The msg_id of the message sent.
235 238 """
236 239 if user_variables is None:
237 240 user_variables = []
238 241 if user_expressions is None:
239 242 user_expressions = {}
240 243 if allow_stdin is None:
241 244 allow_stdin = self.allow_stdin
242 245
243 246
244 247 # Don't waste network traffic if inputs are invalid
245 248 if not isinstance(code, basestring):
246 249 raise ValueError('code %r must be a string' % code)
247 250 validate_string_list(user_variables)
248 251 validate_string_dict(user_expressions)
249 252
250 253 # Create class for content/msg creation. Related to, but possibly
251 254 # not in Session.
252 255 content = dict(code=code, silent=silent,
253 256 user_variables=user_variables,
254 257 user_expressions=user_expressions,
255 258 allow_stdin=allow_stdin,
256 259 )
257 260 msg = self.session.msg('execute_request', content)
258 261 self._queue_send(msg)
259 262 return msg['header']['msg_id']
260 263
261 264 def complete(self, text, line, cursor_pos, block=None):
262 265 """Tab complete text in the kernel's namespace.
263 266
264 267 Parameters
265 268 ----------
266 269 text : str
267 270 The text to complete.
268 271 line : str
269 272 The full line of text that is the surrounding context for the
270 273 text to complete.
271 274 cursor_pos : int
272 275 The position of the cursor in the line where the completion was
273 276 requested.
274 277 block : str, optional
275 278 The full block of code in which the completion is being requested.
276 279
277 280 Returns
278 281 -------
279 282 The msg_id of the message sent.
280 283 """
281 284 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
282 285 msg = self.session.msg('complete_request', content)
283 286 self._queue_send(msg)
284 287 return msg['header']['msg_id']
285 288
286 289 def object_info(self, oname):
287 290 """Get metadata information about an object.
288 291
289 292 Parameters
290 293 ----------
291 294 oname : str
292 295 A string specifying the object name.
293 296
294 297 Returns
295 298 -------
296 299 The msg_id of the message sent.
297 300 """
298 301 content = dict(oname=oname)
299 302 msg = self.session.msg('object_info_request', content)
300 303 self._queue_send(msg)
301 304 return msg['header']['msg_id']
302 305
303 306 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
304 307 """Get entries from the history list.
305 308
306 309 Parameters
307 310 ----------
308 311 raw : bool
309 312 If True, return the raw input.
310 313 output : bool
311 314 If True, then return the output as well.
312 315 hist_access_type : str
313 316 'range' (fill in session, start and stop params), 'tail' (fill in n)
314 317 or 'search' (fill in pattern param).
315 318
316 319 session : int
317 320 For a range request, the session from which to get lines. Session
318 321 numbers are positive integers; negative ones count back from the
319 322 current session.
320 323 start : int
321 324 The first line number of a history range.
322 325 stop : int
323 326 The final (excluded) line number of a history range.
324 327
325 328 n : int
326 329 The number of lines of history to get for a tail request.
327 330
328 331 pattern : str
329 332 The glob-syntax pattern for a search request.
330 333
331 334 Returns
332 335 -------
333 336 The msg_id of the message sent.
334 337 """
335 338 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
336 339 **kwargs)
337 340 msg = self.session.msg('history_request', content)
338 341 self._queue_send(msg)
339 342 return msg['header']['msg_id']
340 343
341 344 def shutdown(self, restart=False):
342 345 """Request an immediate kernel shutdown.
343 346
344 347 Upon receipt of the (empty) reply, client code can safely assume that
345 348 the kernel has shut down and it's safe to forcefully terminate it if
346 349 it's still alive.
347 350
348 351 The kernel will send the reply via a function registered with Python's
349 352 atexit module, ensuring it's truly done as the kernel is done with all
350 353 normal operation.
351 354 """
352 355 # Send quit message to kernel. Once we implement kernel-side setattr,
353 356 # this should probably be done that way, but for now this will do.
354 357 msg = self.session.msg('shutdown_request', {'restart':restart})
355 358 self._queue_send(msg)
356 359 return msg['header']['msg_id']
357 360
358 361
359 362
360 363 class SubSocketChannel(ZMQSocketChannel):
361 364 """The SUB channel which listens for messages that the kernel publishes.
362 365 """
363 366
364 367 def __init__(self, context, session, address):
365 368 super(SubSocketChannel, self).__init__(context, session, address)
366 369 self.ioloop = ioloop.IOLoop()
367 370
368 371 def run(self):
369 372 """The thread's main activity. Call start() instead."""
370 373 self.socket = self.context.socket(zmq.SUB)
371 374 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
372 375 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 376 self.socket.connect('tcp://%s:%i' % self.address)
374 377 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
375 378 self.stream.on_recv(self._handle_recv)
376 379 self._run_loop()
377 380
378 381 def stop(self):
379 382 self.ioloop.stop()
380 383 super(SubSocketChannel, self).stop()
381 384
382 385 def call_handlers(self, msg):
383 386 """This method is called in the ioloop thread when a message arrives.
384 387
385 388 Subclasses should override this method to handle incoming messages.
386 389 It is important to remember that this method is called in the thread
387 390 so that some logic must be done to ensure that the application leve
388 391 handlers are called in the application thread.
389 392 """
390 393 raise NotImplementedError('call_handlers must be defined in a subclass.')
391 394
392 395 def flush(self, timeout=1.0):
393 396 """Immediately processes all pending messages on the SUB channel.
394 397
395 398 Callers should use this method to ensure that :method:`call_handlers`
396 399 has been called for all messages that have been received on the
397 400 0MQ SUB socket of this channel.
398 401
399 402 This method is thread safe.
400 403
401 404 Parameters
402 405 ----------
403 406 timeout : float, optional
404 407 The maximum amount of time to spend flushing, in seconds. The
405 408 default is one second.
406 409 """
407 410 # We do the IOLoop callback process twice to ensure that the IOLoop
408 411 # gets to perform at least one full poll.
409 412 stop_time = time.time() + timeout
410 413 for i in xrange(2):
411 414 self._flushed = False
412 415 self.ioloop.add_callback(self._flush)
413 416 while not self._flushed and time.time() < stop_time:
414 417 time.sleep(0.01)
415 418
416 419 def _flush(self):
417 420 """Callback for :method:`self.flush`."""
418 421 self.stream.flush()
419 422 self._flushed = True
420 423
421 424
422 425 class StdInSocketChannel(ZMQSocketChannel):
423 426 """A reply channel to handle raw_input requests that the kernel makes."""
424 427
425 428 msg_queue = None
426 429
427 430 def __init__(self, context, session, address):
428 431 super(StdInSocketChannel, self).__init__(context, session, address)
429 432 self.ioloop = ioloop.IOLoop()
430 433
431 434 def run(self):
432 435 """The thread's main activity. Call start() instead."""
433 436 self.socket = self.context.socket(zmq.DEALER)
434 437 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
435 438 self.socket.connect('tcp://%s:%i' % self.address)
436 439 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
437 440 self.stream.on_recv(self._handle_recv)
438 441 self._run_loop()
439 442
440 443 def stop(self):
441 444 self.ioloop.stop()
442 445 super(StdInSocketChannel, self).stop()
443 446
444 447 def call_handlers(self, msg):
445 448 """This method is called in the ioloop thread when a message arrives.
446 449
447 450 Subclasses should override this method to handle incoming messages.
448 451 It is important to remember that this method is called in the thread
449 452 so that some logic must be done to ensure that the application leve
450 453 handlers are called in the application thread.
451 454 """
452 455 raise NotImplementedError('call_handlers must be defined in a subclass.')
453 456
454 457 def input(self, string):
455 458 """Send a string of raw input to the kernel."""
456 459 content = dict(value=string)
457 460 msg = self.session.msg('input_reply', content)
458 461 self._queue_send(msg)
459 462
460 463
461 464 class HBSocketChannel(ZMQSocketChannel):
462 465 """The heartbeat channel which monitors the kernel heartbeat.
463 466
464 467 Note that the heartbeat channel is paused by default. As long as you start
465 468 this channel, the kernel manager will ensure that it is paused and un-paused
466 469 as appropriate.
467 470 """
468 471
469 472 time_to_dead = 3.0
470 473 socket = None
471 474 poller = None
472 475 _running = None
473 476 _pause = None
474 477 _beating = None
475 478
476 479 def __init__(self, context, session, address):
477 480 super(HBSocketChannel, self).__init__(context, session, address)
478 481 self._running = False
479 482 self._pause =True
480 483 self.poller = zmq.Poller()
481 484
482 485 def _create_socket(self):
483 486 if self.socket is not None:
484 487 # close previous socket, before opening a new one
485 488 self.poller.unregister(self.socket)
486 489 self.socket.close()
487 490 self.socket = self.context.socket(zmq.REQ)
488 491 self.socket.setsockopt(zmq.LINGER, 0)
489 492 self.socket.connect('tcp://%s:%i' % self.address)
490 493
491 494 self.poller.register(self.socket, zmq.POLLIN)
492 495
493 496 def _poll(self, start_time):
494 497 """poll for heartbeat replies until we reach self.time_to_dead
495 498
496 499 Ignores interrupts, and returns the result of poll(), which
497 500 will be an empty list if no messages arrived before the timeout,
498 501 or the event tuple if there is a message to receive.
499 502 """
500 503
501 504 until_dead = self.time_to_dead - (time.time() - start_time)
502 505 # ensure poll at least once
503 506 until_dead = max(until_dead, 1e-3)
504 507 events = []
505 508 while True:
506 509 try:
507 510 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
511 except ZMQError as e:
509 512 if e.errno == errno.EINTR:
510 513 # ignore interrupts during heartbeat
511 514 # this may never actually happen
512 515 until_dead = self.time_to_dead - (time.time() - start_time)
513 516 until_dead = max(until_dead, 1e-3)
514 517 pass
515 518 else:
516 519 raise
517 520 else:
518 521 break
519 522 return events
520 523
521 524 def run(self):
522 525 """The thread's main activity. Call start() instead."""
523 526 self._create_socket()
524 527 self._running = True
525 528 self._beating = True
526 529
527 530 while self._running:
528 531 if self._pause:
529 532 # just sleep, and skip the rest of the loop
530 533 time.sleep(self.time_to_dead)
531 534 continue
532 535
533 536 since_last_heartbeat = 0.0
534 537 # io.rprint('Ping from HB channel') # dbg
535 538 # no need to catch EFSM here, because the previous event was
536 539 # either a recv or connect, which cannot be followed by EFSM
537 540 self.socket.send(b'ping')
538 541 request_time = time.time()
539 542 ready = self._poll(request_time)
540 543 if ready:
541 544 self._beating = True
542 545 # the poll above guarantees we have something to recv
543 546 self.socket.recv()
544 547 # sleep the remainder of the cycle
545 548 remainder = self.time_to_dead - (time.time() - request_time)
546 549 if remainder > 0:
547 550 time.sleep(remainder)
548 551 continue
549 552 else:
550 553 # nothing was received within the time limit, signal heart failure
551 554 self._beating = False
552 555 since_last_heartbeat = time.time() - request_time
553 556 self.call_handlers(since_last_heartbeat)
554 557 # and close/reopen the socket, because the REQ/REP cycle has been broken
555 558 self._create_socket()
556 559 continue
557 560
558 561 def pause(self):
559 562 """Pause the heartbeat."""
560 563 self._pause = True
561 564
562 565 def unpause(self):
563 566 """Unpause the heartbeat."""
564 567 self._pause = False
565 568
566 569 def is_beating(self):
567 570 """Is the heartbeat running and responsive (and not paused)."""
568 571 if self.is_alive() and not self._pause and self._beating:
569 572 return True
570 573 else:
571 574 return False
572 575
573 576 def stop(self):
574 577 self._running = False
575 578 super(HBSocketChannel, self).stop()
576 579
577 580 def call_handlers(self, since_last_heartbeat):
578 581 """This method is called in the ioloop thread when a message arrives.
579 582
580 583 Subclasses should override this method to handle incoming messages.
581 584 It is important to remember that this method is called in the thread
582 585 so that some logic must be done to ensure that the application level
583 586 handlers are called in the application thread.
584 587 """
585 588 raise NotImplementedError('call_handlers must be defined in a subclass.')
586 589
587 590
588 591 #-----------------------------------------------------------------------------
589 592 # Main kernel manager class
590 593 #-----------------------------------------------------------------------------
591 594
592 595 class KernelManager(HasTraits):
593 596 """ Manages a kernel for a frontend.
594 597
595 598 The SUB channel is for the frontend to receive messages published by the
596 599 kernel.
597 600
598 601 The REQ channel is for the frontend to make requests of the kernel.
599 602
600 603 The REP channel is for the kernel to request stdin (raw_input) from the
601 604 frontend.
602 605 """
603 606 # config object for passing to child configurables
604 607 config = Instance(Config)
605 608
606 609 # The PyZMQ Context to use for communication with the kernel.
607 610 context = Instance(zmq.Context)
608 611 def _context_default(self):
609 612 return zmq.Context.instance()
610 613
611 614 # The Session to use for communication with the kernel.
612 615 session = Instance(Session)
613 616
614 617 # The kernel process with which the KernelManager is communicating.
615 618 kernel = Instance(Popen)
616 619
617 620 # The addresses for the communication channels.
618 621 connection_file = Unicode('')
619 622 ip = Unicode(LOCALHOST)
620 623 def _ip_changed(self, name, old, new):
621 624 if new == '*':
622 625 self.ip = '0.0.0.0'
623 626 shell_port = Integer(0)
624 627 iopub_port = Integer(0)
625 628 stdin_port = Integer(0)
626 629 hb_port = Integer(0)
627 630
628 631 # The classes to use for the various channels.
629 632 shell_channel_class = Type(ShellSocketChannel)
630 633 sub_channel_class = Type(SubSocketChannel)
631 634 stdin_channel_class = Type(StdInSocketChannel)
632 635 hb_channel_class = Type(HBSocketChannel)
633 636
634 637 # Protected traits.
635 638 _launch_args = Any
636 639 _shell_channel = Any
637 640 _sub_channel = Any
638 641 _stdin_channel = Any
639 642 _hb_channel = Any
640 643 _connection_file_written=Bool(False)
641 644
642 645 def __init__(self, **kwargs):
643 646 super(KernelManager, self).__init__(**kwargs)
644 647 if self.session is None:
645 648 self.session = Session(config=self.config)
646 649
647 650 def __del__(self):
648 651 self.cleanup_connection_file()
649 652
650 653
651 654 #--------------------------------------------------------------------------
652 655 # Channel management methods:
653 656 #--------------------------------------------------------------------------
654 657
655 658 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
656 659 """Starts the channels for this kernel.
657 660
658 661 This will create the channels if they do not exist and then start
659 662 them. If port numbers of 0 are being used (random ports) then you
660 663 must first call :method:`start_kernel`. If the channels have been
661 664 stopped and you call this, :class:`RuntimeError` will be raised.
662 665 """
663 666 if shell:
664 667 self.shell_channel.start()
665 668 if sub:
666 669 self.sub_channel.start()
667 670 if stdin:
668 671 self.stdin_channel.start()
669 672 self.shell_channel.allow_stdin = True
670 673 else:
671 674 self.shell_channel.allow_stdin = False
672 675 if hb:
673 676 self.hb_channel.start()
674 677
675 678 def stop_channels(self):
676 679 """Stops all the running channels for this kernel.
677 680 """
678 681 if self.shell_channel.is_alive():
679 682 self.shell_channel.stop()
680 683 if self.sub_channel.is_alive():
681 684 self.sub_channel.stop()
682 685 if self.stdin_channel.is_alive():
683 686 self.stdin_channel.stop()
684 687 if self.hb_channel.is_alive():
685 688 self.hb_channel.stop()
686 689
687 690 @property
688 691 def channels_running(self):
689 692 """Are any of the channels created and running?"""
690 693 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
691 694 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
692 695
693 696 #--------------------------------------------------------------------------
694 697 # Kernel process management methods:
695 698 #--------------------------------------------------------------------------
696 699
697 700 def cleanup_connection_file(self):
698 701 """cleanup connection file *if we wrote it*
699 702
700 703 Will not raise if the connection file was already removed somehow.
701 704 """
702 705 if self._connection_file_written:
703 706 # cleanup connection files on full shutdown of kernel we started
704 707 self._connection_file_written = False
705 708 try:
706 709 os.remove(self.connection_file)
707 710 except OSError:
708 711 pass
709 712
710 713 def load_connection_file(self):
711 714 """load connection info from JSON dict in self.connection_file"""
712 715 with open(self.connection_file) as f:
713 716 cfg = json.loads(f.read())
714 717
715 718 self.ip = cfg['ip']
716 719 self.shell_port = cfg['shell_port']
717 720 self.stdin_port = cfg['stdin_port']
718 721 self.iopub_port = cfg['iopub_port']
719 722 self.hb_port = cfg['hb_port']
720 723 self.session.key = str_to_bytes(cfg['key'])
721 724
722 725 def write_connection_file(self):
723 726 """write connection info to JSON dict in self.connection_file"""
724 727 if self._connection_file_written:
725 728 return
726 729 self.connection_file,cfg = write_connection_file(self.connection_file,
727 730 ip=self.ip, key=self.session.key,
728 731 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
729 732 shell_port=self.shell_port, hb_port=self.hb_port)
730 733 # write_connection_file also sets default ports:
731 734 self.shell_port = cfg['shell_port']
732 735 self.stdin_port = cfg['stdin_port']
733 736 self.iopub_port = cfg['iopub_port']
734 737 self.hb_port = cfg['hb_port']
735 738
736 739 self._connection_file_written = True
737 740
738 741 def start_kernel(self, **kw):
739 742 """Starts a kernel process and configures the manager to use it.
740 743
741 744 If random ports (port=0) are being used, this method must be called
742 745 before the channels are created.
743 746
744 747 Parameters:
745 748 -----------
746 749 ipython : bool, optional (default True)
747 750 Whether to use an IPython kernel instead of a plain Python kernel.
748 751
749 752 launcher : callable, optional (default None)
750 753 A custom function for launching the kernel process (generally a
751 754 wrapper around ``entry_point.base_launch_kernel``). In most cases,
752 755 it should not be necessary to use this parameter.
753 756
754 757 **kw : optional
755 758 See respective options for IPython and Python kernels.
756 759 """
757 760 if self.ip not in LOCAL_IPS:
758 761 raise RuntimeError("Can only launch a kernel on a local interface. "
759 762 "Make sure that the '*_address' attributes are "
760 763 "configured properly. "
761 764 "Currently valid addresses are: %s"%LOCAL_IPS
762 765 )
763 766
764 767 # write connection file / get default ports
765 768 self.write_connection_file()
766 769
767 770 self._launch_args = kw.copy()
768 771 launch_kernel = kw.pop('launcher', None)
769 772 if launch_kernel is None:
770 773 if kw.pop('ipython', True):
771 774 from ipkernel import launch_kernel
772 775 else:
773 776 from pykernel import launch_kernel
774 777 self.kernel = launch_kernel(fname=self.connection_file, **kw)
775 778
776 779 def shutdown_kernel(self, restart=False):
777 780 """ Attempts to the stop the kernel process cleanly. If the kernel
778 781 cannot be stopped, it is killed, if possible.
779 782 """
780 783 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
781 784 if sys.platform == 'win32':
782 785 self.kill_kernel()
783 786 return
784 787
785 788 # Pause the heart beat channel if it exists.
786 789 if self._hb_channel is not None:
787 790 self._hb_channel.pause()
788 791
789 792 # Don't send any additional kernel kill messages immediately, to give
790 793 # the kernel a chance to properly execute shutdown actions. Wait for at
791 794 # most 1s, checking every 0.1s.
792 795 self.shell_channel.shutdown(restart=restart)
793 796 for i in range(10):
794 797 if self.is_alive:
795 798 time.sleep(0.1)
796 799 else:
797 800 break
798 801 else:
799 802 # OK, we've waited long enough.
800 803 if self.has_kernel:
801 804 self.kill_kernel()
802 805
803 806 if not restart and self._connection_file_written:
804 807 # cleanup connection files on full shutdown of kernel we started
805 808 self._connection_file_written = False
806 809 try:
807 810 os.remove(self.connection_file)
808 811 except IOError:
809 812 pass
810 813
811 814 def restart_kernel(self, now=False, **kw):
812 815 """Restarts a kernel with the arguments that were used to launch it.
813 816
814 817 If the old kernel was launched with random ports, the same ports will be
815 818 used for the new kernel.
816 819
817 820 Parameters
818 821 ----------
819 822 now : bool, optional
820 823 If True, the kernel is forcefully restarted *immediately*, without
821 824 having a chance to do any cleanup action. Otherwise the kernel is
822 825 given 1s to clean up before a forceful restart is issued.
823 826
824 827 In all cases the kernel is restarted, the only difference is whether
825 828 it is given a chance to perform a clean shutdown or not.
826 829
827 830 **kw : optional
828 831 Any options specified here will replace those used to launch the
829 832 kernel.
830 833 """
831 834 if self._launch_args is None:
832 835 raise RuntimeError("Cannot restart the kernel. "
833 836 "No previous call to 'start_kernel'.")
834 837 else:
835 838 # Stop currently running kernel.
836 839 if self.has_kernel:
837 840 if now:
838 841 self.kill_kernel()
839 842 else:
840 843 self.shutdown_kernel(restart=True)
841 844
842 845 # Start new kernel.
843 846 self._launch_args.update(kw)
844 847 self.start_kernel(**self._launch_args)
845 848
846 849 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
847 850 # unless there is some delay here.
848 851 if sys.platform == 'win32':
849 852 time.sleep(0.2)
850 853
851 854 @property
852 855 def has_kernel(self):
853 856 """Returns whether a kernel process has been specified for the kernel
854 857 manager.
855 858 """
856 859 return self.kernel is not None
857 860
858 861 def kill_kernel(self):
859 862 """ Kill the running kernel. """
860 863 if self.has_kernel:
861 864 # Pause the heart beat channel if it exists.
862 865 if self._hb_channel is not None:
863 866 self._hb_channel.pause()
864 867
865 868 # Attempt to kill the kernel.
866 869 try:
867 870 self.kernel.kill()
868 871 except OSError, e:
869 872 # In Windows, we will get an Access Denied error if the process
870 873 # has already terminated. Ignore it.
871 874 if sys.platform == 'win32':
872 875 if e.winerror != 5:
873 876 raise
874 877 # On Unix, we may get an ESRCH error if the process has already
875 878 # terminated. Ignore it.
876 879 else:
877 880 from errno import ESRCH
878 881 if e.errno != ESRCH:
879 882 raise
880 883 self.kernel = None
881 884 else:
882 885 raise RuntimeError("Cannot kill kernel. No kernel is running!")
883 886
884 887 def interrupt_kernel(self):
885 888 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
886 889 well supported on all platforms.
887 890 """
888 891 if self.has_kernel:
889 892 if sys.platform == 'win32':
890 893 from parentpoller import ParentPollerWindows as Poller
891 894 Poller.send_interrupt(self.kernel.win32_interrupt_event)
892 895 else:
893 896 self.kernel.send_signal(signal.SIGINT)
894 897 else:
895 898 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
896 899
897 900 def signal_kernel(self, signum):
898 901 """ Sends a signal to the kernel. Note that since only SIGTERM is
899 902 supported on Windows, this function is only useful on Unix systems.
900 903 """
901 904 if self.has_kernel:
902 905 self.kernel.send_signal(signum)
903 906 else:
904 907 raise RuntimeError("Cannot signal kernel. No kernel is running!")
905 908
906 909 @property
907 910 def is_alive(self):
908 911 """Is the kernel process still running?"""
909 912 if self.has_kernel:
910 913 if self.kernel.poll() is None:
911 914 return True
912 915 else:
913 916 return False
914 917 elif self._hb_channel is not None:
915 918 # We didn't start the kernel with this KernelManager so we
916 919 # use the heartbeat.
917 920 return self._hb_channel.is_beating()
918 921 else:
919 922 # no heartbeat and not local, we can't tell if it's running,
920 923 # so naively return True
921 924 return True
922 925
923 926 #--------------------------------------------------------------------------
924 927 # Channels used for communication with the kernel:
925 928 #--------------------------------------------------------------------------
926 929
927 930 @property
928 931 def shell_channel(self):
929 932 """Get the REQ socket channel object to make requests of the kernel."""
930 933 if self._shell_channel is None:
931 934 self._shell_channel = self.shell_channel_class(self.context,
932 935 self.session,
933 936 (self.ip, self.shell_port))
934 937 return self._shell_channel
935 938
936 939 @property
937 940 def sub_channel(self):
938 941 """Get the SUB socket channel object."""
939 942 if self._sub_channel is None:
940 943 self._sub_channel = self.sub_channel_class(self.context,
941 944 self.session,
942 945 (self.ip, self.iopub_port))
943 946 return self._sub_channel
944 947
945 948 @property
946 949 def stdin_channel(self):
947 950 """Get the REP socket channel object to handle stdin (raw_input)."""
948 951 if self._stdin_channel is None:
949 952 self._stdin_channel = self.stdin_channel_class(self.context,
950 953 self.session,
951 954 (self.ip, self.stdin_port))
952 955 return self._stdin_channel
953 956
954 957 @property
955 958 def hb_channel(self):
956 959 """Get the heartbeat socket channel object to check that the
957 960 kernel is alive."""
958 961 if self._hb_channel is None:
959 962 self._hb_channel = self.hb_channel_class(self.context,
960 963 self.session,
961 964 (self.ip, self.hb_port))
962 965 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now