##// END OF EJS Templates
celery: log exception in the event of unknown IOError
dan -
r266:2f064e44 default
parent child Browse files
Show More
@@ -1,141 +1,141 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 celery libs for RhodeCode
22 22 """
23 23
24 24
25 25 import socket
26 26 import logging
27 27
28 28 import rhodecode
29 29
30 30 from os.path import join as jn
31 31 from pylons import config
32 32
33 33 from decorator import decorator
34 34
35 35 from zope.cachedescriptors.property import Lazy as LazyProperty
36 36
37 37 from rhodecode.config import utils
38 38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
39 39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 40 from rhodecode.lib.vcs import connect_vcs
41 41 from rhodecode.model import meta
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 class ResultWrapper(object):
47 47 def __init__(self, task):
48 48 self.task = task
49 49
50 50 @LazyProperty
51 51 def result(self):
52 52 return self.task
53 53
54 54
55 55 def run_task(task, *args, **kwargs):
56 56 if rhodecode.CELERY_ENABLED:
57 57 try:
58 58 t = task.apply_async(args=args, kwargs=kwargs)
59 59 log.info('running task %s:%s', t.task_id, task)
60 60 return t
61 61
62 62 except socket.error as e:
63 63 if isinstance(e, IOError) and e.errno == 111:
64 64 log.error('Unable to connect to celeryd. Sync execution')
65 65 rhodecode.CELERY_ENABLED = False
66 66 else:
67 log.error("Exception while connecting to celeryd.")
67 log.exception("Exception while connecting to celeryd.")
68 68 except KeyError as e:
69 69 log.error('Unable to connect to celeryd. Sync execution')
70 70 except Exception as e:
71 71 log.exception(
72 72 "Exception while trying to run task asynchronous. "
73 73 "Fallback to sync execution.")
74 74 else:
75 75 log.debug('executing task %s in sync mode', task)
76 76 return ResultWrapper(task(*args, **kwargs))
77 77
78 78
79 79 def __get_lockkey(func, *fargs, **fkwargs):
80 80 params = list(fargs)
81 81 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
82 82
83 83 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
84 84 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
85 85 return 'task_%s.lock' % (md5_safe(_lock_key),)
86 86
87 87
88 88 def locked_task(func):
89 89 def __wrapper(func, *fargs, **fkwargs):
90 90 lockkey = __get_lockkey(func, *fargs, **fkwargs)
91 91 lockkey_path = config['app_conf']['cache_dir']
92 92
93 93 log.info('running task with lockkey %s' % lockkey)
94 94 try:
95 95 l = DaemonLock(file_=jn(lockkey_path, lockkey))
96 96 ret = func(*fargs, **fkwargs)
97 97 l.release()
98 98 return ret
99 99 except LockHeld:
100 100 log.info('LockHeld')
101 101 return 'Task with key %s already running' % lockkey
102 102
103 103 return decorator(__wrapper, func)
104 104
105 105
106 106 def get_session():
107 107 if rhodecode.CELERY_ENABLED:
108 108 utils.initialize_database(config)
109 109 sa = meta.Session()
110 110 return sa
111 111
112 112
113 113 def dbsession(func):
114 114 def __wrapper(func, *fargs, **fkwargs):
115 115 try:
116 116 ret = func(*fargs, **fkwargs)
117 117 return ret
118 118 finally:
119 119 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
120 120 meta.Session.remove()
121 121
122 122 return decorator(__wrapper, func)
123 123
124 124
125 125 def vcsconnection(func):
126 126 def __wrapper(func, *fargs, **fkwargs):
127 127 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
128 128 backends = config['vcs.backends'] = aslist(
129 129 config.get('vcs.backends', 'hg,git'), sep=',')
130 130 for alias in rhodecode.BACKENDS.keys():
131 131 if alias not in backends:
132 132 del rhodecode.BACKENDS[alias]
133 133 utils.configure_pyro4(config)
134 134 utils.configure_vcs(config)
135 135 connect_vcs(
136 136 config['vcs.server'],
137 137 utils.get_vcs_server_protocol(config))
138 138 ret = func(*fargs, **fkwargs)
139 139 return ret
140 140
141 141 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now