blockingkernelmanager.py
121 lines
| 3.7 KiB
| text/x-python
|
PythonLexer
Fernando Perez
|
r2926 | """Implement a fully blocking kernel manager. | ||
Useful for test suites and blocking terminal interfaces. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING.txt, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import print_function | ||||
# Stdlib | ||||
Brian Granger
|
r2693 | from Queue import Queue, Empty | ||
Fernando Perez
|
r2926 | # Our own | ||
from IPython.utils import io | ||||
from IPython.utils.traitlets import Type | ||||
Brian Granger
|
r2693 | |||
Fernando Perez
|
r2926 | from .kernelmanager import (KernelManager, SubSocketChannel, | ||
XReqSocketChannel, RepSocketChannel, HBSocketChannel) | ||||
Brian Granger
|
r2693 | |||
Fernando Perez
|
r2926 | #----------------------------------------------------------------------------- | ||
# Functions and classes | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2693 | |||
class BlockingSubSocketChannel(SubSocketChannel): | ||||
def __init__(self, context, session, address=None): | ||||
epatters
|
r3825 | super(BlockingSubSocketChannel, self).__init__(context, session, | ||
address) | ||||
Brian Granger
|
r2693 | self._in_queue = Queue() | ||
def call_handlers(self, msg): | ||||
epatters
|
r3825 | #io.rprint('[[Sub]]', msg) # dbg | ||
Brian Granger
|
r2693 | self._in_queue.put(msg) | ||
def msg_ready(self): | ||||
"""Is there a message that has been received?""" | ||||
if self._in_queue.qsize() == 0: | ||||
return False | ||||
else: | ||||
return True | ||||
def get_msg(self, block=True, timeout=None): | ||||
"""Get a message if there is one that is ready.""" | ||||
David
|
r3233 | return self._in_queue.get(block, timeout) | ||
Fernando Perez
|
r2926 | |||
def get_msgs(self): | ||||
"""Get all messages that are currently ready.""" | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
msgs.append(self.get_msg(block=False)) | ||||
except Empty: | ||||
break | ||||
return msgs | ||||
class BlockingXReqSocketChannel(XReqSocketChannel): | ||||
def __init__(self, context, session, address=None): | ||||
epatters
|
r3825 | super(BlockingXReqSocketChannel, self).__init__(context, session, | ||
address) | ||||
Fernando Perez
|
r2926 | self._in_queue = Queue() | ||
def call_handlers(self, msg): | ||||
epatters
|
r3825 | #io.rprint('[[XReq]]', msg) # dbg | ||
self._in_queue.put(msg) | ||||
Fernando Perez
|
r2926 | |||
def msg_ready(self): | ||||
"""Is there a message that has been received?""" | ||||
if self._in_queue.qsize() == 0: | ||||
return False | ||||
Brian Granger
|
r2693 | else: | ||
Fernando Perez
|
r2926 | return True | ||
def get_msg(self, block=True, timeout=None): | ||||
"""Get a message if there is one that is ready.""" | ||||
David
|
r3233 | return self._in_queue.get(block, timeout) | ||
Brian Granger
|
r2693 | |||
def get_msgs(self): | ||||
"""Get all messages that are currently ready.""" | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
Fernando Perez
|
r2926 | msgs.append(self.get_msg(block=False)) | ||
except Empty: | ||||
Brian Granger
|
r2693 | break | ||
Fernando Perez
|
r2926 | return msgs | ||
epatters
|
r3825 | |||
Fernando Perez
|
r2926 | |||
class BlockingRepSocketChannel(RepSocketChannel): | ||||
epatters
|
r3825 | |||
Fernando Perez
|
r2926 | def call_handlers(self, msg): | ||
epatters
|
r3825 | #io.rprint('[[Rep]]', msg) # dbg | ||
pass | ||||
Fernando Perez
|
r2926 | |||
class BlockingHBSocketChannel(HBSocketChannel): | ||||
epatters
|
r3825 | |||
Fernando Perez
|
r2926 | # This kernel needs rapid monitoring capabilities | ||
time_to_dead = 0.2 | ||||
def call_handlers(self, since_last_heartbeat): | ||||
epatters
|
r3825 | #io.rprint('[[Heart]]', since_last_heartbeat) # dbg | ||
pass | ||||
Fernando Perez
|
r2926 | |||
class BlockingKernelManager(KernelManager): | ||||
# The classes to use for the various channels. | ||||
xreq_channel_class = Type(BlockingXReqSocketChannel) | ||||
sub_channel_class = Type(BlockingSubSocketChannel) | ||||
rep_channel_class = Type(BlockingRepSocketChannel) | ||||
hb_channel_class = Type(BlockingHBSocketChannel) | ||||