Show More
blockingkernelmanager.py
154 lines
| 4.9 KiB
| text/x-python
|
PythonLexer
Fernando Perez
|
r2926 | """Implement a fully blocking kernel manager. | ||
Useful for test suites and blocking terminal interfaces. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
Matthias BUSSONNIER
|
r5390 | # Copyright (C) 2010-2011 The IPython Development Team | ||
Fernando Perez
|
r2926 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING.txt, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import print_function | ||||
# Stdlib | ||||
Brian Granger
|
r2693 | from Queue import Queue, Empty | ||
Thomas Kluyver
|
r5596 | from threading import Event | ||
Brian Granger
|
r2693 | |||
Fernando Perez
|
r2926 | # Our own | ||
from IPython.utils import io | ||||
from IPython.utils.traitlets import Type | ||||
Brian Granger
|
r2693 | |||
MinRK
|
r3974 | from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel, | ||
ShellSocketChannel, StdInSocketChannel) | ||||
Brian Granger
|
r2693 | |||
Fernando Perez
|
r2926 | #----------------------------------------------------------------------------- | ||
# Functions and classes | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2693 | |||
class BlockingSubSocketChannel(SubSocketChannel): | ||||
def __init__(self, context, session, address=None): | ||||
epatters
|
r3825 | super(BlockingSubSocketChannel, self).__init__(context, session, | ||
address) | ||||
Brian Granger
|
r2693 | self._in_queue = Queue() | ||
def call_handlers(self, msg): | ||||
epatters
|
r3825 | #io.rprint('[[Sub]]', msg) # dbg | ||
Brian Granger
|
r2693 | self._in_queue.put(msg) | ||
def msg_ready(self): | ||||
"""Is there a message that has been received?""" | ||||
if self._in_queue.qsize() == 0: | ||||
return False | ||||
else: | ||||
return True | ||||
def get_msg(self, block=True, timeout=None): | ||||
"""Get a message if there is one that is ready.""" | ||||
MinRK
|
r4252 | if block and timeout is None: | ||
# never use timeout=None, because get | ||||
# becomes uninterruptible | ||||
timeout = 1e6 | ||||
David
|
r3233 | return self._in_queue.get(block, timeout) | ||
Fernando Perez
|
r2926 | |||
def get_msgs(self): | ||||
"""Get all messages that are currently ready.""" | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
msgs.append(self.get_msg(block=False)) | ||||
except Empty: | ||||
break | ||||
return msgs | ||||
MinRK
|
r3974 | class BlockingShellSocketChannel(ShellSocketChannel): | ||
Fernando Perez
|
r2926 | |||
def __init__(self, context, session, address=None): | ||||
MinRK
|
r3974 | super(BlockingShellSocketChannel, self).__init__(context, session, | ||
epatters
|
r3825 | address) | ||
Fernando Perez
|
r2926 | self._in_queue = Queue() | ||
def call_handlers(self, msg): | ||||
MinRK
|
r3974 | #io.rprint('[[Shell]]', msg) # dbg | ||
epatters
|
r3825 | self._in_queue.put(msg) | ||
Fernando Perez
|
r2926 | |||
def msg_ready(self): | ||||
"""Is there a message that has been received?""" | ||||
if self._in_queue.qsize() == 0: | ||||
return False | ||||
Brian Granger
|
r2693 | else: | ||
Fernando Perez
|
r2926 | return True | ||
def get_msg(self, block=True, timeout=None): | ||||
"""Get a message if there is one that is ready.""" | ||||
MinRK
|
r4252 | if block and timeout is None: | ||
# never use timeout=None, because get | ||||
# becomes uninterruptible | ||||
timeout = 1e6 | ||||
David
|
r3233 | return self._in_queue.get(block, timeout) | ||
Brian Granger
|
r2693 | |||
def get_msgs(self): | ||||
"""Get all messages that are currently ready.""" | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
Fernando Perez
|
r2926 | msgs.append(self.get_msg(block=False)) | ||
except Empty: | ||||
Brian Granger
|
r2693 | break | ||
Fernando Perez
|
r2926 | return msgs | ||
epatters
|
r3825 | |||
Fernando Perez
|
r2926 | |||
MinRK
|
r3974 | class BlockingStdInSocketChannel(StdInSocketChannel): | ||
epatters
|
r3825 | |||
Thomas Kluyver
|
r5596 | def __init__(self, context, session, address=None): | ||
MinRK
|
r5599 | super(BlockingStdInSocketChannel, self).__init__(context, session, address) | ||
Thomas Kluyver
|
r5596 | self._in_queue = Queue() | ||
Fernando Perez
|
r2926 | def call_handlers(self, msg): | ||
epatters
|
r3825 | #io.rprint('[[Rep]]', msg) # dbg | ||
Thomas Kluyver
|
r5596 | self._in_queue.put(msg) | ||
def get_msg(self, block=True, timeout=None): | ||||
"Gets a message if there is one that is ready." | ||||
return self._in_queue.get(block, timeout) | ||||
def get_msgs(self): | ||||
"""Get all messages that are currently ready.""" | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
msgs.append(self.get_msg(block=False)) | ||||
except Empty: | ||||
break | ||||
return msgs | ||||
def msg_ready(self): | ||||
"Is there a message that has been received?" | ||||
return not self._in_queue.empty() | ||||
Fernando Perez
|
r2926 | |||
class BlockingHBSocketChannel(HBSocketChannel): | ||||
epatters
|
r3825 | |||
MinRK
|
r5614 | # This kernel needs quicker monitoring, shorten to 1 sec. | ||
# less than 0.5s is unreliable, and will get occasional | ||||
# false reports of missed beats. | ||||
time_to_dead = 1. | ||||
Fernando Perez
|
r2926 | |||
def call_handlers(self, since_last_heartbeat): | ||||
MinRK
|
r5614 | """pause beating on missed heartbeat""" | ||
epatters
|
r3825 | pass | ||
Fernando Perez
|
r2926 | |||
class BlockingKernelManager(KernelManager): | ||||
# The classes to use for the various channels. | ||||
MinRK
|
r3974 | shell_channel_class = Type(BlockingShellSocketChannel) | ||
Fernando Perez
|
r2926 | sub_channel_class = Type(BlockingSubSocketChannel) | ||
MinRK
|
r3974 | stdin_channel_class = Type(BlockingStdInSocketChannel) | ||
Fernando Perez
|
r2926 | hb_channel_class = Type(BlockingHBSocketChannel) | ||