##// END OF EJS Templates
zmq kernels now started via newapp
zmq kernels now started via newapp

File last commit:

r3825:db78130e
r3970:c5a35798
Show More
blockingkernelmanager.py
121 lines | 3.7 KiB | text/x-python | PythonLexer
/ IPython / zmq / blockingkernelmanager.py
Fernando Perez
Rework messaging to better conform to our spec....
r2926 """Implement a fully blocking kernel manager.
Useful for test suites and blocking terminal interfaces.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING.txt, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
# Stdlib
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693 from Queue import Queue, Empty
Fernando Perez
Rework messaging to better conform to our spec....
r2926 # Our own
from IPython.utils import io
from IPython.utils.traitlets import Type
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693
Fernando Perez
Rework messaging to better conform to our spec....
r2926 from .kernelmanager import (KernelManager, SubSocketChannel,
XReqSocketChannel, RepSocketChannel, HBSocketChannel)
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693
Fernando Perez
Rework messaging to better conform to our spec....
r2926 #-----------------------------------------------------------------------------
# Functions and classes
#-----------------------------------------------------------------------------
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693
class BlockingSubSocketChannel(SubSocketChannel):
def __init__(self, context, session, address=None):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 super(BlockingSubSocketChannel, self).__init__(context, session,
address)
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693 self._in_queue = Queue()
def call_handlers(self, msg):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 #io.rprint('[[Sub]]', msg) # dbg
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693 self._in_queue.put(msg)
def msg_ready(self):
"""Is there a message that has been received?"""
if self._in_queue.qsize() == 0:
return False
else:
return True
def get_msg(self, block=True, timeout=None):
"""Get a message if there is one that is ready."""
David
Fixed two minor typos.
r3233 return self._in_queue.get(block, timeout)
Fernando Perez
Rework messaging to better conform to our spec....
r2926
def get_msgs(self):
"""Get all messages that are currently ready."""
msgs = []
while True:
try:
msgs.append(self.get_msg(block=False))
except Empty:
break
return msgs
class BlockingXReqSocketChannel(XReqSocketChannel):
def __init__(self, context, session, address=None):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 super(BlockingXReqSocketChannel, self).__init__(context, session,
address)
Fernando Perez
Rework messaging to better conform to our spec....
r2926 self._in_queue = Queue()
def call_handlers(self, msg):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 #io.rprint('[[XReq]]', msg) # dbg
self._in_queue.put(msg)
Fernando Perez
Rework messaging to better conform to our spec....
r2926
def msg_ready(self):
"""Is there a message that has been received?"""
if self._in_queue.qsize() == 0:
return False
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693 else:
Fernando Perez
Rework messaging to better conform to our spec....
r2926 return True
def get_msg(self, block=True, timeout=None):
"""Get a message if there is one that is ready."""
David
Fixed two minor typos.
r3233 return self._in_queue.get(block, timeout)
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693
def get_msgs(self):
"""Get all messages that are currently ready."""
msgs = []
while True:
try:
Fernando Perez
Rework messaging to better conform to our spec....
r2926 msgs.append(self.get_msg(block=False))
except Empty:
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693 break
Fernando Perez
Rework messaging to better conform to our spec....
r2926 return msgs
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825
Fernando Perez
Rework messaging to better conform to our spec....
r2926
class BlockingRepSocketChannel(RepSocketChannel):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825
Fernando Perez
Rework messaging to better conform to our spec....
r2926 def call_handlers(self, msg):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 #io.rprint('[[Rep]]', msg) # dbg
pass
Fernando Perez
Rework messaging to better conform to our spec....
r2926
class BlockingHBSocketChannel(HBSocketChannel):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825
Fernando Perez
Rework messaging to better conform to our spec....
r2926 # This kernel needs rapid monitoring capabilities
time_to_dead = 0.2
def call_handlers(self, since_last_heartbeat):
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
pass
Fernando Perez
Rework messaging to better conform to our spec....
r2926
class BlockingKernelManager(KernelManager):
# The classes to use for the various channels.
xreq_channel_class = Type(BlockingXReqSocketChannel)
sub_channel_class = Type(BlockingSubSocketChannel)
rep_channel_class = Type(BlockingRepSocketChannel)
hb_channel_class = Type(BlockingHBSocketChannel)