channels.py
92 lines
| 2.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r10285 | """Blocking channels | ||
Fernando Perez
|
r2926 | |||
Useful for test suites and blocking terminal interfaces. | ||||
""" | ||||
Thomas Kluyver
|
r19208 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
Thomas Kluyver
|
r13354 | try: | ||
from queue import Queue, Empty # Py 3 | ||||
except ImportError: | ||||
from Queue import Queue, Empty # Py 2 | ||||
Brian Granger
|
r9121 | |||
Brian Granger
|
r2693 | |||
Thomas Kluyver
|
r19208 | class ZMQSocketChannel(object): | ||
Thomas Kluyver
|
r19225 | """A ZMQ socket in a simple blocking API""" | ||
Thomas Kluyver
|
r19208 | session = None | ||
socket = None | ||||
stream = None | ||||
_exiting = False | ||||
proxy_methods = [] | ||||
Brian Granger
|
r9120 | |||
Thomas Kluyver
|
r19224 | def __init__(self, socket, session, loop=None): | ||
Thomas Kluyver
|
r19208 | """Create a channel. | ||
MinRK
|
r10283 | |||
Thomas Kluyver
|
r19208 | Parameters | ||
---------- | ||||
Thomas Kluyver
|
r19224 | socket : :class:`zmq.Socket` | ||
The ZMQ socket to use. | ||||
Thomas Kluyver
|
r19208 | session : :class:`session.Session` | ||
The session to use. | ||||
Thomas Kluyver
|
r19224 | loop | ||
Unused here, for other implementations | ||||
Thomas Kluyver
|
r19208 | """ | ||
super(ZMQSocketChannel, self).__init__() | ||||
MinRK
|
r10283 | |||
Thomas Kluyver
|
r19217 | self.socket = socket | ||
Thomas Kluyver
|
r19208 | self.session = session | ||
def _recv(self, **kwargs): | ||||
msg = self.socket.recv_multipart(**kwargs) | ||||
ident,smsg = self.session.feed_identities(msg) | ||||
return self.session.deserialize(smsg) | ||||
MinRK
|
r10283 | |||
Brian Granger
|
r9120 | def get_msg(self, block=True, timeout=None): | ||
""" Gets a message if there is one that is ready. """ | ||||
Thomas Kluyver
|
r19208 | if block: | ||
if timeout is not None: | ||||
timeout *= 1000 # seconds to ms | ||||
ready = self.socket.poll(timeout) | ||||
else: | ||||
ready = self.socket.poll(timeout=0) | ||||
if ready: | ||||
return self._recv() | ||||
else: | ||||
raise Empty | ||||
MinRK
|
r10283 | |||
Brian Granger
|
r9120 | def get_msgs(self): | ||
""" Get all messages that are currently ready. """ | ||||
msgs = [] | ||||
while True: | ||||
try: | ||||
msgs.append(self.get_msg(block=False)) | ||||
Thomas Kluyver
|
r13354 | except Empty: | ||
Brian Granger
|
r9120 | break | ||
return msgs | ||||
MinRK
|
r10283 | |||
Brian Granger
|
r9120 | def msg_ready(self): | ||
""" Is there a message that has been received? """ | ||||
Thomas Kluyver
|
r19208 | return bool(self.socket.poll(timeout=0)) | ||
def close(self): | ||||
if self.socket is not None: | ||||
try: | ||||
self.socket.close(linger=0) | ||||
except Exception: | ||||
pass | ||||
self.socket = None | ||||
stop = close | ||||
def is_alive(self): | ||||
return (self.socket is not None) | ||||
def _queue_send(self, msg): | ||||
"""Pass a message to the ZMQ socket to send | ||||
""" | ||||
self.session.send(self.socket, msg) | ||||
Thomas Kluyver
|
r19217 | def start(self): | ||
pass | ||||