##// END OF EJS Templates
celery: handle pyramid/pylons context better when running async tasks.
marcink -
r1340:b1a9c9bb default
parent child Browse files
Show More
@@ -1,507 +1,507 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import inspect
22 22 import itertools
23 23 import logging
24 24 import types
25 25
26 26 import decorator
27 27 import venusian
28 28 from collections import OrderedDict
29 29
30 30 from pyramid.exceptions import ConfigurationError
31 31 from pyramid.renderers import render
32 32 from pyramid.response import Response
33 33 from pyramid.httpexceptions import HTTPNotFound
34 34
35 35 from rhodecode.api.exc import (
36 36 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
37 37 from rhodecode.lib.auth import AuthUser
38 38 from rhodecode.lib.base import get_ip_addr
39 39 from rhodecode.lib.ext_json import json
40 40 from rhodecode.lib.utils2 import safe_str
41 41 from rhodecode.lib.plugins.utils import get_plugin_settings
42 42 from rhodecode.model.db import User, UserApiKeys
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46 DEFAULT_RENDERER = 'jsonrpc_renderer'
47 47 DEFAULT_URL = '/_admin/apiv2'
48 48
49 49
50 50 class ExtJsonRenderer(object):
51 51 """
52 52 Custom renderer that mkaes use of our ext_json lib
53 53
54 54 """
55 55
56 56 def __init__(self, serializer=json.dumps, **kw):
57 57 """ Any keyword arguments will be passed to the ``serializer``
58 58 function."""
59 59 self.serializer = serializer
60 60 self.kw = kw
61 61
62 62 def __call__(self, info):
63 63 """ Returns a plain JSON-encoded string with content-type
64 64 ``application/json``. The content-type may be overridden by
65 65 setting ``request.response.content_type``."""
66 66
67 67 def _render(value, system):
68 68 request = system.get('request')
69 69 if request is not None:
70 70 response = request.response
71 71 ct = response.content_type
72 72 if ct == response.default_content_type:
73 73 response.content_type = 'application/json'
74 74
75 75 return self.serializer(value, **self.kw)
76 76
77 77 return _render
78 78
79 79
80 80 def jsonrpc_response(request, result):
81 81 rpc_id = getattr(request, 'rpc_id', None)
82 82 response = request.response
83 83
84 84 # store content_type before render is called
85 85 ct = response.content_type
86 86
87 87 ret_value = ''
88 88 if rpc_id:
89 89 ret_value = {
90 90 'id': rpc_id,
91 91 'result': result,
92 92 'error': None,
93 93 }
94 94
95 95 # fetch deprecation warnings, and store it inside results
96 96 deprecation = getattr(request, 'rpc_deprecation', None)
97 97 if deprecation:
98 98 ret_value['DEPRECATION_WARNING'] = deprecation
99 99
100 100 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
101 101 response.body = safe_str(raw_body, response.charset)
102 102
103 103 if ct == response.default_content_type:
104 104 response.content_type = 'application/json'
105 105
106 106 return response
107 107
108 108
109 109 def jsonrpc_error(request, message, retid=None, code=None):
110 110 """
111 111 Generate a Response object with a JSON-RPC error body
112 112
113 113 :param code:
114 114 :param retid:
115 115 :param message:
116 116 """
117 117 err_dict = {'id': retid, 'result': None, 'error': message}
118 118 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
119 119 return Response(
120 120 body=body,
121 121 status=code,
122 122 content_type='application/json'
123 123 )
124 124
125 125
126 126 def exception_view(exc, request):
127 127 rpc_id = getattr(request, 'rpc_id', None)
128 128
129 129 fault_message = 'undefined error'
130 130 if isinstance(exc, JSONRPCError):
131 131 fault_message = exc.message
132 132 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
133 133 elif isinstance(exc, JSONRPCValidationError):
134 134 colander_exc = exc.colander_exception
135 135 # TODO(marcink): think maybe of nicer way to serialize errors ?
136 136 fault_message = colander_exc.asdict()
137 137 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
138 138 elif isinstance(exc, JSONRPCForbidden):
139 139 fault_message = 'Access was denied to this resource.'
140 140 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
141 141 elif isinstance(exc, HTTPNotFound):
142 142 method = request.rpc_method
143 143 log.debug('json-rpc method `%s` not found in list of '
144 144 'api calls: %s, rpc_id:%s',
145 145 method, request.registry.jsonrpc_methods.keys(), rpc_id)
146 146 fault_message = "No such method: {}".format(method)
147 147
148 148 return jsonrpc_error(request, fault_message, rpc_id)
149 149
150 150
151 151 def request_view(request):
152 152 """
153 153 Main request handling method. It handles all logic to call a specific
154 154 exposed method
155 155 """
156 156
157 157 # check if we can find this session using api_key, get_by_auth_token
158 158 # search not expired tokens only
159 159
160 160 try:
161 161 u = User.get_by_auth_token(request.rpc_api_key)
162 162
163 163 if u is None:
164 164 return jsonrpc_error(
165 165 request, retid=request.rpc_id, message='Invalid API KEY')
166 166
167 167 if not u.active:
168 168 return jsonrpc_error(
169 169 request, retid=request.rpc_id,
170 170 message='Request from this user not allowed')
171 171
172 172 # check if we are allowed to use this IP
173 173 auth_u = AuthUser(
174 174 u.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
175 175 if not auth_u.ip_allowed:
176 176 return jsonrpc_error(
177 177 request, retid=request.rpc_id,
178 178 message='Request from IP:%s not allowed' % (
179 179 request.rpc_ip_addr,))
180 180 else:
181 181 log.info('Access for IP:%s allowed' % (request.rpc_ip_addr,))
182 182
183 183 # now check if token is valid for API
184 184 role = UserApiKeys.ROLE_API
185 185 extra_auth_tokens = [
186 186 x.api_key for x in User.extra_valid_auth_tokens(u, role=role)]
187 187 active_tokens = [u.api_key] + extra_auth_tokens
188 188
189 189 log.debug('Checking if API key has proper role')
190 190 if request.rpc_api_key not in active_tokens:
191 191 return jsonrpc_error(
192 192 request, retid=request.rpc_id,
193 193 message='API KEY has bad role for an API call')
194 194
195 195 except Exception as e:
196 196 log.exception('Error on API AUTH')
197 197 return jsonrpc_error(
198 198 request, retid=request.rpc_id, message='Invalid API KEY')
199 199
200 200 method = request.rpc_method
201 201 func = request.registry.jsonrpc_methods[method]
202 202
203 203 # now that we have a method, add request._req_params to
204 204 # self.kargs and dispatch control to WGIController
205 205 argspec = inspect.getargspec(func)
206 206 arglist = argspec[0]
207 207 defaults = map(type, argspec[3] or [])
208 208 default_empty = types.NotImplementedType
209 209
210 210 # kw arguments required by this method
211 211 func_kwargs = dict(itertools.izip_longest(
212 212 reversed(arglist), reversed(defaults), fillvalue=default_empty))
213 213
214 214 # This attribute will need to be first param of a method that uses
215 215 # api_key, which is translated to instance of user at that name
216 216 user_var = 'apiuser'
217 217 request_var = 'request'
218 218
219 219 for arg in [user_var, request_var]:
220 220 if arg not in arglist:
221 221 return jsonrpc_error(
222 222 request,
223 223 retid=request.rpc_id,
224 224 message='This method [%s] does not support '
225 225 'required parameter `%s`' % (func.__name__, arg))
226 226
227 227 # get our arglist and check if we provided them as args
228 228 for arg, default in func_kwargs.items():
229 229 if arg in [user_var, request_var]:
230 230 # user_var and request_var are pre-hardcoded parameters and we
231 231 # don't need to do any translation
232 232 continue
233 233
234 234 # skip the required param check if it's default value is
235 235 # NotImplementedType (default_empty)
236 236 if default == default_empty and arg not in request.rpc_params:
237 237 return jsonrpc_error(
238 238 request,
239 239 retid=request.rpc_id,
240 240 message=('Missing non optional `%s` arg in JSON DATA' % arg)
241 241 )
242 242
243 243 # sanitize extra passed arguments
244 244 for k in request.rpc_params.keys()[:]:
245 245 if k not in func_kwargs:
246 246 del request.rpc_params[k]
247 247
248 248 call_params = request.rpc_params
249 249 call_params.update({
250 250 'request': request,
251 251 'apiuser': auth_u
252 252 })
253 253 try:
254 254 ret_value = func(**call_params)
255 255 return jsonrpc_response(request, ret_value)
256 256 except JSONRPCBaseError:
257 257 raise
258 258 except Exception:
259 log.exception('Unhandled exception occured on api call: %s', func)
259 log.exception('Unhandled exception occurred on api call: %s', func)
260 260 return jsonrpc_error(request, retid=request.rpc_id,
261 261 message='Internal server error')
262 262
263 263
264 264 def setup_request(request):
265 265 """
266 266 Parse a JSON-RPC request body. It's used inside the predicates method
267 267 to validate and bootstrap requests for usage in rpc calls.
268 268
269 269 We need to raise JSONRPCError here if we want to return some errors back to
270 270 user.
271 271 """
272 272
273 273 log.debug('Executing setup request: %r', request)
274 274 request.rpc_ip_addr = get_ip_addr(request.environ)
275 275 # TODO(marcink): deprecate GET at some point
276 276 if request.method not in ['POST', 'GET']:
277 277 log.debug('unsupported request method "%s"', request.method)
278 278 raise JSONRPCError(
279 279 'unsupported request method "%s". Please use POST' % request.method)
280 280
281 281 if 'CONTENT_LENGTH' not in request.environ:
282 282 log.debug("No Content-Length")
283 283 raise JSONRPCError("Empty body, No Content-Length in request")
284 284
285 285 else:
286 286 length = request.environ['CONTENT_LENGTH']
287 287 log.debug('Content-Length: %s', length)
288 288
289 289 if length == 0:
290 290 log.debug("Content-Length is 0")
291 291 raise JSONRPCError("Content-Length is 0")
292 292
293 293 raw_body = request.body
294 294 try:
295 295 json_body = json.loads(raw_body)
296 296 except ValueError as e:
297 297 # catch JSON errors Here
298 298 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
299 299
300 300 request.rpc_id = json_body.get('id')
301 301 request.rpc_method = json_body.get('method')
302 302
303 303 # check required base parameters
304 304 try:
305 305 api_key = json_body.get('api_key')
306 306 if not api_key:
307 307 api_key = json_body.get('auth_token')
308 308
309 309 if not api_key:
310 310 raise KeyError('api_key or auth_token')
311 311
312 312 # TODO(marcink): support passing in token in request header
313 313
314 314 request.rpc_api_key = api_key
315 315 request.rpc_id = json_body['id']
316 316 request.rpc_method = json_body['method']
317 317 request.rpc_params = json_body['args'] \
318 318 if isinstance(json_body['args'], dict) else {}
319 319
320 320 log.debug(
321 321 'method: %s, params: %s' % (request.rpc_method, request.rpc_params))
322 322 except KeyError as e:
323 323 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
324 324
325 325 log.debug('setup complete, now handling method:%s rpcid:%s',
326 326 request.rpc_method, request.rpc_id, )
327 327
328 328
329 329 class RoutePredicate(object):
330 330 def __init__(self, val, config):
331 331 self.val = val
332 332
333 333 def text(self):
334 334 return 'jsonrpc route = %s' % self.val
335 335
336 336 phash = text
337 337
338 338 def __call__(self, info, request):
339 339 if self.val:
340 340 # potentially setup and bootstrap our call
341 341 setup_request(request)
342 342
343 343 # Always return True so that even if it isn't a valid RPC it
344 344 # will fall through to the underlaying handlers like notfound_view
345 345 return True
346 346
347 347
348 348 class NotFoundPredicate(object):
349 349 def __init__(self, val, config):
350 350 self.val = val
351 351
352 352 def text(self):
353 353 return 'jsonrpc method not found = %s' % self.val
354 354
355 355 phash = text
356 356
357 357 def __call__(self, info, request):
358 358 return hasattr(request, 'rpc_method')
359 359
360 360
361 361 class MethodPredicate(object):
362 362 def __init__(self, val, config):
363 363 self.method = val
364 364
365 365 def text(self):
366 366 return 'jsonrpc method = %s' % self.method
367 367
368 368 phash = text
369 369
370 370 def __call__(self, context, request):
371 371 # we need to explicitly return False here, so pyramid doesn't try to
372 372 # execute our view directly. We need our main handler to execute things
373 373 return getattr(request, 'rpc_method') == self.method
374 374
375 375
376 376 def add_jsonrpc_method(config, view, **kwargs):
377 377 # pop the method name
378 378 method = kwargs.pop('method', None)
379 379
380 380 if method is None:
381 381 raise ConfigurationError(
382 382 'Cannot register a JSON-RPC method without specifying the '
383 383 '"method"')
384 384
385 385 # we define custom predicate, to enable to detect conflicting methods,
386 386 # those predicates are kind of "translation" from the decorator variables
387 387 # to internal predicates names
388 388
389 389 kwargs['jsonrpc_method'] = method
390 390
391 391 # register our view into global view store for validation
392 392 config.registry.jsonrpc_methods[method] = view
393 393
394 394 # we're using our main request_view handler, here, so each method
395 395 # has a unified handler for itself
396 396 config.add_view(request_view, route_name='apiv2', **kwargs)
397 397
398 398
399 399 class jsonrpc_method(object):
400 400 """
401 401 decorator that works similar to @add_view_config decorator,
402 402 but tailored for our JSON RPC
403 403 """
404 404
405 405 venusian = venusian # for testing injection
406 406
407 407 def __init__(self, method=None, **kwargs):
408 408 self.method = method
409 409 self.kwargs = kwargs
410 410
411 411 def __call__(self, wrapped):
412 412 kwargs = self.kwargs.copy()
413 413 kwargs['method'] = self.method or wrapped.__name__
414 414 depth = kwargs.pop('_depth', 0)
415 415
416 416 def callback(context, name, ob):
417 417 config = context.config.with_package(info.module)
418 418 config.add_jsonrpc_method(view=ob, **kwargs)
419 419
420 420 info = venusian.attach(wrapped, callback, category='pyramid',
421 421 depth=depth + 1)
422 422 if info.scope == 'class':
423 423 # ensure that attr is set if decorating a class method
424 424 kwargs.setdefault('attr', wrapped.__name__)
425 425
426 426 kwargs['_info'] = info.codeinfo # fbo action_method
427 427 return wrapped
428 428
429 429
430 430 class jsonrpc_deprecated_method(object):
431 431 """
432 432 Marks method as deprecated, adds log.warning, and inject special key to
433 433 the request variable to mark method as deprecated.
434 434 Also injects special docstring that extract_docs will catch to mark
435 435 method as deprecated.
436 436
437 437 :param use_method: specify which method should be used instead of
438 438 the decorated one
439 439
440 440 Use like::
441 441
442 442 @jsonrpc_method()
443 443 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
444 444 def old_func(request, apiuser, arg1, arg2):
445 445 ...
446 446 """
447 447
448 448 def __init__(self, use_method, deprecated_at_version):
449 449 self.use_method = use_method
450 450 self.deprecated_at_version = deprecated_at_version
451 451 self.deprecated_msg = ''
452 452
453 453 def __call__(self, func):
454 454 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
455 455 method=self.use_method)
456 456
457 457 docstring = """\n
458 458 .. deprecated:: {version}
459 459
460 460 {deprecation_message}
461 461
462 462 {original_docstring}
463 463 """
464 464 func.__doc__ = docstring.format(
465 465 version=self.deprecated_at_version,
466 466 deprecation_message=self.deprecated_msg,
467 467 original_docstring=func.__doc__)
468 468 return decorator.decorator(self.__wrapper, func)
469 469
470 470 def __wrapper(self, func, *fargs, **fkwargs):
471 471 log.warning('DEPRECATED API CALL on function %s, please '
472 472 'use `%s` instead', func, self.use_method)
473 473 # alter function docstring to mark as deprecated, this is picked up
474 474 # via fabric file that generates API DOC.
475 475 result = func(*fargs, **fkwargs)
476 476
477 477 request = fargs[0]
478 478 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
479 479 return result
480 480
481 481
482 482 def includeme(config):
483 483 plugin_module = 'rhodecode.api'
484 484 plugin_settings = get_plugin_settings(
485 485 plugin_module, config.registry.settings)
486 486
487 487 if not hasattr(config.registry, 'jsonrpc_methods'):
488 488 config.registry.jsonrpc_methods = OrderedDict()
489 489
490 490 # match filter by given method only
491 491 config.add_view_predicate('jsonrpc_method', MethodPredicate)
492 492
493 493 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
494 494 serializer=json.dumps, indent=4))
495 495 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
496 496
497 497 config.add_route_predicate(
498 498 'jsonrpc_call', RoutePredicate)
499 499
500 500 config.add_route(
501 501 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
502 502
503 503 config.scan(plugin_module, ignore='rhodecode.api.tests')
504 504 # register some exception handling view
505 505 config.add_view(exception_view, context=JSONRPCBaseError)
506 506 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
507 507 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
@@ -1,225 +1,237 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 celery libs for RhodeCode
22 22 """
23 23
24 24
25 25 import pylons
26 26 import socket
27 27 import logging
28 28
29 29 import rhodecode
30 30
31 31 from os.path import join as jn
32 32 from pylons import config
33 33 from celery.task import Task
34 34 from pyramid.request import Request
35 35 from pyramid.scripting import prepare
36 36 from pyramid.threadlocal import get_current_request
37 37
38 38 from decorator import decorator
39 39
40 40 from zope.cachedescriptors.property import Lazy as LazyProperty
41 41
42 42 from rhodecode.config import utils
43 43 from rhodecode.lib.utils2 import (
44 44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 45 get_server_url)
46 46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 47 from rhodecode.lib.vcs import connect_vcs
48 48 from rhodecode.model import meta
49 49 from rhodecode.lib.auth import AuthUser
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53
54 54 class ResultWrapper(object):
55 55 def __init__(self, task):
56 56 self.task = task
57 57
58 58 @LazyProperty
59 59 def result(self):
60 60 return self.task
61 61
62 62
63 63 class RhodecodeCeleryTask(Task):
64 64 """
65 65 This is a celery task which will create a rhodecode app instance context
66 66 for the task, patch pyramid + pylons threadlocals with the original request
67 67 that created the task and also add the user to the context.
68 68
69 69 This class as a whole should be removed once the pylons port is complete
70 70 and a pyramid only solution for celery is implemented as per issue #4139
71 71 """
72 72
73 73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 74 link=None, link_error=None, **options):
75 75 """ queue the job to run (we are in web request context here) """
76 76
77 77 request = get_current_request()
78 78
79 if hasattr(request, 'user'):
80 ip_addr = request.user.ip_addr
81 user_id = request.user.user_id
82 elif hasattr(request, 'rpc_params'):
83 # TODO(marcink) remove when migration is finished
84 # api specific call on Pyramid.
85 ip_addr = request.rpc_params['apiuser'].ip_addr
86 user_id = request.rpc_params['apiuser'].user_id
87 else:
88 raise Exception('Unable to fetch data from request: {}'.format(
89 request))
90
79 91 if request:
80 92 # we hook into kwargs since it is the only way to pass our data to
81 93 # the celery worker in celery 2.2
82 94 kwargs.update({
83 95 '_rhodecode_proxy_data': {
84 96 'environ': {
85 97 'PATH_INFO': request.environ['PATH_INFO'],
86 98 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
87 99 'HTTP_HOST': request.environ.get('HTTP_HOST',
88 100 request.environ['SERVER_NAME']),
89 101 'SERVER_NAME': request.environ['SERVER_NAME'],
90 102 'SERVER_PORT': request.environ['SERVER_PORT'],
91 103 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
92 104 },
93 105 'auth_user': {
94 'ip_addr': request.user.ip_addr,
95 'user_id': request.user.user_id
106 'ip_addr': ip_addr,
107 'user_id': user_id
96 108 },
97 109 }
98 110 })
99 111 return super(RhodecodeCeleryTask, self).apply_async(
100 112 args, kwargs, task_id, producer, link, link_error, **options)
101 113
102 114 def __call__(self, *args, **kwargs):
103 115 """ rebuild the context and then run task on celery worker """
104 116 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
105 117
106 118 if not proxy_data:
107 119 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
108 120
109 121 log.debug('using celery proxy data to run task: %r', proxy_data)
110 122
111 123 from rhodecode.config.routing import make_map
112 124
113 125 request = Request.blank('/', environ=proxy_data['environ'])
114 126 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
115 127 ip_addr=proxy_data['auth_user']['ip_addr'])
116 128
117 129 pyramid_request = prepare(request) # set pyramid threadlocal request
118 130
119 131 # pylons routing
120 132 if not rhodecode.CONFIG.get('routes.map'):
121 133 rhodecode.CONFIG['routes.map'] = make_map(config)
122 134 pylons.url._push_object(get_routes_generator_for_server_url(
123 135 get_server_url(request.environ)
124 136 ))
125 137
126 138 try:
127 139 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
128 140 finally:
129 141 pyramid_request['closer']()
130 142 pylons.url._pop_object()
131 143
132 144
133 145 def run_task(task, *args, **kwargs):
134 146 if rhodecode.CELERY_ENABLED:
135 147 celery_is_up = False
136 148 try:
137 149 t = task.apply_async(args=args, kwargs=kwargs)
138 150 log.info('running task %s:%s', t.task_id, task)
139 151 celery_is_up = True
140 152 return t
141 153
142 154 except socket.error as e:
143 155 if isinstance(e, IOError) and e.errno == 111:
144 156 log.error('Unable to connect to celeryd. Sync execution')
145 157 else:
146 158 log.exception("Exception while connecting to celeryd.")
147 159 except KeyError as e:
148 160 log.error('Unable to connect to celeryd. Sync execution')
149 161 except Exception as e:
150 162 log.exception(
151 163 "Exception while trying to run task asynchronous. "
152 164 "Fallback to sync execution.")
153 165
154 166 # keep in mind there maybe a subtle race condition where something
155 167 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
156 168 # will see CELERY_ENABLED as True before this has a chance to set False
157 169 rhodecode.CELERY_ENABLED = celery_is_up
158 170 else:
159 171 log.debug('executing task %s in sync mode', task)
160 172 return ResultWrapper(task(*args, **kwargs))
161 173
162 174
163 175 def __get_lockkey(func, *fargs, **fkwargs):
164 176 params = list(fargs)
165 177 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
166 178
167 179 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
168 180 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
169 181 return 'task_%s.lock' % (md5_safe(_lock_key),)
170 182
171 183
172 184 def locked_task(func):
173 185 def __wrapper(func, *fargs, **fkwargs):
174 186 lockkey = __get_lockkey(func, *fargs, **fkwargs)
175 187 lockkey_path = config['app_conf']['cache_dir']
176 188
177 189 log.info('running task with lockkey %s' % lockkey)
178 190 try:
179 191 l = DaemonLock(file_=jn(lockkey_path, lockkey))
180 192 ret = func(*fargs, **fkwargs)
181 193 l.release()
182 194 return ret
183 195 except LockHeld:
184 196 log.info('LockHeld')
185 197 return 'Task with key %s already running' % lockkey
186 198
187 199 return decorator(__wrapper, func)
188 200
189 201
190 202 def get_session():
191 203 if rhodecode.CELERY_ENABLED:
192 204 utils.initialize_database(config)
193 205 sa = meta.Session()
194 206 return sa
195 207
196 208
197 209 def dbsession(func):
198 210 def __wrapper(func, *fargs, **fkwargs):
199 211 try:
200 212 ret = func(*fargs, **fkwargs)
201 213 return ret
202 214 finally:
203 215 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
204 216 meta.Session.remove()
205 217
206 218 return decorator(__wrapper, func)
207 219
208 220
209 221 def vcsconnection(func):
210 222 def __wrapper(func, *fargs, **fkwargs):
211 223 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
212 224 settings = rhodecode.PYRAMID_SETTINGS
213 225 backends = settings['vcs.backends']
214 226 for alias in rhodecode.BACKENDS.keys():
215 227 if alias not in backends:
216 228 del rhodecode.BACKENDS[alias]
217 229 utils.configure_pyro4(settings)
218 230 utils.configure_vcs(settings)
219 231 connect_vcs(
220 232 settings['vcs.server'],
221 233 utils.get_vcs_server_protocol(settings))
222 234 ret = func(*fargs, **fkwargs)
223 235 return ret
224 236
225 237 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now