# HG changeset patch # User Daniel Dourvaris # Date 2019-08-23 17:13:52 # Node ID 304a54136237f3af02267a4965c487f1119d9475 # Parent 406ae170dc86d8668bdb176be5233874da421be8 vcsserver: added streaming interface for streaming remote attributes diff --git a/vcsserver/http_main.py b/vcsserver/http_main.py --- a/vcsserver/http_main.py +++ b/vcsserver/http_main.py @@ -25,6 +25,7 @@ import wsgiref.util import traceback import tempfile from itertools import chain +from cStringIO import StringIO import simplejson as json import msgpack @@ -32,7 +33,9 @@ from pyramid.config import Configurator from pyramid.settings import asbool, aslist from pyramid.wsgi import wsgiapp from pyramid.compat import configparser +from pyramid.response import Response +from vcsserver.utils import safe_int log = logging.getLogger(__name__) @@ -114,8 +117,8 @@ def _string_setting(settings, name, defa class VCS(object): - def __init__(self, locale=None, cache_config=None): - self.locale = locale + def __init__(self, locale_conf=None, cache_config=None): + self.locale = locale_conf self.cache_config = cache_config self._configure_locale() @@ -233,7 +236,7 @@ class HTTPApplication(object): self.config.include('vcsserver.lib.rc_cache') settings_locale = settings.get('locale', '') or 'en_US.UTF-8' - vcs = VCS(locale=settings_locale, cache_config=settings) + vcs = VCS(locale_conf=settings_locale, cache_config=settings) self._remotes = { 'hg': vcs._hg_remote, 'git': vcs._git_remote, @@ -307,7 +310,14 @@ class HTTPApplication(object): self.config.add_route('status', '/status') self.config.add_route('hg_proxy', '/proxy/hg') self.config.add_route('git_proxy', '/proxy/git') + + # rpc methods self.config.add_route('vcs', '/{backend}') + + # streaming rpc remote methods + self.config.add_route('vcs_stream', '/{backend}/stream') + + # vcs operations clone/push as streaming self.config.add_route('stream_git', '/stream/git/*repo_name') self.config.add_route('stream_hg', '/stream/hg/*repo_name') @@ -318,6 +328,8 @@ class HTTPApplication(object): self.config.add_view(self.git_proxy(), route_name='git_proxy') self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack', vcs_view=self._remotes) + self.config.add_view(self.vcs_stream_view, route_name='vcs_stream', + vcs_view=self._remotes) self.config.add_view(self.hg_stream(), route_name='stream_hg') self.config.add_view(self.git_stream(), route_name='stream_git') @@ -338,11 +350,11 @@ class HTTPApplication(object): def wsgi_app(self): return self.config.make_wsgi_app() - def vcs_view(self, request): + def _vcs_view_params(self, request): remote = self._remotes[request.matchdict['backend']] payload = msgpack.unpackb(request.body, use_list=True) method = payload.get('method') - params = payload.get('params') + params = payload['params'] wire = params.get('wire') args = params.get('args') kwargs = params.get('kwargs') @@ -354,6 +366,7 @@ class HTTPApplication(object): except KeyError: pass args.insert(0, wire) + repo_state_uid = wire.get('repo_state_uid') if wire else None # NOTE(marcink): trading complexity for slight performance if log.isEnabledFor(logging.DEBUG): @@ -365,10 +378,16 @@ class HTTPApplication(object): else: call_args = args[1:] - repo_state_uid = wire.get('repo_state_uid') if wire else None - log.debug('method called:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s', + log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s', method, call_args, kwargs, context_uid, repo_state_uid) + return payload, remote, method, args, kwargs + + def vcs_view(self, request): + + payload, remote, method, args, kwargs = self._vcs_view_params(request) + payload_id = payload.get('id') + try: resp = getattr(remote, method)(*args, **kwargs) except Exception as e: @@ -395,7 +414,7 @@ class HTTPApplication(object): type_ = None resp = { - 'id': payload.get('id'), + 'id': payload_id, 'error': { 'message': e.message, 'traceback': tb_info, @@ -410,12 +429,36 @@ class HTTPApplication(object): pass else: resp = { - 'id': payload.get('id'), + 'id': payload_id, 'result': resp } return resp + def vcs_stream_view(self, request): + payload, remote, method, args, kwargs = self._vcs_view_params(request) + # this method has a stream: marker we remove it here + method = method.split('stream:')[-1] + chunk_size = safe_int(payload.get('chunk_size')) or 4096 + + try: + resp = getattr(remote, method)(*args, **kwargs) + except Exception as e: + raise + + def get_chunked_data(method_resp): + stream = StringIO(method_resp) + while 1: + chunk = stream.read(chunk_size) + if not chunk: + break + yield chunk + + response = Response(app_iter=get_chunked_data(resp)) + response.content_type = 'application/octet-stream' + + return response + def status_view(self, request): import vcsserver return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,