##// END OF EJS Templates
celery: fixed error when celery was disabled
super-admin -
r4817:753f56fc default
parent child Browse files
Show More
@@ -1,84 +1,84 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28 from rhodecode.lib.statsd_client import StatsdClient
28 from rhodecode.lib.statsd_client import StatsdClient
29
29
30 async_task = celery_app.task
30 async_task = celery_app.task
31
31
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ResultWrapper(object):
36 class ResultWrapper(object):
37 def __init__(self, task):
37 def __init__(self, task):
38 self.task = task
38 self.task = task
39
39
40 @LazyProperty
40 @LazyProperty
41 def result(self):
41 def result(self):
42 return self.task
42 return self.task
43
43
44
44
45 def run_task(task, *args, **kwargs):
45 def run_task(task, *args, **kwargs):
46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
46 log.debug('Got task `%s` for execution, celery mode enabled:%s', task, rhodecode.CELERY_ENABLED)
47 if task is None:
47 if task is None:
48 raise ValueError('Got non-existing task for execution')
48 raise ValueError('Got non-existing task for execution')
49
49
50 exec_mode = 'sync'
50 exec_mode = 'sync'
51
51
52 t = None
52 if rhodecode.CELERY_ENABLED:
53 if rhodecode.CELERY_ENABLED:
53 t = None
54 try:
54 try:
55 t = task.apply_async(args=args, kwargs=kwargs)
55 t = task.apply_async(args=args, kwargs=kwargs)
56 log.debug('executing task %s:%s in async mode', t.task_id, task)
56 log.debug('executing task %s:%s in async mode', t.task_id, task)
57 exec_mode = 'async'
57 exec_mode = 'async'
58 except socket.error as e:
58 except socket.error as e:
59 if isinstance(e, IOError) and e.errno == 111:
59 if isinstance(e, IOError) and e.errno == 111:
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
61 else:
61 else:
62 log.exception("Exception while connecting to celeryd.")
62 log.exception("Exception while connecting to celeryd.")
63 except KeyError as e:
63 except KeyError as e:
64 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
64 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
65 except Exception as e:
65 except Exception as e:
66 log.exception(
66 log.exception(
67 "Exception while trying to run task asynchronous. "
67 "Exception while trying to run task asynchronous. "
68 "Fallback to sync execution.")
68 "Fallback to sync execution.")
69
69
70 else:
70 else:
71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
72
72
73 statsd = StatsdClient.statsd
73 statsd = StatsdClient.statsd
74 if statsd:
74 if statsd:
75 task_repr = getattr(task, 'name', task)
75 task_repr = getattr(task, 'name', task)
76 statsd.incr('rhodecode_celery_task_total', tags=[
76 statsd.incr('rhodecode_celery_task_total', tags=[
77 'task:{}'.format(task_repr),
77 'task:{}'.format(task_repr),
78 'mode:{}'.format(exec_mode)
78 'mode:{}'.format(exec_mode)
79 ])
79 ])
80
80
81 # we got async task, return it after statsd call
81 # we got async task, return it after statsd call
82 if t:
82 if t:
83 return t
83 return t
84 return ResultWrapper(task(*args, **kwargs))
84 return ResultWrapper(task(*args, **kwargs))
General Comments 0
You need to be logged in to leave comments. Login now