##// END OF EJS Templates
celery: don't disable celery if 1 task fails. This results in permanent...
marcink -
r2416:14bd0464 default
parent child Browse files
Show More
@@ -1,73 +1,69 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import socket
21 import socket
22 import logging
22 import logging
23
23
24 import rhodecode
24 import rhodecode
25 from zope.cachedescriptors.property import Lazy as LazyProperty
25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 from rhodecode.lib.celerylib.loader import (
26 from rhodecode.lib.celerylib.loader import (
27 celery_app, RequestContextTask, get_logger)
27 celery_app, RequestContextTask, get_logger)
28
28
29 async_task = celery_app.task
29 async_task = celery_app.task
30
30
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34
34
35 class ResultWrapper(object):
35 class ResultWrapper(object):
36 def __init__(self, task):
36 def __init__(self, task):
37 self.task = task
37 self.task = task
38
38
39 @LazyProperty
39 @LazyProperty
40 def result(self):
40 def result(self):
41 return self.task
41 return self.task
42
42
43
43
44 def run_task(task, *args, **kwargs):
44 def run_task(task, *args, **kwargs):
45 log.debug('Got task `%s` for execution', task)
45 log.debug('Got task `%s` for execution', task)
46 if rhodecode.CELERY_ENABLED:
46 if rhodecode.CELERY_ENABLED:
47 celery_is_up = False
47 celery_is_up = False
48 try:
48 try:
49 t = task.apply_async(args=args, kwargs=kwargs)
49 t = task.apply_async(args=args, kwargs=kwargs)
50 celery_is_up = True
50 celery_is_up = True
51 log.debug('executing task %s:%s in async mode', t.task_id, task)
51 log.debug('executing task %s:%s in async mode', t.task_id, task)
52 return t
52 return t
53
53
54 except socket.error as e:
54 except socket.error as e:
55 if isinstance(e, IOError) and e.errno == 111:
55 if isinstance(e, IOError) and e.errno == 111:
56 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
56 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
57 else:
57 else:
58 log.exception("Exception while connecting to celeryd.")
58 log.exception("Exception while connecting to celeryd.")
59 except KeyError as e:
59 except KeyError as e:
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
60 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
61 except Exception as e:
61 except Exception as e:
62 log.exception(
62 log.exception(
63 "Exception while trying to run task asynchronous. "
63 "Exception while trying to run task asynchronous. "
64 "Fallback to sync execution.")
64 "Fallback to sync execution.")
65
65
66 # keep in mind there maybe a subtle race condition where something
67 # depending on rhodecode.CELERY_ENABLED
68 # will see CELERY_ENABLED as True before this has a chance to set False
69 rhodecode.CELERY_ENABLED = celery_is_up
70 else:
66 else:
71 log.debug('executing task %s:%s in sync mode', 'TASK', task)
67 log.debug('executing task %s:%s in sync mode', 'TASK', task)
72
68
73 return ResultWrapper(task(*args, **kwargs))
69 return ResultWrapper(task(*args, **kwargs))
General Comments 0
You need to be logged in to leave comments. Login now