##// END OF EJS Templates
celery: move pyramid app to celery init rather than per task run
dan -
r639:c5c553d9 default
parent child Browse files
Show More
@@ -1,228 +1,224 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 celery libs for RhodeCode
21 celery libs for RhodeCode
22 """
22 """
23
23
24
24
25 import pylons
25 import pylons
26 import socket
26 import socket
27 import logging
27 import logging
28
28
29 import rhodecode
29 import rhodecode
30
30
31 from os.path import join as jn
31 from os.path import join as jn
32 from pylons import config
32 from pylons import config
33 from celery.task import Task
33 from celery.task import Task
34 from pyramid.request import Request
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
36 from pyramid.threadlocal import get_current_request
37
37
38 from decorator import decorator
38 from decorator import decorator
39
39
40 from zope.cachedescriptors.property import Lazy as LazyProperty
40 from zope.cachedescriptors.property import Lazy as LazyProperty
41
41
42 from rhodecode.config import utils
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
49 from rhodecode.lib.auth import AuthUser
50
50
51 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
52
52
53
53
54 class ResultWrapper(object):
54 class ResultWrapper(object):
55 def __init__(self, task):
55 def __init__(self, task):
56 self.task = task
56 self.task = task
57
57
58 @LazyProperty
58 @LazyProperty
59 def result(self):
59 def result(self):
60 return self.task
60 return self.task
61
61
62
62
63 class RhodecodeCeleryTask(Task):
63 class RhodecodeCeleryTask(Task):
64 """
64 """
65 This is a celery task which will create a rhodecode app instance context
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
67 that created the task and also add the user to the context.
68
68
69 This class as a whole should be removed once the pylons port is complete
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
71 """
72
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
75 """ queue the job to run (we are in web request context here) """
76
76
77 request = get_current_request()
77 request = get_current_request()
78
78
79 # we hook into kwargs since it is the only way to pass our data to the
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
80 # celery worker in celery 2.2
81 kwargs.update({
81 kwargs.update({
82 '_rhodecode_proxy_data': {
82 '_rhodecode_proxy_data': {
83 'environ': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
91 },
92 'auth_user': {
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
94 'user_id': request.user.user_id
95 },
95 },
96 }
96 }
97 })
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
99 args, kwargs, task_id, producer, link, link_error, **options)
100
100
101 def __call__(self, *args, **kwargs):
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
104
105 if not proxy_data:
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
109
110 from rhodecode.config.routing import make_map
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
112
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
111
116 request = Request.blank('/', environ=proxy_data['environ'])
112 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
113 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
114 ip_addr=proxy_data['auth_user']['ip_addr'])
119
115
120 pyramid_request = prepare(request) # set pyramid threadlocal request
116 pyramid_request = prepare(request) # set pyramid threadlocal request
121
117
122 # pylons routing
118 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
119 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
120 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
121 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
122 get_server_url(request.environ)
127 ))
123 ))
128
124
129 try:
125 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
126 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
127 finally:
132 pyramid_request['closer']()
128 pyramid_request['closer']()
133 pylons.url._pop_object()
129 pylons.url._pop_object()
134
130
135
131
136 def run_task(task, *args, **kwargs):
132 def run_task(task, *args, **kwargs):
137 if rhodecode.CELERY_ENABLED:
133 if rhodecode.CELERY_ENABLED:
138 celery_is_up = False
134 celery_is_up = False
139 try:
135 try:
140 t = task.apply_async(args=args, kwargs=kwargs)
136 t = task.apply_async(args=args, kwargs=kwargs)
141 log.info('running task %s:%s', t.task_id, task)
137 log.info('running task %s:%s', t.task_id, task)
142 celery_is_up = True
138 celery_is_up = True
143 return t
139 return t
144
140
145 except socket.error as e:
141 except socket.error as e:
146 if isinstance(e, IOError) and e.errno == 111:
142 if isinstance(e, IOError) and e.errno == 111:
147 log.error('Unable to connect to celeryd. Sync execution')
143 log.error('Unable to connect to celeryd. Sync execution')
148 else:
144 else:
149 log.exception("Exception while connecting to celeryd.")
145 log.exception("Exception while connecting to celeryd.")
150 except KeyError as e:
146 except KeyError as e:
151 log.error('Unable to connect to celeryd. Sync execution')
147 log.error('Unable to connect to celeryd. Sync execution')
152 except Exception as e:
148 except Exception as e:
153 log.exception(
149 log.exception(
154 "Exception while trying to run task asynchronous. "
150 "Exception while trying to run task asynchronous. "
155 "Fallback to sync execution.")
151 "Fallback to sync execution.")
156
152
157 # keep in mind there maybe a subtle race condition where something
153 # keep in mind there maybe a subtle race condition where something
158 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
154 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
159 # will see CELERY_ENABLED as True before this has a chance to set False
155 # will see CELERY_ENABLED as True before this has a chance to set False
160 rhodecode.CELERY_ENABLED = celery_is_up
156 rhodecode.CELERY_ENABLED = celery_is_up
161 else:
157 else:
162 log.debug('executing task %s in sync mode', task)
158 log.debug('executing task %s in sync mode', task)
163 return ResultWrapper(task(*args, **kwargs))
159 return ResultWrapper(task(*args, **kwargs))
164
160
165
161
166 def __get_lockkey(func, *fargs, **fkwargs):
162 def __get_lockkey(func, *fargs, **fkwargs):
167 params = list(fargs)
163 params = list(fargs)
168 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
164 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
169
165
170 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
166 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
171 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
167 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
172 return 'task_%s.lock' % (md5_safe(_lock_key),)
168 return 'task_%s.lock' % (md5_safe(_lock_key),)
173
169
174
170
175 def locked_task(func):
171 def locked_task(func):
176 def __wrapper(func, *fargs, **fkwargs):
172 def __wrapper(func, *fargs, **fkwargs):
177 lockkey = __get_lockkey(func, *fargs, **fkwargs)
173 lockkey = __get_lockkey(func, *fargs, **fkwargs)
178 lockkey_path = config['app_conf']['cache_dir']
174 lockkey_path = config['app_conf']['cache_dir']
179
175
180 log.info('running task with lockkey %s' % lockkey)
176 log.info('running task with lockkey %s' % lockkey)
181 try:
177 try:
182 l = DaemonLock(file_=jn(lockkey_path, lockkey))
178 l = DaemonLock(file_=jn(lockkey_path, lockkey))
183 ret = func(*fargs, **fkwargs)
179 ret = func(*fargs, **fkwargs)
184 l.release()
180 l.release()
185 return ret
181 return ret
186 except LockHeld:
182 except LockHeld:
187 log.info('LockHeld')
183 log.info('LockHeld')
188 return 'Task with key %s already running' % lockkey
184 return 'Task with key %s already running' % lockkey
189
185
190 return decorator(__wrapper, func)
186 return decorator(__wrapper, func)
191
187
192
188
193 def get_session():
189 def get_session():
194 if rhodecode.CELERY_ENABLED:
190 if rhodecode.CELERY_ENABLED:
195 utils.initialize_database(config)
191 utils.initialize_database(config)
196 sa = meta.Session()
192 sa = meta.Session()
197 return sa
193 return sa
198
194
199
195
200 def dbsession(func):
196 def dbsession(func):
201 def __wrapper(func, *fargs, **fkwargs):
197 def __wrapper(func, *fargs, **fkwargs):
202 try:
198 try:
203 ret = func(*fargs, **fkwargs)
199 ret = func(*fargs, **fkwargs)
204 return ret
200 return ret
205 finally:
201 finally:
206 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
202 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
207 meta.Session.remove()
203 meta.Session.remove()
208
204
209 return decorator(__wrapper, func)
205 return decorator(__wrapper, func)
210
206
211
207
212 def vcsconnection(func):
208 def vcsconnection(func):
213 def __wrapper(func, *fargs, **fkwargs):
209 def __wrapper(func, *fargs, **fkwargs):
214 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
210 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
215 settings = rhodecode.PYRAMID_SETTINGS
211 settings = rhodecode.PYRAMID_SETTINGS
216 backends = settings['vcs.backends']
212 backends = settings['vcs.backends']
217 for alias in rhodecode.BACKENDS.keys():
213 for alias in rhodecode.BACKENDS.keys():
218 if alias not in backends:
214 if alias not in backends:
219 del rhodecode.BACKENDS[alias]
215 del rhodecode.BACKENDS[alias]
220 utils.configure_pyro4(settings)
216 utils.configure_pyro4(settings)
221 utils.configure_vcs(settings)
217 utils.configure_vcs(settings)
222 connect_vcs(
218 connect_vcs(
223 settings['vcs.server'],
219 settings['vcs.server'],
224 utils.get_vcs_server_protocol(settings))
220 utils.get_vcs_server_protocol(settings))
225 ret = func(*fargs, **fkwargs)
221 ret = func(*fargs, **fkwargs)
226 return ret
222 return ret
227
223
228 return decorator(__wrapper, func)
224 return decorator(__wrapper, func)
@@ -1,88 +1,92 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2012-2016 RhodeCode GmbH
3 # Copyright (C) 2012-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 from celery.loaders.base import BaseLoader
21 from celery.loaders.base import BaseLoader
22 from pylons import config
22 from pylons import config
23
23
24 to_pylons = lambda x: x.replace('_', '.').lower()
24 to_pylons = lambda x: x.replace('_', '.').lower()
25 to_celery = lambda x: x.replace('.', '_').upper()
25 to_celery = lambda x: x.replace('.', '_').upper()
26
26
27 LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
27 LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
28
28
29
29
30 class PylonsSettingsProxy(object):
30 class PylonsSettingsProxy(object):
31 """Pylons Settings Proxy
31 """Pylons Settings Proxy
32
32
33 Proxies settings from pylons.config
33 Proxies settings from pylons.config
34
34
35 """
35 """
36 def __getattr__(self, key):
36 def __getattr__(self, key):
37 pylons_key = to_pylons(key)
37 pylons_key = to_pylons(key)
38 try:
38 try:
39 value = config[pylons_key]
39 value = config[pylons_key]
40 if key in LIST_PARAMS:return value.split()
40 if key in LIST_PARAMS:return value.split()
41 return self.type_converter(value)
41 return self.type_converter(value)
42 except KeyError:
42 except KeyError:
43 raise AttributeError(pylons_key)
43 raise AttributeError(pylons_key)
44
44
45 def get(self, key):
45 def get(self, key):
46 try:
46 try:
47 return self.__getattr__(key)
47 return self.__getattr__(key)
48 except AttributeError:
48 except AttributeError:
49 return None
49 return None
50
50
51 def __getitem__(self, key):
51 def __getitem__(self, key):
52 try:
52 try:
53 return self.__getattr__(key)
53 return self.__getattr__(key)
54 except AttributeError:
54 except AttributeError:
55 raise KeyError()
55 raise KeyError()
56
56
57 def __setattr__(self, key, value):
57 def __setattr__(self, key, value):
58 pylons_key = to_pylons(key)
58 pylons_key = to_pylons(key)
59 config[pylons_key] = value
59 config[pylons_key] = value
60
60
61 def __setitem__(self, key, value):
61 def __setitem__(self, key, value):
62 self.__setattr__(key, value)
62 self.__setattr__(key, value)
63
63
64 def type_converter(self, value):
64 def type_converter(self, value):
65 #cast to int
65 #cast to int
66 if value.isdigit():
66 if value.isdigit():
67 return int(value)
67 return int(value)
68
68
69 #cast to bool
69 #cast to bool
70 if value.lower() in ['true', 'false']:
70 if value.lower() in ['true', 'false']:
71 return value.lower() == 'true'
71 return value.lower() == 'true'
72 return value
72 return value
73
73
74 class PylonsLoader(BaseLoader):
74 class PylonsLoader(BaseLoader):
75 """Pylons celery loader
75 """Pylons celery loader
76
76
77 Maps the celery config onto pylons.config
77 Maps the celery config onto pylons.config
78
78
79 """
79 """
80 def read_configuration(self):
80 def read_configuration(self):
81 self.configured = True
81 self.configured = True
82 return PylonsSettingsProxy()
82 return PylonsSettingsProxy()
83
83
84 def on_worker_init(self):
84 def on_worker_init(self):
85 """
85 """
86 Import task modules.
86 Import task modules.
87 """
87 """
88 self.import_default_modules()
88 self.import_default_modules()
89 from rhodecode.config.middleware import make_pyramid_app
90
91 # adding to self to keep a reference around
92 self.pyramid_app = make_pyramid_app(config, **config['app_conf'])
General Comments 0
You need to be logged in to leave comments. Login now