zmqhttp.py
79 lines
| 2.0 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r4297 | import json | |
from tornado import web | |||
import logging | |||
class ZMQHandler(web.RequestHandler): | |||
def get_stream(self): | |||
"""Get the ZMQStream for this request.""" | |||
raise NotImplementedError('Implement get_stream() in a subclass.') | |||
def _save_method_args(self, *args, **kwargs): | |||
"""Save the args and kwargs to get/post/put/delete for future use. | |||
These arguments are not saved in the request or handler objects, but | |||
are often needed by methods such as get_stream(). | |||
""" | |||
self._method_args = args | |||
self._method_kwargs = kwargs | |||
def _handle_msgs(self, msg): | |||
msgs = [msg] | |||
stream = self.get_stream() | |||
stream.on_recv(lambda m: msgs.append(json.loads(m))) | |||
stream.flush() | |||
stream.stop_on_recv() | |||
logging.info("Reply: %r" % msgs) | |||
self.write(json.dumps(msgs)) | |||
self.finish() | |||
class ZMQPubHandler(ZMQHandler): | |||
SUPPORTED_METHODS = ("POST",) | |||
def post(self, *args, **kwargs): | |||
self._save_method_args(*args, **kwargs) | |||
try: | |||
msg = json.loads(self.request.body) | |||
except: | |||
self.send_error(status_code=415) | |||
else: | |||
logging.info("Request: %r" % msg) | |||
self.get_stream().send_json(msg) | |||
class ZMQSubHandler(ZMQHandler): | |||
SUPPORTED_METHODS = ("GET",) | |||
@web.asynchronous | |||
def get(self, *args, **kwargs): | |||
self._save_method_args(*args, **kwargs) | |||
self.get_stream().on_recv(self._handle_msgs) | |||
class ZMQXReqHandler(ZMQHandler): | |||
SUPPORTED_METHODS = ("POST",) | |||
@web.asynchronous | |||
def post(self, *args, **kwargs): | |||
self._save_method_args(*args, **kwargs) | |||
logging.info("request: %r" % self.request) | |||
try: | |||
msg = json.loads(self.request.body) | |||
except: | |||
self.send_error(status_code=415) | |||
else: | |||
logging.info("Reply: %r" % msg) | |||
stream = self.get_stream() | |||
stream.send_json(msg) | |||
stream.on_recv(self._handle_msgs) | |||