##// END OF EJS Templates
metrics: expose more metrics via statsd...
super-admin -
r4801:80a69a8f default
parent child Browse files
Show More
@@ -1,564 +1,568 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import itertools
22 22 import logging
23 23 import sys
24 24 import types
25 25 import fnmatch
26 26
27 27 import decorator
28 28 import venusian
29 29 from collections import OrderedDict
30 30
31 31 from pyramid.exceptions import ConfigurationError
32 32 from pyramid.renderers import render
33 33 from pyramid.response import Response
34 34 from pyramid.httpexceptions import HTTPNotFound
35 35
36 36 from rhodecode.api.exc import (
37 37 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
38 38 from rhodecode.apps._base import TemplateArgs
39 39 from rhodecode.lib.auth import AuthUser
40 40 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
41 41 from rhodecode.lib.exc_tracking import store_exception
42 42 from rhodecode.lib.ext_json import json
43 43 from rhodecode.lib.utils2 import safe_str
44 44 from rhodecode.lib.plugins.utils import get_plugin_settings
45 45 from rhodecode.model.db import User, UserApiKeys
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49 DEFAULT_RENDERER = 'jsonrpc_renderer'
50 50 DEFAULT_URL = '/_admin/apiv2'
51 51
52 52
53 53 def find_methods(jsonrpc_methods, pattern):
54 54 matches = OrderedDict()
55 55 if not isinstance(pattern, (list, tuple)):
56 56 pattern = [pattern]
57 57
58 58 for single_pattern in pattern:
59 59 for method_name, method in jsonrpc_methods.items():
60 60 if fnmatch.fnmatch(method_name, single_pattern):
61 61 matches[method_name] = method
62 62 return matches
63 63
64 64
65 65 class ExtJsonRenderer(object):
66 66 """
67 67 Custom renderer that mkaes use of our ext_json lib
68 68
69 69 """
70 70
71 71 def __init__(self, serializer=json.dumps, **kw):
72 72 """ Any keyword arguments will be passed to the ``serializer``
73 73 function."""
74 74 self.serializer = serializer
75 75 self.kw = kw
76 76
77 77 def __call__(self, info):
78 78 """ Returns a plain JSON-encoded string with content-type
79 79 ``application/json``. The content-type may be overridden by
80 80 setting ``request.response.content_type``."""
81 81
82 82 def _render(value, system):
83 83 request = system.get('request')
84 84 if request is not None:
85 85 response = request.response
86 86 ct = response.content_type
87 87 if ct == response.default_content_type:
88 88 response.content_type = 'application/json'
89 89
90 90 return self.serializer(value, **self.kw)
91 91
92 92 return _render
93 93
94 94
95 95 def jsonrpc_response(request, result):
96 96 rpc_id = getattr(request, 'rpc_id', None)
97 97 response = request.response
98 98
99 99 # store content_type before render is called
100 100 ct = response.content_type
101 101
102 102 ret_value = ''
103 103 if rpc_id:
104 104 ret_value = {
105 105 'id': rpc_id,
106 106 'result': result,
107 107 'error': None,
108 108 }
109 109
110 110 # fetch deprecation warnings, and store it inside results
111 111 deprecation = getattr(request, 'rpc_deprecation', None)
112 112 if deprecation:
113 113 ret_value['DEPRECATION_WARNING'] = deprecation
114 114
115 115 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
116 116 response.body = safe_str(raw_body, response.charset)
117 117
118 118 if ct == response.default_content_type:
119 119 response.content_type = 'application/json'
120 120
121 121 return response
122 122
123 123
124 124 def jsonrpc_error(request, message, retid=None, code=None, headers=None):
125 125 """
126 126 Generate a Response object with a JSON-RPC error body
127 127
128 128 :param code:
129 129 :param retid:
130 130 :param message:
131 131 """
132 132 err_dict = {'id': retid, 'result': None, 'error': message}
133 133 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
134 134
135 135 return Response(
136 136 body=body,
137 137 status=code,
138 138 content_type='application/json',
139 139 headerlist=headers
140 140 )
141 141
142 142
143 143 def exception_view(exc, request):
144 144 rpc_id = getattr(request, 'rpc_id', None)
145 145
146 146 if isinstance(exc, JSONRPCError):
147 147 fault_message = safe_str(exc.message)
148 148 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
149 149 elif isinstance(exc, JSONRPCValidationError):
150 150 colander_exc = exc.colander_exception
151 151 # TODO(marcink): think maybe of nicer way to serialize errors ?
152 152 fault_message = colander_exc.asdict()
153 153 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
154 154 elif isinstance(exc, JSONRPCForbidden):
155 155 fault_message = 'Access was denied to this resource.'
156 156 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
157 157 elif isinstance(exc, HTTPNotFound):
158 158 method = request.rpc_method
159 159 log.debug('json-rpc method `%s` not found in list of '
160 160 'api calls: %s, rpc_id:%s',
161 161 method, request.registry.jsonrpc_methods.keys(), rpc_id)
162 162
163 163 similar = 'none'
164 164 try:
165 165 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
166 166 similar_found = find_methods(
167 167 request.registry.jsonrpc_methods, similar_paterns)
168 168 similar = ', '.join(similar_found.keys()) or similar
169 169 except Exception:
170 170 # make the whole above block safe
171 171 pass
172 172
173 173 fault_message = "No such method: {}. Similar methods: {}".format(
174 174 method, similar)
175 175 else:
176 176 fault_message = 'undefined error'
177 177 exc_info = exc.exc_info()
178 178 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
179 179
180 statsd = request.registry.statsd
181 if statsd:
182 statsd.incr('rhodecode_exception', tags=["api"])
183
180 184 return jsonrpc_error(request, fault_message, rpc_id)
181 185
182 186
183 187 def request_view(request):
184 188 """
185 189 Main request handling method. It handles all logic to call a specific
186 190 exposed method
187 191 """
188 192 # cython compatible inspect
189 193 from rhodecode.config.patches import inspect_getargspec
190 194 inspect = inspect_getargspec()
191 195
192 196 # check if we can find this session using api_key, get_by_auth_token
193 197 # search not expired tokens only
194 198 try:
195 199 api_user = User.get_by_auth_token(request.rpc_api_key)
196 200
197 201 if api_user is None:
198 202 return jsonrpc_error(
199 203 request, retid=request.rpc_id, message='Invalid API KEY')
200 204
201 205 if not api_user.active:
202 206 return jsonrpc_error(
203 207 request, retid=request.rpc_id,
204 208 message='Request from this user not allowed')
205 209
206 210 # check if we are allowed to use this IP
207 211 auth_u = AuthUser(
208 212 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
209 213 if not auth_u.ip_allowed:
210 214 return jsonrpc_error(
211 215 request, retid=request.rpc_id,
212 216 message='Request from IP:%s not allowed' % (
213 217 request.rpc_ip_addr,))
214 218 else:
215 219 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
216 220
217 221 # register our auth-user
218 222 request.rpc_user = auth_u
219 223 request.environ['rc_auth_user_id'] = auth_u.user_id
220 224
221 225 # now check if token is valid for API
222 226 auth_token = request.rpc_api_key
223 227 token_match = api_user.authenticate_by_token(
224 228 auth_token, roles=[UserApiKeys.ROLE_API])
225 229 invalid_token = not token_match
226 230
227 231 log.debug('Checking if API KEY is valid with proper role')
228 232 if invalid_token:
229 233 return jsonrpc_error(
230 234 request, retid=request.rpc_id,
231 235 message='API KEY invalid or, has bad role for an API call')
232 236
233 237 except Exception:
234 238 log.exception('Error on API AUTH')
235 239 return jsonrpc_error(
236 240 request, retid=request.rpc_id, message='Invalid API KEY')
237 241
238 242 method = request.rpc_method
239 243 func = request.registry.jsonrpc_methods[method]
240 244
241 245 # now that we have a method, add request._req_params to
242 246 # self.kargs and dispatch control to WGIController
243 247 argspec = inspect.getargspec(func)
244 248 arglist = argspec[0]
245 249 defaults = map(type, argspec[3] or [])
246 250 default_empty = types.NotImplementedType
247 251
248 252 # kw arguments required by this method
249 253 func_kwargs = dict(itertools.izip_longest(
250 254 reversed(arglist), reversed(defaults), fillvalue=default_empty))
251 255
252 256 # This attribute will need to be first param of a method that uses
253 257 # api_key, which is translated to instance of user at that name
254 258 user_var = 'apiuser'
255 259 request_var = 'request'
256 260
257 261 for arg in [user_var, request_var]:
258 262 if arg not in arglist:
259 263 return jsonrpc_error(
260 264 request,
261 265 retid=request.rpc_id,
262 266 message='This method [%s] does not support '
263 267 'required parameter `%s`' % (func.__name__, arg))
264 268
265 269 # get our arglist and check if we provided them as args
266 270 for arg, default in func_kwargs.items():
267 271 if arg in [user_var, request_var]:
268 272 # user_var and request_var are pre-hardcoded parameters and we
269 273 # don't need to do any translation
270 274 continue
271 275
272 276 # skip the required param check if it's default value is
273 277 # NotImplementedType (default_empty)
274 278 if default == default_empty and arg not in request.rpc_params:
275 279 return jsonrpc_error(
276 280 request,
277 281 retid=request.rpc_id,
278 282 message=('Missing non optional `%s` arg in JSON DATA' % arg)
279 283 )
280 284
281 285 # sanitize extra passed arguments
282 286 for k in request.rpc_params.keys()[:]:
283 287 if k not in func_kwargs:
284 288 del request.rpc_params[k]
285 289
286 290 call_params = request.rpc_params
287 291 call_params.update({
288 292 'request': request,
289 293 'apiuser': auth_u
290 294 })
291 295
292 296 # register some common functions for usage
293 297 attach_context_attributes(TemplateArgs(), request, request.rpc_user.user_id)
294 298
295 299 try:
296 300 ret_value = func(**call_params)
297 301 return jsonrpc_response(request, ret_value)
298 302 except JSONRPCBaseError:
299 303 raise
300 304 except Exception:
301 305 log.exception('Unhandled exception occurred on api call: %s', func)
302 306 exc_info = sys.exc_info()
303 307 exc_id, exc_type_name = store_exception(
304 308 id(exc_info), exc_info, prefix='rhodecode-api')
305 309 error_headers = [('RhodeCode-Exception-Id', str(exc_id)),
306 310 ('RhodeCode-Exception-Type', str(exc_type_name))]
307 311 return jsonrpc_error(
308 312 request, retid=request.rpc_id, message='Internal server error',
309 313 headers=error_headers)
310 314
311 315
312 316 def setup_request(request):
313 317 """
314 318 Parse a JSON-RPC request body. It's used inside the predicates method
315 319 to validate and bootstrap requests for usage in rpc calls.
316 320
317 321 We need to raise JSONRPCError here if we want to return some errors back to
318 322 user.
319 323 """
320 324
321 325 log.debug('Executing setup request: %r', request)
322 326 request.rpc_ip_addr = get_ip_addr(request.environ)
323 327 # TODO(marcink): deprecate GET at some point
324 328 if request.method not in ['POST', 'GET']:
325 329 log.debug('unsupported request method "%s"', request.method)
326 330 raise JSONRPCError(
327 331 'unsupported request method "%s". Please use POST' % request.method)
328 332
329 333 if 'CONTENT_LENGTH' not in request.environ:
330 334 log.debug("No Content-Length")
331 335 raise JSONRPCError("Empty body, No Content-Length in request")
332 336
333 337 else:
334 338 length = request.environ['CONTENT_LENGTH']
335 339 log.debug('Content-Length: %s', length)
336 340
337 341 if length == 0:
338 342 log.debug("Content-Length is 0")
339 343 raise JSONRPCError("Content-Length is 0")
340 344
341 345 raw_body = request.body
342 346 log.debug("Loading JSON body now")
343 347 try:
344 348 json_body = json.loads(raw_body)
345 349 except ValueError as e:
346 350 # catch JSON errors Here
347 351 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
348 352
349 353 request.rpc_id = json_body.get('id')
350 354 request.rpc_method = json_body.get('method')
351 355
352 356 # check required base parameters
353 357 try:
354 358 api_key = json_body.get('api_key')
355 359 if not api_key:
356 360 api_key = json_body.get('auth_token')
357 361
358 362 if not api_key:
359 363 raise KeyError('api_key or auth_token')
360 364
361 365 # TODO(marcink): support passing in token in request header
362 366
363 367 request.rpc_api_key = api_key
364 368 request.rpc_id = json_body['id']
365 369 request.rpc_method = json_body['method']
366 370 request.rpc_params = json_body['args'] \
367 371 if isinstance(json_body['args'], dict) else {}
368 372
369 373 log.debug('method: %s, params: %.10240r', request.rpc_method, request.rpc_params)
370 374 except KeyError as e:
371 375 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
372 376
373 377 log.debug('setup complete, now handling method:%s rpcid:%s',
374 378 request.rpc_method, request.rpc_id, )
375 379
376 380
377 381 class RoutePredicate(object):
378 382 def __init__(self, val, config):
379 383 self.val = val
380 384
381 385 def text(self):
382 386 return 'jsonrpc route = %s' % self.val
383 387
384 388 phash = text
385 389
386 390 def __call__(self, info, request):
387 391 if self.val:
388 392 # potentially setup and bootstrap our call
389 393 setup_request(request)
390 394
391 395 # Always return True so that even if it isn't a valid RPC it
392 396 # will fall through to the underlaying handlers like notfound_view
393 397 return True
394 398
395 399
396 400 class NotFoundPredicate(object):
397 401 def __init__(self, val, config):
398 402 self.val = val
399 403 self.methods = config.registry.jsonrpc_methods
400 404
401 405 def text(self):
402 406 return 'jsonrpc method not found = {}.'.format(self.val)
403 407
404 408 phash = text
405 409
406 410 def __call__(self, info, request):
407 411 return hasattr(request, 'rpc_method')
408 412
409 413
410 414 class MethodPredicate(object):
411 415 def __init__(self, val, config):
412 416 self.method = val
413 417
414 418 def text(self):
415 419 return 'jsonrpc method = %s' % self.method
416 420
417 421 phash = text
418 422
419 423 def __call__(self, context, request):
420 424 # we need to explicitly return False here, so pyramid doesn't try to
421 425 # execute our view directly. We need our main handler to execute things
422 426 return getattr(request, 'rpc_method') == self.method
423 427
424 428
425 429 def add_jsonrpc_method(config, view, **kwargs):
426 430 # pop the method name
427 431 method = kwargs.pop('method', None)
428 432
429 433 if method is None:
430 434 raise ConfigurationError(
431 435 'Cannot register a JSON-RPC method without specifying the "method"')
432 436
433 437 # we define custom predicate, to enable to detect conflicting methods,
434 438 # those predicates are kind of "translation" from the decorator variables
435 439 # to internal predicates names
436 440
437 441 kwargs['jsonrpc_method'] = method
438 442
439 443 # register our view into global view store for validation
440 444 config.registry.jsonrpc_methods[method] = view
441 445
442 446 # we're using our main request_view handler, here, so each method
443 447 # has a unified handler for itself
444 448 config.add_view(request_view, route_name='apiv2', **kwargs)
445 449
446 450
447 451 class jsonrpc_method(object):
448 452 """
449 453 decorator that works similar to @add_view_config decorator,
450 454 but tailored for our JSON RPC
451 455 """
452 456
453 457 venusian = venusian # for testing injection
454 458
455 459 def __init__(self, method=None, **kwargs):
456 460 self.method = method
457 461 self.kwargs = kwargs
458 462
459 463 def __call__(self, wrapped):
460 464 kwargs = self.kwargs.copy()
461 465 kwargs['method'] = self.method or wrapped.__name__
462 466 depth = kwargs.pop('_depth', 0)
463 467
464 468 def callback(context, name, ob):
465 469 config = context.config.with_package(info.module)
466 470 config.add_jsonrpc_method(view=ob, **kwargs)
467 471
468 472 info = venusian.attach(wrapped, callback, category='pyramid',
469 473 depth=depth + 1)
470 474 if info.scope == 'class':
471 475 # ensure that attr is set if decorating a class method
472 476 kwargs.setdefault('attr', wrapped.__name__)
473 477
474 478 kwargs['_info'] = info.codeinfo # fbo action_method
475 479 return wrapped
476 480
477 481
478 482 class jsonrpc_deprecated_method(object):
479 483 """
480 484 Marks method as deprecated, adds log.warning, and inject special key to
481 485 the request variable to mark method as deprecated.
482 486 Also injects special docstring that extract_docs will catch to mark
483 487 method as deprecated.
484 488
485 489 :param use_method: specify which method should be used instead of
486 490 the decorated one
487 491
488 492 Use like::
489 493
490 494 @jsonrpc_method()
491 495 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
492 496 def old_func(request, apiuser, arg1, arg2):
493 497 ...
494 498 """
495 499
496 500 def __init__(self, use_method, deprecated_at_version):
497 501 self.use_method = use_method
498 502 self.deprecated_at_version = deprecated_at_version
499 503 self.deprecated_msg = ''
500 504
501 505 def __call__(self, func):
502 506 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
503 507 method=self.use_method)
504 508
505 509 docstring = """\n
506 510 .. deprecated:: {version}
507 511
508 512 {deprecation_message}
509 513
510 514 {original_docstring}
511 515 """
512 516 func.__doc__ = docstring.format(
513 517 version=self.deprecated_at_version,
514 518 deprecation_message=self.deprecated_msg,
515 519 original_docstring=func.__doc__)
516 520 return decorator.decorator(self.__wrapper, func)
517 521
518 522 def __wrapper(self, func, *fargs, **fkwargs):
519 523 log.warning('DEPRECATED API CALL on function %s, please '
520 524 'use `%s` instead', func, self.use_method)
521 525 # alter function docstring to mark as deprecated, this is picked up
522 526 # via fabric file that generates API DOC.
523 527 result = func(*fargs, **fkwargs)
524 528
525 529 request = fargs[0]
526 530 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
527 531 return result
528 532
529 533
530 534 def add_api_methods(config):
531 535 from rhodecode.api.views import (
532 536 deprecated_api, gist_api, pull_request_api, repo_api, repo_group_api,
533 537 server_api, search_api, testing_api, user_api, user_group_api)
534 538
535 539 config.scan('rhodecode.api.views')
536 540
537 541
538 542 def includeme(config):
539 543 plugin_module = 'rhodecode.api'
540 544 plugin_settings = get_plugin_settings(
541 545 plugin_module, config.registry.settings)
542 546
543 547 if not hasattr(config.registry, 'jsonrpc_methods'):
544 548 config.registry.jsonrpc_methods = OrderedDict()
545 549
546 550 # match filter by given method only
547 551 config.add_view_predicate('jsonrpc_method', MethodPredicate)
548 552 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
549 553
550 554 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
551 555 serializer=json.dumps, indent=4))
552 556 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
553 557
554 558 config.add_route_predicate(
555 559 'jsonrpc_call', RoutePredicate)
556 560
557 561 config.add_route(
558 562 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
559 563
560 564 # register some exception handling view
561 565 config.add_view(exception_view, context=JSONRPCBaseError)
562 566 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
563 567
564 568 add_api_methods(config)
@@ -1,306 +1,310 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --beat \
25 25 --app rhodecode.lib.celerylib.loader \
26 26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 27 --loglevel DEBUG --ini=._dev/dev.ini
28 28 """
29 29 import os
30 30 import logging
31 31 import importlib
32 32
33 33 from celery import Celery
34 34 from celery import signals
35 35 from celery import Task
36 36 from celery import exceptions # pragma: no cover
37 37 from kombu.serialization import register
38 38 from pyramid.threadlocal import get_current_request
39 39
40 40 import rhodecode
41 41
42 42 from rhodecode.lib.auth import AuthUser
43 43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
44 44 from rhodecode.lib.ext_json import json
45 45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 46 from rhodecode.lib.utils2 import str2bool
47 47 from rhodecode.model import meta
48 48
49 49
50 50 register('json_ext', json.dumps, json.loads,
51 51 content_type='application/x-json-ext',
52 52 content_encoding='utf-8')
53 53
54 54 log = logging.getLogger('celery.rhodecode.loader')
55 55
56 56
57 57 def add_preload_arguments(parser):
58 58 parser.add_argument(
59 59 '--ini', default=None,
60 60 help='Path to ini configuration file.'
61 61 )
62 62 parser.add_argument(
63 63 '--ini-var', default=None,
64 64 help='Comma separated list of key=value to pass to ini.'
65 65 )
66 66
67 67
68 68 def get_logger(obj):
69 69 custom_log = logging.getLogger(
70 70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71 71
72 72 if rhodecode.CELERY_ENABLED:
73 73 try:
74 74 custom_log = obj.get_logger()
75 75 except Exception:
76 76 pass
77 77
78 78 return custom_log
79 79
80 80
81 81 imports = ['rhodecode.lib.celerylib.tasks']
82 82
83 83 try:
84 84 # try if we have EE tasks available
85 85 importlib.import_module('rc_ee')
86 86 imports.append('rc_ee.lib.celerylib.tasks')
87 87 except ImportError:
88 88 pass
89 89
90 90
91 91 base_celery_config = {
92 92 'result_backend': 'rpc://',
93 93 'result_expires': 60 * 60 * 24,
94 94 'result_persistent': True,
95 95 'imports': imports,
96 96 'worker_max_tasks_per_child': 100,
97 97 'accept_content': ['json_ext'],
98 98 'task_serializer': 'json_ext',
99 99 'result_serializer': 'json_ext',
100 100 'worker_hijack_root_logger': False,
101 101 'database_table_names': {
102 102 'task': 'beat_taskmeta',
103 103 'group': 'beat_groupmeta',
104 104 }
105 105 }
106 106 # init main celery app
107 107 celery_app = Celery()
108 108 celery_app.user_options['preload'].add(add_preload_arguments)
109 109 ini_file_glob = None
110 110
111 111
112 112 @signals.setup_logging.connect
113 113 def setup_logging_callback(**kwargs):
114 114 setup_logging(ini_file_glob)
115 115
116 116
117 117 @signals.user_preload_options.connect
118 118 def on_preload_parsed(options, **kwargs):
119 119 ini_location = options['ini']
120 120 ini_vars = options['ini_var']
121 121 celery_app.conf['INI_PYRAMID'] = options['ini']
122 122
123 123 if ini_location is None:
124 124 print('You must provide the paste --ini argument')
125 125 exit(-1)
126 126
127 127 options = None
128 128 if ini_vars is not None:
129 129 options = parse_ini_vars(ini_vars)
130 130
131 131 global ini_file_glob
132 132 ini_file_glob = ini_location
133 133
134 134 log.debug('Bootstrapping RhodeCode application...')
135 135 env = bootstrap(ini_location, options=options)
136 136
137 137 setup_celery_app(
138 138 app=env['app'], root=env['root'], request=env['request'],
139 139 registry=env['registry'], closer=env['closer'],
140 140 ini_location=ini_location)
141 141
142 142 # fix the global flag even if it's disabled via .ini file because this
143 143 # is a worker code that doesn't need this to be disabled.
144 144 rhodecode.CELERY_ENABLED = True
145 145
146 146
147 147 @signals.task_prerun.connect
148 148 def task_prerun_signal(task_id, task, args, **kwargs):
149 149 ping_db()
150 150
151 151
152 152 @signals.task_success.connect
153 153 def task_success_signal(result, **kwargs):
154 154 meta.Session.commit()
155 155 closer = celery_app.conf['PYRAMID_CLOSER']
156 156 if closer:
157 157 closer()
158 158
159 159
160 160 @signals.task_retry.connect
161 161 def task_retry_signal(
162 162 request, reason, einfo, **kwargs):
163 163 meta.Session.remove()
164 164 closer = celery_app.conf['PYRAMID_CLOSER']
165 165 if closer:
166 166 closer()
167 167
168 168
169 169 @signals.task_failure.connect
170 170 def task_failure_signal(
171 171 task_id, exception, args, kwargs, traceback, einfo, **kargs):
172 172 from rhodecode.lib.exc_tracking import store_exception
173 from rhodecode.lib.statsd_client import StatsdClient
173 174
174 175 meta.Session.remove()
175 176
176 177 # simulate sys.exc_info()
177 178 exc_info = (einfo.type, einfo.exception, einfo.tb)
178 179 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
180 statsd = StatsdClient.statsd
181 if statsd:
182 statsd.incr('rhodecode_exception', tags=["celery"])
179 183
180 184 closer = celery_app.conf['PYRAMID_CLOSER']
181 185 if closer:
182 186 closer()
183 187
184 188
185 189 @signals.task_revoked.connect
186 190 def task_revoked_signal(
187 191 request, terminated, signum, expired, **kwargs):
188 192 closer = celery_app.conf['PYRAMID_CLOSER']
189 193 if closer:
190 194 closer()
191 195
192 196
193 197 def setup_celery_app(app, root, request, registry, closer, ini_location):
194 198 ini_dir = os.path.dirname(os.path.abspath(ini_location))
195 199 celery_config = base_celery_config
196 200 celery_config.update({
197 201 # store celerybeat scheduler db where the .ini file is
198 202 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
199 203 })
200 204 ini_settings = get_ini_config(ini_location)
201 205 log.debug('Got custom celery conf: %s', ini_settings)
202 206
203 207 celery_config.update(ini_settings)
204 208 celery_app.config_from_object(celery_config)
205 209
206 210 celery_app.conf.update({'PYRAMID_APP': app})
207 211 celery_app.conf.update({'PYRAMID_ROOT': root})
208 212 celery_app.conf.update({'PYRAMID_REQUEST': request})
209 213 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
210 214 celery_app.conf.update({'PYRAMID_CLOSER': closer})
211 215
212 216
213 217 def configure_celery(config, ini_location):
214 218 """
215 219 Helper that is called from our application creation logic. It gives
216 220 connection info into running webapp and allows execution of tasks from
217 221 RhodeCode itself
218 222 """
219 223 # store some globals into rhodecode
220 224 rhodecode.CELERY_ENABLED = str2bool(
221 225 config.registry.settings.get('use_celery'))
222 226 if rhodecode.CELERY_ENABLED:
223 227 log.info('Configuring celery based on `%s` file', ini_location)
224 228 setup_celery_app(
225 229 app=None, root=None, request=None, registry=config.registry,
226 230 closer=None, ini_location=ini_location)
227 231
228 232
229 233 def maybe_prepare_env(req):
230 234 environ = {}
231 235 try:
232 236 environ.update({
233 237 'PATH_INFO': req.environ['PATH_INFO'],
234 238 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
235 239 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
236 240 'SERVER_NAME': req.environ['SERVER_NAME'],
237 241 'SERVER_PORT': req.environ['SERVER_PORT'],
238 242 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
239 243 })
240 244 except Exception:
241 245 pass
242 246
243 247 return environ
244 248
245 249
246 250 class RequestContextTask(Task):
247 251 """
248 252 This is a celery task which will create a rhodecode app instance context
249 253 for the task, patch pyramid with the original request
250 254 that created the task and also add the user to the context.
251 255 """
252 256
253 257 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
254 258 link=None, link_error=None, shadow=None, **options):
255 259 """ queue the job to run (we are in web request context here) """
256 260
257 261 req = get_current_request()
258 262
259 263 # web case
260 264 if hasattr(req, 'user'):
261 265 ip_addr = req.user.ip_addr
262 266 user_id = req.user.user_id
263 267
264 268 # api case
265 269 elif hasattr(req, 'rpc_user'):
266 270 ip_addr = req.rpc_user.ip_addr
267 271 user_id = req.rpc_user.user_id
268 272 else:
269 273 raise Exception(
270 274 'Unable to fetch required data from request: {}. \n'
271 275 'This task is required to be executed from context of '
272 276 'request in a webapp'.format(repr(req)))
273 277
274 278 if req:
275 279 # we hook into kwargs since it is the only way to pass our data to
276 280 # the celery worker
277 281 environ = maybe_prepare_env(req)
278 282 options['headers'] = options.get('headers', {})
279 283 options['headers'].update({
280 284 'rhodecode_proxy_data': {
281 285 'environ': environ,
282 286 'auth_user': {
283 287 'ip_addr': ip_addr,
284 288 'user_id': user_id
285 289 },
286 290 }
287 291 })
288 292
289 293 return super(RequestContextTask, self).apply_async(
290 294 args, kwargs, task_id, producer, link, link_error, shadow, **options)
291 295
292 296 def __call__(self, *args, **kwargs):
293 297 """ rebuild the context and then run task on celery worker """
294 298
295 299 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
296 300 if not proxy_data:
297 301 return super(RequestContextTask, self).__call__(*args, **kwargs)
298 302
299 303 log.debug('using celery proxy data to run task: %r', proxy_data)
300 304 # re-inject and register threadlocals for proper routing support
301 305 request = prepare_request(proxy_data['environ'])
302 306 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
303 307 ip_addr=proxy_data['auth_user']['ip_addr'])
304 308
305 309 return super(RequestContextTask, self).__call__(*args, **kwargs)
306 310
General Comments 0
You need to be logged in to leave comments. Login now