##// END OF EJS Templates
Handle Unix ESRCH errors gracefully in kill_kernel.
epatters -
Show More
@@ -1,920 +1,927 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26 import logging
27 27
28 28 # System library imports.
29 29 import zmq
30 30 from zmq import POLLIN, POLLOUT, POLLERR
31 31 from zmq.eventloop import ioloop
32 32
33 33 # Local imports.
34 34 from IPython.utils import io
35 35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 37 from session import Session, Message
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Constants and exceptions
41 41 #-----------------------------------------------------------------------------
42 42
43 43 class InvalidPortNumber(Exception):
44 44 pass
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Utility functions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # some utilities to validate message structure, these might get moved elsewhere
51 51 # if they prove to have more generic utility
52 52
53 53 def validate_string_list(lst):
54 54 """Validate that the input is a list of strings.
55 55
56 56 Raises ValueError if not."""
57 57 if not isinstance(lst, list):
58 58 raise ValueError('input %r must be a list' % lst)
59 59 for x in lst:
60 60 if not isinstance(x, basestring):
61 61 raise ValueError('element %r in list must be a string' % x)
62 62
63 63
64 64 def validate_string_dict(dct):
65 65 """Validate that the input is a dict with string keys and values.
66 66
67 67 Raises ValueError if not."""
68 68 for k,v in dct.iteritems():
69 69 if not isinstance(k, basestring):
70 70 raise ValueError('key %r in dict must be a string' % k)
71 71 if not isinstance(v, basestring):
72 72 raise ValueError('value %r in dict must be a string' % v)
73 73
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # ZMQ Socket Channel classes
77 77 #-----------------------------------------------------------------------------
78 78
79 79 class ZmqSocketChannel(Thread):
80 80 """The base class for the channels that use ZMQ sockets.
81 81 """
82 82 context = None
83 83 session = None
84 84 socket = None
85 85 ioloop = None
86 86 iostate = None
87 87 _address = None
88 88
89 89 def __init__(self, context, session, address):
90 90 """Create a channel
91 91
92 92 Parameters
93 93 ----------
94 94 context : :class:`zmq.Context`
95 95 The ZMQ context to use.
96 96 session : :class:`session.Session`
97 97 The session to use.
98 98 address : tuple
99 99 Standard (ip, port) tuple that the kernel is listening on.
100 100 """
101 101 super(ZmqSocketChannel, self).__init__()
102 102 self.daemon = True
103 103
104 104 self.context = context
105 105 self.session = session
106 106 if address[1] == 0:
107 107 message = 'The port number for a channel cannot be 0.'
108 108 raise InvalidPortNumber(message)
109 109 self._address = address
110 110
111 111 def stop(self):
112 112 """Stop the channel's activity.
113 113
114 114 This calls :method:`Thread.join` and returns when the thread
115 115 terminates. :class:`RuntimeError` will be raised if
116 116 :method:`self.start` is called again.
117 117 """
118 118 self.join()
119 119
120 120 @property
121 121 def address(self):
122 122 """Get the channel's address as an (ip, port) tuple.
123 123
124 124 By the default, the address is (localhost, 0), where 0 means a random
125 125 port.
126 126 """
127 127 return self._address
128 128
129 129 def add_io_state(self, state):
130 130 """Add IO state to the eventloop.
131 131
132 132 Parameters
133 133 ----------
134 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 135 The IO state flag to set.
136 136
137 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 138 """
139 139 def add_io_state_callback():
140 140 if not self.iostate & state:
141 141 self.iostate = self.iostate | state
142 142 self.ioloop.update_handler(self.socket, self.iostate)
143 143 self.ioloop.add_callback(add_io_state_callback)
144 144
145 145 def drop_io_state(self, state):
146 146 """Drop IO state from the eventloop.
147 147
148 148 Parameters
149 149 ----------
150 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 151 The IO state flag to set.
152 152
153 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 154 """
155 155 def drop_io_state_callback():
156 156 if self.iostate & state:
157 157 self.iostate = self.iostate & (~state)
158 158 self.ioloop.update_handler(self.socket, self.iostate)
159 159 self.ioloop.add_callback(drop_io_state_callback)
160 160
161 161
162 162 class XReqSocketChannel(ZmqSocketChannel):
163 163 """The XREQ channel for issues request/replies to the kernel.
164 164 """
165 165
166 166 command_queue = None
167 167
168 168 def __init__(self, context, session, address):
169 169 super(XReqSocketChannel, self).__init__(context, session, address)
170 170 self.command_queue = Queue()
171 171 self.ioloop = ioloop.IOLoop()
172 172
173 173 def run(self):
174 174 """The thread's main activity. Call start() instead."""
175 175 self.socket = self.context.socket(zmq.XREQ)
176 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 177 self.socket.connect('tcp://%s:%i' % self.address)
178 178 self.iostate = POLLERR|POLLIN
179 179 self.ioloop.add_handler(self.socket, self._handle_events,
180 180 self.iostate)
181 181 self.ioloop.start()
182 182
183 183 def stop(self):
184 184 self.ioloop.stop()
185 185 super(XReqSocketChannel, self).stop()
186 186
187 187 def call_handlers(self, msg):
188 188 """This method is called in the ioloop thread when a message arrives.
189 189
190 190 Subclasses should override this method to handle incoming messages.
191 191 It is important to remember that this method is called in the thread
192 192 so that some logic must be done to ensure that the application leve
193 193 handlers are called in the application thread.
194 194 """
195 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 196
197 197 def execute(self, code, silent=False,
198 198 user_variables=None, user_expressions=None):
199 199 """Execute code in the kernel.
200 200
201 201 Parameters
202 202 ----------
203 203 code : str
204 204 A string of Python code.
205 205
206 206 silent : bool, optional (default False)
207 207 If set, the kernel will execute the code as quietly possible.
208 208
209 209 user_variables : list, optional
210 210 A list of variable names to pull from the user's namespace. They
211 211 will come back as a dict with these names as keys and their
212 212 :func:`repr` as values.
213 213
214 214 user_expressions : dict, optional
215 215 A dict with string keys and to pull from the user's
216 216 namespace. They will come back as a dict with these names as keys
217 217 and their :func:`repr` as values.
218 218
219 219 Returns
220 220 -------
221 221 The msg_id of the message sent.
222 222 """
223 223 if user_variables is None:
224 224 user_variables = []
225 225 if user_expressions is None:
226 226 user_expressions = {}
227 227
228 228 # Don't waste network traffic if inputs are invalid
229 229 if not isinstance(code, basestring):
230 230 raise ValueError('code %r must be a string' % code)
231 231 validate_string_list(user_variables)
232 232 validate_string_dict(user_expressions)
233 233
234 234 # Create class for content/msg creation. Related to, but possibly
235 235 # not in Session.
236 236 content = dict(code=code, silent=silent,
237 237 user_variables=user_variables,
238 238 user_expressions=user_expressions)
239 239 msg = self.session.msg('execute_request', content)
240 240 self._queue_request(msg)
241 241 return msg['header']['msg_id']
242 242
243 243 def complete(self, text, line, cursor_pos, block=None):
244 244 """Tab complete text in the kernel's namespace.
245 245
246 246 Parameters
247 247 ----------
248 248 text : str
249 249 The text to complete.
250 250 line : str
251 251 The full line of text that is the surrounding context for the
252 252 text to complete.
253 253 cursor_pos : int
254 254 The position of the cursor in the line where the completion was
255 255 requested.
256 256 block : str, optional
257 257 The full block of code in which the completion is being requested.
258 258
259 259 Returns
260 260 -------
261 261 The msg_id of the message sent.
262 262 """
263 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 264 msg = self.session.msg('complete_request', content)
265 265 self._queue_request(msg)
266 266 return msg['header']['msg_id']
267 267
268 268 def object_info(self, oname):
269 269 """Get metadata information about an object.
270 270
271 271 Parameters
272 272 ----------
273 273 oname : str
274 274 A string specifying the object name.
275 275
276 276 Returns
277 277 -------
278 278 The msg_id of the message sent.
279 279 """
280 280 content = dict(oname=oname)
281 281 msg = self.session.msg('object_info_request', content)
282 282 self._queue_request(msg)
283 283 return msg['header']['msg_id']
284 284
285 285 def history_tail(self, n=10, raw=True, output=False):
286 286 """Get the history list.
287 287
288 288 Parameters
289 289 ----------
290 290 n : int
291 291 The number of lines of history to get.
292 292 raw : bool
293 293 If True, return the raw input.
294 294 output : bool
295 295 If True, then return the output as well.
296 296
297 297 Returns
298 298 -------
299 299 The msg_id of the message sent.
300 300 """
301 301 content = dict(n=n, raw=raw, output=output)
302 302 msg = self.session.msg('history_tail_request', content)
303 303 self._queue_request(msg)
304 304 return msg['header']['msg_id']
305 305
306 306 def shutdown(self, restart=False):
307 307 """Request an immediate kernel shutdown.
308 308
309 309 Upon receipt of the (empty) reply, client code can safely assume that
310 310 the kernel has shut down and it's safe to forcefully terminate it if
311 311 it's still alive.
312 312
313 313 The kernel will send the reply via a function registered with Python's
314 314 atexit module, ensuring it's truly done as the kernel is done with all
315 315 normal operation.
316 316 """
317 317 # Send quit message to kernel. Once we implement kernel-side setattr,
318 318 # this should probably be done that way, but for now this will do.
319 319 msg = self.session.msg('shutdown_request', {'restart':restart})
320 320 self._queue_request(msg)
321 321 return msg['header']['msg_id']
322 322
323 323 def _handle_events(self, socket, events):
324 324 if events & POLLERR:
325 325 self._handle_err()
326 326 if events & POLLOUT:
327 327 self._handle_send()
328 328 if events & POLLIN:
329 329 self._handle_recv()
330 330
331 331 def _handle_recv(self):
332 332 ident,msg = self.session.recv(self.socket, 0)
333 333 self.call_handlers(msg)
334 334
335 335 def _handle_send(self):
336 336 try:
337 337 msg = self.command_queue.get(False)
338 338 except Empty:
339 339 pass
340 340 else:
341 341 self.session.send(self.socket,msg)
342 342 if self.command_queue.empty():
343 343 self.drop_io_state(POLLOUT)
344 344
345 345 def _handle_err(self):
346 346 # We don't want to let this go silently, so eventually we should log.
347 347 raise zmq.ZMQError()
348 348
349 349 def _queue_request(self, msg):
350 350 self.command_queue.put(msg)
351 351 self.add_io_state(POLLOUT)
352 352
353 353
354 354 class SubSocketChannel(ZmqSocketChannel):
355 355 """The SUB channel which listens for messages that the kernel publishes.
356 356 """
357 357
358 358 def __init__(self, context, session, address):
359 359 super(SubSocketChannel, self).__init__(context, session, address)
360 360 self.ioloop = ioloop.IOLoop()
361 361
362 362 def run(self):
363 363 """The thread's main activity. Call start() instead."""
364 364 self.socket = self.context.socket(zmq.SUB)
365 365 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 366 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 367 self.socket.connect('tcp://%s:%i' % self.address)
368 368 self.iostate = POLLIN|POLLERR
369 369 self.ioloop.add_handler(self.socket, self._handle_events,
370 370 self.iostate)
371 371 self.ioloop.start()
372 372
373 373 def stop(self):
374 374 self.ioloop.stop()
375 375 super(SubSocketChannel, self).stop()
376 376
377 377 def call_handlers(self, msg):
378 378 """This method is called in the ioloop thread when a message arrives.
379 379
380 380 Subclasses should override this method to handle incoming messages.
381 381 It is important to remember that this method is called in the thread
382 382 so that some logic must be done to ensure that the application leve
383 383 handlers are called in the application thread.
384 384 """
385 385 raise NotImplementedError('call_handlers must be defined in a subclass.')
386 386
387 387 def flush(self, timeout=1.0):
388 388 """Immediately processes all pending messages on the SUB channel.
389 389
390 390 Callers should use this method to ensure that :method:`call_handlers`
391 391 has been called for all messages that have been received on the
392 392 0MQ SUB socket of this channel.
393 393
394 394 This method is thread safe.
395 395
396 396 Parameters
397 397 ----------
398 398 timeout : float, optional
399 399 The maximum amount of time to spend flushing, in seconds. The
400 400 default is one second.
401 401 """
402 402 # We do the IOLoop callback process twice to ensure that the IOLoop
403 403 # gets to perform at least one full poll.
404 404 stop_time = time.time() + timeout
405 405 for i in xrange(2):
406 406 self._flushed = False
407 407 self.ioloop.add_callback(self._flush)
408 408 while not self._flushed and time.time() < stop_time:
409 409 time.sleep(0.01)
410 410
411 411 def _handle_events(self, socket, events):
412 412 # Turn on and off POLLOUT depending on if we have made a request
413 413 if events & POLLERR:
414 414 self._handle_err()
415 415 if events & POLLIN:
416 416 self._handle_recv()
417 417
418 418 def _handle_err(self):
419 419 # We don't want to let this go silently, so eventually we should log.
420 420 raise zmq.ZMQError()
421 421
422 422 def _handle_recv(self):
423 423 # Get all of the messages we can
424 424 while True:
425 425 try:
426 426 ident,msg = self.session.recv(self.socket)
427 427 except zmq.ZMQError:
428 428 # Check the errno?
429 429 # Will this trigger POLLERR?
430 430 break
431 431 else:
432 432 if msg is None:
433 433 break
434 434 self.call_handlers(msg)
435 435
436 436 def _flush(self):
437 437 """Callback for :method:`self.flush`."""
438 438 self._flushed = True
439 439
440 440
441 441 class RepSocketChannel(ZmqSocketChannel):
442 442 """A reply channel to handle raw_input requests that the kernel makes."""
443 443
444 444 msg_queue = None
445 445
446 446 def __init__(self, context, session, address):
447 447 super(RepSocketChannel, self).__init__(context, session, address)
448 448 self.ioloop = ioloop.IOLoop()
449 449 self.msg_queue = Queue()
450 450
451 451 def run(self):
452 452 """The thread's main activity. Call start() instead."""
453 453 self.socket = self.context.socket(zmq.XREQ)
454 454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 455 self.socket.connect('tcp://%s:%i' % self.address)
456 456 self.iostate = POLLERR|POLLIN
457 457 self.ioloop.add_handler(self.socket, self._handle_events,
458 458 self.iostate)
459 459 self.ioloop.start()
460 460
461 461 def stop(self):
462 462 self.ioloop.stop()
463 463 super(RepSocketChannel, self).stop()
464 464
465 465 def call_handlers(self, msg):
466 466 """This method is called in the ioloop thread when a message arrives.
467 467
468 468 Subclasses should override this method to handle incoming messages.
469 469 It is important to remember that this method is called in the thread
470 470 so that some logic must be done to ensure that the application leve
471 471 handlers are called in the application thread.
472 472 """
473 473 raise NotImplementedError('call_handlers must be defined in a subclass.')
474 474
475 475 def input(self, string):
476 476 """Send a string of raw input to the kernel."""
477 477 content = dict(value=string)
478 478 msg = self.session.msg('input_reply', content)
479 479 self._queue_reply(msg)
480 480
481 481 def _handle_events(self, socket, events):
482 482 if events & POLLERR:
483 483 self._handle_err()
484 484 if events & POLLOUT:
485 485 self._handle_send()
486 486 if events & POLLIN:
487 487 self._handle_recv()
488 488
489 489 def _handle_recv(self):
490 490 ident,msg = self.session.recv(self.socket, 0)
491 491 self.call_handlers(msg)
492 492
493 493 def _handle_send(self):
494 494 try:
495 495 msg = self.msg_queue.get(False)
496 496 except Empty:
497 497 pass
498 498 else:
499 499 self.session.send(self.socket,msg)
500 500 if self.msg_queue.empty():
501 501 self.drop_io_state(POLLOUT)
502 502
503 503 def _handle_err(self):
504 504 # We don't want to let this go silently, so eventually we should log.
505 505 raise zmq.ZMQError()
506 506
507 507 def _queue_reply(self, msg):
508 508 self.msg_queue.put(msg)
509 509 self.add_io_state(POLLOUT)
510 510
511 511
512 512 class HBSocketChannel(ZmqSocketChannel):
513 513 """The heartbeat channel which monitors the kernel heartbeat.
514 514
515 515 Note that the heartbeat channel is paused by default. As long as you start
516 516 this channel, the kernel manager will ensure that it is paused and un-paused
517 517 as appropriate.
518 518 """
519 519
520 520 time_to_dead = 3.0
521 521 socket = None
522 522 poller = None
523 523 _running = None
524 524 _pause = None
525 525
526 526 def __init__(self, context, session, address):
527 527 super(HBSocketChannel, self).__init__(context, session, address)
528 528 self._running = False
529 529 self._pause = True
530 530
531 531 def _create_socket(self):
532 532 self.socket = self.context.socket(zmq.REQ)
533 533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
534 534 self.socket.connect('tcp://%s:%i' % self.address)
535 535 self.poller = zmq.Poller()
536 536 self.poller.register(self.socket, zmq.POLLIN)
537 537
538 538 def run(self):
539 539 """The thread's main activity. Call start() instead."""
540 540 self._create_socket()
541 541 self._running = True
542 542 while self._running:
543 543 if self._pause:
544 544 time.sleep(self.time_to_dead)
545 545 else:
546 546 since_last_heartbeat = 0.0
547 547 request_time = time.time()
548 548 try:
549 549 #io.rprint('Ping from HB channel') # dbg
550 550 self.socket.send(b'ping')
551 551 except zmq.ZMQError, e:
552 552 #io.rprint('*** HB Error:', e) # dbg
553 553 if e.errno == zmq.EFSM:
554 554 #io.rprint('sleep...', self.time_to_dead) # dbg
555 555 time.sleep(self.time_to_dead)
556 556 self._create_socket()
557 557 else:
558 558 raise
559 559 else:
560 560 while True:
561 561 try:
562 562 self.socket.recv(zmq.NOBLOCK)
563 563 except zmq.ZMQError, e:
564 564 #io.rprint('*** HB Error 2:', e) # dbg
565 565 if e.errno == zmq.EAGAIN:
566 566 before_poll = time.time()
567 567 until_dead = self.time_to_dead - (before_poll -
568 568 request_time)
569 569
570 570 # When the return value of poll() is an empty
571 571 # list, that is when things have gone wrong
572 572 # (zeromq bug). As long as it is not an empty
573 573 # list, poll is working correctly even if it
574 574 # returns quickly. Note: poll timeout is in
575 575 # milliseconds.
576 576 self.poller.poll(1000*until_dead)
577 577
578 578 since_last_heartbeat = time.time()-request_time
579 579 if since_last_heartbeat > self.time_to_dead:
580 580 self.call_handlers(since_last_heartbeat)
581 581 break
582 582 else:
583 583 # FIXME: We should probably log this instead.
584 584 raise
585 585 else:
586 586 until_dead = self.time_to_dead - (time.time() -
587 587 request_time)
588 588 if until_dead > 0.0:
589 589 #io.rprint('sleep...', self.time_to_dead) # dbg
590 590 time.sleep(until_dead)
591 591 break
592 592
593 593 def pause(self):
594 594 """Pause the heartbeat."""
595 595 self._pause = True
596 596
597 597 def unpause(self):
598 598 """Unpause the heartbeat."""
599 599 self._pause = False
600 600
601 601 def is_beating(self):
602 602 """Is the heartbeat running and not paused."""
603 603 if self.is_alive() and not self._pause:
604 604 return True
605 605 else:
606 606 return False
607 607
608 608 def stop(self):
609 609 self._running = False
610 610 super(HBSocketChannel, self).stop()
611 611
612 612 def call_handlers(self, since_last_heartbeat):
613 613 """This method is called in the ioloop thread when a message arrives.
614 614
615 615 Subclasses should override this method to handle incoming messages.
616 616 It is important to remember that this method is called in the thread
617 617 so that some logic must be done to ensure that the application leve
618 618 handlers are called in the application thread.
619 619 """
620 620 raise NotImplementedError('call_handlers must be defined in a subclass.')
621 621
622 622
623 623 #-----------------------------------------------------------------------------
624 624 # Main kernel manager class
625 625 #-----------------------------------------------------------------------------
626 626
627 627 class KernelManager(HasTraits):
628 628 """ Manages a kernel for a frontend.
629 629
630 630 The SUB channel is for the frontend to receive messages published by the
631 631 kernel.
632 632
633 633 The REQ channel is for the frontend to make requests of the kernel.
634 634
635 635 The REP channel is for the kernel to request stdin (raw_input) from the
636 636 frontend.
637 637 """
638 638 # The PyZMQ Context to use for communication with the kernel.
639 639 context = Instance(zmq.Context,(),{})
640 640
641 641 # The Session to use for communication with the kernel.
642 642 session = Instance(Session,(),{})
643 643
644 644 # The kernel process with which the KernelManager is communicating.
645 645 kernel = Instance(Popen)
646 646
647 647 # The addresses for the communication channels.
648 648 xreq_address = TCPAddress((LOCALHOST, 0))
649 649 sub_address = TCPAddress((LOCALHOST, 0))
650 650 rep_address = TCPAddress((LOCALHOST, 0))
651 651 hb_address = TCPAddress((LOCALHOST, 0))
652 652
653 653 # The classes to use for the various channels.
654 654 xreq_channel_class = Type(XReqSocketChannel)
655 655 sub_channel_class = Type(SubSocketChannel)
656 656 rep_channel_class = Type(RepSocketChannel)
657 657 hb_channel_class = Type(HBSocketChannel)
658 658
659 659 # Protected traits.
660 660 _launch_args = Any
661 661 _xreq_channel = Any
662 662 _sub_channel = Any
663 663 _rep_channel = Any
664 664 _hb_channel = Any
665 665
666 666 def __init__(self, **kwargs):
667 667 super(KernelManager, self).__init__(**kwargs)
668 668 # Uncomment this to try closing the context.
669 669 # atexit.register(self.context.close)
670 670
671 671 #--------------------------------------------------------------------------
672 672 # Channel management methods:
673 673 #--------------------------------------------------------------------------
674 674
675 675 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
676 676 """Starts the channels for this kernel.
677 677
678 678 This will create the channels if they do not exist and then start
679 679 them. If port numbers of 0 are being used (random ports) then you
680 680 must first call :method:`start_kernel`. If the channels have been
681 681 stopped and you call this, :class:`RuntimeError` will be raised.
682 682 """
683 683 if xreq:
684 684 self.xreq_channel.start()
685 685 if sub:
686 686 self.sub_channel.start()
687 687 if rep:
688 688 self.rep_channel.start()
689 689 if hb:
690 690 self.hb_channel.start()
691 691
692 692 def stop_channels(self):
693 693 """Stops all the running channels for this kernel.
694 694 """
695 695 if self.xreq_channel.is_alive():
696 696 self.xreq_channel.stop()
697 697 if self.sub_channel.is_alive():
698 698 self.sub_channel.stop()
699 699 if self.rep_channel.is_alive():
700 700 self.rep_channel.stop()
701 701 if self.hb_channel.is_alive():
702 702 self.hb_channel.stop()
703 703
704 704 @property
705 705 def channels_running(self):
706 706 """Are any of the channels created and running?"""
707 707 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
708 708 self.rep_channel.is_alive() or self.hb_channel.is_alive())
709 709
710 710 #--------------------------------------------------------------------------
711 711 # Kernel process management methods:
712 712 #--------------------------------------------------------------------------
713 713
714 714 def start_kernel(self, **kw):
715 715 """Starts a kernel process and configures the manager to use it.
716 716
717 717 If random ports (port=0) are being used, this method must be called
718 718 before the channels are created.
719 719
720 720 Parameters:
721 721 -----------
722 722 ipython : bool, optional (default True)
723 723 Whether to use an IPython kernel instead of a plain Python kernel.
724 724
725 725 **kw : optional
726 726 See respective options for IPython and Python kernels.
727 727 """
728 728 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
729 729 self.rep_address, self.hb_address
730 730 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
731 731 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
732 732 raise RuntimeError("Can only launch a kernel on a local interface. "
733 733 "Make sure that the '*_address' attributes are "
734 734 "configured properly. "
735 735 "Currently valid addresses are: %s"%LOCAL_IPS
736 736 )
737 737
738 738 self._launch_args = kw.copy()
739 739 if kw.pop('ipython', True):
740 740 from ipkernel import launch_kernel
741 741 else:
742 742 from pykernel import launch_kernel
743 743 self.kernel, xrep, pub, req, _hb = launch_kernel(
744 744 xrep_port=xreq[1], pub_port=sub[1],
745 745 req_port=rep[1], hb_port=hb[1], **kw)
746 746 self.xreq_address = (xreq[0], xrep)
747 747 self.sub_address = (sub[0], pub)
748 748 self.rep_address = (rep[0], req)
749 749 self.hb_address = (hb[0], _hb)
750 750
751 751 def shutdown_kernel(self, restart=False):
752 752 """ Attempts to the stop the kernel process cleanly. If the kernel
753 753 cannot be stopped, it is killed, if possible.
754 754 """
755 755 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
756 756 if sys.platform == 'win32':
757 757 self.kill_kernel()
758 758 return
759 759
760 760 # Pause the heart beat channel if it exists.
761 761 if self._hb_channel is not None:
762 762 self._hb_channel.pause()
763 763
764 764 # Don't send any additional kernel kill messages immediately, to give
765 765 # the kernel a chance to properly execute shutdown actions. Wait for at
766 766 # most 1s, checking every 0.1s.
767 767 self.xreq_channel.shutdown(restart=restart)
768 768 for i in range(10):
769 769 if self.is_alive:
770 770 time.sleep(0.1)
771 771 else:
772 772 break
773 773 else:
774 774 # OK, we've waited long enough.
775 775 if self.has_kernel:
776 776 self.kill_kernel()
777 777
778 778 def restart_kernel(self, now=False, **kw):
779 779 """Restarts a kernel with the arguments that were used to launch it.
780 780
781 781 If the old kernel was launched with random ports, the same ports will be
782 782 used for the new kernel.
783 783
784 784 Parameters
785 785 ----------
786 786 now : bool, optional
787 787 If True, the kernel is forcefully restarted *immediately*, without
788 788 having a chance to do any cleanup action. Otherwise the kernel is
789 789 given 1s to clean up before a forceful restart is issued.
790 790
791 791 In all cases the kernel is restarted, the only difference is whether
792 792 it is given a chance to perform a clean shutdown or not.
793 793
794 794 **kw : optional
795 795 Any options specified here will replace those used to launch the
796 796 kernel.
797 797 """
798 798 if self._launch_args is None:
799 799 raise RuntimeError("Cannot restart the kernel. "
800 800 "No previous call to 'start_kernel'.")
801 801 else:
802 802 # Stop currently running kernel.
803 803 if self.has_kernel:
804 804 if now:
805 805 self.kill_kernel()
806 806 else:
807 807 self.shutdown_kernel(restart=True)
808 808
809 809 # Start new kernel.
810 810 self._launch_args.update(kw)
811 811 self.start_kernel(**self._launch_args)
812 812
813 813 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
814 814 # unless there is some delay here.
815 815 if sys.platform == 'win32':
816 816 time.sleep(0.2)
817 817
818 818 @property
819 819 def has_kernel(self):
820 820 """Returns whether a kernel process has been specified for the kernel
821 821 manager.
822 822 """
823 823 return self.kernel is not None
824 824
825 825 def kill_kernel(self):
826 826 """ Kill the running kernel. """
827 827 if self.has_kernel:
828 828 # Pause the heart beat channel if it exists.
829 829 if self._hb_channel is not None:
830 830 self._hb_channel.pause()
831 831
832 832 # Attempt to kill the kernel.
833 833 try:
834 834 self.kernel.kill()
835 835 except OSError, e:
836 836 # In Windows, we will get an Access Denied error if the process
837 837 # has already terminated. Ignore it.
838 if not (sys.platform == 'win32' and e.winerror == 5):
839 raise
838 if sys.platform == 'win32':
839 if e.winerror != 5:
840 raise
841 # On Unix, we may get an ESRCH error if the process has already
842 # terminated. Ignore it.
843 else:
844 from errno import ESRCH
845 if e.errno != ESRCH:
846 raise
840 847 self.kernel = None
841 848 else:
842 849 raise RuntimeError("Cannot kill kernel. No kernel is running!")
843 850
844 851 def interrupt_kernel(self):
845 852 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
846 853 well supported on all platforms.
847 854 """
848 855 if self.has_kernel:
849 856 if sys.platform == 'win32':
850 857 from parentpoller import ParentPollerWindows as Poller
851 858 Poller.send_interrupt(self.kernel.win32_interrupt_event)
852 859 else:
853 860 self.kernel.send_signal(signal.SIGINT)
854 861 else:
855 862 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
856 863
857 864 def signal_kernel(self, signum):
858 865 """ Sends a signal to the kernel. Note that since only SIGTERM is
859 866 supported on Windows, this function is only useful on Unix systems.
860 867 """
861 868 if self.has_kernel:
862 869 self.kernel.send_signal(signum)
863 870 else:
864 871 raise RuntimeError("Cannot signal kernel. No kernel is running!")
865 872
866 873 @property
867 874 def is_alive(self):
868 875 """Is the kernel process still running?"""
869 876 # FIXME: not using a heartbeat means this method is broken for any
870 877 # remote kernel, it's only capable of handling local kernels.
871 878 if self.has_kernel:
872 879 if self.kernel.poll() is None:
873 880 return True
874 881 else:
875 882 return False
876 883 else:
877 884 # We didn't start the kernel with this KernelManager so we don't
878 885 # know if it is running. We should use a heartbeat for this case.
879 886 return True
880 887
881 888 #--------------------------------------------------------------------------
882 889 # Channels used for communication with the kernel:
883 890 #--------------------------------------------------------------------------
884 891
885 892 @property
886 893 def xreq_channel(self):
887 894 """Get the REQ socket channel object to make requests of the kernel."""
888 895 if self._xreq_channel is None:
889 896 self._xreq_channel = self.xreq_channel_class(self.context,
890 897 self.session,
891 898 self.xreq_address)
892 899 return self._xreq_channel
893 900
894 901 @property
895 902 def sub_channel(self):
896 903 """Get the SUB socket channel object."""
897 904 if self._sub_channel is None:
898 905 self._sub_channel = self.sub_channel_class(self.context,
899 906 self.session,
900 907 self.sub_address)
901 908 return self._sub_channel
902 909
903 910 @property
904 911 def rep_channel(self):
905 912 """Get the REP socket channel object to handle stdin (raw_input)."""
906 913 if self._rep_channel is None:
907 914 self._rep_channel = self.rep_channel_class(self.context,
908 915 self.session,
909 916 self.rep_address)
910 917 return self._rep_channel
911 918
912 919 @property
913 920 def hb_channel(self):
914 921 """Get the heartbeat socket channel object to check that the
915 922 kernel is alive."""
916 923 if self._hb_channel is None:
917 924 self._hb_channel = self.hb_channel_class(self.context,
918 925 self.session,
919 926 self.hb_address)
920 927 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now