##// END OF EJS Templates
celery: always count tasks even if they fail
super-admin -
r4891:2cd8759f default
parent child Browse files
Show More
@@ -1,96 +1,95 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import socket
22 22 import logging
23 23
24 24 import rhodecode
25 25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 26 from rhodecode.lib.celerylib.loader import (
27 27 celery_app, RequestContextTask, get_logger)
28 28 from rhodecode.lib.statsd_client import StatsdClient
29 29
30 30 async_task = celery_app.task
31 31
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ResultWrapper(object):
37 37 def __init__(self, task):
38 38 self.task = task
39 39
40 40 @LazyProperty
41 41 def result(self):
42 42 return self.task
43 43
44 44
45 45 def run_task(task, *args, **kwargs):
46 46 import celery
47 47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
48 48 if task is None:
49 49 raise ValueError('Got non-existing task for execution')
50 50
51 51 exec_mode = 'sync'
52 52 allow_async = True
53 53
54 54 # if we're already in a celery task, don't allow async execution again
55 55 # e.g task within task
56 56 in_task = celery.current_task
57 57 if in_task:
58 58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 59 allow_async = False
60 60 if kwargs.pop('allow_subtask', False):
61 61 log.debug('Forced async by allow_async=True flag')
62 62 allow_async = True
63 63
64 64 t = None
65 65 if rhodecode.CELERY_ENABLED and allow_async:
66
67 statsd = StatsdClient.statsd
68 if statsd:
69 task_repr = getattr(task, 'name', task)
70 statsd.incr('rhodecode_celery_task_total', tags=[
71 'task:{}'.format(task_repr)
72 ])
73
66 74 try:
67 75 t = task.apply_async(args=args, kwargs=kwargs)
68 76 log.debug('executing task %s:%s in async mode', t.task_id, task)
69 exec_mode = 'async'
70 77 except socket.error as e:
71 78 if isinstance(e, IOError) and e.errno == 111:
72 79 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
73 80 else:
74 81 log.exception("Exception while connecting to celeryd.")
75 82 except KeyError as e:
76 83 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
77 84 except Exception as e:
78 85 log.exception(
79 86 "Exception while trying to run task asynchronous. "
80 87 "Fallback to sync execution.")
81 88
82 89 else:
83 90 log.debug('executing task %s:%s in sync mode', 'TASK', task)
84 91
85 statsd = StatsdClient.statsd
86 if statsd:
87 task_repr = getattr(task, 'name', task)
88 statsd.incr('rhodecode_celery_task_total', tags=[
89 'task:{}'.format(task_repr),
90 'mode:{}'.format(exec_mode)
91 ])
92
93 92 # we got async task, return it after statsd call
94 93 if t:
95 94 return t
96 95 return ResultWrapper(task(*args, **kwargs))
General Comments 0
You need to be logged in to leave comments. Login now