##// END OF EJS Templates
stream: use wsgi.input_terminated because of webob changes of handling chunked encoding
marcink -
r332:02e39d96 stable
parent child Browse files
Show More
@@ -1,466 +1,476 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2017 RodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import base64
19 19 import locale
20 20 import logging
21 21 import uuid
22 22 import wsgiref.util
23 23 import traceback
24 24 from itertools import chain
25 25
26 26 import simplejson as json
27 27 import msgpack
28 28 from beaker.cache import CacheManager
29 29 from beaker.util import parse_cache_config_options
30 30 from pyramid.config import Configurator
31 31 from pyramid.wsgi import wsgiapp
32 32
33 33 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
34 34 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
35 35 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
36 36 from vcsserver.echo_stub.echo_app import EchoApp
37 37 from vcsserver.exceptions import HTTPRepoLocked
38 38 from vcsserver.server import VcsServer
39 39
40 40 try:
41 41 from vcsserver.git import GitFactory, GitRemote
42 42 except ImportError:
43 43 GitFactory = None
44 44 GitRemote = None
45 45
46 46 try:
47 47 from vcsserver.hg import MercurialFactory, HgRemote
48 48 except ImportError:
49 49 MercurialFactory = None
50 50 HgRemote = None
51 51
52 52 try:
53 53 from vcsserver.svn import SubversionFactory, SvnRemote
54 54 except ImportError:
55 55 SubversionFactory = None
56 56 SvnRemote = None
57 57
58 58 log = logging.getLogger(__name__)
59 59
60 60
61 def _is_request_chunked(environ):
62 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
63 return stream
64
65
61 66 class VCS(object):
62 67 def __init__(self, locale=None, cache_config=None):
63 68 self.locale = locale
64 69 self.cache_config = cache_config
65 70 self._configure_locale()
66 71 self._initialize_cache()
67 72
68 73 if GitFactory and GitRemote:
69 74 git_repo_cache = self.cache.get_cache_region(
70 75 'git', region='repo_object')
71 76 git_factory = GitFactory(git_repo_cache)
72 77 self._git_remote = GitRemote(git_factory)
73 78 else:
74 79 log.info("Git client import failed")
75 80
76 81 if MercurialFactory and HgRemote:
77 82 hg_repo_cache = self.cache.get_cache_region(
78 83 'hg', region='repo_object')
79 84 hg_factory = MercurialFactory(hg_repo_cache)
80 85 self._hg_remote = HgRemote(hg_factory)
81 86 else:
82 87 log.info("Mercurial client import failed")
83 88
84 89 if SubversionFactory and SvnRemote:
85 90 svn_repo_cache = self.cache.get_cache_region(
86 91 'svn', region='repo_object')
87 92 svn_factory = SubversionFactory(svn_repo_cache)
88 93 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
89 94 else:
90 95 log.info("Subversion client import failed")
91 96
92 97 self._vcsserver = VcsServer()
93 98
94 99 def _initialize_cache(self):
95 100 cache_config = parse_cache_config_options(self.cache_config)
96 101 log.info('Initializing beaker cache: %s' % cache_config)
97 102 self.cache = CacheManager(**cache_config)
98 103
99 104 def _configure_locale(self):
100 105 if self.locale:
101 106 log.info('Settings locale: `LC_ALL` to %s' % self.locale)
102 107 else:
103 108 log.info(
104 109 'Configuring locale subsystem based on environment variables')
105 110 try:
106 111 # If self.locale is the empty string, then the locale
107 112 # module will use the environment variables. See the
108 113 # documentation of the package `locale`.
109 114 locale.setlocale(locale.LC_ALL, self.locale)
110 115
111 116 language_code, encoding = locale.getlocale()
112 117 log.info(
113 118 'Locale set to language code "%s" with encoding "%s".',
114 119 language_code, encoding)
115 120 except locale.Error:
116 121 log.exception(
117 122 'Cannot set locale, not configuring the locale system')
118 123
119 124
120 125 class WsgiProxy(object):
121 126 def __init__(self, wsgi):
122 127 self.wsgi = wsgi
123 128
124 129 def __call__(self, environ, start_response):
125 130 input_data = environ['wsgi.input'].read()
126 131 input_data = msgpack.unpackb(input_data)
127 132
128 133 error = None
129 134 try:
130 135 data, status, headers = self.wsgi.handle(
131 136 input_data['environment'], input_data['input_data'],
132 137 *input_data['args'], **input_data['kwargs'])
133 138 except Exception as e:
134 139 data, status, headers = [], None, None
135 140 error = {
136 141 'message': str(e),
137 142 '_vcs_kind': getattr(e, '_vcs_kind', None)
138 143 }
139 144
140 145 start_response(200, {})
141 146 return self._iterator(error, status, headers, data)
142 147
143 148 def _iterator(self, error, status, headers, data):
144 149 initial_data = [
145 150 error,
146 151 status,
147 152 headers,
148 153 ]
149 154
150 155 for d in chain(initial_data, data):
151 156 yield msgpack.packb(d)
152 157
153 158
154 159 class HTTPApplication(object):
155 160 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
156 161
157 162 remote_wsgi = remote_wsgi
158 163 _use_echo_app = False
159 164
160 165 def __init__(self, settings=None, global_config=None):
161 166 self.config = Configurator(settings=settings)
162 167 self.global_config = global_config
163 168
164 169 locale = settings.get('locale', '') or 'en_US.UTF-8'
165 170 vcs = VCS(locale=locale, cache_config=settings)
166 171 self._remotes = {
167 172 'hg': vcs._hg_remote,
168 173 'git': vcs._git_remote,
169 174 'svn': vcs._svn_remote,
170 175 'server': vcs._vcsserver,
171 176 }
172 177 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
173 178 self._use_echo_app = True
174 179 log.warning("Using EchoApp for VCS operations.")
175 180 self.remote_wsgi = remote_wsgi_stub
176 181 self._configure_settings(settings)
177 182 self._configure()
178 183
179 184 def _configure_settings(self, app_settings):
180 185 """
181 186 Configure the settings module.
182 187 """
183 188 git_path = app_settings.get('git_path', None)
184 189 if git_path:
185 190 settings.GIT_EXECUTABLE = git_path
186 191
187 192 def _configure(self):
188 193 self.config.add_renderer(
189 194 name='msgpack',
190 195 factory=self._msgpack_renderer_factory)
191 196
192 197 self.config.add_route('service', '/_service')
193 198 self.config.add_route('status', '/status')
194 199 self.config.add_route('hg_proxy', '/proxy/hg')
195 200 self.config.add_route('git_proxy', '/proxy/git')
196 201 self.config.add_route('vcs', '/{backend}')
197 202 self.config.add_route('stream_git', '/stream/git/*repo_name')
198 203 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
199 204
200 205 self.config.add_view(
201 206 self.status_view, route_name='status', renderer='json')
202 207 self.config.add_view(
203 208 self.service_view, route_name='service', renderer='msgpack')
204 209
205 210 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
206 211 self.config.add_view(self.git_proxy(), route_name='git_proxy')
207 212 self.config.add_view(
208 213 self.vcs_view, route_name='vcs', renderer='msgpack',
209 214 custom_predicates=[self.is_vcs_view])
210 215
211 216 self.config.add_view(self.hg_stream(), route_name='stream_hg')
212 217 self.config.add_view(self.git_stream(), route_name='stream_git')
213 218
214 219 def notfound(request):
215 220 return {'status': '404 NOT FOUND'}
216 221 self.config.add_notfound_view(notfound, renderer='json')
217 222
218 223 self.config.add_view(self.handle_vcs_exception, context=Exception)
219 224
220 225 self.config.add_tween(
221 226 'vcsserver.tweens.RequestWrapperTween',
222 227 )
223 228
224 229 def wsgi_app(self):
225 230 return self.config.make_wsgi_app()
226 231
227 232 def vcs_view(self, request):
228 233 remote = self._remotes[request.matchdict['backend']]
229 234 payload = msgpack.unpackb(request.body, use_list=True)
230 235 method = payload.get('method')
231 236 params = payload.get('params')
232 237 wire = params.get('wire')
233 238 args = params.get('args')
234 239 kwargs = params.get('kwargs')
235 240 if wire:
236 241 try:
237 242 wire['context'] = uuid.UUID(wire['context'])
238 243 except KeyError:
239 244 pass
240 245 args.insert(0, wire)
241 246
242 247 log.debug('method called:%s with kwargs:%s', method, kwargs)
243 248 try:
244 249 resp = getattr(remote, method)(*args, **kwargs)
245 250 except Exception as e:
246 251 tb_info = traceback.format_exc()
247 252
248 253 type_ = e.__class__.__name__
249 254 if type_ not in self.ALLOWED_EXCEPTIONS:
250 255 type_ = None
251 256
252 257 resp = {
253 258 'id': payload.get('id'),
254 259 'error': {
255 260 'message': e.message,
256 261 'traceback': tb_info,
257 262 'type': type_
258 263 }
259 264 }
260 265 try:
261 266 resp['error']['_vcs_kind'] = e._vcs_kind
262 267 except AttributeError:
263 268 pass
264 269 else:
265 270 resp = {
266 271 'id': payload.get('id'),
267 272 'result': resp
268 273 }
269 274
270 275 return resp
271 276
272 277 def status_view(self, request):
273 278 import vcsserver
274 279 return {'status': 'OK', 'vcsserver_version': vcsserver.__version__}
275 280
276 281 def service_view(self, request):
277 282 import vcsserver
278 283 import ConfigParser as configparser
279 284
280 285 payload = msgpack.unpackb(request.body, use_list=True)
281 286
282 287 try:
283 288 path = self.global_config['__file__']
284 289 config = configparser.ConfigParser()
285 290 config.read(path)
286 291 parsed_ini = config
287 292 if parsed_ini.has_section('server:main'):
288 293 parsed_ini = dict(parsed_ini.items('server:main'))
289 294 except Exception:
290 295 log.exception('Failed to read .ini file for display')
291 296 parsed_ini = {}
292 297
293 298 resp = {
294 299 'id': payload.get('id'),
295 300 'result': dict(
296 301 version=vcsserver.__version__,
297 302 config=parsed_ini,
298 303 payload=payload,
299 304 )
300 305 }
301 306 return resp
302 307
303 308 def _msgpack_renderer_factory(self, info):
304 309 def _render(value, system):
305 310 value = msgpack.packb(value)
306 311 request = system.get('request')
307 312 if request is not None:
308 313 response = request.response
309 314 ct = response.content_type
310 315 if ct == response.default_content_type:
311 316 response.content_type = 'application/x-msgpack'
312 317 return value
313 318 return _render
314 319
315 320 def set_env_from_config(self, environ, config):
316 321 dict_conf = {}
317 322 try:
318 323 for elem in config:
319 324 if elem[0] == 'rhodecode':
320 325 dict_conf = json.loads(elem[2])
321 326 break
322 327 except Exception:
323 328 log.exception('Failed to fetch SCM CONFIG')
324 329 return
325 330
326 331 username = dict_conf.get('username')
327 332 if username:
328 333 environ['REMOTE_USER'] = username
329 334
330 335 ip = dict_conf.get('ip')
331 336 if ip:
332 337 environ['REMOTE_HOST'] = ip
333 338
339 if _is_request_chunked(environ):
340 # set the compatibility flag for webob
341 environ['wsgi.input_terminated'] = True
342
334 343 def hg_proxy(self):
335 344 @wsgiapp
336 345 def _hg_proxy(environ, start_response):
337 346 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
338 347 return app(environ, start_response)
339 348 return _hg_proxy
340 349
341 350 def git_proxy(self):
342 351 @wsgiapp
343 352 def _git_proxy(environ, start_response):
344 353 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
345 354 return app(environ, start_response)
346 355 return _git_proxy
347 356
348 357 def hg_stream(self):
349 358 if self._use_echo_app:
350 359 @wsgiapp
351 360 def _hg_stream(environ, start_response):
352 361 app = EchoApp('fake_path', 'fake_name', None)
353 362 return app(environ, start_response)
354 363 return _hg_stream
355 364 else:
356 365 @wsgiapp
357 366 def _hg_stream(environ, start_response):
358 367 log.debug('http-app: handling hg stream')
359 368 repo_path = environ['HTTP_X_RC_REPO_PATH']
360 369 repo_name = environ['HTTP_X_RC_REPO_NAME']
361 370 packed_config = base64.b64decode(
362 371 environ['HTTP_X_RC_REPO_CONFIG'])
363 372 config = msgpack.unpackb(packed_config)
364 373 app = scm_app.create_hg_wsgi_app(
365 374 repo_path, repo_name, config)
366 375
367 376 # Consistent path information for hgweb
368 377 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
369 378 environ['REPO_NAME'] = repo_name
370 379 self.set_env_from_config(environ, config)
371 380
372 381 log.debug('http-app: starting app handler '
373 382 'with %s and process request', app)
374 383 return app(environ, ResponseFilter(start_response))
375 384 return _hg_stream
376 385
377 386 def git_stream(self):
378 387 if self._use_echo_app:
379 388 @wsgiapp
380 389 def _git_stream(environ, start_response):
381 390 app = EchoApp('fake_path', 'fake_name', None)
382 391 return app(environ, start_response)
383 392 return _git_stream
384 393 else:
385 394 @wsgiapp
386 395 def _git_stream(environ, start_response):
387 396 log.debug('http-app: handling git stream')
388 397 repo_path = environ['HTTP_X_RC_REPO_PATH']
389 398 repo_name = environ['HTTP_X_RC_REPO_NAME']
390 399 packed_config = base64.b64decode(
391 400 environ['HTTP_X_RC_REPO_CONFIG'])
392 401 config = msgpack.unpackb(packed_config)
393 402
394 403 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
395 404 self.set_env_from_config(environ, config)
396 405
397 406 content_type = environ.get('CONTENT_TYPE', '')
398 407
399 408 path = environ['PATH_INFO']
400 409 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
401 410 log.debug(
402 411 'LFS: Detecting if request `%s` is LFS server path based '
403 412 'on content type:`%s`, is_lfs:%s',
404 413 path, content_type, is_lfs_request)
405 414
406 415 if not is_lfs_request:
407 416 # fallback detection by path
408 417 if GIT_LFS_PROTO_PAT.match(path):
409 418 is_lfs_request = True
410 419 log.debug(
411 420 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
412 421 path, is_lfs_request)
413 422
414 423 if is_lfs_request:
415 424 app = scm_app.create_git_lfs_wsgi_app(
416 425 repo_path, repo_name, config)
417 426 else:
418 427 app = scm_app.create_git_wsgi_app(
419 428 repo_path, repo_name, config)
420 429
421 430 log.debug('http-app: starting app handler '
422 431 'with %s and process request', app)
432
423 433 return app(environ, start_response)
424 434
425 435 return _git_stream
426 436
427 437 def is_vcs_view(self, context, request):
428 438 """
429 439 View predicate that returns true if given backend is supported by
430 440 defined remotes.
431 441 """
432 442 backend = request.matchdict.get('backend')
433 443 return backend in self._remotes
434 444
435 445 def handle_vcs_exception(self, exception, request):
436 446 _vcs_kind = getattr(exception, '_vcs_kind', '')
437 447 if _vcs_kind == 'repo_locked':
438 448 # Get custom repo-locked status code if present.
439 449 status_code = request.headers.get('X-RC-Locked-Status-Code')
440 450 return HTTPRepoLocked(
441 451 title=exception.message, status_code=status_code)
442 452
443 453 # Re-raise exception if we can not handle it.
444 454 log.exception(
445 455 'error occurred handling this request for path: %s', request.path)
446 456 raise exception
447 457
448 458
449 459 class ResponseFilter(object):
450 460
451 461 def __init__(self, start_response):
452 462 self._start_response = start_response
453 463
454 464 def __call__(self, status, response_headers, exc_info=None):
455 465 headers = tuple(
456 466 (h, v) for h, v in response_headers
457 467 if not wsgiref.util.is_hop_by_hop(h))
458 468 return self._start_response(status, headers, exc_info)
459 469
460 470
461 471 def main(global_config, **settings):
462 472 if MercurialFactory:
463 473 hgpatches.patch_largefiles_capabilities()
464 474 hgpatches.patch_subrepo_type_mapping()
465 475 app = HTTPApplication(settings=settings, global_config=global_config)
466 476 return app.wsgi_app()
General Comments 0
You need to be logged in to leave comments. Login now