##// END OF EJS Templates
exc-tracker: store API based exceptions and fix prefixes to no use _
marcink -
r3335:3f5d13e1 default
parent child Browse files
Show More
@@ -1,542 +1,548 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import inspect
22 22 import itertools
23 23 import logging
24 import sys
24 25 import types
25 26 import fnmatch
26 27
27 28 import decorator
28 29 import venusian
29 30 from collections import OrderedDict
30 31
31 32 from pyramid.exceptions import ConfigurationError
32 33 from pyramid.renderers import render
33 34 from pyramid.response import Response
34 35 from pyramid.httpexceptions import HTTPNotFound
35 36
36 37 from rhodecode.api.exc import (
37 38 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
38 39 from rhodecode.apps._base import TemplateArgs
39 40 from rhodecode.lib.auth import AuthUser
40 41 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
42 from rhodecode.lib.exc_tracking import store_exception
41 43 from rhodecode.lib.ext_json import json
42 44 from rhodecode.lib.utils2 import safe_str
43 45 from rhodecode.lib.plugins.utils import get_plugin_settings
44 46 from rhodecode.model.db import User, UserApiKeys
45 47
46 48 log = logging.getLogger(__name__)
47 49
48 50 DEFAULT_RENDERER = 'jsonrpc_renderer'
49 51 DEFAULT_URL = '/_admin/apiv2'
50 52
51 53
52 54 def find_methods(jsonrpc_methods, pattern):
53 55 matches = OrderedDict()
54 56 if not isinstance(pattern, (list, tuple)):
55 57 pattern = [pattern]
56 58
57 59 for single_pattern in pattern:
58 60 for method_name, method in jsonrpc_methods.items():
59 61 if fnmatch.fnmatch(method_name, single_pattern):
60 62 matches[method_name] = method
61 63 return matches
62 64
63 65
64 66 class ExtJsonRenderer(object):
65 67 """
66 68 Custom renderer that mkaes use of our ext_json lib
67 69
68 70 """
69 71
70 72 def __init__(self, serializer=json.dumps, **kw):
71 73 """ Any keyword arguments will be passed to the ``serializer``
72 74 function."""
73 75 self.serializer = serializer
74 76 self.kw = kw
75 77
76 78 def __call__(self, info):
77 79 """ Returns a plain JSON-encoded string with content-type
78 80 ``application/json``. The content-type may be overridden by
79 81 setting ``request.response.content_type``."""
80 82
81 83 def _render(value, system):
82 84 request = system.get('request')
83 85 if request is not None:
84 86 response = request.response
85 87 ct = response.content_type
86 88 if ct == response.default_content_type:
87 89 response.content_type = 'application/json'
88 90
89 91 return self.serializer(value, **self.kw)
90 92
91 93 return _render
92 94
93 95
94 96 def jsonrpc_response(request, result):
95 97 rpc_id = getattr(request, 'rpc_id', None)
96 98 response = request.response
97 99
98 100 # store content_type before render is called
99 101 ct = response.content_type
100 102
101 103 ret_value = ''
102 104 if rpc_id:
103 105 ret_value = {
104 106 'id': rpc_id,
105 107 'result': result,
106 108 'error': None,
107 109 }
108 110
109 111 # fetch deprecation warnings, and store it inside results
110 112 deprecation = getattr(request, 'rpc_deprecation', None)
111 113 if deprecation:
112 114 ret_value['DEPRECATION_WARNING'] = deprecation
113 115
114 116 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
115 117 response.body = safe_str(raw_body, response.charset)
116 118
117 119 if ct == response.default_content_type:
118 120 response.content_type = 'application/json'
119 121
120 122 return response
121 123
122 124
123 125 def jsonrpc_error(request, message, retid=None, code=None):
124 126 """
125 127 Generate a Response object with a JSON-RPC error body
126 128
127 129 :param code:
128 130 :param retid:
129 131 :param message:
130 132 """
131 133 err_dict = {'id': retid, 'result': None, 'error': message}
132 134 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
133 135 return Response(
134 136 body=body,
135 137 status=code,
136 138 content_type='application/json'
137 139 )
138 140
139 141
140 142 def exception_view(exc, request):
141 143 rpc_id = getattr(request, 'rpc_id', None)
142 144
143 fault_message = 'undefined error'
144 145 if isinstance(exc, JSONRPCError):
145 146 fault_message = safe_str(exc.message)
146 147 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
147 148 elif isinstance(exc, JSONRPCValidationError):
148 149 colander_exc = exc.colander_exception
149 150 # TODO(marcink): think maybe of nicer way to serialize errors ?
150 151 fault_message = colander_exc.asdict()
151 152 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
152 153 elif isinstance(exc, JSONRPCForbidden):
153 154 fault_message = 'Access was denied to this resource.'
154 155 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
155 156 elif isinstance(exc, HTTPNotFound):
156 157 method = request.rpc_method
157 158 log.debug('json-rpc method `%s` not found in list of '
158 159 'api calls: %s, rpc_id:%s',
159 160 method, request.registry.jsonrpc_methods.keys(), rpc_id)
160 161
161 162 similar = 'none'
162 163 try:
163 164 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
164 165 similar_found = find_methods(
165 166 request.registry.jsonrpc_methods, similar_paterns)
166 167 similar = ', '.join(similar_found.keys()) or similar
167 168 except Exception:
168 169 # make the whole above block safe
169 170 pass
170 171
171 172 fault_message = "No such method: {}. Similar methods: {}".format(
172 173 method, similar)
174 else:
175 fault_message = 'undefined error'
176 exc_info = exc.exc_info()
177 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
173 178
174 179 return jsonrpc_error(request, fault_message, rpc_id)
175 180
176 181
177 182 def request_view(request):
178 183 """
179 184 Main request handling method. It handles all logic to call a specific
180 185 exposed method
181 186 """
182 187
183 188 # check if we can find this session using api_key, get_by_auth_token
184 189 # search not expired tokens only
185 190
186 191 try:
187 192 api_user = User.get_by_auth_token(request.rpc_api_key)
188 193
189 194 if api_user is None:
190 195 return jsonrpc_error(
191 196 request, retid=request.rpc_id, message='Invalid API KEY')
192 197
193 198 if not api_user.active:
194 199 return jsonrpc_error(
195 200 request, retid=request.rpc_id,
196 201 message='Request from this user not allowed')
197 202
198 203 # check if we are allowed to use this IP
199 204 auth_u = AuthUser(
200 205 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
201 206 if not auth_u.ip_allowed:
202 207 return jsonrpc_error(
203 208 request, retid=request.rpc_id,
204 209 message='Request from IP:%s not allowed' % (
205 210 request.rpc_ip_addr,))
206 211 else:
207 212 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
208 213
209 214 # register our auth-user
210 215 request.rpc_user = auth_u
211 216 request.environ['rc_auth_user_id'] = auth_u.user_id
212 217
213 218 # now check if token is valid for API
214 219 auth_token = request.rpc_api_key
215 220 token_match = api_user.authenticate_by_token(
216 221 auth_token, roles=[UserApiKeys.ROLE_API])
217 222 invalid_token = not token_match
218 223
219 224 log.debug('Checking if API KEY is valid with proper role')
220 225 if invalid_token:
221 226 return jsonrpc_error(
222 227 request, retid=request.rpc_id,
223 228 message='API KEY invalid or, has bad role for an API call')
224 229
225 230 except Exception:
226 231 log.exception('Error on API AUTH')
227 232 return jsonrpc_error(
228 233 request, retid=request.rpc_id, message='Invalid API KEY')
229 234
230 235 method = request.rpc_method
231 236 func = request.registry.jsonrpc_methods[method]
232 237
233 238 # now that we have a method, add request._req_params to
234 239 # self.kargs and dispatch control to WGIController
235 240 argspec = inspect.getargspec(func)
236 241 arglist = argspec[0]
237 242 defaults = map(type, argspec[3] or [])
238 243 default_empty = types.NotImplementedType
239 244
240 245 # kw arguments required by this method
241 246 func_kwargs = dict(itertools.izip_longest(
242 247 reversed(arglist), reversed(defaults), fillvalue=default_empty))
243 248
244 249 # This attribute will need to be first param of a method that uses
245 250 # api_key, which is translated to instance of user at that name
246 251 user_var = 'apiuser'
247 252 request_var = 'request'
248 253
249 254 for arg in [user_var, request_var]:
250 255 if arg not in arglist:
251 256 return jsonrpc_error(
252 257 request,
253 258 retid=request.rpc_id,
254 259 message='This method [%s] does not support '
255 260 'required parameter `%s`' % (func.__name__, arg))
256 261
257 262 # get our arglist and check if we provided them as args
258 263 for arg, default in func_kwargs.items():
259 264 if arg in [user_var, request_var]:
260 265 # user_var and request_var are pre-hardcoded parameters and we
261 266 # don't need to do any translation
262 267 continue
263 268
264 269 # skip the required param check if it's default value is
265 270 # NotImplementedType (default_empty)
266 271 if default == default_empty and arg not in request.rpc_params:
267 272 return jsonrpc_error(
268 273 request,
269 274 retid=request.rpc_id,
270 275 message=('Missing non optional `%s` arg in JSON DATA' % arg)
271 276 )
272 277
273 278 # sanitize extra passed arguments
274 279 for k in request.rpc_params.keys()[:]:
275 280 if k not in func_kwargs:
276 281 del request.rpc_params[k]
277 282
278 283 call_params = request.rpc_params
279 284 call_params.update({
280 285 'request': request,
281 286 'apiuser': auth_u
282 287 })
283 288
284 289 # register some common functions for usage
285 290 attach_context_attributes(
286 291 TemplateArgs(), request, request.rpc_user.user_id)
287 292
288 293 try:
289 294 ret_value = func(**call_params)
290 295 return jsonrpc_response(request, ret_value)
291 296 except JSONRPCBaseError:
292 297 raise
293 298 except Exception:
294 299 log.exception('Unhandled exception occurred on api call: %s', func)
295 return jsonrpc_error(request, retid=request.rpc_id,
296 message='Internal server error')
300 exc_info = sys.exc_info()
301 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
302 return jsonrpc_error(
303 request, retid=request.rpc_id, message='Internal server error')
297 304
298 305
299 306 def setup_request(request):
300 307 """
301 308 Parse a JSON-RPC request body. It's used inside the predicates method
302 309 to validate and bootstrap requests for usage in rpc calls.
303 310
304 311 We need to raise JSONRPCError here if we want to return some errors back to
305 312 user.
306 313 """
307 314
308 315 log.debug('Executing setup request: %r', request)
309 316 request.rpc_ip_addr = get_ip_addr(request.environ)
310 317 # TODO(marcink): deprecate GET at some point
311 318 if request.method not in ['POST', 'GET']:
312 319 log.debug('unsupported request method "%s"', request.method)
313 320 raise JSONRPCError(
314 321 'unsupported request method "%s". Please use POST' % request.method)
315 322
316 323 if 'CONTENT_LENGTH' not in request.environ:
317 324 log.debug("No Content-Length")
318 325 raise JSONRPCError("Empty body, No Content-Length in request")
319 326
320 327 else:
321 328 length = request.environ['CONTENT_LENGTH']
322 329 log.debug('Content-Length: %s', length)
323 330
324 331 if length == 0:
325 332 log.debug("Content-Length is 0")
326 333 raise JSONRPCError("Content-Length is 0")
327 334
328 335 raw_body = request.body
329 336 try:
330 337 json_body = json.loads(raw_body)
331 338 except ValueError as e:
332 339 # catch JSON errors Here
333 340 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
334 341
335 342 request.rpc_id = json_body.get('id')
336 343 request.rpc_method = json_body.get('method')
337 344
338 345 # check required base parameters
339 346 try:
340 347 api_key = json_body.get('api_key')
341 348 if not api_key:
342 349 api_key = json_body.get('auth_token')
343 350
344 351 if not api_key:
345 352 raise KeyError('api_key or auth_token')
346 353
347 354 # TODO(marcink): support passing in token in request header
348 355
349 356 request.rpc_api_key = api_key
350 357 request.rpc_id = json_body['id']
351 358 request.rpc_method = json_body['method']
352 359 request.rpc_params = json_body['args'] \
353 360 if isinstance(json_body['args'], dict) else {}
354 361
355 362 log.debug('method: %s, params: %s', request.rpc_method, request.rpc_params)
356 363 except KeyError as e:
357 364 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
358 365
359 366 log.debug('setup complete, now handling method:%s rpcid:%s',
360 367 request.rpc_method, request.rpc_id, )
361 368
362 369
363 370 class RoutePredicate(object):
364 371 def __init__(self, val, config):
365 372 self.val = val
366 373
367 374 def text(self):
368 375 return 'jsonrpc route = %s' % self.val
369 376
370 377 phash = text
371 378
372 379 def __call__(self, info, request):
373 380 if self.val:
374 381 # potentially setup and bootstrap our call
375 382 setup_request(request)
376 383
377 384 # Always return True so that even if it isn't a valid RPC it
378 385 # will fall through to the underlaying handlers like notfound_view
379 386 return True
380 387
381 388
382 389 class NotFoundPredicate(object):
383 390 def __init__(self, val, config):
384 391 self.val = val
385 392 self.methods = config.registry.jsonrpc_methods
386 393
387 394 def text(self):
388 395 return 'jsonrpc method not found = {}.'.format(self.val)
389 396
390 397 phash = text
391 398
392 399 def __call__(self, info, request):
393 400 return hasattr(request, 'rpc_method')
394 401
395 402
396 403 class MethodPredicate(object):
397 404 def __init__(self, val, config):
398 405 self.method = val
399 406
400 407 def text(self):
401 408 return 'jsonrpc method = %s' % self.method
402 409
403 410 phash = text
404 411
405 412 def __call__(self, context, request):
406 413 # we need to explicitly return False here, so pyramid doesn't try to
407 414 # execute our view directly. We need our main handler to execute things
408 415 return getattr(request, 'rpc_method') == self.method
409 416
410 417
411 418 def add_jsonrpc_method(config, view, **kwargs):
412 419 # pop the method name
413 420 method = kwargs.pop('method', None)
414 421
415 422 if method is None:
416 423 raise ConfigurationError(
417 'Cannot register a JSON-RPC method without specifying the '
418 '"method"')
424 'Cannot register a JSON-RPC method without specifying the "method"')
419 425
420 426 # we define custom predicate, to enable to detect conflicting methods,
421 427 # those predicates are kind of "translation" from the decorator variables
422 428 # to internal predicates names
423 429
424 430 kwargs['jsonrpc_method'] = method
425 431
426 432 # register our view into global view store for validation
427 433 config.registry.jsonrpc_methods[method] = view
428 434
429 435 # we're using our main request_view handler, here, so each method
430 436 # has a unified handler for itself
431 437 config.add_view(request_view, route_name='apiv2', **kwargs)
432 438
433 439
434 440 class jsonrpc_method(object):
435 441 """
436 442 decorator that works similar to @add_view_config decorator,
437 443 but tailored for our JSON RPC
438 444 """
439 445
440 446 venusian = venusian # for testing injection
441 447
442 448 def __init__(self, method=None, **kwargs):
443 449 self.method = method
444 450 self.kwargs = kwargs
445 451
446 452 def __call__(self, wrapped):
447 453 kwargs = self.kwargs.copy()
448 454 kwargs['method'] = self.method or wrapped.__name__
449 455 depth = kwargs.pop('_depth', 0)
450 456
451 457 def callback(context, name, ob):
452 458 config = context.config.with_package(info.module)
453 459 config.add_jsonrpc_method(view=ob, **kwargs)
454 460
455 461 info = venusian.attach(wrapped, callback, category='pyramid',
456 462 depth=depth + 1)
457 463 if info.scope == 'class':
458 464 # ensure that attr is set if decorating a class method
459 465 kwargs.setdefault('attr', wrapped.__name__)
460 466
461 467 kwargs['_info'] = info.codeinfo # fbo action_method
462 468 return wrapped
463 469
464 470
465 471 class jsonrpc_deprecated_method(object):
466 472 """
467 473 Marks method as deprecated, adds log.warning, and inject special key to
468 474 the request variable to mark method as deprecated.
469 475 Also injects special docstring that extract_docs will catch to mark
470 476 method as deprecated.
471 477
472 478 :param use_method: specify which method should be used instead of
473 479 the decorated one
474 480
475 481 Use like::
476 482
477 483 @jsonrpc_method()
478 484 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
479 485 def old_func(request, apiuser, arg1, arg2):
480 486 ...
481 487 """
482 488
483 489 def __init__(self, use_method, deprecated_at_version):
484 490 self.use_method = use_method
485 491 self.deprecated_at_version = deprecated_at_version
486 492 self.deprecated_msg = ''
487 493
488 494 def __call__(self, func):
489 495 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
490 496 method=self.use_method)
491 497
492 498 docstring = """\n
493 499 .. deprecated:: {version}
494 500
495 501 {deprecation_message}
496 502
497 503 {original_docstring}
498 504 """
499 505 func.__doc__ = docstring.format(
500 506 version=self.deprecated_at_version,
501 507 deprecation_message=self.deprecated_msg,
502 508 original_docstring=func.__doc__)
503 509 return decorator.decorator(self.__wrapper, func)
504 510
505 511 def __wrapper(self, func, *fargs, **fkwargs):
506 512 log.warning('DEPRECATED API CALL on function %s, please '
507 513 'use `%s` instead', func, self.use_method)
508 514 # alter function docstring to mark as deprecated, this is picked up
509 515 # via fabric file that generates API DOC.
510 516 result = func(*fargs, **fkwargs)
511 517
512 518 request = fargs[0]
513 519 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
514 520 return result
515 521
516 522
517 523 def includeme(config):
518 524 plugin_module = 'rhodecode.api'
519 525 plugin_settings = get_plugin_settings(
520 526 plugin_module, config.registry.settings)
521 527
522 528 if not hasattr(config.registry, 'jsonrpc_methods'):
523 529 config.registry.jsonrpc_methods = OrderedDict()
524 530
525 531 # match filter by given method only
526 532 config.add_view_predicate('jsonrpc_method', MethodPredicate)
533 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
527 534
528 535 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
529 536 serializer=json.dumps, indent=4))
530 537 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
531 538
532 539 config.add_route_predicate(
533 540 'jsonrpc_call', RoutePredicate)
534 541
535 542 config.add_route(
536 543 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
537 544
538 545 config.scan(plugin_module, ignore='rhodecode.api.tests')
539 546 # register some exception handling view
540 547 config.add_view(exception_view, context=JSONRPCBaseError)
541 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
542 548 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
@@ -1,302 +1,302 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31 import importlib
32 32
33 33 from celery import Celery
34 34 from celery import signals
35 35 from celery import Task
36 36 from celery import exceptions # pragma: no cover
37 37 from kombu.serialization import register
38 38 from pyramid.threadlocal import get_current_request
39 39
40 40 import rhodecode
41 41
42 42 from rhodecode.lib.auth import AuthUser
43 43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
44 44 from rhodecode.lib.ext_json import json
45 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 46 from rhodecode.lib.utils2 import str2bool
47 47 from rhodecode.model import meta
48 48
49 49
50 50 register('json_ext', json.dumps, json.loads,
51 51 content_type='application/x-json-ext',
52 52 content_encoding='utf-8')
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 def add_preload_arguments(parser):
58 58 parser.add_argument(
59 59 '--ini', default=None,
60 60 help='Path to ini configuration file.'
61 61 )
62 62 parser.add_argument(
63 63 '--ini-var', default=None,
64 64 help='Comma separated list of key=value to pass to ini.'
65 65 )
66 66
67 67
68 68 def get_logger(obj):
69 69 custom_log = logging.getLogger(
70 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 71
72 72 if rhodecode.CELERY_ENABLED:
73 73 try:
74 74 custom_log = obj.get_logger()
75 75 except Exception:
76 76 pass
77 77
78 78 return custom_log
79 79
80 80
81 81 imports = ['rhodecode.lib.celerylib.tasks']
82 82
83 83 try:
84 84 # try if we have EE tasks available
85 85 importlib.import_module('rc_ee')
86 86 imports.append('rc_ee.lib.celerylib.tasks')
87 87 except ImportError:
88 88 pass
89 89
90 90
91 91 base_celery_config = {
92 92 'result_backend': 'rpc://',
93 93 'result_expires': 60 * 60 * 24,
94 94 'result_persistent': True,
95 95 'imports': imports,
96 96 'worker_max_tasks_per_child': 100,
97 97 'accept_content': ['json_ext'],
98 98 'task_serializer': 'json_ext',
99 99 'result_serializer': 'json_ext',
100 100 'worker_hijack_root_logger': False,
101 101 'database_table_names': {
102 102 'task': 'beat_taskmeta',
103 103 'group': 'beat_groupmeta',
104 104 }
105 105 }
106 106 # init main celery app
107 107 celery_app = Celery()
108 108 celery_app.user_options['preload'].add(add_preload_arguments)
109 109 ini_file_glob = None
110 110
111 111
112 112 @signals.setup_logging.connect
113 113 def setup_logging_callback(**kwargs):
114 114 setup_logging(ini_file_glob)
115 115
116 116
117 117 @signals.user_preload_options.connect
118 118 def on_preload_parsed(options, **kwargs):
119 119 ini_location = options['ini']
120 120 ini_vars = options['ini_var']
121 121 celery_app.conf['INI_PYRAMID'] = options['ini']
122 122
123 123 if ini_location is None:
124 124 print('You must provide the paste --ini argument')
125 125 exit(-1)
126 126
127 127 options = None
128 128 if ini_vars is not None:
129 129 options = parse_ini_vars(ini_vars)
130 130
131 131 global ini_file_glob
132 132 ini_file_glob = ini_location
133 133
134 134 log.debug('Bootstrapping RhodeCode application...')
135 135 env = bootstrap(ini_location, options=options)
136 136
137 137 setup_celery_app(
138 138 app=env['app'], root=env['root'], request=env['request'],
139 139 registry=env['registry'], closer=env['closer'],
140 140 ini_location=ini_location)
141 141
142 142 # fix the global flag even if it's disabled via .ini file because this
143 143 # is a worker code that doesn't need this to be disabled.
144 144 rhodecode.CELERY_ENABLED = True
145 145
146 146
147 147 @signals.task_success.connect
148 148 def task_success_signal(result, **kwargs):
149 149 meta.Session.commit()
150 150 closer = celery_app.conf['PYRAMID_CLOSER']
151 151 if closer:
152 152 closer()
153 153
154 154
155 155 @signals.task_retry.connect
156 156 def task_retry_signal(
157 157 request, reason, einfo, **kwargs):
158 158 meta.Session.remove()
159 159 closer = celery_app.conf['PYRAMID_CLOSER']
160 160 if closer:
161 161 closer()
162 162
163 163
164 164 @signals.task_failure.connect
165 165 def task_failure_signal(
166 166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 167 from rhodecode.lib.exc_tracking import store_exception
168 168
169 169 meta.Session.remove()
170 170
171 171 # simulate sys.exc_info()
172 172 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 store_exception(id(exc_info), exc_info, prefix='celery_rhodecode')
173 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
174 174
175 175 closer = celery_app.conf['PYRAMID_CLOSER']
176 176 if closer:
177 177 closer()
178 178
179 179
180 180 @signals.task_revoked.connect
181 181 def task_revoked_signal(
182 182 request, terminated, signum, expired, **kwargs):
183 183 closer = celery_app.conf['PYRAMID_CLOSER']
184 184 if closer:
185 185 closer()
186 186
187 187
188 188 def setup_celery_app(app, root, request, registry, closer, ini_location):
189 189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
190 190 celery_config = base_celery_config
191 191 celery_config.update({
192 192 # store celerybeat scheduler db where the .ini file is
193 193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
194 194 })
195 195 ini_settings = get_ini_config(ini_location)
196 196 log.debug('Got custom celery conf: %s', ini_settings)
197 197
198 198 celery_config.update(ini_settings)
199 199 celery_app.config_from_object(celery_config)
200 200
201 201 celery_app.conf.update({'PYRAMID_APP': app})
202 202 celery_app.conf.update({'PYRAMID_ROOT': root})
203 203 celery_app.conf.update({'PYRAMID_REQUEST': request})
204 204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
205 205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
206 206
207 207
208 208 def configure_celery(config, ini_location):
209 209 """
210 210 Helper that is called from our application creation logic. It gives
211 211 connection info into running webapp and allows execution of tasks from
212 212 RhodeCode itself
213 213 """
214 214 # store some globals into rhodecode
215 215 rhodecode.CELERY_ENABLED = str2bool(
216 216 config.registry.settings.get('use_celery'))
217 217 if rhodecode.CELERY_ENABLED:
218 218 log.info('Configuring celery based on `%s` file', ini_location)
219 219 setup_celery_app(
220 220 app=None, root=None, request=None, registry=config.registry,
221 221 closer=None, ini_location=ini_location)
222 222
223 223
224 224 def maybe_prepare_env(req):
225 225 environ = {}
226 226 try:
227 227 environ.update({
228 228 'PATH_INFO': req.environ['PATH_INFO'],
229 229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
230 230 'HTTP_HOST':
231 231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
232 232 'SERVER_NAME': req.environ['SERVER_NAME'],
233 233 'SERVER_PORT': req.environ['SERVER_PORT'],
234 234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
235 235 })
236 236 except Exception:
237 237 pass
238 238
239 239 return environ
240 240
241 241
242 242 class RequestContextTask(Task):
243 243 """
244 244 This is a celery task which will create a rhodecode app instance context
245 245 for the task, patch pyramid with the original request
246 246 that created the task and also add the user to the context.
247 247 """
248 248
249 249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
250 250 link=None, link_error=None, shadow=None, **options):
251 251 """ queue the job to run (we are in web request context here) """
252 252
253 253 req = get_current_request()
254 254
255 255 # web case
256 256 if hasattr(req, 'user'):
257 257 ip_addr = req.user.ip_addr
258 258 user_id = req.user.user_id
259 259
260 260 # api case
261 261 elif hasattr(req, 'rpc_user'):
262 262 ip_addr = req.rpc_user.ip_addr
263 263 user_id = req.rpc_user.user_id
264 264 else:
265 265 raise Exception(
266 266 'Unable to fetch required data from request: {}. \n'
267 267 'This task is required to be executed from context of '
268 268 'request in a webapp'.format(repr(req)))
269 269
270 270 if req:
271 271 # we hook into kwargs since it is the only way to pass our data to
272 272 # the celery worker
273 273 environ = maybe_prepare_env(req)
274 274 options['headers'] = options.get('headers', {})
275 275 options['headers'].update({
276 276 'rhodecode_proxy_data': {
277 277 'environ': environ,
278 278 'auth_user': {
279 279 'ip_addr': ip_addr,
280 280 'user_id': user_id
281 281 },
282 282 }
283 283 })
284 284
285 285 return super(RequestContextTask, self).apply_async(
286 286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
287 287
288 288 def __call__(self, *args, **kwargs):
289 289 """ rebuild the context and then run task on celery worker """
290 290
291 291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
292 292 if not proxy_data:
293 293 return super(RequestContextTask, self).__call__(*args, **kwargs)
294 294
295 295 log.debug('using celery proxy data to run task: %r', proxy_data)
296 296 # re-inject and register threadlocals for proper routing support
297 297 request = prepare_request(proxy_data['environ'])
298 298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
299 299 ip_addr=proxy_data['auth_user']['ip_addr'])
300 300
301 301 return super(RequestContextTask, self).__call__(*args, **kwargs)
302 302
General Comments 0
You need to be logged in to leave comments. Login now