##// END OF EJS Templates
add KernelManager.load_connection_file method...
MinRK -
Show More
@@ -1,1028 +1,1043 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import errno
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import os
23 23 import signal
24 24 import sys
25 25 from threading import Thread
26 26 import time
27 27
28 28 # System library imports.
29 29 import zmq
30 30 from zmq import POLLIN, POLLOUT, POLLERR
31 31 from zmq.eventloop import ioloop
32 from zmq.utils import jsonapi as json
32 33
33 34 # Local imports.
34 35 from IPython.config.loader import Config
35 36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 37 from IPython.utils.traitlets import (
37 38 HasTraits, Any, Instance, Type, Unicode, Int, Bool
38 39 )
40 from IPython.utils.py3compat import str_to_bytes
39 41 from IPython.zmq.entry_point import write_connection_file
40 42 from session import Session
41 43
42 44 #-----------------------------------------------------------------------------
43 45 # Constants and exceptions
44 46 #-----------------------------------------------------------------------------
45 47
46 48 class InvalidPortNumber(Exception):
47 49 pass
48 50
49 51 #-----------------------------------------------------------------------------
50 52 # Utility functions
51 53 #-----------------------------------------------------------------------------
52 54
53 55 # some utilities to validate message structure, these might get moved elsewhere
54 56 # if they prove to have more generic utility
55 57
56 58 def validate_string_list(lst):
57 59 """Validate that the input is a list of strings.
58 60
59 61 Raises ValueError if not."""
60 62 if not isinstance(lst, list):
61 63 raise ValueError('input %r must be a list' % lst)
62 64 for x in lst:
63 65 if not isinstance(x, basestring):
64 66 raise ValueError('element %r in list must be a string' % x)
65 67
66 68
67 69 def validate_string_dict(dct):
68 70 """Validate that the input is a dict with string keys and values.
69 71
70 72 Raises ValueError if not."""
71 73 for k,v in dct.iteritems():
72 74 if not isinstance(k, basestring):
73 75 raise ValueError('key %r in dict must be a string' % k)
74 76 if not isinstance(v, basestring):
75 77 raise ValueError('value %r in dict must be a string' % v)
76 78
77 79
78 80 #-----------------------------------------------------------------------------
79 81 # ZMQ Socket Channel classes
80 82 #-----------------------------------------------------------------------------
81 83
82 84 class ZMQSocketChannel(Thread):
83 85 """The base class for the channels that use ZMQ sockets.
84 86 """
85 87 context = None
86 88 session = None
87 89 socket = None
88 90 ioloop = None
89 91 iostate = None
90 92 _address = None
91 93
92 94 def __init__(self, context, session, address):
93 95 """Create a channel
94 96
95 97 Parameters
96 98 ----------
97 99 context : :class:`zmq.Context`
98 100 The ZMQ context to use.
99 101 session : :class:`session.Session`
100 102 The session to use.
101 103 address : tuple
102 104 Standard (ip, port) tuple that the kernel is listening on.
103 105 """
104 106 super(ZMQSocketChannel, self).__init__()
105 107 self.daemon = True
106 108
107 109 self.context = context
108 110 self.session = session
109 111 if address[1] == 0:
110 112 message = 'The port number for a channel cannot be 0.'
111 113 raise InvalidPortNumber(message)
112 114 self._address = address
113 115
114 116 def _run_loop(self):
115 117 """Run my loop, ignoring EINTR events in the poller"""
116 118 while True:
117 119 try:
118 120 self.ioloop.start()
119 121 except zmq.ZMQError as e:
120 122 if e.errno == errno.EINTR:
121 123 continue
122 124 else:
123 125 raise
124 126 else:
125 127 break
126 128
127 129 def stop(self):
128 130 """Stop the channel's activity.
129 131
130 132 This calls :method:`Thread.join` and returns when the thread
131 133 terminates. :class:`RuntimeError` will be raised if
132 134 :method:`self.start` is called again.
133 135 """
134 136 self.join()
135 137
136 138 @property
137 139 def address(self):
138 140 """Get the channel's address as an (ip, port) tuple.
139 141
140 142 By the default, the address is (localhost, 0), where 0 means a random
141 143 port.
142 144 """
143 145 return self._address
144 146
145 147 def add_io_state(self, state):
146 148 """Add IO state to the eventloop.
147 149
148 150 Parameters
149 151 ----------
150 152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 153 The IO state flag to set.
152 154
153 155 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 156 """
155 157 def add_io_state_callback():
156 158 if not self.iostate & state:
157 159 self.iostate = self.iostate | state
158 160 self.ioloop.update_handler(self.socket, self.iostate)
159 161 self.ioloop.add_callback(add_io_state_callback)
160 162
161 163 def drop_io_state(self, state):
162 164 """Drop IO state from the eventloop.
163 165
164 166 Parameters
165 167 ----------
166 168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
167 169 The IO state flag to set.
168 170
169 171 This is thread safe as it uses the thread safe IOLoop.add_callback.
170 172 """
171 173 def drop_io_state_callback():
172 174 if self.iostate & state:
173 175 self.iostate = self.iostate & (~state)
174 176 self.ioloop.update_handler(self.socket, self.iostate)
175 177 self.ioloop.add_callback(drop_io_state_callback)
176 178
177 179
178 180 class ShellSocketChannel(ZMQSocketChannel):
179 181 """The XREQ channel for issues request/replies to the kernel.
180 182 """
181 183
182 184 command_queue = None
183 185 # flag for whether execute requests should be allowed to call raw_input:
184 186 allow_stdin = True
185 187
186 188 def __init__(self, context, session, address):
187 189 super(ShellSocketChannel, self).__init__(context, session, address)
188 190 self.command_queue = Queue()
189 191 self.ioloop = ioloop.IOLoop()
190 192
191 193 def run(self):
192 194 """The thread's main activity. Call start() instead."""
193 195 self.socket = self.context.socket(zmq.DEALER)
194 196 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
195 197 self.socket.connect('tcp://%s:%i' % self.address)
196 198 self.iostate = POLLERR|POLLIN
197 199 self.ioloop.add_handler(self.socket, self._handle_events,
198 200 self.iostate)
199 201 self._run_loop()
200 202
201 203 def stop(self):
202 204 self.ioloop.stop()
203 205 super(ShellSocketChannel, self).stop()
204 206
205 207 def call_handlers(self, msg):
206 208 """This method is called in the ioloop thread when a message arrives.
207 209
208 210 Subclasses should override this method to handle incoming messages.
209 211 It is important to remember that this method is called in the thread
210 212 so that some logic must be done to ensure that the application leve
211 213 handlers are called in the application thread.
212 214 """
213 215 raise NotImplementedError('call_handlers must be defined in a subclass.')
214 216
215 217 def execute(self, code, silent=False,
216 218 user_variables=None, user_expressions=None, allow_stdin=None):
217 219 """Execute code in the kernel.
218 220
219 221 Parameters
220 222 ----------
221 223 code : str
222 224 A string of Python code.
223 225
224 226 silent : bool, optional (default False)
225 227 If set, the kernel will execute the code as quietly possible.
226 228
227 229 user_variables : list, optional
228 230 A list of variable names to pull from the user's namespace. They
229 231 will come back as a dict with these names as keys and their
230 232 :func:`repr` as values.
231 233
232 234 user_expressions : dict, optional
233 235 A dict with string keys and to pull from the user's
234 236 namespace. They will come back as a dict with these names as keys
235 237 and their :func:`repr` as values.
236 238
237 239 allow_stdin : bool, optional
238 240 Flag for
239 241 A dict with string keys and to pull from the user's
240 242 namespace. They will come back as a dict with these names as keys
241 243 and their :func:`repr` as values.
242 244
243 245 Returns
244 246 -------
245 247 The msg_id of the message sent.
246 248 """
247 249 if user_variables is None:
248 250 user_variables = []
249 251 if user_expressions is None:
250 252 user_expressions = {}
251 253 if allow_stdin is None:
252 254 allow_stdin = self.allow_stdin
253 255
254 256
255 257 # Don't waste network traffic if inputs are invalid
256 258 if not isinstance(code, basestring):
257 259 raise ValueError('code %r must be a string' % code)
258 260 validate_string_list(user_variables)
259 261 validate_string_dict(user_expressions)
260 262
261 263 # Create class for content/msg creation. Related to, but possibly
262 264 # not in Session.
263 265 content = dict(code=code, silent=silent,
264 266 user_variables=user_variables,
265 267 user_expressions=user_expressions,
266 268 allow_stdin=allow_stdin,
267 269 )
268 270 msg = self.session.msg('execute_request', content)
269 271 self._queue_request(msg)
270 272 return msg['header']['msg_id']
271 273
272 274 def complete(self, text, line, cursor_pos, block=None):
273 275 """Tab complete text in the kernel's namespace.
274 276
275 277 Parameters
276 278 ----------
277 279 text : str
278 280 The text to complete.
279 281 line : str
280 282 The full line of text that is the surrounding context for the
281 283 text to complete.
282 284 cursor_pos : int
283 285 The position of the cursor in the line where the completion was
284 286 requested.
285 287 block : str, optional
286 288 The full block of code in which the completion is being requested.
287 289
288 290 Returns
289 291 -------
290 292 The msg_id of the message sent.
291 293 """
292 294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
293 295 msg = self.session.msg('complete_request', content)
294 296 self._queue_request(msg)
295 297 return msg['header']['msg_id']
296 298
297 299 def object_info(self, oname):
298 300 """Get metadata information about an object.
299 301
300 302 Parameters
301 303 ----------
302 304 oname : str
303 305 A string specifying the object name.
304 306
305 307 Returns
306 308 -------
307 309 The msg_id of the message sent.
308 310 """
309 311 content = dict(oname=oname)
310 312 msg = self.session.msg('object_info_request', content)
311 313 self._queue_request(msg)
312 314 return msg['header']['msg_id']
313 315
314 316 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
315 317 """Get entries from the history list.
316 318
317 319 Parameters
318 320 ----------
319 321 raw : bool
320 322 If True, return the raw input.
321 323 output : bool
322 324 If True, then return the output as well.
323 325 hist_access_type : str
324 326 'range' (fill in session, start and stop params), 'tail' (fill in n)
325 327 or 'search' (fill in pattern param).
326 328
327 329 session : int
328 330 For a range request, the session from which to get lines. Session
329 331 numbers are positive integers; negative ones count back from the
330 332 current session.
331 333 start : int
332 334 The first line number of a history range.
333 335 stop : int
334 336 The final (excluded) line number of a history range.
335 337
336 338 n : int
337 339 The number of lines of history to get for a tail request.
338 340
339 341 pattern : str
340 342 The glob-syntax pattern for a search request.
341 343
342 344 Returns
343 345 -------
344 346 The msg_id of the message sent.
345 347 """
346 348 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
347 349 **kwargs)
348 350 msg = self.session.msg('history_request', content)
349 351 self._queue_request(msg)
350 352 return msg['header']['msg_id']
351 353
352 354 def shutdown(self, restart=False):
353 355 """Request an immediate kernel shutdown.
354 356
355 357 Upon receipt of the (empty) reply, client code can safely assume that
356 358 the kernel has shut down and it's safe to forcefully terminate it if
357 359 it's still alive.
358 360
359 361 The kernel will send the reply via a function registered with Python's
360 362 atexit module, ensuring it's truly done as the kernel is done with all
361 363 normal operation.
362 364 """
363 365 # Send quit message to kernel. Once we implement kernel-side setattr,
364 366 # this should probably be done that way, but for now this will do.
365 367 msg = self.session.msg('shutdown_request', {'restart':restart})
366 368 self._queue_request(msg)
367 369 return msg['header']['msg_id']
368 370
369 371 def _handle_events(self, socket, events):
370 372 if events & POLLERR:
371 373 self._handle_err()
372 374 if events & POLLOUT:
373 375 self._handle_send()
374 376 if events & POLLIN:
375 377 self._handle_recv()
376 378
377 379 def _handle_recv(self):
378 380 ident,msg = self.session.recv(self.socket, 0)
379 381 self.call_handlers(msg)
380 382
381 383 def _handle_send(self):
382 384 try:
383 385 msg = self.command_queue.get(False)
384 386 except Empty:
385 387 pass
386 388 else:
387 389 self.session.send(self.socket,msg)
388 390 if self.command_queue.empty():
389 391 self.drop_io_state(POLLOUT)
390 392
391 393 def _handle_err(self):
392 394 # We don't want to let this go silently, so eventually we should log.
393 395 raise zmq.ZMQError()
394 396
395 397 def _queue_request(self, msg):
396 398 self.command_queue.put(msg)
397 399 self.add_io_state(POLLOUT)
398 400
399 401
400 402 class SubSocketChannel(ZMQSocketChannel):
401 403 """The SUB channel which listens for messages that the kernel publishes.
402 404 """
403 405
404 406 def __init__(self, context, session, address):
405 407 super(SubSocketChannel, self).__init__(context, session, address)
406 408 self.ioloop = ioloop.IOLoop()
407 409
408 410 def run(self):
409 411 """The thread's main activity. Call start() instead."""
410 412 self.socket = self.context.socket(zmq.SUB)
411 413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 415 self.socket.connect('tcp://%s:%i' % self.address)
414 416 self.iostate = POLLIN|POLLERR
415 417 self.ioloop.add_handler(self.socket, self._handle_events,
416 418 self.iostate)
417 419 self._run_loop()
418 420
419 421 def stop(self):
420 422 self.ioloop.stop()
421 423 super(SubSocketChannel, self).stop()
422 424
423 425 def call_handlers(self, msg):
424 426 """This method is called in the ioloop thread when a message arrives.
425 427
426 428 Subclasses should override this method to handle incoming messages.
427 429 It is important to remember that this method is called in the thread
428 430 so that some logic must be done to ensure that the application leve
429 431 handlers are called in the application thread.
430 432 """
431 433 raise NotImplementedError('call_handlers must be defined in a subclass.')
432 434
433 435 def flush(self, timeout=1.0):
434 436 """Immediately processes all pending messages on the SUB channel.
435 437
436 438 Callers should use this method to ensure that :method:`call_handlers`
437 439 has been called for all messages that have been received on the
438 440 0MQ SUB socket of this channel.
439 441
440 442 This method is thread safe.
441 443
442 444 Parameters
443 445 ----------
444 446 timeout : float, optional
445 447 The maximum amount of time to spend flushing, in seconds. The
446 448 default is one second.
447 449 """
448 450 # We do the IOLoop callback process twice to ensure that the IOLoop
449 451 # gets to perform at least one full poll.
450 452 stop_time = time.time() + timeout
451 453 for i in xrange(2):
452 454 self._flushed = False
453 455 self.ioloop.add_callback(self._flush)
454 456 while not self._flushed and time.time() < stop_time:
455 457 time.sleep(0.01)
456 458
457 459 def _handle_events(self, socket, events):
458 460 # Turn on and off POLLOUT depending on if we have made a request
459 461 if events & POLLERR:
460 462 self._handle_err()
461 463 if events & POLLIN:
462 464 self._handle_recv()
463 465
464 466 def _handle_err(self):
465 467 # We don't want to let this go silently, so eventually we should log.
466 468 raise zmq.ZMQError()
467 469
468 470 def _handle_recv(self):
469 471 # Get all of the messages we can
470 472 while True:
471 473 try:
472 474 ident,msg = self.session.recv(self.socket)
473 475 except zmq.ZMQError:
474 476 # Check the errno?
475 477 # Will this trigger POLLERR?
476 478 break
477 479 else:
478 480 if msg is None:
479 481 break
480 482 self.call_handlers(msg)
481 483
482 484 def _flush(self):
483 485 """Callback for :method:`self.flush`."""
484 486 self._flushed = True
485 487
486 488
487 489 class StdInSocketChannel(ZMQSocketChannel):
488 490 """A reply channel to handle raw_input requests that the kernel makes."""
489 491
490 492 msg_queue = None
491 493
492 494 def __init__(self, context, session, address):
493 495 super(StdInSocketChannel, self).__init__(context, session, address)
494 496 self.ioloop = ioloop.IOLoop()
495 497 self.msg_queue = Queue()
496 498
497 499 def run(self):
498 500 """The thread's main activity. Call start() instead."""
499 501 self.socket = self.context.socket(zmq.DEALER)
500 502 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
501 503 self.socket.connect('tcp://%s:%i' % self.address)
502 504 self.iostate = POLLERR|POLLIN
503 505 self.ioloop.add_handler(self.socket, self._handle_events,
504 506 self.iostate)
505 507 self._run_loop()
506 508
507 509 def stop(self):
508 510 self.ioloop.stop()
509 511 super(StdInSocketChannel, self).stop()
510 512
511 513 def call_handlers(self, msg):
512 514 """This method is called in the ioloop thread when a message arrives.
513 515
514 516 Subclasses should override this method to handle incoming messages.
515 517 It is important to remember that this method is called in the thread
516 518 so that some logic must be done to ensure that the application leve
517 519 handlers are called in the application thread.
518 520 """
519 521 raise NotImplementedError('call_handlers must be defined in a subclass.')
520 522
521 523 def input(self, string):
522 524 """Send a string of raw input to the kernel."""
523 525 content = dict(value=string)
524 526 msg = self.session.msg('input_reply', content)
525 527 self._queue_reply(msg)
526 528
527 529 def _handle_events(self, socket, events):
528 530 if events & POLLERR:
529 531 self._handle_err()
530 532 if events & POLLOUT:
531 533 self._handle_send()
532 534 if events & POLLIN:
533 535 self._handle_recv()
534 536
535 537 def _handle_recv(self):
536 538 ident,msg = self.session.recv(self.socket, 0)
537 539 self.call_handlers(msg)
538 540
539 541 def _handle_send(self):
540 542 try:
541 543 msg = self.msg_queue.get(False)
542 544 except Empty:
543 545 pass
544 546 else:
545 547 self.session.send(self.socket,msg)
546 548 if self.msg_queue.empty():
547 549 self.drop_io_state(POLLOUT)
548 550
549 551 def _handle_err(self):
550 552 # We don't want to let this go silently, so eventually we should log.
551 553 raise zmq.ZMQError()
552 554
553 555 def _queue_reply(self, msg):
554 556 self.msg_queue.put(msg)
555 557 self.add_io_state(POLLOUT)
556 558
557 559
558 560 class HBSocketChannel(ZMQSocketChannel):
559 561 """The heartbeat channel which monitors the kernel heartbeat.
560 562
561 563 Note that the heartbeat channel is paused by default. As long as you start
562 564 this channel, the kernel manager will ensure that it is paused and un-paused
563 565 as appropriate.
564 566 """
565 567
566 568 time_to_dead = 3.0
567 569 socket = None
568 570 poller = None
569 571 _running = None
570 572 _pause = None
571 573
572 574 def __init__(self, context, session, address):
573 575 super(HBSocketChannel, self).__init__(context, session, address)
574 576 self._running = False
575 577 self._pause = True
576 578
577 579 def _create_socket(self):
578 580 self.socket = self.context.socket(zmq.REQ)
579 581 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
580 582 self.socket.connect('tcp://%s:%i' % self.address)
581 583 self.poller = zmq.Poller()
582 584 self.poller.register(self.socket, zmq.POLLIN)
583 585
584 586 def run(self):
585 587 """The thread's main activity. Call start() instead."""
586 588 self._create_socket()
587 589 self._running = True
588 590 while self._running:
589 591 if self._pause:
590 592 time.sleep(self.time_to_dead)
591 593 else:
592 594 since_last_heartbeat = 0.0
593 595 request_time = time.time()
594 596 try:
595 597 #io.rprint('Ping from HB channel') # dbg
596 598 self.socket.send(b'ping')
597 599 except zmq.ZMQError, e:
598 600 #io.rprint('*** HB Error:', e) # dbg
599 601 if e.errno == zmq.EFSM:
600 602 #io.rprint('sleep...', self.time_to_dead) # dbg
601 603 time.sleep(self.time_to_dead)
602 604 self._create_socket()
603 605 else:
604 606 raise
605 607 else:
606 608 while True:
607 609 try:
608 610 self.socket.recv(zmq.NOBLOCK)
609 611 except zmq.ZMQError, e:
610 612 #io.rprint('*** HB Error 2:', e) # dbg
611 613 if e.errno == zmq.EAGAIN:
612 614 before_poll = time.time()
613 615 until_dead = self.time_to_dead - (before_poll -
614 616 request_time)
615 617
616 618 # When the return value of poll() is an empty
617 619 # list, that is when things have gone wrong
618 620 # (zeromq bug). As long as it is not an empty
619 621 # list, poll is working correctly even if it
620 622 # returns quickly. Note: poll timeout is in
621 623 # milliseconds.
622 624 if until_dead > 0.0:
623 625 while True:
624 626 try:
625 627 self.poller.poll(1000 * until_dead)
626 628 except zmq.ZMQError as e:
627 629 if e.errno == errno.EINTR:
628 630 continue
629 631 else:
630 632 raise
631 633 else:
632 634 break
633 635
634 636 since_last_heartbeat = time.time()-request_time
635 637 if since_last_heartbeat > self.time_to_dead:
636 638 self.call_handlers(since_last_heartbeat)
637 639 break
638 640 else:
639 641 # FIXME: We should probably log this instead.
640 642 raise
641 643 else:
642 644 until_dead = self.time_to_dead - (time.time() -
643 645 request_time)
644 646 if until_dead > 0.0:
645 647 #io.rprint('sleep...', self.time_to_dead) # dbg
646 648 time.sleep(until_dead)
647 649 break
648 650
649 651 def pause(self):
650 652 """Pause the heartbeat."""
651 653 self._pause = True
652 654
653 655 def unpause(self):
654 656 """Unpause the heartbeat."""
655 657 self._pause = False
656 658
657 659 def is_beating(self):
658 660 """Is the heartbeat running and not paused."""
659 661 if self.is_alive() and not self._pause:
660 662 return True
661 663 else:
662 664 return False
663 665
664 666 def stop(self):
665 667 self._running = False
666 668 super(HBSocketChannel, self).stop()
667 669
668 670 def call_handlers(self, since_last_heartbeat):
669 671 """This method is called in the ioloop thread when a message arrives.
670 672
671 673 Subclasses should override this method to handle incoming messages.
672 674 It is important to remember that this method is called in the thread
673 675 so that some logic must be done to ensure that the application leve
674 676 handlers are called in the application thread.
675 677 """
676 678 raise NotImplementedError('call_handlers must be defined in a subclass.')
677 679
678 680
679 681 #-----------------------------------------------------------------------------
680 682 # Main kernel manager class
681 683 #-----------------------------------------------------------------------------
682 684
683 685 class KernelManager(HasTraits):
684 686 """ Manages a kernel for a frontend.
685 687
686 688 The SUB channel is for the frontend to receive messages published by the
687 689 kernel.
688 690
689 691 The REQ channel is for the frontend to make requests of the kernel.
690 692
691 693 The REP channel is for the kernel to request stdin (raw_input) from the
692 694 frontend.
693 695 """
694 696 # config object for passing to child configurables
695 697 config = Instance(Config)
696 698
697 699 # The PyZMQ Context to use for communication with the kernel.
698 700 context = Instance(zmq.Context)
699 701 def _context_default(self):
700 702 return zmq.Context.instance()
701 703
702 704 # The Session to use for communication with the kernel.
703 705 session = Instance(Session)
704 706
705 707 # The kernel process with which the KernelManager is communicating.
706 708 kernel = Instance(Popen)
707 709
708 710 # The addresses for the communication channels.
709 711 connection_file = Unicode('')
710 712 ip = Unicode(LOCALHOST)
711 713 shell_port = Int(0)
712 714 iopub_port = Int(0)
713 715 stdin_port = Int(0)
714 716 hb_port = Int(0)
715 717
716 718 # The classes to use for the various channels.
717 719 shell_channel_class = Type(ShellSocketChannel)
718 720 sub_channel_class = Type(SubSocketChannel)
719 721 stdin_channel_class = Type(StdInSocketChannel)
720 722 hb_channel_class = Type(HBSocketChannel)
721 723
722 724 # Protected traits.
723 725 _launch_args = Any
724 726 _shell_channel = Any
725 727 _sub_channel = Any
726 728 _stdin_channel = Any
727 729 _hb_channel = Any
728 730 _connection_file_written=Bool(False)
729 731
730 732 def __init__(self, **kwargs):
731 733 super(KernelManager, self).__init__(**kwargs)
732 734 if self.session is None:
733 735 self.session = Session(config=self.config)
734 736
735 737 def __del__(self):
736 738 if self._connection_file_written:
737 739 # cleanup connection files on full shutdown of kernel we started
738 740 self._connection_file_written = False
739 741 try:
740 742 os.remove(self.connection_file)
741 743 except IOError:
742 744 pass
743 745
744 746
745 747 #--------------------------------------------------------------------------
746 748 # Channel management methods:
747 749 #--------------------------------------------------------------------------
748 750
749 751 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
750 752 """Starts the channels for this kernel.
751 753
752 754 This will create the channels if they do not exist and then start
753 755 them. If port numbers of 0 are being used (random ports) then you
754 756 must first call :method:`start_kernel`. If the channels have been
755 757 stopped and you call this, :class:`RuntimeError` will be raised.
756 758 """
757 759 if shell:
758 760 self.shell_channel.start()
759 761 if sub:
760 762 self.sub_channel.start()
761 763 if stdin:
762 764 self.stdin_channel.start()
763 765 self.shell_channel.allow_stdin = True
764 766 else:
765 767 self.shell_channel.allow_stdin = False
766 768 if hb:
767 769 self.hb_channel.start()
768 770
769 771 def stop_channels(self):
770 772 """Stops all the running channels for this kernel.
771 773 """
772 774 if self.shell_channel.is_alive():
773 775 self.shell_channel.stop()
774 776 if self.sub_channel.is_alive():
775 777 self.sub_channel.stop()
776 778 if self.stdin_channel.is_alive():
777 779 self.stdin_channel.stop()
778 780 if self.hb_channel.is_alive():
779 781 self.hb_channel.stop()
780 782
781 783 @property
782 784 def channels_running(self):
783 785 """Are any of the channels created and running?"""
784 786 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
785 787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
786 788
787 789 #--------------------------------------------------------------------------
788 790 # Kernel process management methods:
789 791 #--------------------------------------------------------------------------
790 792
793 def load_connection_file(self):
794 """load connection info from JSON dict in self.connection_file"""
795 with open(self.connection_file) as f:
796 cfg = json.loads(f.read())
797
798 self.ip = cfg['ip']
799 self.shell_port = cfg['shell_port']
800 self.stdin_port = cfg['stdin_port']
801 self.iopub_port = cfg['iopub_port']
802 self.hb_port = cfg['hb_port']
803 self.session.key = str_to_bytes(cfg['key'])
804
791 805 def write_connection_file(self):
806 """write connection info to JSON dict in self.connection_file"""
792 807 if self._connection_file_written:
793 808 return
794 809 self.connection_file,cfg = write_connection_file(self.connection_file,
795 810 ip=self.ip, key=self.session.key,
796 811 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
797 812 shell_port=self.shell_port, hb_port=self.hb_port)
798 813 # write_connection_file also sets default ports:
799 814 self.shell_port = cfg['shell_port']
800 815 self.stdin_port = cfg['stdin_port']
801 816 self.iopub_port = cfg['iopub_port']
802 817 self.hb_port = cfg['hb_port']
803 818
804 819 self._connection_file_written = True
805 820
806 821 def start_kernel(self, **kw):
807 822 """Starts a kernel process and configures the manager to use it.
808 823
809 824 If random ports (port=0) are being used, this method must be called
810 825 before the channels are created.
811 826
812 827 Parameters:
813 828 -----------
814 829 ipython : bool, optional (default True)
815 830 Whether to use an IPython kernel instead of a plain Python kernel.
816 831
817 832 launcher : callable, optional (default None)
818 833 A custom function for launching the kernel process (generally a
819 834 wrapper around ``entry_point.base_launch_kernel``). In most cases,
820 835 it should not be necessary to use this parameter.
821 836
822 837 **kw : optional
823 838 See respective options for IPython and Python kernels.
824 839 """
825 840 if self.ip not in LOCAL_IPS:
826 841 raise RuntimeError("Can only launch a kernel on a local interface. "
827 842 "Make sure that the '*_address' attributes are "
828 843 "configured properly. "
829 844 "Currently valid addresses are: %s"%LOCAL_IPS
830 845 )
831 846
832 847 # write connection file / get default ports
833 848 self.write_connection_file()
834 849
835 850 self._launch_args = kw.copy()
836 851 launch_kernel = kw.pop('launcher', None)
837 852 if launch_kernel is None:
838 853 if kw.pop('ipython', True):
839 854 from ipkernel import launch_kernel
840 855 else:
841 856 from pykernel import launch_kernel
842 857 self.kernel = launch_kernel(fname=self.connection_file, **kw)
843 858
844 859 def shutdown_kernel(self, restart=False):
845 860 """ Attempts to the stop the kernel process cleanly. If the kernel
846 861 cannot be stopped, it is killed, if possible.
847 862 """
848 863 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
849 864 if sys.platform == 'win32':
850 865 self.kill_kernel()
851 866 return
852 867
853 868 # Pause the heart beat channel if it exists.
854 869 if self._hb_channel is not None:
855 870 self._hb_channel.pause()
856 871
857 872 # Don't send any additional kernel kill messages immediately, to give
858 873 # the kernel a chance to properly execute shutdown actions. Wait for at
859 874 # most 1s, checking every 0.1s.
860 875 self.shell_channel.shutdown(restart=restart)
861 876 for i in range(10):
862 877 if self.is_alive:
863 878 time.sleep(0.1)
864 879 else:
865 880 break
866 881 else:
867 882 # OK, we've waited long enough.
868 883 if self.has_kernel:
869 884 self.kill_kernel()
870 885
871 886 if not restart and self._connection_file_written:
872 887 # cleanup connection files on full shutdown of kernel we started
873 888 self._connection_file_written = False
874 889 try:
875 890 os.remove(self.connection_file)
876 891 except IOError:
877 892 pass
878 893
879 894 def restart_kernel(self, now=False, **kw):
880 895 """Restarts a kernel with the arguments that were used to launch it.
881 896
882 897 If the old kernel was launched with random ports, the same ports will be
883 898 used for the new kernel.
884 899
885 900 Parameters
886 901 ----------
887 902 now : bool, optional
888 903 If True, the kernel is forcefully restarted *immediately*, without
889 904 having a chance to do any cleanup action. Otherwise the kernel is
890 905 given 1s to clean up before a forceful restart is issued.
891 906
892 907 In all cases the kernel is restarted, the only difference is whether
893 908 it is given a chance to perform a clean shutdown or not.
894 909
895 910 **kw : optional
896 911 Any options specified here will replace those used to launch the
897 912 kernel.
898 913 """
899 914 if self._launch_args is None:
900 915 raise RuntimeError("Cannot restart the kernel. "
901 916 "No previous call to 'start_kernel'.")
902 917 else:
903 918 # Stop currently running kernel.
904 919 if self.has_kernel:
905 920 if now:
906 921 self.kill_kernel()
907 922 else:
908 923 self.shutdown_kernel(restart=True)
909 924
910 925 # Start new kernel.
911 926 self._launch_args.update(kw)
912 927 self.start_kernel(**self._launch_args)
913 928
914 929 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
915 930 # unless there is some delay here.
916 931 if sys.platform == 'win32':
917 932 time.sleep(0.2)
918 933
919 934 @property
920 935 def has_kernel(self):
921 936 """Returns whether a kernel process has been specified for the kernel
922 937 manager.
923 938 """
924 939 return self.kernel is not None
925 940
926 941 def kill_kernel(self):
927 942 """ Kill the running kernel. """
928 943 if self.has_kernel:
929 944 # Pause the heart beat channel if it exists.
930 945 if self._hb_channel is not None:
931 946 self._hb_channel.pause()
932 947
933 948 # Attempt to kill the kernel.
934 949 try:
935 950 self.kernel.kill()
936 951 except OSError, e:
937 952 # In Windows, we will get an Access Denied error if the process
938 953 # has already terminated. Ignore it.
939 954 if sys.platform == 'win32':
940 955 if e.winerror != 5:
941 956 raise
942 957 # On Unix, we may get an ESRCH error if the process has already
943 958 # terminated. Ignore it.
944 959 else:
945 960 from errno import ESRCH
946 961 if e.errno != ESRCH:
947 962 raise
948 963 self.kernel = None
949 964 else:
950 965 raise RuntimeError("Cannot kill kernel. No kernel is running!")
951 966
952 967 def interrupt_kernel(self):
953 968 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
954 969 well supported on all platforms.
955 970 """
956 971 if self.has_kernel:
957 972 if sys.platform == 'win32':
958 973 from parentpoller import ParentPollerWindows as Poller
959 974 Poller.send_interrupt(self.kernel.win32_interrupt_event)
960 975 else:
961 976 self.kernel.send_signal(signal.SIGINT)
962 977 else:
963 978 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
964 979
965 980 def signal_kernel(self, signum):
966 981 """ Sends a signal to the kernel. Note that since only SIGTERM is
967 982 supported on Windows, this function is only useful on Unix systems.
968 983 """
969 984 if self.has_kernel:
970 985 self.kernel.send_signal(signum)
971 986 else:
972 987 raise RuntimeError("Cannot signal kernel. No kernel is running!")
973 988
974 989 @property
975 990 def is_alive(self):
976 991 """Is the kernel process still running?"""
977 992 # FIXME: not using a heartbeat means this method is broken for any
978 993 # remote kernel, it's only capable of handling local kernels.
979 994 if self.has_kernel:
980 995 if self.kernel.poll() is None:
981 996 return True
982 997 else:
983 998 return False
984 999 else:
985 1000 # We didn't start the kernel with this KernelManager so we don't
986 1001 # know if it is running. We should use a heartbeat for this case.
987 1002 return True
988 1003
989 1004 #--------------------------------------------------------------------------
990 1005 # Channels used for communication with the kernel:
991 1006 #--------------------------------------------------------------------------
992 1007
993 1008 @property
994 1009 def shell_channel(self):
995 1010 """Get the REQ socket channel object to make requests of the kernel."""
996 1011 if self._shell_channel is None:
997 1012 self._shell_channel = self.shell_channel_class(self.context,
998 1013 self.session,
999 1014 (self.ip, self.shell_port))
1000 1015 return self._shell_channel
1001 1016
1002 1017 @property
1003 1018 def sub_channel(self):
1004 1019 """Get the SUB socket channel object."""
1005 1020 if self._sub_channel is None:
1006 1021 self._sub_channel = self.sub_channel_class(self.context,
1007 1022 self.session,
1008 1023 (self.ip, self.iopub_port))
1009 1024 return self._sub_channel
1010 1025
1011 1026 @property
1012 1027 def stdin_channel(self):
1013 1028 """Get the REP socket channel object to handle stdin (raw_input)."""
1014 1029 if self._stdin_channel is None:
1015 1030 self._stdin_channel = self.stdin_channel_class(self.context,
1016 1031 self.session,
1017 1032 (self.ip, self.stdin_port))
1018 1033 return self._stdin_channel
1019 1034
1020 1035 @property
1021 1036 def hb_channel(self):
1022 1037 """Get the heartbeat socket channel object to check that the
1023 1038 kernel is alive."""
1024 1039 if self._hb_channel is None:
1025 1040 self._hb_channel = self.hb_channel_class(self.context,
1026 1041 self.session,
1027 1042 (self.ip, self.hb_port))
1028 1043 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now