##// END OF EJS Templates
vcsserver: added streaming interface for streaming remote attributes
dan -
r768:304a5413 default
parent child Browse files
Show More
@@ -25,6 +25,7 b' import wsgiref.util'
25 import traceback
25 import traceback
26 import tempfile
26 import tempfile
27 from itertools import chain
27 from itertools import chain
28 from cStringIO import StringIO
28
29
29 import simplejson as json
30 import simplejson as json
30 import msgpack
31 import msgpack
@@ -32,7 +33,9 b' from pyramid.config import Configurator'
32 from pyramid.settings import asbool, aslist
33 from pyramid.settings import asbool, aslist
33 from pyramid.wsgi import wsgiapp
34 from pyramid.wsgi import wsgiapp
34 from pyramid.compat import configparser
35 from pyramid.compat import configparser
36 from pyramid.response import Response
35
37
38 from vcsserver.utils import safe_int
36
39
37 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
38
41
@@ -114,8 +117,8 b' def _string_setting(settings, name, defa'
114
117
115
118
116 class VCS(object):
119 class VCS(object):
117 def __init__(self, locale=None, cache_config=None):
120 def __init__(self, locale_conf=None, cache_config=None):
118 self.locale = locale
121 self.locale = locale_conf
119 self.cache_config = cache_config
122 self.cache_config = cache_config
120 self._configure_locale()
123 self._configure_locale()
121
124
@@ -233,7 +236,7 b' class HTTPApplication(object):'
233 self.config.include('vcsserver.lib.rc_cache')
236 self.config.include('vcsserver.lib.rc_cache')
234
237
235 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
236 vcs = VCS(locale=settings_locale, cache_config=settings)
239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
237 self._remotes = {
240 self._remotes = {
238 'hg': vcs._hg_remote,
241 'hg': vcs._hg_remote,
239 'git': vcs._git_remote,
242 'git': vcs._git_remote,
@@ -307,7 +310,14 b' class HTTPApplication(object):'
307 self.config.add_route('status', '/status')
310 self.config.add_route('status', '/status')
308 self.config.add_route('hg_proxy', '/proxy/hg')
311 self.config.add_route('hg_proxy', '/proxy/hg')
309 self.config.add_route('git_proxy', '/proxy/git')
312 self.config.add_route('git_proxy', '/proxy/git')
313
314 # rpc methods
310 self.config.add_route('vcs', '/{backend}')
315 self.config.add_route('vcs', '/{backend}')
316
317 # streaming rpc remote methods
318 self.config.add_route('vcs_stream', '/{backend}/stream')
319
320 # vcs operations clone/push as streaming
311 self.config.add_route('stream_git', '/stream/git/*repo_name')
321 self.config.add_route('stream_git', '/stream/git/*repo_name')
312 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
322 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
313
323
@@ -318,6 +328,8 b' class HTTPApplication(object):'
318 self.config.add_view(self.git_proxy(), route_name='git_proxy')
328 self.config.add_view(self.git_proxy(), route_name='git_proxy')
319 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
329 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
320 vcs_view=self._remotes)
330 vcs_view=self._remotes)
331 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
332 vcs_view=self._remotes)
321
333
322 self.config.add_view(self.hg_stream(), route_name='stream_hg')
334 self.config.add_view(self.hg_stream(), route_name='stream_hg')
323 self.config.add_view(self.git_stream(), route_name='stream_git')
335 self.config.add_view(self.git_stream(), route_name='stream_git')
@@ -338,11 +350,11 b' class HTTPApplication(object):'
338 def wsgi_app(self):
350 def wsgi_app(self):
339 return self.config.make_wsgi_app()
351 return self.config.make_wsgi_app()
340
352
341 def vcs_view(self, request):
353 def _vcs_view_params(self, request):
342 remote = self._remotes[request.matchdict['backend']]
354 remote = self._remotes[request.matchdict['backend']]
343 payload = msgpack.unpackb(request.body, use_list=True)
355 payload = msgpack.unpackb(request.body, use_list=True)
344 method = payload.get('method')
356 method = payload.get('method')
345 params = payload.get('params')
357 params = payload['params']
346 wire = params.get('wire')
358 wire = params.get('wire')
347 args = params.get('args')
359 args = params.get('args')
348 kwargs = params.get('kwargs')
360 kwargs = params.get('kwargs')
@@ -354,6 +366,7 b' class HTTPApplication(object):'
354 except KeyError:
366 except KeyError:
355 pass
367 pass
356 args.insert(0, wire)
368 args.insert(0, wire)
369 repo_state_uid = wire.get('repo_state_uid') if wire else None
357
370
358 # NOTE(marcink): trading complexity for slight performance
371 # NOTE(marcink): trading complexity for slight performance
359 if log.isEnabledFor(logging.DEBUG):
372 if log.isEnabledFor(logging.DEBUG):
@@ -365,10 +378,16 b' class HTTPApplication(object):'
365 else:
378 else:
366 call_args = args[1:]
379 call_args = args[1:]
367
380
368 repo_state_uid = wire.get('repo_state_uid') if wire else None
381 log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
369 log.debug('method called:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
370 method, call_args, kwargs, context_uid, repo_state_uid)
382 method, call_args, kwargs, context_uid, repo_state_uid)
371
383
384 return payload, remote, method, args, kwargs
385
386 def vcs_view(self, request):
387
388 payload, remote, method, args, kwargs = self._vcs_view_params(request)
389 payload_id = payload.get('id')
390
372 try:
391 try:
373 resp = getattr(remote, method)(*args, **kwargs)
392 resp = getattr(remote, method)(*args, **kwargs)
374 except Exception as e:
393 except Exception as e:
@@ -395,7 +414,7 b' class HTTPApplication(object):'
395 type_ = None
414 type_ = None
396
415
397 resp = {
416 resp = {
398 'id': payload.get('id'),
417 'id': payload_id,
399 'error': {
418 'error': {
400 'message': e.message,
419 'message': e.message,
401 'traceback': tb_info,
420 'traceback': tb_info,
@@ -410,12 +429,36 b' class HTTPApplication(object):'
410 pass
429 pass
411 else:
430 else:
412 resp = {
431 resp = {
413 'id': payload.get('id'),
432 'id': payload_id,
414 'result': resp
433 'result': resp
415 }
434 }
416
435
417 return resp
436 return resp
418
437
438 def vcs_stream_view(self, request):
439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
440 # this method has a stream: marker we remove it here
441 method = method.split('stream:')[-1]
442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
443
444 try:
445 resp = getattr(remote, method)(*args, **kwargs)
446 except Exception as e:
447 raise
448
449 def get_chunked_data(method_resp):
450 stream = StringIO(method_resp)
451 while 1:
452 chunk = stream.read(chunk_size)
453 if not chunk:
454 break
455 yield chunk
456
457 response = Response(app_iter=get_chunked_data(resp))
458 response.content_type = 'application/octet-stream'
459
460 return response
461
419 def status_view(self, request):
462 def status_view(self, request):
420 import vcsserver
463 import vcsserver
421 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
464 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
General Comments 0
You need to be logged in to leave comments. Login now