##// END OF EJS Templates
ignore EINTR in channel loops...
MinRK -
Show More
@@ -1,945 +1,968 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 import errno
20 21 from Queue import Queue, Empty
21 22 from subprocess import Popen
22 23 import signal
23 24 import sys
24 25 from threading import Thread
25 26 import time
26 27 import logging
27 28
28 29 # System library imports.
29 30 import zmq
30 31 from zmq import POLLIN, POLLOUT, POLLERR
31 32 from zmq.eventloop import ioloop
32 33
33 34 # Local imports.
34 35 from IPython.utils import io
35 36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 37 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 38 from session import Session, Message
38 39
39 40 #-----------------------------------------------------------------------------
40 41 # Constants and exceptions
41 42 #-----------------------------------------------------------------------------
42 43
43 44 class InvalidPortNumber(Exception):
44 45 pass
45 46
46 47 #-----------------------------------------------------------------------------
47 48 # Utility functions
48 49 #-----------------------------------------------------------------------------
49 50
50 51 # some utilities to validate message structure, these might get moved elsewhere
51 52 # if they prove to have more generic utility
52 53
53 54 def validate_string_list(lst):
54 55 """Validate that the input is a list of strings.
55 56
56 57 Raises ValueError if not."""
57 58 if not isinstance(lst, list):
58 59 raise ValueError('input %r must be a list' % lst)
59 60 for x in lst:
60 61 if not isinstance(x, basestring):
61 62 raise ValueError('element %r in list must be a string' % x)
62 63
63 64
64 65 def validate_string_dict(dct):
65 66 """Validate that the input is a dict with string keys and values.
66 67
67 68 Raises ValueError if not."""
68 69 for k,v in dct.iteritems():
69 70 if not isinstance(k, basestring):
70 71 raise ValueError('key %r in dict must be a string' % k)
71 72 if not isinstance(v, basestring):
72 73 raise ValueError('value %r in dict must be a string' % v)
73 74
74 75
75 76 #-----------------------------------------------------------------------------
76 77 # ZMQ Socket Channel classes
77 78 #-----------------------------------------------------------------------------
78 79
79 80 class ZmqSocketChannel(Thread):
80 81 """The base class for the channels that use ZMQ sockets.
81 82 """
82 83 context = None
83 84 session = None
84 85 socket = None
85 86 ioloop = None
86 87 iostate = None
87 88 _address = None
88 89
89 90 def __init__(self, context, session, address):
90 91 """Create a channel
91 92
92 93 Parameters
93 94 ----------
94 95 context : :class:`zmq.Context`
95 96 The ZMQ context to use.
96 97 session : :class:`session.Session`
97 98 The session to use.
98 99 address : tuple
99 100 Standard (ip, port) tuple that the kernel is listening on.
100 101 """
101 102 super(ZmqSocketChannel, self).__init__()
102 103 self.daemon = True
103 104
104 105 self.context = context
105 106 self.session = session
106 107 if address[1] == 0:
107 108 message = 'The port number for a channel cannot be 0.'
108 109 raise InvalidPortNumber(message)
109 110 self._address = address
110 111
112 def _run_loop(self):
113 """Run my loop, ignoring EINTR events in the poller"""
114 while True:
115 try:
116 self.ioloop.start()
117 except zmq.ZMQError as e:
118 if e.errno == errno.EINTR:
119 continue
120 else:
121 raise
122 else:
123 break
124
111 125 def stop(self):
112 126 """Stop the channel's activity.
113 127
114 128 This calls :method:`Thread.join` and returns when the thread
115 129 terminates. :class:`RuntimeError` will be raised if
116 130 :method:`self.start` is called again.
117 131 """
118 132 self.join()
119 133
120 134 @property
121 135 def address(self):
122 136 """Get the channel's address as an (ip, port) tuple.
123 137
124 138 By the default, the address is (localhost, 0), where 0 means a random
125 139 port.
126 140 """
127 141 return self._address
128 142
129 143 def add_io_state(self, state):
130 144 """Add IO state to the eventloop.
131 145
132 146 Parameters
133 147 ----------
134 148 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 149 The IO state flag to set.
136 150
137 151 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 152 """
139 153 def add_io_state_callback():
140 154 if not self.iostate & state:
141 155 self.iostate = self.iostate | state
142 156 self.ioloop.update_handler(self.socket, self.iostate)
143 157 self.ioloop.add_callback(add_io_state_callback)
144 158
145 159 def drop_io_state(self, state):
146 160 """Drop IO state from the eventloop.
147 161
148 162 Parameters
149 163 ----------
150 164 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 165 The IO state flag to set.
152 166
153 167 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 168 """
155 169 def drop_io_state_callback():
156 170 if self.iostate & state:
157 171 self.iostate = self.iostate & (~state)
158 172 self.ioloop.update_handler(self.socket, self.iostate)
159 173 self.ioloop.add_callback(drop_io_state_callback)
160 174
161 175
162 176 class XReqSocketChannel(ZmqSocketChannel):
163 177 """The XREQ channel for issues request/replies to the kernel.
164 178 """
165 179
166 180 command_queue = None
167 181
168 182 def __init__(self, context, session, address):
169 183 super(XReqSocketChannel, self).__init__(context, session, address)
170 184 self.command_queue = Queue()
171 185 self.ioloop = ioloop.IOLoop()
172 186
173 187 def run(self):
174 188 """The thread's main activity. Call start() instead."""
175 189 self.socket = self.context.socket(zmq.XREQ)
176 190 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 191 self.socket.connect('tcp://%s:%i' % self.address)
178 192 self.iostate = POLLERR|POLLIN
179 193 self.ioloop.add_handler(self.socket, self._handle_events,
180 194 self.iostate)
181 self.ioloop.start()
195 self._run_loop()
182 196
183 197 def stop(self):
184 198 self.ioloop.stop()
185 199 super(XReqSocketChannel, self).stop()
186 200
187 201 def call_handlers(self, msg):
188 202 """This method is called in the ioloop thread when a message arrives.
189 203
190 204 Subclasses should override this method to handle incoming messages.
191 205 It is important to remember that this method is called in the thread
192 206 so that some logic must be done to ensure that the application leve
193 207 handlers are called in the application thread.
194 208 """
195 209 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 210
197 211 def execute(self, code, silent=False,
198 212 user_variables=None, user_expressions=None):
199 213 """Execute code in the kernel.
200 214
201 215 Parameters
202 216 ----------
203 217 code : str
204 218 A string of Python code.
205 219
206 220 silent : bool, optional (default False)
207 221 If set, the kernel will execute the code as quietly possible.
208 222
209 223 user_variables : list, optional
210 224 A list of variable names to pull from the user's namespace. They
211 225 will come back as a dict with these names as keys and their
212 226 :func:`repr` as values.
213 227
214 228 user_expressions : dict, optional
215 229 A dict with string keys and to pull from the user's
216 230 namespace. They will come back as a dict with these names as keys
217 231 and their :func:`repr` as values.
218 232
219 233 Returns
220 234 -------
221 235 The msg_id of the message sent.
222 236 """
223 237 if user_variables is None:
224 238 user_variables = []
225 239 if user_expressions is None:
226 240 user_expressions = {}
227 241
228 242 # Don't waste network traffic if inputs are invalid
229 243 if not isinstance(code, basestring):
230 244 raise ValueError('code %r must be a string' % code)
231 245 validate_string_list(user_variables)
232 246 validate_string_dict(user_expressions)
233 247
234 248 # Create class for content/msg creation. Related to, but possibly
235 249 # not in Session.
236 250 content = dict(code=code, silent=silent,
237 251 user_variables=user_variables,
238 252 user_expressions=user_expressions)
239 253 msg = self.session.msg('execute_request', content)
240 254 self._queue_request(msg)
241 255 return msg['header']['msg_id']
242 256
243 257 def complete(self, text, line, cursor_pos, block=None):
244 258 """Tab complete text in the kernel's namespace.
245 259
246 260 Parameters
247 261 ----------
248 262 text : str
249 263 The text to complete.
250 264 line : str
251 265 The full line of text that is the surrounding context for the
252 266 text to complete.
253 267 cursor_pos : int
254 268 The position of the cursor in the line where the completion was
255 269 requested.
256 270 block : str, optional
257 271 The full block of code in which the completion is being requested.
258 272
259 273 Returns
260 274 -------
261 275 The msg_id of the message sent.
262 276 """
263 277 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 278 msg = self.session.msg('complete_request', content)
265 279 self._queue_request(msg)
266 280 return msg['header']['msg_id']
267 281
268 282 def object_info(self, oname):
269 283 """Get metadata information about an object.
270 284
271 285 Parameters
272 286 ----------
273 287 oname : str
274 288 A string specifying the object name.
275 289
276 290 Returns
277 291 -------
278 292 The msg_id of the message sent.
279 293 """
280 294 content = dict(oname=oname)
281 295 msg = self.session.msg('object_info_request', content)
282 296 self._queue_request(msg)
283 297 return msg['header']['msg_id']
284 298
285 299 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
286 300 """Get entries from the history list.
287 301
288 302 Parameters
289 303 ----------
290 304 raw : bool
291 305 If True, return the raw input.
292 306 output : bool
293 307 If True, then return the output as well.
294 308 hist_access_type : str
295 309 'range' (fill in session, start and stop params), 'tail' (fill in n)
296 310 or 'search' (fill in pattern param).
297 311
298 312 session : int
299 313 For a range request, the session from which to get lines. Session
300 314 numbers are positive integers; negative ones count back from the
301 315 current session.
302 316 start : int
303 317 The first line number of a history range.
304 318 stop : int
305 319 The final (excluded) line number of a history range.
306 320
307 321 n : int
308 322 The number of lines of history to get for a tail request.
309 323
310 324 pattern : str
311 325 The glob-syntax pattern for a search request.
312 326
313 327 Returns
314 328 -------
315 329 The msg_id of the message sent.
316 330 """
317 331 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
318 332 **kwargs)
319 333 msg = self.session.msg('history_request', content)
320 334 self._queue_request(msg)
321 335 return msg['header']['msg_id']
322 336
323 337 def shutdown(self, restart=False):
324 338 """Request an immediate kernel shutdown.
325 339
326 340 Upon receipt of the (empty) reply, client code can safely assume that
327 341 the kernel has shut down and it's safe to forcefully terminate it if
328 342 it's still alive.
329 343
330 344 The kernel will send the reply via a function registered with Python's
331 345 atexit module, ensuring it's truly done as the kernel is done with all
332 346 normal operation.
333 347 """
334 348 # Send quit message to kernel. Once we implement kernel-side setattr,
335 349 # this should probably be done that way, but for now this will do.
336 350 msg = self.session.msg('shutdown_request', {'restart':restart})
337 351 self._queue_request(msg)
338 352 return msg['header']['msg_id']
339 353
340 354 def _handle_events(self, socket, events):
341 355 if events & POLLERR:
342 356 self._handle_err()
343 357 if events & POLLOUT:
344 358 self._handle_send()
345 359 if events & POLLIN:
346 360 self._handle_recv()
347 361
348 362 def _handle_recv(self):
349 363 ident,msg = self.session.recv(self.socket, 0)
350 364 self.call_handlers(msg)
351 365
352 366 def _handle_send(self):
353 367 try:
354 368 msg = self.command_queue.get(False)
355 369 except Empty:
356 370 pass
357 371 else:
358 372 self.session.send(self.socket,msg)
359 373 if self.command_queue.empty():
360 374 self.drop_io_state(POLLOUT)
361 375
362 376 def _handle_err(self):
363 377 # We don't want to let this go silently, so eventually we should log.
364 378 raise zmq.ZMQError()
365 379
366 380 def _queue_request(self, msg):
367 381 self.command_queue.put(msg)
368 382 self.add_io_state(POLLOUT)
369 383
370 384
371 385 class SubSocketChannel(ZmqSocketChannel):
372 386 """The SUB channel which listens for messages that the kernel publishes.
373 387 """
374 388
375 389 def __init__(self, context, session, address):
376 390 super(SubSocketChannel, self).__init__(context, session, address)
377 391 self.ioloop = ioloop.IOLoop()
378 392
379 393 def run(self):
380 394 """The thread's main activity. Call start() instead."""
381 395 self.socket = self.context.socket(zmq.SUB)
382 396 self.socket.setsockopt(zmq.SUBSCRIBE,'')
383 397 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
384 398 self.socket.connect('tcp://%s:%i' % self.address)
385 399 self.iostate = POLLIN|POLLERR
386 400 self.ioloop.add_handler(self.socket, self._handle_events,
387 401 self.iostate)
388 self.ioloop.start()
402 self._run_loop()
389 403
390 404 def stop(self):
391 405 self.ioloop.stop()
392 406 super(SubSocketChannel, self).stop()
393 407
394 408 def call_handlers(self, msg):
395 409 """This method is called in the ioloop thread when a message arrives.
396 410
397 411 Subclasses should override this method to handle incoming messages.
398 412 It is important to remember that this method is called in the thread
399 413 so that some logic must be done to ensure that the application leve
400 414 handlers are called in the application thread.
401 415 """
402 416 raise NotImplementedError('call_handlers must be defined in a subclass.')
403 417
404 418 def flush(self, timeout=1.0):
405 419 """Immediately processes all pending messages on the SUB channel.
406 420
407 421 Callers should use this method to ensure that :method:`call_handlers`
408 422 has been called for all messages that have been received on the
409 423 0MQ SUB socket of this channel.
410 424
411 425 This method is thread safe.
412 426
413 427 Parameters
414 428 ----------
415 429 timeout : float, optional
416 430 The maximum amount of time to spend flushing, in seconds. The
417 431 default is one second.
418 432 """
419 433 # We do the IOLoop callback process twice to ensure that the IOLoop
420 434 # gets to perform at least one full poll.
421 435 stop_time = time.time() + timeout
422 436 for i in xrange(2):
423 437 self._flushed = False
424 438 self.ioloop.add_callback(self._flush)
425 439 while not self._flushed and time.time() < stop_time:
426 440 time.sleep(0.01)
427 441
428 442 def _handle_events(self, socket, events):
429 443 # Turn on and off POLLOUT depending on if we have made a request
430 444 if events & POLLERR:
431 445 self._handle_err()
432 446 if events & POLLIN:
433 447 self._handle_recv()
434 448
435 449 def _handle_err(self):
436 450 # We don't want to let this go silently, so eventually we should log.
437 451 raise zmq.ZMQError()
438 452
439 453 def _handle_recv(self):
440 454 # Get all of the messages we can
441 455 while True:
442 456 try:
443 457 ident,msg = self.session.recv(self.socket)
444 458 except zmq.ZMQError:
445 459 # Check the errno?
446 460 # Will this trigger POLLERR?
447 461 break
448 462 else:
449 463 if msg is None:
450 464 break
451 465 self.call_handlers(msg)
452 466
453 467 def _flush(self):
454 468 """Callback for :method:`self.flush`."""
455 469 self._flushed = True
456 470
457 471
458 472 class RepSocketChannel(ZmqSocketChannel):
459 473 """A reply channel to handle raw_input requests that the kernel makes."""
460 474
461 475 msg_queue = None
462 476
463 477 def __init__(self, context, session, address):
464 478 super(RepSocketChannel, self).__init__(context, session, address)
465 479 self.ioloop = ioloop.IOLoop()
466 480 self.msg_queue = Queue()
467 481
468 482 def run(self):
469 483 """The thread's main activity. Call start() instead."""
470 484 self.socket = self.context.socket(zmq.XREQ)
471 485 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
472 486 self.socket.connect('tcp://%s:%i' % self.address)
473 487 self.iostate = POLLERR|POLLIN
474 488 self.ioloop.add_handler(self.socket, self._handle_events,
475 489 self.iostate)
476 self.ioloop.start()
490 self._run_loop()
477 491
478 492 def stop(self):
479 493 self.ioloop.stop()
480 494 super(RepSocketChannel, self).stop()
481 495
482 496 def call_handlers(self, msg):
483 497 """This method is called in the ioloop thread when a message arrives.
484 498
485 499 Subclasses should override this method to handle incoming messages.
486 500 It is important to remember that this method is called in the thread
487 501 so that some logic must be done to ensure that the application leve
488 502 handlers are called in the application thread.
489 503 """
490 504 raise NotImplementedError('call_handlers must be defined in a subclass.')
491 505
492 506 def input(self, string):
493 507 """Send a string of raw input to the kernel."""
494 508 content = dict(value=string)
495 509 msg = self.session.msg('input_reply', content)
496 510 self._queue_reply(msg)
497 511
498 512 def _handle_events(self, socket, events):
499 513 if events & POLLERR:
500 514 self._handle_err()
501 515 if events & POLLOUT:
502 516 self._handle_send()
503 517 if events & POLLIN:
504 518 self._handle_recv()
505 519
506 520 def _handle_recv(self):
507 521 ident,msg = self.session.recv(self.socket, 0)
508 522 self.call_handlers(msg)
509 523
510 524 def _handle_send(self):
511 525 try:
512 526 msg = self.msg_queue.get(False)
513 527 except Empty:
514 528 pass
515 529 else:
516 530 self.session.send(self.socket,msg)
517 531 if self.msg_queue.empty():
518 532 self.drop_io_state(POLLOUT)
519 533
520 534 def _handle_err(self):
521 535 # We don't want to let this go silently, so eventually we should log.
522 536 raise zmq.ZMQError()
523 537
524 538 def _queue_reply(self, msg):
525 539 self.msg_queue.put(msg)
526 540 self.add_io_state(POLLOUT)
527 541
528 542
529 543 class HBSocketChannel(ZmqSocketChannel):
530 544 """The heartbeat channel which monitors the kernel heartbeat.
531 545
532 546 Note that the heartbeat channel is paused by default. As long as you start
533 547 this channel, the kernel manager will ensure that it is paused and un-paused
534 548 as appropriate.
535 549 """
536 550
537 551 time_to_dead = 3.0
538 552 socket = None
539 553 poller = None
540 554 _running = None
541 555 _pause = None
542 556
543 557 def __init__(self, context, session, address):
544 558 super(HBSocketChannel, self).__init__(context, session, address)
545 559 self._running = False
546 560 self._pause = True
547 561
548 562 def _create_socket(self):
549 563 self.socket = self.context.socket(zmq.REQ)
550 564 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
551 565 self.socket.connect('tcp://%s:%i' % self.address)
552 566 self.poller = zmq.Poller()
553 567 self.poller.register(self.socket, zmq.POLLIN)
554 568
555 569 def run(self):
556 570 """The thread's main activity. Call start() instead."""
557 571 self._create_socket()
558 572 self._running = True
559 573 while self._running:
560 574 if self._pause:
561 575 time.sleep(self.time_to_dead)
562 576 else:
563 577 since_last_heartbeat = 0.0
564 578 request_time = time.time()
565 579 try:
566 580 #io.rprint('Ping from HB channel') # dbg
567 581 self.socket.send(b'ping')
568 582 except zmq.ZMQError, e:
569 583 #io.rprint('*** HB Error:', e) # dbg
570 584 if e.errno == zmq.EFSM:
571 585 #io.rprint('sleep...', self.time_to_dead) # dbg
572 586 time.sleep(self.time_to_dead)
573 587 self._create_socket()
574 588 else:
575 589 raise
576 590 else:
577 591 while True:
578 592 try:
579 593 self.socket.recv(zmq.NOBLOCK)
580 594 except zmq.ZMQError, e:
581 595 #io.rprint('*** HB Error 2:', e) # dbg
582 596 if e.errno == zmq.EAGAIN:
583 597 before_poll = time.time()
584 598 until_dead = self.time_to_dead - (before_poll -
585 599 request_time)
586 600
587 601 # When the return value of poll() is an empty
588 602 # list, that is when things have gone wrong
589 603 # (zeromq bug). As long as it is not an empty
590 604 # list, poll is working correctly even if it
591 605 # returns quickly. Note: poll timeout is in
592 606 # milliseconds.
593 607 if until_dead > 0.0:
594 self.poller.poll(1000 * until_dead)
608 while True:
609 try:
610 self.poller.poll(1000 * until_dead)
611 except zmq.ZMQError as e:
612 if e.errno == errno.EINTR:
613 continue
614 else:
615 raise
616 else:
617 break
595 618
596 619 since_last_heartbeat = time.time()-request_time
597 620 if since_last_heartbeat > self.time_to_dead:
598 621 self.call_handlers(since_last_heartbeat)
599 622 break
600 623 else:
601 624 # FIXME: We should probably log this instead.
602 625 raise
603 626 else:
604 627 until_dead = self.time_to_dead - (time.time() -
605 628 request_time)
606 629 if until_dead > 0.0:
607 630 #io.rprint('sleep...', self.time_to_dead) # dbg
608 631 time.sleep(until_dead)
609 632 break
610 633
611 634 def pause(self):
612 635 """Pause the heartbeat."""
613 636 self._pause = True
614 637
615 638 def unpause(self):
616 639 """Unpause the heartbeat."""
617 640 self._pause = False
618 641
619 642 def is_beating(self):
620 643 """Is the heartbeat running and not paused."""
621 644 if self.is_alive() and not self._pause:
622 645 return True
623 646 else:
624 647 return False
625 648
626 649 def stop(self):
627 650 self._running = False
628 651 super(HBSocketChannel, self).stop()
629 652
630 653 def call_handlers(self, since_last_heartbeat):
631 654 """This method is called in the ioloop thread when a message arrives.
632 655
633 656 Subclasses should override this method to handle incoming messages.
634 657 It is important to remember that this method is called in the thread
635 658 so that some logic must be done to ensure that the application leve
636 659 handlers are called in the application thread.
637 660 """
638 661 raise NotImplementedError('call_handlers must be defined in a subclass.')
639 662
640 663
641 664 #-----------------------------------------------------------------------------
642 665 # Main kernel manager class
643 666 #-----------------------------------------------------------------------------
644 667
645 668 class KernelManager(HasTraits):
646 669 """ Manages a kernel for a frontend.
647 670
648 671 The SUB channel is for the frontend to receive messages published by the
649 672 kernel.
650 673
651 674 The REQ channel is for the frontend to make requests of the kernel.
652 675
653 676 The REP channel is for the kernel to request stdin (raw_input) from the
654 677 frontend.
655 678 """
656 679 # The PyZMQ Context to use for communication with the kernel.
657 680 context = Instance(zmq.Context,(),{})
658 681
659 682 # The Session to use for communication with the kernel.
660 683 session = Instance(Session,(),{})
661 684
662 685 # The kernel process with which the KernelManager is communicating.
663 686 kernel = Instance(Popen)
664 687
665 688 # The addresses for the communication channels.
666 689 xreq_address = TCPAddress((LOCALHOST, 0))
667 690 sub_address = TCPAddress((LOCALHOST, 0))
668 691 rep_address = TCPAddress((LOCALHOST, 0))
669 692 hb_address = TCPAddress((LOCALHOST, 0))
670 693
671 694 # The classes to use for the various channels.
672 695 xreq_channel_class = Type(XReqSocketChannel)
673 696 sub_channel_class = Type(SubSocketChannel)
674 697 rep_channel_class = Type(RepSocketChannel)
675 698 hb_channel_class = Type(HBSocketChannel)
676 699
677 700 # Protected traits.
678 701 _launch_args = Any
679 702 _xreq_channel = Any
680 703 _sub_channel = Any
681 704 _rep_channel = Any
682 705 _hb_channel = Any
683 706
684 707 def __init__(self, **kwargs):
685 708 super(KernelManager, self).__init__(**kwargs)
686 709 # Uncomment this to try closing the context.
687 710 # atexit.register(self.context.close)
688 711
689 712 #--------------------------------------------------------------------------
690 713 # Channel management methods:
691 714 #--------------------------------------------------------------------------
692 715
693 716 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
694 717 """Starts the channels for this kernel.
695 718
696 719 This will create the channels if they do not exist and then start
697 720 them. If port numbers of 0 are being used (random ports) then you
698 721 must first call :method:`start_kernel`. If the channels have been
699 722 stopped and you call this, :class:`RuntimeError` will be raised.
700 723 """
701 724 if xreq:
702 725 self.xreq_channel.start()
703 726 if sub:
704 727 self.sub_channel.start()
705 728 if rep:
706 729 self.rep_channel.start()
707 730 if hb:
708 731 self.hb_channel.start()
709 732
710 733 def stop_channels(self):
711 734 """Stops all the running channels for this kernel.
712 735 """
713 736 if self.xreq_channel.is_alive():
714 737 self.xreq_channel.stop()
715 738 if self.sub_channel.is_alive():
716 739 self.sub_channel.stop()
717 740 if self.rep_channel.is_alive():
718 741 self.rep_channel.stop()
719 742 if self.hb_channel.is_alive():
720 743 self.hb_channel.stop()
721 744
722 745 @property
723 746 def channels_running(self):
724 747 """Are any of the channels created and running?"""
725 748 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
726 749 self.rep_channel.is_alive() or self.hb_channel.is_alive())
727 750
728 751 #--------------------------------------------------------------------------
729 752 # Kernel process management methods:
730 753 #--------------------------------------------------------------------------
731 754
732 755 def start_kernel(self, **kw):
733 756 """Starts a kernel process and configures the manager to use it.
734 757
735 758 If random ports (port=0) are being used, this method must be called
736 759 before the channels are created.
737 760
738 761 Parameters:
739 762 -----------
740 763 ipython : bool, optional (default True)
741 764 Whether to use an IPython kernel instead of a plain Python kernel.
742 765
743 766 **kw : optional
744 767 See respective options for IPython and Python kernels.
745 768 """
746 769 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
747 770 self.rep_address, self.hb_address
748 771 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
749 772 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
750 773 raise RuntimeError("Can only launch a kernel on a local interface. "
751 774 "Make sure that the '*_address' attributes are "
752 775 "configured properly. "
753 776 "Currently valid addresses are: %s"%LOCAL_IPS
754 777 )
755 778
756 779 self._launch_args = kw.copy()
757 780 if kw.pop('ipython', True):
758 781 from ipkernel import launch_kernel
759 782 else:
760 783 from pykernel import launch_kernel
761 784 self.kernel, xrep, pub, req, _hb = launch_kernel(
762 785 xrep_port=xreq[1], pub_port=sub[1],
763 786 req_port=rep[1], hb_port=hb[1], **kw)
764 787 self.xreq_address = (xreq[0], xrep)
765 788 self.sub_address = (sub[0], pub)
766 789 self.rep_address = (rep[0], req)
767 790 self.hb_address = (hb[0], _hb)
768 791
769 792 def shutdown_kernel(self, restart=False):
770 793 """ Attempts to the stop the kernel process cleanly. If the kernel
771 794 cannot be stopped, it is killed, if possible.
772 795 """
773 796 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
774 797 if sys.platform == 'win32':
775 798 self.kill_kernel()
776 799 return
777 800
778 801 # Pause the heart beat channel if it exists.
779 802 if self._hb_channel is not None:
780 803 self._hb_channel.pause()
781 804
782 805 # Don't send any additional kernel kill messages immediately, to give
783 806 # the kernel a chance to properly execute shutdown actions. Wait for at
784 807 # most 1s, checking every 0.1s.
785 808 self.xreq_channel.shutdown(restart=restart)
786 809 for i in range(10):
787 810 if self.is_alive:
788 811 time.sleep(0.1)
789 812 else:
790 813 break
791 814 else:
792 815 # OK, we've waited long enough.
793 816 if self.has_kernel:
794 817 self.kill_kernel()
795 818
796 819 def restart_kernel(self, now=False, **kw):
797 820 """Restarts a kernel with the arguments that were used to launch it.
798 821
799 822 If the old kernel was launched with random ports, the same ports will be
800 823 used for the new kernel.
801 824
802 825 Parameters
803 826 ----------
804 827 now : bool, optional
805 828 If True, the kernel is forcefully restarted *immediately*, without
806 829 having a chance to do any cleanup action. Otherwise the kernel is
807 830 given 1s to clean up before a forceful restart is issued.
808 831
809 832 In all cases the kernel is restarted, the only difference is whether
810 833 it is given a chance to perform a clean shutdown or not.
811 834
812 835 **kw : optional
813 836 Any options specified here will replace those used to launch the
814 837 kernel.
815 838 """
816 839 if self._launch_args is None:
817 840 raise RuntimeError("Cannot restart the kernel. "
818 841 "No previous call to 'start_kernel'.")
819 842 else:
820 843 # Stop currently running kernel.
821 844 if self.has_kernel:
822 845 if now:
823 846 self.kill_kernel()
824 847 else:
825 848 self.shutdown_kernel(restart=True)
826 849
827 850 # Start new kernel.
828 851 self._launch_args.update(kw)
829 852 self.start_kernel(**self._launch_args)
830 853
831 854 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
832 855 # unless there is some delay here.
833 856 if sys.platform == 'win32':
834 857 time.sleep(0.2)
835 858
836 859 @property
837 860 def has_kernel(self):
838 861 """Returns whether a kernel process has been specified for the kernel
839 862 manager.
840 863 """
841 864 return self.kernel is not None
842 865
843 866 def kill_kernel(self):
844 867 """ Kill the running kernel. """
845 868 if self.has_kernel:
846 869 # Pause the heart beat channel if it exists.
847 870 if self._hb_channel is not None:
848 871 self._hb_channel.pause()
849 872
850 873 # Attempt to kill the kernel.
851 874 try:
852 875 self.kernel.kill()
853 876 except OSError, e:
854 877 # In Windows, we will get an Access Denied error if the process
855 878 # has already terminated. Ignore it.
856 879 if sys.platform == 'win32':
857 880 if e.winerror != 5:
858 881 raise
859 882 # On Unix, we may get an ESRCH error if the process has already
860 883 # terminated. Ignore it.
861 884 else:
862 885 from errno import ESRCH
863 886 if e.errno != ESRCH:
864 887 raise
865 888 self.kernel = None
866 889 else:
867 890 raise RuntimeError("Cannot kill kernel. No kernel is running!")
868 891
869 892 def interrupt_kernel(self):
870 893 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
871 894 well supported on all platforms.
872 895 """
873 896 if self.has_kernel:
874 897 if sys.platform == 'win32':
875 898 from parentpoller import ParentPollerWindows as Poller
876 899 Poller.send_interrupt(self.kernel.win32_interrupt_event)
877 900 else:
878 901 self.kernel.send_signal(signal.SIGINT)
879 902 else:
880 903 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
881 904
882 905 def signal_kernel(self, signum):
883 906 """ Sends a signal to the kernel. Note that since only SIGTERM is
884 907 supported on Windows, this function is only useful on Unix systems.
885 908 """
886 909 if self.has_kernel:
887 910 self.kernel.send_signal(signum)
888 911 else:
889 912 raise RuntimeError("Cannot signal kernel. No kernel is running!")
890 913
891 914 @property
892 915 def is_alive(self):
893 916 """Is the kernel process still running?"""
894 917 # FIXME: not using a heartbeat means this method is broken for any
895 918 # remote kernel, it's only capable of handling local kernels.
896 919 if self.has_kernel:
897 920 if self.kernel.poll() is None:
898 921 return True
899 922 else:
900 923 return False
901 924 else:
902 925 # We didn't start the kernel with this KernelManager so we don't
903 926 # know if it is running. We should use a heartbeat for this case.
904 927 return True
905 928
906 929 #--------------------------------------------------------------------------
907 930 # Channels used for communication with the kernel:
908 931 #--------------------------------------------------------------------------
909 932
910 933 @property
911 934 def xreq_channel(self):
912 935 """Get the REQ socket channel object to make requests of the kernel."""
913 936 if self._xreq_channel is None:
914 937 self._xreq_channel = self.xreq_channel_class(self.context,
915 938 self.session,
916 939 self.xreq_address)
917 940 return self._xreq_channel
918 941
919 942 @property
920 943 def sub_channel(self):
921 944 """Get the SUB socket channel object."""
922 945 if self._sub_channel is None:
923 946 self._sub_channel = self.sub_channel_class(self.context,
924 947 self.session,
925 948 self.sub_address)
926 949 return self._sub_channel
927 950
928 951 @property
929 952 def rep_channel(self):
930 953 """Get the REP socket channel object to handle stdin (raw_input)."""
931 954 if self._rep_channel is None:
932 955 self._rep_channel = self.rep_channel_class(self.context,
933 956 self.session,
934 957 self.rep_address)
935 958 return self._rep_channel
936 959
937 960 @property
938 961 def hb_channel(self):
939 962 """Get the heartbeat socket channel object to check that the
940 963 kernel is alive."""
941 964 if self._hb_channel is None:
942 965 self._hb_channel = self.hb_channel_class(self.context,
943 966 self.session,
944 967 self.hb_address)
945 968 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now