##// END OF EJS Templates
vcsserver: added streaming interface for streaming remote attributes
dan -
r768:304a5413 default
parent child Browse files
Show More
@@ -1,624 +1,667 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2019 RhodeCode GmbH
2 # Copyright (C) 2014-2019 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
18 import os
19 import sys
19 import sys
20 import base64
20 import base64
21 import locale
21 import locale
22 import logging
22 import logging
23 import uuid
23 import uuid
24 import wsgiref.util
24 import wsgiref.util
25 import traceback
25 import traceback
26 import tempfile
26 import tempfile
27 from itertools import chain
27 from itertools import chain
28 from cStringIO import StringIO
28
29
29 import simplejson as json
30 import simplejson as json
30 import msgpack
31 import msgpack
31 from pyramid.config import Configurator
32 from pyramid.config import Configurator
32 from pyramid.settings import asbool, aslist
33 from pyramid.settings import asbool, aslist
33 from pyramid.wsgi import wsgiapp
34 from pyramid.wsgi import wsgiapp
34 from pyramid.compat import configparser
35 from pyramid.compat import configparser
36 from pyramid.response import Response
35
37
38 from vcsserver.utils import safe_int
36
39
37 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
38
41
39 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
42 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
40 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
43 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
41
44
42 try:
45 try:
43 locale.setlocale(locale.LC_ALL, '')
46 locale.setlocale(locale.LC_ALL, '')
44 except locale.Error as e:
47 except locale.Error as e:
45 log.error(
48 log.error(
46 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
49 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
47 os.environ['LC_ALL'] = 'C'
50 os.environ['LC_ALL'] = 'C'
48
51
49 import vcsserver
52 import vcsserver
50 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
53 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
51 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
54 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
52 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
55 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
53 from vcsserver.echo_stub.echo_app import EchoApp
56 from vcsserver.echo_stub.echo_app import EchoApp
54 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
57 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
55 from vcsserver.lib.exc_tracking import store_exception
58 from vcsserver.lib.exc_tracking import store_exception
56 from vcsserver.server import VcsServer
59 from vcsserver.server import VcsServer
57
60
58 try:
61 try:
59 from vcsserver.git import GitFactory, GitRemote
62 from vcsserver.git import GitFactory, GitRemote
60 except ImportError:
63 except ImportError:
61 GitFactory = None
64 GitFactory = None
62 GitRemote = None
65 GitRemote = None
63
66
64 try:
67 try:
65 from vcsserver.hg import MercurialFactory, HgRemote
68 from vcsserver.hg import MercurialFactory, HgRemote
66 except ImportError:
69 except ImportError:
67 MercurialFactory = None
70 MercurialFactory = None
68 HgRemote = None
71 HgRemote = None
69
72
70 try:
73 try:
71 from vcsserver.svn import SubversionFactory, SvnRemote
74 from vcsserver.svn import SubversionFactory, SvnRemote
72 except ImportError:
75 except ImportError:
73 SubversionFactory = None
76 SubversionFactory = None
74 SvnRemote = None
77 SvnRemote = None
75
78
76
79
77 def _is_request_chunked(environ):
80 def _is_request_chunked(environ):
78 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
81 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
79 return stream
82 return stream
80
83
81
84
82 def _int_setting(settings, name, default):
85 def _int_setting(settings, name, default):
83 settings[name] = int(settings.get(name, default))
86 settings[name] = int(settings.get(name, default))
84 return settings[name]
87 return settings[name]
85
88
86
89
87 def _bool_setting(settings, name, default):
90 def _bool_setting(settings, name, default):
88 input_val = settings.get(name, default)
91 input_val = settings.get(name, default)
89 if isinstance(input_val, unicode):
92 if isinstance(input_val, unicode):
90 input_val = input_val.encode('utf8')
93 input_val = input_val.encode('utf8')
91 settings[name] = asbool(input_val)
94 settings[name] = asbool(input_val)
92 return settings[name]
95 return settings[name]
93
96
94
97
95 def _list_setting(settings, name, default):
98 def _list_setting(settings, name, default):
96 raw_value = settings.get(name, default)
99 raw_value = settings.get(name, default)
97
100
98 # Otherwise we assume it uses pyramids space/newline separation.
101 # Otherwise we assume it uses pyramids space/newline separation.
99 settings[name] = aslist(raw_value)
102 settings[name] = aslist(raw_value)
100 return settings[name]
103 return settings[name]
101
104
102
105
103 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
106 def _string_setting(settings, name, default, lower=True, default_when_empty=False):
104 value = settings.get(name, default)
107 value = settings.get(name, default)
105
108
106 if default_when_empty and not value:
109 if default_when_empty and not value:
107 # use default value when value is empty
110 # use default value when value is empty
108 value = default
111 value = default
109
112
110 if lower:
113 if lower:
111 value = value.lower()
114 value = value.lower()
112 settings[name] = value
115 settings[name] = value
113 return settings[name]
116 return settings[name]
114
117
115
118
116 class VCS(object):
119 class VCS(object):
117 def __init__(self, locale=None, cache_config=None):
120 def __init__(self, locale_conf=None, cache_config=None):
118 self.locale = locale
121 self.locale = locale_conf
119 self.cache_config = cache_config
122 self.cache_config = cache_config
120 self._configure_locale()
123 self._configure_locale()
121
124
122 if GitFactory and GitRemote:
125 if GitFactory and GitRemote:
123 git_factory = GitFactory()
126 git_factory = GitFactory()
124 self._git_remote = GitRemote(git_factory)
127 self._git_remote = GitRemote(git_factory)
125 else:
128 else:
126 log.info("Git client import failed")
129 log.info("Git client import failed")
127
130
128 if MercurialFactory and HgRemote:
131 if MercurialFactory and HgRemote:
129 hg_factory = MercurialFactory()
132 hg_factory = MercurialFactory()
130 self._hg_remote = HgRemote(hg_factory)
133 self._hg_remote = HgRemote(hg_factory)
131 else:
134 else:
132 log.info("Mercurial client import failed")
135 log.info("Mercurial client import failed")
133
136
134 if SubversionFactory and SvnRemote:
137 if SubversionFactory and SvnRemote:
135 svn_factory = SubversionFactory()
138 svn_factory = SubversionFactory()
136
139
137 # hg factory is used for svn url validation
140 # hg factory is used for svn url validation
138 hg_factory = MercurialFactory()
141 hg_factory = MercurialFactory()
139 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
142 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
140 else:
143 else:
141 log.info("Subversion client import failed")
144 log.info("Subversion client import failed")
142
145
143 self._vcsserver = VcsServer()
146 self._vcsserver = VcsServer()
144
147
145 def _configure_locale(self):
148 def _configure_locale(self):
146 if self.locale:
149 if self.locale:
147 log.info('Settings locale: `LC_ALL` to %s', self.locale)
150 log.info('Settings locale: `LC_ALL` to %s', self.locale)
148 else:
151 else:
149 log.info(
152 log.info(
150 'Configuring locale subsystem based on environment variables')
153 'Configuring locale subsystem based on environment variables')
151 try:
154 try:
152 # If self.locale is the empty string, then the locale
155 # If self.locale is the empty string, then the locale
153 # module will use the environment variables. See the
156 # module will use the environment variables. See the
154 # documentation of the package `locale`.
157 # documentation of the package `locale`.
155 locale.setlocale(locale.LC_ALL, self.locale)
158 locale.setlocale(locale.LC_ALL, self.locale)
156
159
157 language_code, encoding = locale.getlocale()
160 language_code, encoding = locale.getlocale()
158 log.info(
161 log.info(
159 'Locale set to language code "%s" with encoding "%s".',
162 'Locale set to language code "%s" with encoding "%s".',
160 language_code, encoding)
163 language_code, encoding)
161 except locale.Error:
164 except locale.Error:
162 log.exception(
165 log.exception(
163 'Cannot set locale, not configuring the locale system')
166 'Cannot set locale, not configuring the locale system')
164
167
165
168
166 class WsgiProxy(object):
169 class WsgiProxy(object):
167 def __init__(self, wsgi):
170 def __init__(self, wsgi):
168 self.wsgi = wsgi
171 self.wsgi = wsgi
169
172
170 def __call__(self, environ, start_response):
173 def __call__(self, environ, start_response):
171 input_data = environ['wsgi.input'].read()
174 input_data = environ['wsgi.input'].read()
172 input_data = msgpack.unpackb(input_data)
175 input_data = msgpack.unpackb(input_data)
173
176
174 error = None
177 error = None
175 try:
178 try:
176 data, status, headers = self.wsgi.handle(
179 data, status, headers = self.wsgi.handle(
177 input_data['environment'], input_data['input_data'],
180 input_data['environment'], input_data['input_data'],
178 *input_data['args'], **input_data['kwargs'])
181 *input_data['args'], **input_data['kwargs'])
179 except Exception as e:
182 except Exception as e:
180 data, status, headers = [], None, None
183 data, status, headers = [], None, None
181 error = {
184 error = {
182 'message': str(e),
185 'message': str(e),
183 '_vcs_kind': getattr(e, '_vcs_kind', None)
186 '_vcs_kind': getattr(e, '_vcs_kind', None)
184 }
187 }
185
188
186 start_response(200, {})
189 start_response(200, {})
187 return self._iterator(error, status, headers, data)
190 return self._iterator(error, status, headers, data)
188
191
189 def _iterator(self, error, status, headers, data):
192 def _iterator(self, error, status, headers, data):
190 initial_data = [
193 initial_data = [
191 error,
194 error,
192 status,
195 status,
193 headers,
196 headers,
194 ]
197 ]
195
198
196 for d in chain(initial_data, data):
199 for d in chain(initial_data, data):
197 yield msgpack.packb(d)
200 yield msgpack.packb(d)
198
201
199
202
200 def not_found(request):
203 def not_found(request):
201 return {'status': '404 NOT FOUND'}
204 return {'status': '404 NOT FOUND'}
202
205
203
206
204 class VCSViewPredicate(object):
207 class VCSViewPredicate(object):
205 def __init__(self, val, config):
208 def __init__(self, val, config):
206 self.remotes = val
209 self.remotes = val
207
210
208 def text(self):
211 def text(self):
209 return 'vcs view method = %s' % (self.remotes.keys(),)
212 return 'vcs view method = %s' % (self.remotes.keys(),)
210
213
211 phash = text
214 phash = text
212
215
213 def __call__(self, context, request):
216 def __call__(self, context, request):
214 """
217 """
215 View predicate that returns true if given backend is supported by
218 View predicate that returns true if given backend is supported by
216 defined remotes.
219 defined remotes.
217 """
220 """
218 backend = request.matchdict.get('backend')
221 backend = request.matchdict.get('backend')
219 return backend in self.remotes
222 return backend in self.remotes
220
223
221
224
222 class HTTPApplication(object):
225 class HTTPApplication(object):
223 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
226 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
224
227
225 remote_wsgi = remote_wsgi
228 remote_wsgi = remote_wsgi
226 _use_echo_app = False
229 _use_echo_app = False
227
230
228 def __init__(self, settings=None, global_config=None):
231 def __init__(self, settings=None, global_config=None):
229 self._sanitize_settings_and_apply_defaults(settings)
232 self._sanitize_settings_and_apply_defaults(settings)
230
233
231 self.config = Configurator(settings=settings)
234 self.config = Configurator(settings=settings)
232 self.global_config = global_config
235 self.global_config = global_config
233 self.config.include('vcsserver.lib.rc_cache')
236 self.config.include('vcsserver.lib.rc_cache')
234
237
235 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
238 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
236 vcs = VCS(locale=settings_locale, cache_config=settings)
239 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
237 self._remotes = {
240 self._remotes = {
238 'hg': vcs._hg_remote,
241 'hg': vcs._hg_remote,
239 'git': vcs._git_remote,
242 'git': vcs._git_remote,
240 'svn': vcs._svn_remote,
243 'svn': vcs._svn_remote,
241 'server': vcs._vcsserver,
244 'server': vcs._vcsserver,
242 }
245 }
243 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
246 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
244 self._use_echo_app = True
247 self._use_echo_app = True
245 log.warning("Using EchoApp for VCS operations.")
248 log.warning("Using EchoApp for VCS operations.")
246 self.remote_wsgi = remote_wsgi_stub
249 self.remote_wsgi = remote_wsgi_stub
247
250
248 self._configure_settings(global_config, settings)
251 self._configure_settings(global_config, settings)
249 self._configure()
252 self._configure()
250
253
251 def _configure_settings(self, global_config, app_settings):
254 def _configure_settings(self, global_config, app_settings):
252 """
255 """
253 Configure the settings module.
256 Configure the settings module.
254 """
257 """
255 settings_merged = global_config.copy()
258 settings_merged = global_config.copy()
256 settings_merged.update(app_settings)
259 settings_merged.update(app_settings)
257
260
258 git_path = app_settings.get('git_path', None)
261 git_path = app_settings.get('git_path', None)
259 if git_path:
262 if git_path:
260 settings.GIT_EXECUTABLE = git_path
263 settings.GIT_EXECUTABLE = git_path
261 binary_dir = app_settings.get('core.binary_dir', None)
264 binary_dir = app_settings.get('core.binary_dir', None)
262 if binary_dir:
265 if binary_dir:
263 settings.BINARY_DIR = binary_dir
266 settings.BINARY_DIR = binary_dir
264
267
265 # Store the settings to make them available to other modules.
268 # Store the settings to make them available to other modules.
266 vcsserver.PYRAMID_SETTINGS = settings_merged
269 vcsserver.PYRAMID_SETTINGS = settings_merged
267 vcsserver.CONFIG = settings_merged
270 vcsserver.CONFIG = settings_merged
268
271
269 def _sanitize_settings_and_apply_defaults(self, settings):
272 def _sanitize_settings_and_apply_defaults(self, settings):
270 temp_store = tempfile.gettempdir()
273 temp_store = tempfile.gettempdir()
271 default_cache_dir = os.path.join(temp_store, 'rc_cache')
274 default_cache_dir = os.path.join(temp_store, 'rc_cache')
272
275
273 # save default, cache dir, and use it for all backends later.
276 # save default, cache dir, and use it for all backends later.
274 default_cache_dir = _string_setting(
277 default_cache_dir = _string_setting(
275 settings,
278 settings,
276 'cache_dir',
279 'cache_dir',
277 default_cache_dir, lower=False, default_when_empty=True)
280 default_cache_dir, lower=False, default_when_empty=True)
278
281
279 # ensure we have our dir created
282 # ensure we have our dir created
280 if not os.path.isdir(default_cache_dir):
283 if not os.path.isdir(default_cache_dir):
281 os.makedirs(default_cache_dir, mode=0o755)
284 os.makedirs(default_cache_dir, mode=0o755)
282
285
283 # exception store cache
286 # exception store cache
284 _string_setting(
287 _string_setting(
285 settings,
288 settings,
286 'exception_tracker.store_path',
289 'exception_tracker.store_path',
287 temp_store, lower=False, default_when_empty=True)
290 temp_store, lower=False, default_when_empty=True)
288
291
289 # repo_object cache
292 # repo_object cache
290 _string_setting(
293 _string_setting(
291 settings,
294 settings,
292 'rc_cache.repo_object.backend',
295 'rc_cache.repo_object.backend',
293 'dogpile.cache.rc.memory_lru')
296 'dogpile.cache.rc.memory_lru')
294 _int_setting(
297 _int_setting(
295 settings,
298 settings,
296 'rc_cache.repo_object.expiration_time',
299 'rc_cache.repo_object.expiration_time',
297 300)
300 300)
298 _int_setting(
301 _int_setting(
299 settings,
302 settings,
300 'rc_cache.repo_object.max_size',
303 'rc_cache.repo_object.max_size',
301 1024)
304 1024)
302
305
303 def _configure(self):
306 def _configure(self):
304 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
307 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
305
308
306 self.config.add_route('service', '/_service')
309 self.config.add_route('service', '/_service')
307 self.config.add_route('status', '/status')
310 self.config.add_route('status', '/status')
308 self.config.add_route('hg_proxy', '/proxy/hg')
311 self.config.add_route('hg_proxy', '/proxy/hg')
309 self.config.add_route('git_proxy', '/proxy/git')
312 self.config.add_route('git_proxy', '/proxy/git')
313
314 # rpc methods
310 self.config.add_route('vcs', '/{backend}')
315 self.config.add_route('vcs', '/{backend}')
316
317 # streaming rpc remote methods
318 self.config.add_route('vcs_stream', '/{backend}/stream')
319
320 # vcs operations clone/push as streaming
311 self.config.add_route('stream_git', '/stream/git/*repo_name')
321 self.config.add_route('stream_git', '/stream/git/*repo_name')
312 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
322 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
313
323
314 self.config.add_view(self.status_view, route_name='status', renderer='json')
324 self.config.add_view(self.status_view, route_name='status', renderer='json')
315 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
325 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
316
326
317 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
327 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
318 self.config.add_view(self.git_proxy(), route_name='git_proxy')
328 self.config.add_view(self.git_proxy(), route_name='git_proxy')
319 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
329 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
320 vcs_view=self._remotes)
330 vcs_view=self._remotes)
331 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
332 vcs_view=self._remotes)
321
333
322 self.config.add_view(self.hg_stream(), route_name='stream_hg')
334 self.config.add_view(self.hg_stream(), route_name='stream_hg')
323 self.config.add_view(self.git_stream(), route_name='stream_git')
335 self.config.add_view(self.git_stream(), route_name='stream_git')
324
336
325 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
337 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
326
338
327 self.config.add_notfound_view(not_found, renderer='json')
339 self.config.add_notfound_view(not_found, renderer='json')
328
340
329 self.config.add_view(self.handle_vcs_exception, context=Exception)
341 self.config.add_view(self.handle_vcs_exception, context=Exception)
330
342
331 self.config.add_tween(
343 self.config.add_tween(
332 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
344 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
333 )
345 )
334 self.config.add_request_method(
346 self.config.add_request_method(
335 'vcsserver.lib.request_counter.get_request_counter',
347 'vcsserver.lib.request_counter.get_request_counter',
336 'request_count')
348 'request_count')
337
349
338 def wsgi_app(self):
350 def wsgi_app(self):
339 return self.config.make_wsgi_app()
351 return self.config.make_wsgi_app()
340
352
341 def vcs_view(self, request):
353 def _vcs_view_params(self, request):
342 remote = self._remotes[request.matchdict['backend']]
354 remote = self._remotes[request.matchdict['backend']]
343 payload = msgpack.unpackb(request.body, use_list=True)
355 payload = msgpack.unpackb(request.body, use_list=True)
344 method = payload.get('method')
356 method = payload.get('method')
345 params = payload.get('params')
357 params = payload['params']
346 wire = params.get('wire')
358 wire = params.get('wire')
347 args = params.get('args')
359 args = params.get('args')
348 kwargs = params.get('kwargs')
360 kwargs = params.get('kwargs')
349 context_uid = None
361 context_uid = None
350
362
351 if wire:
363 if wire:
352 try:
364 try:
353 wire['context'] = context_uid = uuid.UUID(wire['context'])
365 wire['context'] = context_uid = uuid.UUID(wire['context'])
354 except KeyError:
366 except KeyError:
355 pass
367 pass
356 args.insert(0, wire)
368 args.insert(0, wire)
369 repo_state_uid = wire.get('repo_state_uid') if wire else None
357
370
358 # NOTE(marcink): trading complexity for slight performance
371 # NOTE(marcink): trading complexity for slight performance
359 if log.isEnabledFor(logging.DEBUG):
372 if log.isEnabledFor(logging.DEBUG):
360 no_args_methods = [
373 no_args_methods = [
361 'archive_repo'
374 'archive_repo'
362 ]
375 ]
363 if method in no_args_methods:
376 if method in no_args_methods:
364 call_args = ''
377 call_args = ''
365 else:
378 else:
366 call_args = args[1:]
379 call_args = args[1:]
367
380
368 repo_state_uid = wire.get('repo_state_uid') if wire else None
381 log.debug('method requested:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
369 log.debug('method called:%s with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
370 method, call_args, kwargs, context_uid, repo_state_uid)
382 method, call_args, kwargs, context_uid, repo_state_uid)
371
383
384 return payload, remote, method, args, kwargs
385
386 def vcs_view(self, request):
387
388 payload, remote, method, args, kwargs = self._vcs_view_params(request)
389 payload_id = payload.get('id')
390
372 try:
391 try:
373 resp = getattr(remote, method)(*args, **kwargs)
392 resp = getattr(remote, method)(*args, **kwargs)
374 except Exception as e:
393 except Exception as e:
375 exc_info = list(sys.exc_info())
394 exc_info = list(sys.exc_info())
376 exc_type, exc_value, exc_traceback = exc_info
395 exc_type, exc_value, exc_traceback = exc_info
377
396
378 org_exc = getattr(e, '_org_exc', None)
397 org_exc = getattr(e, '_org_exc', None)
379 org_exc_name = None
398 org_exc_name = None
380 org_exc_tb = ''
399 org_exc_tb = ''
381 if org_exc:
400 if org_exc:
382 org_exc_name = org_exc.__class__.__name__
401 org_exc_name = org_exc.__class__.__name__
383 org_exc_tb = getattr(e, '_org_exc_tb', '')
402 org_exc_tb = getattr(e, '_org_exc_tb', '')
384 # replace our "faked" exception with our org
403 # replace our "faked" exception with our org
385 exc_info[0] = org_exc.__class__
404 exc_info[0] = org_exc.__class__
386 exc_info[1] = org_exc
405 exc_info[1] = org_exc
387
406
388 store_exception(id(exc_info), exc_info)
407 store_exception(id(exc_info), exc_info)
389
408
390 tb_info = ''.join(
409 tb_info = ''.join(
391 traceback.format_exception(exc_type, exc_value, exc_traceback))
410 traceback.format_exception(exc_type, exc_value, exc_traceback))
392
411
393 type_ = e.__class__.__name__
412 type_ = e.__class__.__name__
394 if type_ not in self.ALLOWED_EXCEPTIONS:
413 if type_ not in self.ALLOWED_EXCEPTIONS:
395 type_ = None
414 type_ = None
396
415
397 resp = {
416 resp = {
398 'id': payload.get('id'),
417 'id': payload_id,
399 'error': {
418 'error': {
400 'message': e.message,
419 'message': e.message,
401 'traceback': tb_info,
420 'traceback': tb_info,
402 'org_exc': org_exc_name,
421 'org_exc': org_exc_name,
403 'org_exc_tb': org_exc_tb,
422 'org_exc_tb': org_exc_tb,
404 'type': type_
423 'type': type_
405 }
424 }
406 }
425 }
407 try:
426 try:
408 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
427 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
409 except AttributeError:
428 except AttributeError:
410 pass
429 pass
411 else:
430 else:
412 resp = {
431 resp = {
413 'id': payload.get('id'),
432 'id': payload_id,
414 'result': resp
433 'result': resp
415 }
434 }
416
435
417 return resp
436 return resp
418
437
438 def vcs_stream_view(self, request):
439 payload, remote, method, args, kwargs = self._vcs_view_params(request)
440 # this method has a stream: marker we remove it here
441 method = method.split('stream:')[-1]
442 chunk_size = safe_int(payload.get('chunk_size')) or 4096
443
444 try:
445 resp = getattr(remote, method)(*args, **kwargs)
446 except Exception as e:
447 raise
448
449 def get_chunked_data(method_resp):
450 stream = StringIO(method_resp)
451 while 1:
452 chunk = stream.read(chunk_size)
453 if not chunk:
454 break
455 yield chunk
456
457 response = Response(app_iter=get_chunked_data(resp))
458 response.content_type = 'application/octet-stream'
459
460 return response
461
419 def status_view(self, request):
462 def status_view(self, request):
420 import vcsserver
463 import vcsserver
421 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
464 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__,
422 'pid': os.getpid()}
465 'pid': os.getpid()}
423
466
424 def service_view(self, request):
467 def service_view(self, request):
425 import vcsserver
468 import vcsserver
426
469
427 payload = msgpack.unpackb(request.body, use_list=True)
470 payload = msgpack.unpackb(request.body, use_list=True)
428
471
429 try:
472 try:
430 path = self.global_config['__file__']
473 path = self.global_config['__file__']
431 config = configparser.ConfigParser()
474 config = configparser.ConfigParser()
432 config.read(path)
475 config.read(path)
433 parsed_ini = config
476 parsed_ini = config
434 if parsed_ini.has_section('server:main'):
477 if parsed_ini.has_section('server:main'):
435 parsed_ini = dict(parsed_ini.items('server:main'))
478 parsed_ini = dict(parsed_ini.items('server:main'))
436 except Exception:
479 except Exception:
437 log.exception('Failed to read .ini file for display')
480 log.exception('Failed to read .ini file for display')
438 parsed_ini = {}
481 parsed_ini = {}
439
482
440 resp = {
483 resp = {
441 'id': payload.get('id'),
484 'id': payload.get('id'),
442 'result': dict(
485 'result': dict(
443 version=vcsserver.__version__,
486 version=vcsserver.__version__,
444 config=parsed_ini,
487 config=parsed_ini,
445 payload=payload,
488 payload=payload,
446 )
489 )
447 }
490 }
448 return resp
491 return resp
449
492
450 def _msgpack_renderer_factory(self, info):
493 def _msgpack_renderer_factory(self, info):
451 def _render(value, system):
494 def _render(value, system):
452 request = system.get('request')
495 request = system.get('request')
453 if request is not None:
496 if request is not None:
454 response = request.response
497 response = request.response
455 ct = response.content_type
498 ct = response.content_type
456 if ct == response.default_content_type:
499 if ct == response.default_content_type:
457 response.content_type = 'application/x-msgpack'
500 response.content_type = 'application/x-msgpack'
458 return msgpack.packb(value)
501 return msgpack.packb(value)
459 return _render
502 return _render
460
503
461 def set_env_from_config(self, environ, config):
504 def set_env_from_config(self, environ, config):
462 dict_conf = {}
505 dict_conf = {}
463 try:
506 try:
464 for elem in config:
507 for elem in config:
465 if elem[0] == 'rhodecode':
508 if elem[0] == 'rhodecode':
466 dict_conf = json.loads(elem[2])
509 dict_conf = json.loads(elem[2])
467 break
510 break
468 except Exception:
511 except Exception:
469 log.exception('Failed to fetch SCM CONFIG')
512 log.exception('Failed to fetch SCM CONFIG')
470 return
513 return
471
514
472 username = dict_conf.get('username')
515 username = dict_conf.get('username')
473 if username:
516 if username:
474 environ['REMOTE_USER'] = username
517 environ['REMOTE_USER'] = username
475 # mercurial specific, some extension api rely on this
518 # mercurial specific, some extension api rely on this
476 environ['HGUSER'] = username
519 environ['HGUSER'] = username
477
520
478 ip = dict_conf.get('ip')
521 ip = dict_conf.get('ip')
479 if ip:
522 if ip:
480 environ['REMOTE_HOST'] = ip
523 environ['REMOTE_HOST'] = ip
481
524
482 if _is_request_chunked(environ):
525 if _is_request_chunked(environ):
483 # set the compatibility flag for webob
526 # set the compatibility flag for webob
484 environ['wsgi.input_terminated'] = True
527 environ['wsgi.input_terminated'] = True
485
528
486 def hg_proxy(self):
529 def hg_proxy(self):
487 @wsgiapp
530 @wsgiapp
488 def _hg_proxy(environ, start_response):
531 def _hg_proxy(environ, start_response):
489 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
532 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
490 return app(environ, start_response)
533 return app(environ, start_response)
491 return _hg_proxy
534 return _hg_proxy
492
535
493 def git_proxy(self):
536 def git_proxy(self):
494 @wsgiapp
537 @wsgiapp
495 def _git_proxy(environ, start_response):
538 def _git_proxy(environ, start_response):
496 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
539 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
497 return app(environ, start_response)
540 return app(environ, start_response)
498 return _git_proxy
541 return _git_proxy
499
542
500 def hg_stream(self):
543 def hg_stream(self):
501 if self._use_echo_app:
544 if self._use_echo_app:
502 @wsgiapp
545 @wsgiapp
503 def _hg_stream(environ, start_response):
546 def _hg_stream(environ, start_response):
504 app = EchoApp('fake_path', 'fake_name', None)
547 app = EchoApp('fake_path', 'fake_name', None)
505 return app(environ, start_response)
548 return app(environ, start_response)
506 return _hg_stream
549 return _hg_stream
507 else:
550 else:
508 @wsgiapp
551 @wsgiapp
509 def _hg_stream(environ, start_response):
552 def _hg_stream(environ, start_response):
510 log.debug('http-app: handling hg stream')
553 log.debug('http-app: handling hg stream')
511 repo_path = environ['HTTP_X_RC_REPO_PATH']
554 repo_path = environ['HTTP_X_RC_REPO_PATH']
512 repo_name = environ['HTTP_X_RC_REPO_NAME']
555 repo_name = environ['HTTP_X_RC_REPO_NAME']
513 packed_config = base64.b64decode(
556 packed_config = base64.b64decode(
514 environ['HTTP_X_RC_REPO_CONFIG'])
557 environ['HTTP_X_RC_REPO_CONFIG'])
515 config = msgpack.unpackb(packed_config)
558 config = msgpack.unpackb(packed_config)
516 app = scm_app.create_hg_wsgi_app(
559 app = scm_app.create_hg_wsgi_app(
517 repo_path, repo_name, config)
560 repo_path, repo_name, config)
518
561
519 # Consistent path information for hgweb
562 # Consistent path information for hgweb
520 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
563 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
521 environ['REPO_NAME'] = repo_name
564 environ['REPO_NAME'] = repo_name
522 self.set_env_from_config(environ, config)
565 self.set_env_from_config(environ, config)
523
566
524 log.debug('http-app: starting app handler '
567 log.debug('http-app: starting app handler '
525 'with %s and process request', app)
568 'with %s and process request', app)
526 return app(environ, ResponseFilter(start_response))
569 return app(environ, ResponseFilter(start_response))
527 return _hg_stream
570 return _hg_stream
528
571
529 def git_stream(self):
572 def git_stream(self):
530 if self._use_echo_app:
573 if self._use_echo_app:
531 @wsgiapp
574 @wsgiapp
532 def _git_stream(environ, start_response):
575 def _git_stream(environ, start_response):
533 app = EchoApp('fake_path', 'fake_name', None)
576 app = EchoApp('fake_path', 'fake_name', None)
534 return app(environ, start_response)
577 return app(environ, start_response)
535 return _git_stream
578 return _git_stream
536 else:
579 else:
537 @wsgiapp
580 @wsgiapp
538 def _git_stream(environ, start_response):
581 def _git_stream(environ, start_response):
539 log.debug('http-app: handling git stream')
582 log.debug('http-app: handling git stream')
540 repo_path = environ['HTTP_X_RC_REPO_PATH']
583 repo_path = environ['HTTP_X_RC_REPO_PATH']
541 repo_name = environ['HTTP_X_RC_REPO_NAME']
584 repo_name = environ['HTTP_X_RC_REPO_NAME']
542 packed_config = base64.b64decode(
585 packed_config = base64.b64decode(
543 environ['HTTP_X_RC_REPO_CONFIG'])
586 environ['HTTP_X_RC_REPO_CONFIG'])
544 config = msgpack.unpackb(packed_config)
587 config = msgpack.unpackb(packed_config)
545
588
546 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
589 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
547 self.set_env_from_config(environ, config)
590 self.set_env_from_config(environ, config)
548
591
549 content_type = environ.get('CONTENT_TYPE', '')
592 content_type = environ.get('CONTENT_TYPE', '')
550
593
551 path = environ['PATH_INFO']
594 path = environ['PATH_INFO']
552 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
595 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
553 log.debug(
596 log.debug(
554 'LFS: Detecting if request `%s` is LFS server path based '
597 'LFS: Detecting if request `%s` is LFS server path based '
555 'on content type:`%s`, is_lfs:%s',
598 'on content type:`%s`, is_lfs:%s',
556 path, content_type, is_lfs_request)
599 path, content_type, is_lfs_request)
557
600
558 if not is_lfs_request:
601 if not is_lfs_request:
559 # fallback detection by path
602 # fallback detection by path
560 if GIT_LFS_PROTO_PAT.match(path):
603 if GIT_LFS_PROTO_PAT.match(path):
561 is_lfs_request = True
604 is_lfs_request = True
562 log.debug(
605 log.debug(
563 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
606 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
564 path, is_lfs_request)
607 path, is_lfs_request)
565
608
566 if is_lfs_request:
609 if is_lfs_request:
567 app = scm_app.create_git_lfs_wsgi_app(
610 app = scm_app.create_git_lfs_wsgi_app(
568 repo_path, repo_name, config)
611 repo_path, repo_name, config)
569 else:
612 else:
570 app = scm_app.create_git_wsgi_app(
613 app = scm_app.create_git_wsgi_app(
571 repo_path, repo_name, config)
614 repo_path, repo_name, config)
572
615
573 log.debug('http-app: starting app handler '
616 log.debug('http-app: starting app handler '
574 'with %s and process request', app)
617 'with %s and process request', app)
575
618
576 return app(environ, start_response)
619 return app(environ, start_response)
577
620
578 return _git_stream
621 return _git_stream
579
622
580 def handle_vcs_exception(self, exception, request):
623 def handle_vcs_exception(self, exception, request):
581 _vcs_kind = getattr(exception, '_vcs_kind', '')
624 _vcs_kind = getattr(exception, '_vcs_kind', '')
582 if _vcs_kind == 'repo_locked':
625 if _vcs_kind == 'repo_locked':
583 # Get custom repo-locked status code if present.
626 # Get custom repo-locked status code if present.
584 status_code = request.headers.get('X-RC-Locked-Status-Code')
627 status_code = request.headers.get('X-RC-Locked-Status-Code')
585 return HTTPRepoLocked(
628 return HTTPRepoLocked(
586 title=exception.message, status_code=status_code)
629 title=exception.message, status_code=status_code)
587
630
588 elif _vcs_kind == 'repo_branch_protected':
631 elif _vcs_kind == 'repo_branch_protected':
589 # Get custom repo-branch-protected status code if present.
632 # Get custom repo-branch-protected status code if present.
590 return HTTPRepoBranchProtected(title=exception.message)
633 return HTTPRepoBranchProtected(title=exception.message)
591
634
592 exc_info = request.exc_info
635 exc_info = request.exc_info
593 store_exception(id(exc_info), exc_info)
636 store_exception(id(exc_info), exc_info)
594
637
595 traceback_info = 'unavailable'
638 traceback_info = 'unavailable'
596 if request.exc_info:
639 if request.exc_info:
597 exc_type, exc_value, exc_tb = request.exc_info
640 exc_type, exc_value, exc_tb = request.exc_info
598 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
641 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
599
642
600 log.error(
643 log.error(
601 'error occurred handling this request for path: %s, \n tb: %s',
644 'error occurred handling this request for path: %s, \n tb: %s',
602 request.path, traceback_info)
645 request.path, traceback_info)
603 raise exception
646 raise exception
604
647
605
648
606 class ResponseFilter(object):
649 class ResponseFilter(object):
607
650
608 def __init__(self, start_response):
651 def __init__(self, start_response):
609 self._start_response = start_response
652 self._start_response = start_response
610
653
611 def __call__(self, status, response_headers, exc_info=None):
654 def __call__(self, status, response_headers, exc_info=None):
612 headers = tuple(
655 headers = tuple(
613 (h, v) for h, v in response_headers
656 (h, v) for h, v in response_headers
614 if not wsgiref.util.is_hop_by_hop(h))
657 if not wsgiref.util.is_hop_by_hop(h))
615 return self._start_response(status, headers, exc_info)
658 return self._start_response(status, headers, exc_info)
616
659
617
660
618 def main(global_config, **settings):
661 def main(global_config, **settings):
619 if MercurialFactory:
662 if MercurialFactory:
620 hgpatches.patch_largefiles_capabilities()
663 hgpatches.patch_largefiles_capabilities()
621 hgpatches.patch_subrepo_type_mapping()
664 hgpatches.patch_subrepo_type_mapping()
622
665
623 app = HTTPApplication(settings=settings, global_config=global_config)
666 app = HTTPApplication(settings=settings, global_config=global_config)
624 return app.wsgi_app()
667 return app.wsgi_app()
General Comments 0
You need to be logged in to leave comments. Login now