zmqhttp.py
96 lines
| 2.8 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r4609 | """Unfinished code for ZMQ/HTTP bridging. We use WebSockets instead. | ||
Brian Granger
|
r4297 | |||
Brian E. Granger
|
r4609 | Authors: | ||
* Brian Granger | ||||
""" | ||||
Brian Granger
|
r4297 | |||
Brian E. Granger
|
r4609 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import json | ||||
Brian Granger
|
r4297 | import logging | ||
Brian E. Granger
|
r4609 | from tornado import web | ||
#----------------------------------------------------------------------------- | ||||
# Code | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r4297 | |||
class ZMQHandler(web.RequestHandler): | ||||
def get_stream(self): | ||||
"""Get the ZMQStream for this request.""" | ||||
raise NotImplementedError('Implement get_stream() in a subclass.') | ||||
def _save_method_args(self, *args, **kwargs): | ||||
"""Save the args and kwargs to get/post/put/delete for future use. | ||||
These arguments are not saved in the request or handler objects, but | ||||
are often needed by methods such as get_stream(). | ||||
""" | ||||
self._method_args = args | ||||
self._method_kwargs = kwargs | ||||
def _handle_msgs(self, msg): | ||||
msgs = [msg] | ||||
stream = self.get_stream() | ||||
stream.on_recv(lambda m: msgs.append(json.loads(m))) | ||||
stream.flush() | ||||
stream.stop_on_recv() | ||||
logging.info("Reply: %r" % msgs) | ||||
self.write(json.dumps(msgs)) | ||||
self.finish() | ||||
class ZMQPubHandler(ZMQHandler): | ||||
SUPPORTED_METHODS = ("POST",) | ||||
def post(self, *args, **kwargs): | ||||
self._save_method_args(*args, **kwargs) | ||||
try: | ||||
msg = json.loads(self.request.body) | ||||
except: | ||||
self.send_error(status_code=415) | ||||
else: | ||||
logging.info("Request: %r" % msg) | ||||
self.get_stream().send_json(msg) | ||||
class ZMQSubHandler(ZMQHandler): | ||||
SUPPORTED_METHODS = ("GET",) | ||||
@web.asynchronous | ||||
def get(self, *args, **kwargs): | ||||
self._save_method_args(*args, **kwargs) | ||||
self.get_stream().on_recv(self._handle_msgs) | ||||
class ZMQXReqHandler(ZMQHandler): | ||||
SUPPORTED_METHODS = ("POST",) | ||||
@web.asynchronous | ||||
def post(self, *args, **kwargs): | ||||
self._save_method_args(*args, **kwargs) | ||||
logging.info("request: %r" % self.request) | ||||
try: | ||||
msg = json.loads(self.request.body) | ||||
except: | ||||
self.send_error(status_code=415) | ||||
else: | ||||
logging.info("Reply: %r" % msg) | ||||
stream = self.get_stream() | ||||
stream.send_json(msg) | ||||
stream.on_recv(self._handle_msgs) | ||||