##// END OF EJS Templates
celery: dont proxy request if one not available (eg. repo push) and...
dan -
r653:a68b8381 default
parent child Browse files
Show More
@@ -1,224 +1,225 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 celery libs for RhodeCode
21 celery libs for RhodeCode
22 """
22 """
23
23
24
24
25 import pylons
25 import pylons
26 import socket
26 import socket
27 import logging
27 import logging
28
28
29 import rhodecode
29 import rhodecode
30
30
31 from os.path import join as jn
31 from os.path import join as jn
32 from pylons import config
32 from pylons import config
33 from celery.task import Task
33 from celery.task import Task
34 from pyramid.request import Request
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
36 from pyramid.threadlocal import get_current_request
37
37
38 from decorator import decorator
38 from decorator import decorator
39
39
40 from zope.cachedescriptors.property import Lazy as LazyProperty
40 from zope.cachedescriptors.property import Lazy as LazyProperty
41
41
42 from rhodecode.config import utils
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
49 from rhodecode.lib.auth import AuthUser
50
50
51 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
52
52
53
53
54 class ResultWrapper(object):
54 class ResultWrapper(object):
55 def __init__(self, task):
55 def __init__(self, task):
56 self.task = task
56 self.task = task
57
57
58 @LazyProperty
58 @LazyProperty
59 def result(self):
59 def result(self):
60 return self.task
60 return self.task
61
61
62
62
63 class RhodecodeCeleryTask(Task):
63 class RhodecodeCeleryTask(Task):
64 """
64 """
65 This is a celery task which will create a rhodecode app instance context
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
67 that created the task and also add the user to the context.
68
68
69 This class as a whole should be removed once the pylons port is complete
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
71 """
72
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
75 """ queue the job to run (we are in web request context here) """
76
76
77 request = get_current_request()
77 request = get_current_request()
78
78
79 # we hook into kwargs since it is the only way to pass our data to the
79 if request:
80 # celery worker in celery 2.2
80 # we hook into kwargs since it is the only way to pass our data to
81 kwargs.update({
81 # the celery worker in celery 2.2
82 '_rhodecode_proxy_data': {
82 kwargs.update({
83 'environ': {
83 '_rhodecode_proxy_data': {
84 'PATH_INFO': request.environ['PATH_INFO'],
84 'environ': {
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
85 'PATH_INFO': request.environ['PATH_INFO'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
86 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
87 request.environ['SERVER_NAME']),
87 'HTTP_HOST': request.environ.get('HTTP_HOST',
88 'SERVER_NAME': request.environ['SERVER_NAME'],
88 request.environ['SERVER_NAME']),
89 'SERVER_PORT': request.environ['SERVER_PORT'],
89 'SERVER_NAME': request.environ['SERVER_NAME'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
90 'SERVER_PORT': request.environ['SERVER_PORT'],
91 },
91 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
92 'auth_user': {
92 },
93 'ip_addr': request.user.ip_addr,
93 'auth_user': {
94 'user_id': request.user.user_id
94 'ip_addr': request.user.ip_addr,
95 },
95 'user_id': request.user.user_id
96 }
96 },
97 })
97 }
98 })
98 return super(RhodecodeCeleryTask, self).apply_async(
99 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
100 args, kwargs, task_id, producer, link, link_error, **options)
100
101
101 def __call__(self, *args, **kwargs):
102 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
103 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
105
105 if not proxy_data:
106 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
108
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109 log.debug('using celery proxy data to run task: %r', proxy_data)
109
110
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.routing import make_map
111
112
112 request = Request.blank('/', environ=proxy_data['environ'])
113 request = Request.blank('/', environ=proxy_data['environ'])
113 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
114 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
114 ip_addr=proxy_data['auth_user']['ip_addr'])
115 ip_addr=proxy_data['auth_user']['ip_addr'])
115
116
116 pyramid_request = prepare(request) # set pyramid threadlocal request
117 pyramid_request = prepare(request) # set pyramid threadlocal request
117
118
118 # pylons routing
119 # pylons routing
119 if not rhodecode.CONFIG.get('routes.map'):
120 if not rhodecode.CONFIG.get('routes.map'):
120 rhodecode.CONFIG['routes.map'] = make_map(config)
121 rhodecode.CONFIG['routes.map'] = make_map(config)
121 pylons.url._push_object(get_routes_generator_for_server_url(
122 pylons.url._push_object(get_routes_generator_for_server_url(
122 get_server_url(request.environ)
123 get_server_url(request.environ)
123 ))
124 ))
124
125
125 try:
126 try:
126 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
127 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
127 finally:
128 finally:
128 pyramid_request['closer']()
129 pyramid_request['closer']()
129 pylons.url._pop_object()
130 pylons.url._pop_object()
130
131
131
132
132 def run_task(task, *args, **kwargs):
133 def run_task(task, *args, **kwargs):
133 if rhodecode.CELERY_ENABLED:
134 if rhodecode.CELERY_ENABLED:
134 celery_is_up = False
135 celery_is_up = False
135 try:
136 try:
136 t = task.apply_async(args=args, kwargs=kwargs)
137 t = task.apply_async(args=args, kwargs=kwargs)
137 log.info('running task %s:%s', t.task_id, task)
138 log.info('running task %s:%s', t.task_id, task)
138 celery_is_up = True
139 celery_is_up = True
139 return t
140 return t
140
141
141 except socket.error as e:
142 except socket.error as e:
142 if isinstance(e, IOError) and e.errno == 111:
143 if isinstance(e, IOError) and e.errno == 111:
143 log.error('Unable to connect to celeryd. Sync execution')
144 log.error('Unable to connect to celeryd. Sync execution')
144 else:
145 else:
145 log.exception("Exception while connecting to celeryd.")
146 log.exception("Exception while connecting to celeryd.")
146 except KeyError as e:
147 except KeyError as e:
147 log.error('Unable to connect to celeryd. Sync execution')
148 log.error('Unable to connect to celeryd. Sync execution')
148 except Exception as e:
149 except Exception as e:
149 log.exception(
150 log.exception(
150 "Exception while trying to run task asynchronous. "
151 "Exception while trying to run task asynchronous. "
151 "Fallback to sync execution.")
152 "Fallback to sync execution.")
152
153
153 # keep in mind there maybe a subtle race condition where something
154 # keep in mind there maybe a subtle race condition where something
154 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
155 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
155 # will see CELERY_ENABLED as True before this has a chance to set False
156 # will see CELERY_ENABLED as True before this has a chance to set False
156 rhodecode.CELERY_ENABLED = celery_is_up
157 rhodecode.CELERY_ENABLED = celery_is_up
157 else:
158 else:
158 log.debug('executing task %s in sync mode', task)
159 log.debug('executing task %s in sync mode', task)
159 return ResultWrapper(task(*args, **kwargs))
160 return ResultWrapper(task(*args, **kwargs))
160
161
161
162
162 def __get_lockkey(func, *fargs, **fkwargs):
163 def __get_lockkey(func, *fargs, **fkwargs):
163 params = list(fargs)
164 params = list(fargs)
164 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
165 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
165
166
166 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
167 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
167 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
168 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
168 return 'task_%s.lock' % (md5_safe(_lock_key),)
169 return 'task_%s.lock' % (md5_safe(_lock_key),)
169
170
170
171
171 def locked_task(func):
172 def locked_task(func):
172 def __wrapper(func, *fargs, **fkwargs):
173 def __wrapper(func, *fargs, **fkwargs):
173 lockkey = __get_lockkey(func, *fargs, **fkwargs)
174 lockkey = __get_lockkey(func, *fargs, **fkwargs)
174 lockkey_path = config['app_conf']['cache_dir']
175 lockkey_path = config['app_conf']['cache_dir']
175
176
176 log.info('running task with lockkey %s' % lockkey)
177 log.info('running task with lockkey %s' % lockkey)
177 try:
178 try:
178 l = DaemonLock(file_=jn(lockkey_path, lockkey))
179 l = DaemonLock(file_=jn(lockkey_path, lockkey))
179 ret = func(*fargs, **fkwargs)
180 ret = func(*fargs, **fkwargs)
180 l.release()
181 l.release()
181 return ret
182 return ret
182 except LockHeld:
183 except LockHeld:
183 log.info('LockHeld')
184 log.info('LockHeld')
184 return 'Task with key %s already running' % lockkey
185 return 'Task with key %s already running' % lockkey
185
186
186 return decorator(__wrapper, func)
187 return decorator(__wrapper, func)
187
188
188
189
189 def get_session():
190 def get_session():
190 if rhodecode.CELERY_ENABLED:
191 if rhodecode.CELERY_ENABLED:
191 utils.initialize_database(config)
192 utils.initialize_database(config)
192 sa = meta.Session()
193 sa = meta.Session()
193 return sa
194 return sa
194
195
195
196
196 def dbsession(func):
197 def dbsession(func):
197 def __wrapper(func, *fargs, **fkwargs):
198 def __wrapper(func, *fargs, **fkwargs):
198 try:
199 try:
199 ret = func(*fargs, **fkwargs)
200 ret = func(*fargs, **fkwargs)
200 return ret
201 return ret
201 finally:
202 finally:
202 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
203 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
203 meta.Session.remove()
204 meta.Session.remove()
204
205
205 return decorator(__wrapper, func)
206 return decorator(__wrapper, func)
206
207
207
208
208 def vcsconnection(func):
209 def vcsconnection(func):
209 def __wrapper(func, *fargs, **fkwargs):
210 def __wrapper(func, *fargs, **fkwargs):
210 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
211 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
211 settings = rhodecode.PYRAMID_SETTINGS
212 settings = rhodecode.PYRAMID_SETTINGS
212 backends = settings['vcs.backends']
213 backends = settings['vcs.backends']
213 for alias in rhodecode.BACKENDS.keys():
214 for alias in rhodecode.BACKENDS.keys():
214 if alias not in backends:
215 if alias not in backends:
215 del rhodecode.BACKENDS[alias]
216 del rhodecode.BACKENDS[alias]
216 utils.configure_pyro4(settings)
217 utils.configure_pyro4(settings)
217 utils.configure_vcs(settings)
218 utils.configure_vcs(settings)
218 connect_vcs(
219 connect_vcs(
219 settings['vcs.server'],
220 settings['vcs.server'],
220 utils.get_vcs_server_protocol(settings))
221 utils.get_vcs_server_protocol(settings))
221 ret = func(*fargs, **fkwargs)
222 ret = func(*fargs, **fkwargs)
222 return ret
223 return ret
223
224
224 return decorator(__wrapper, func)
225 return decorator(__wrapper, func)
@@ -1,92 +1,93 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2012-2016 RhodeCode GmbH
3 # Copyright (C) 2012-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import rhodecode
22 from pylons import config
23
21 from celery.loaders.base import BaseLoader
24 from celery.loaders.base import BaseLoader
22 from pylons import config
23
25
24 to_pylons = lambda x: x.replace('_', '.').lower()
26 to_pylons = lambda x: x.replace('_', '.').lower()
25 to_celery = lambda x: x.replace('.', '_').upper()
27 to_celery = lambda x: x.replace('.', '_').upper()
26
28
27 LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
29 LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
28
30
29
30 class PylonsSettingsProxy(object):
31 class PylonsSettingsProxy(object):
31 """Pylons Settings Proxy
32 """Pylons Settings Proxy
32
33
33 Proxies settings from pylons.config
34 Proxies settings from pylons.config
34
35
35 """
36 """
36 def __getattr__(self, key):
37 def __getattr__(self, key):
37 pylons_key = to_pylons(key)
38 pylons_key = to_pylons(key)
38 try:
39 try:
39 value = config[pylons_key]
40 value = rhodecode.PYRAMID_SETTINGS[pylons_key]
40 if key in LIST_PARAMS:return value.split()
41 if key in LIST_PARAMS:return value.split()
41 return self.type_converter(value)
42 return self.type_converter(value)
42 except KeyError:
43 except KeyError:
43 raise AttributeError(pylons_key)
44 raise AttributeError(pylons_key)
44
45
45 def get(self, key):
46 def get(self, key):
46 try:
47 try:
47 return self.__getattr__(key)
48 return self.__getattr__(key)
48 except AttributeError:
49 except AttributeError:
49 return None
50 return None
50
51
51 def __getitem__(self, key):
52 def __getitem__(self, key):
52 try:
53 try:
53 return self.__getattr__(key)
54 return self.__getattr__(key)
54 except AttributeError:
55 except AttributeError:
55 raise KeyError()
56 raise KeyError()
56
57
57 def __setattr__(self, key, value):
58 def __setattr__(self, key, value):
58 pylons_key = to_pylons(key)
59 pylons_key = to_pylons(key)
59 config[pylons_key] = value
60 rhodecode.PYRAMID_SETTINGS[pylons_key] = value
60
61
61 def __setitem__(self, key, value):
62 def __setitem__(self, key, value):
62 self.__setattr__(key, value)
63 self.__setattr__(key, value)
63
64
64 def type_converter(self, value):
65 def type_converter(self, value):
65 #cast to int
66 #cast to int
66 if value.isdigit():
67 if value.isdigit():
67 return int(value)
68 return int(value)
68
69
69 #cast to bool
70 #cast to bool
70 if value.lower() in ['true', 'false']:
71 if value.lower() in ['true', 'false']:
71 return value.lower() == 'true'
72 return value.lower() == 'true'
72 return value
73 return value
73
74
74 class PylonsLoader(BaseLoader):
75 class PylonsLoader(BaseLoader):
75 """Pylons celery loader
76 """Pylons celery loader
76
77
77 Maps the celery config onto pylons.config
78 Maps the celery config onto pylons.config
78
79
79 """
80 """
80 def read_configuration(self):
81 def read_configuration(self):
81 self.configured = True
82 self.configured = True
82 return PylonsSettingsProxy()
83 return PylonsSettingsProxy()
83
84
84 def on_worker_init(self):
85 def on_worker_init(self):
85 """
86 """
86 Import task modules.
87 Import task modules.
87 """
88 """
88 self.import_default_modules()
89 self.import_default_modules()
89 from rhodecode.config.middleware import make_pyramid_app
90 from rhodecode.config.middleware import make_pyramid_app
90
91
91 # adding to self to keep a reference around
92 # adding to self to keep a reference around
92 self.pyramid_app = make_pyramid_app(config, **config['app_conf'])
93 self.pyramid_app = make_pyramid_app(config, **config['app_conf'])
General Comments 0
You need to be logged in to leave comments. Login now