##// END OF EJS Templates
vcsserver: added streaming interface for streaming remote attributes
dan -
r768:304a5413 default
parent child Browse files
Show More
@@ -25,6 +25,7 b' import wsgiref.util'
25 25 import traceback
26 26 import tempfile
27 27 from itertools import chain
28 from cStringIO import StringIO
28 29
29 30 import simplejson as json
30 31 import msgpack
@@ -32,7 +33,9 b' from pyramid.config import Configurator'
32 33 from pyramid.settings import asbool, aslist
33 34 from pyramid.wsgi import wsgiapp
34 35 from pyramid.compat import configparser
36 from pyramid.response import Response
35 37
38 from vcsserver.utils import safe_int
36 39
37 40 log = logging.getLogger(__name__)
38 41
@@ -114,8 +117,8 b' def _string_setting(settings, name, defa'
114 117
115 118
116 119 class VCS(object):
117 def __init__(self, locale=None, cache_config=None):
118 self.locale = locale
120 def __init__(self, locale_conf=None, cache_config=None):
121 self.locale = locale_conf
119 122 self.cache_config = cache_config
120 123 self._configure_locale()
121 124
@@ -233,7 +236,7 b' class HTTPApplication(object):'
233 236 self.config.include('vcsserver.lib.rc_cache')
234 237
235 238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
236 vcs = VCS(locale=settings_locale, cache_config=settings)
239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
237 240 self._remotes = {
238 241 'hg': vcs._hg_remote,
239 242 'git': vcs._git_remote,
@@ -307,7 +310,14 b' class HTTPApplication(object):'
307 310 self.config.add_route('status', '/status')
308 311 self.config.add_route('hg_proxy', '/proxy/hg')
309 312 self.config.add_route('git_proxy', '/proxy/git')
313
314 # rpc methods
310 315 self.config.add_route('vcs', '/{backend}')
316
317 # streaming rpc remote methods
318 self.config.add_route('vcs_stream', '/{backend}/stream')
319
320 # vcs operations clone/push as streaming
311 321 self.config.add_route('stream_git', '/stream/git/*repo_name')
312 322 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
313 323
@@ -318,6 +328,8 b' class HTTPApplication(object):'
318 328 self.config.add_view(self.git_proxy(), route_name='git_proxy')
319 329 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
320 330 vcs_view=self._remotes)
331 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
332 vcs_view=self._remotes)
321 333
322 334 self.config.add_view(self.hg_stream(), route_name='stream_hg')
323 335 self.config.add_view(self.git_stream(), route_name='stream_git')
@@ -338,11 +350,11 b' class HTTPApplication(object):'
338 350 def wsgi_app(self):
339 351 return self.config.make_wsgi_app()
340 352
341 def vcs_view(self, request):
353 def _vcs_view_params(self, request):
342 354 remote = self._remotes[request.matchdict['backend']]
343 355 payload = msgpack.unpackb(request.body, use_list=True)
344 356 method = payload.get('method')
345 params = payload.get('params')
357 params = payload['params']
346 358 wire = params.get('wire')
347 359 args = params.get('args')
348 360 kwargs = params.get('kwargs')
@@ -354,6 +366,7 b' class HTTPApplication(object):'
354 366 except KeyError:
355 367 pass
356 368 args.insert(0, wire)
369 repo_state_uid = wire.get('repo_state_uid') if wire else None
357 370
358 371 # NOTE(marcink): trading complexity for slight performance
359 372 if log.isEnabledFor(logging.DEBUG):
@@ -365,10 +378,16 b' class HTTPApplication(object):'
365 378 else:
366 379 call_args = args[1:]
367 380
368 repo_state_uid = wire.get('repo_state_uid') if wire else None
369 log.debug('method called:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
381 log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
370 382 method, call_args, kwargs, context_uid, repo_state_uid)
371 383
384 return payload, remote, method, args, kwargs
385
386 def vcs_view(self, request):
387
388 payload, remote, method, args, kwargs = self._vcs_view_params(request)
389 payload_id = payload.get('id')
390
372 391 try:
373 392 resp = getattr(remote, method)(*args, **kwargs)
374 393 except Exception as e:
@@ -395,7 +414,7 b' class HTTPApplication(object):'
395 414 type_ = None
396 415
397 416 resp = {
398 'id': payload.get('id'),
417 'id': payload_id,
399 418 'error': {
400 419 'message': e.message,
401 420 'traceback': tb_info,
@@ -410,12 +429,36 b' class HTTPApplication(object):'
410 429 pass
411 430 else:
412 431 resp = {
413 'id': payload.get('id'),
432 'id': payload_id,
414 433 'result': resp
415 434 }
416 435
417 436 return resp
418 437
438 def vcs_stream_view(self, request):
439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
440 # this method has a stream: marker we remove it here
441 method = method.split('stream:')[-1]
442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
443
444 try:
445 resp = getattr(remote, method)(*args, **kwargs)
446 except Exception as e:
447 raise
448
449 def get_chunked_data(method_resp):
450 stream = StringIO(method_resp)
451 while 1:
452 chunk = stream.read(chunk_size)
453 if not chunk:
454 break
455 yield chunk
456
457 response = Response(app_iter=get_chunked_data(resp))
458 response.content_type = 'application/octet-stream'
459
460 return response
461
419 462 def status_view(self, request):
420 463 import vcsserver
421 464 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
General Comments 0
You need to be logged in to leave comments. Login now