##// END OF EJS Templates
celery: always count tasks even if they fail
super-admin -
r4891:2cd8759f default
parent child Browse files
Show More
@@ -1,96 +1,95 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28 from rhodecode.lib.statsd_client import StatsdClient
28 from rhodecode.lib.statsd_client import StatsdClient
29
29
30 async_task = celery_app.task
30 async_task = celery_app.task
31
31
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ResultWrapper(object):
36 class ResultWrapper(object):
37 def __init__(self, task):
37 def __init__(self, task):
38 self.task = task
38 self.task = task
39
39
40 @LazyProperty
40 @LazyProperty
41 def result(self):
41 def result(self):
42 return self.task
42 return self.task
43
43
44
44
45 def run_task(task, *args, **kwargs):
45 def run_task(task, *args, **kwargs):
46 import celery
46 import celery
47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
48 if task is None:
48 if task is None:
49 raise ValueError('Got non-existing task for execution')
49 raise ValueError('Got non-existing task for execution')
50
50
51 exec_mode = 'sync'
51 exec_mode = 'sync'
52 allow_async = True
52 allow_async = True
53
53
54 # if we're already in a celery task, don't allow async execution again
54 # if we're already in a celery task, don't allow async execution again
55 # e.g task within task
55 # e.g task within task
56 in_task = celery.current_task
56 in_task = celery.current_task
57 if in_task:
57 if in_task:
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
58 log.debug('This task in in context of another task: %s, not allowing another async execution', in_task)
59 allow_async = False
59 allow_async = False
60 if kwargs.pop('allow_subtask', False):
60 if kwargs.pop('allow_subtask', False):
61 log.debug('Forced async by allow_async=True flag')
61 log.debug('Forced async by allow_async=True flag')
62 allow_async = True
62 allow_async = True
63
63
64 t = None
64 t = None
65 if rhodecode.CELERY_ENABLED and allow_async:
65 if rhodecode.CELERY_ENABLED and allow_async:
66
67 statsd = StatsdClient.statsd
68 if statsd:
69 task_repr = getattr(task, 'name', task)
70 statsd.incr('rhodecode_celery_task_total', tags=[
71 'task:{}'.format(task_repr)
72 ])
73
66 try:
74 try:
67 t = task.apply_async(args=args, kwargs=kwargs)
75 t = task.apply_async(args=args, kwargs=kwargs)
68 log.debug('executing task %s:%s in async mode', t.task_id, task)
76 log.debug('executing task %s:%s in async mode', t.task_id, task)
69 exec_mode = 'async'
70 except socket.error as e:
77 except socket.error as e:
71 if isinstance(e, IOError) and e.errno == 111:
78 if isinstance(e, IOError) and e.errno == 111:
72 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
79 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
73 else:
80 else:
74 log.exception("Exception while connecting to celeryd.")
81 log.exception("Exception while connecting to celeryd.")
75 except KeyError as e:
82 except KeyError as e:
76 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
83 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
77 except Exception as e:
84 except Exception as e:
78 log.exception(
85 log.exception(
79 "Exception while trying to run task asynchronous. "
86 "Exception while trying to run task asynchronous. "
80 "Fallback to sync execution.")
87 "Fallback to sync execution.")
81
88
82 else:
89 else:
83 log.debug('executing task %s:%s in sync mode', 'TASK', task)
90 log.debug('executing task %s:%s in sync mode', 'TASK', task)
84
91
85 statsd = StatsdClient.statsd
86 if statsd:
87 task_repr = getattr(task, 'name', task)
88 statsd.incr('rhodecode_celery_task_total', tags=[
89 'task:{}'.format(task_repr),
90 'mode:{}'.format(exec_mode)
91 ])
92
93 # we got async task, return it after statsd call
92 # we got async task, return it after statsd call
94 if t:
93 if t:
95 return t
94 return t
96 return ResultWrapper(task(*args, **kwargs))
95 return ResultWrapper(task(*args, **kwargs))
General Comments 0
You need to be logged in to leave comments. Login now