##// END OF EJS Templates
config: Use prepared settings from app init to setup the vcs connection.
Martin Bornhold -
r627:84e47054 default
parent child Browse files
Show More
@@ -1,228 +1,228 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 celery libs for RhodeCode
21 celery libs for RhodeCode
22 """
22 """
23
23
24
24
25 import pylons
25 import pylons
26 import socket
26 import socket
27 import logging
27 import logging
28
28
29 import rhodecode
29 import rhodecode
30
30
31 from os.path import join as jn
31 from os.path import join as jn
32 from pylons import config
32 from pylons import config
33 from celery.task import Task
33 from celery.task import Task
34 from pyramid.request import Request
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
36 from pyramid.threadlocal import get_current_request
37
37
38 from decorator import decorator
38 from decorator import decorator
39
39
40 from zope.cachedescriptors.property import Lazy as LazyProperty
40 from zope.cachedescriptors.property import Lazy as LazyProperty
41
41
42 from rhodecode.config import utils
42 from rhodecode.config import utils
43 from rhodecode.lib.utils2 import (
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
45 get_server_url)
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
47 from rhodecode.lib.vcs import connect_vcs
47 from rhodecode.lib.vcs import connect_vcs
48 from rhodecode.model import meta
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
49 from rhodecode.lib.auth import AuthUser
50
50
51 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
52
52
53
53
54 class ResultWrapper(object):
54 class ResultWrapper(object):
55 def __init__(self, task):
55 def __init__(self, task):
56 self.task = task
56 self.task = task
57
57
58 @LazyProperty
58 @LazyProperty
59 def result(self):
59 def result(self):
60 return self.task
60 return self.task
61
61
62
62
63 class RhodecodeCeleryTask(Task):
63 class RhodecodeCeleryTask(Task):
64 """
64 """
65 This is a celery task which will create a rhodecode app instance context
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
67 that created the task and also add the user to the context.
68
68
69 This class as a whole should be removed once the pylons port is complete
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
71 """
72
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
75 """ queue the job to run (we are in web request context here) """
76
76
77 request = get_current_request()
77 request = get_current_request()
78
78
79 # we hook into kwargs since it is the only way to pass our data to the
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
80 # celery worker in celery 2.2
81 kwargs.update({
81 kwargs.update({
82 '_rhodecode_proxy_data': {
82 '_rhodecode_proxy_data': {
83 'environ': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
91 },
92 'auth_user': {
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
94 'user_id': request.user.user_id
95 },
95 },
96 }
96 }
97 })
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
99 args, kwargs, task_id, producer, link, link_error, **options)
100
100
101 def __call__(self, *args, **kwargs):
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
104
105 if not proxy_data:
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
109
110 from rhodecode.config.routing import make_map
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
111 from rhodecode.config.middleware import make_pyramid_app
112
112
113 # TODO: this can be done once per worker versus per task
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
115
116 request = Request.blank('/', environ=proxy_data['environ'])
116 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
118 ip_addr=proxy_data['auth_user']['ip_addr'])
119
119
120 pyramid_request = prepare(request) # set pyramid threadlocal request
120 pyramid_request = prepare(request) # set pyramid threadlocal request
121
121
122 # pylons routing
122 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
123 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
124 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
125 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
126 get_server_url(request.environ)
127 ))
127 ))
128
128
129 try:
129 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
131 finally:
132 pyramid_request['closer']()
132 pyramid_request['closer']()
133 pylons.url._pop_object()
133 pylons.url._pop_object()
134
134
135
135
136 def run_task(task, *args, **kwargs):
136 def run_task(task, *args, **kwargs):
137 if rhodecode.CELERY_ENABLED:
137 if rhodecode.CELERY_ENABLED:
138 celery_is_up = False
138 celery_is_up = False
139 try:
139 try:
140 t = task.apply_async(args=args, kwargs=kwargs)
140 t = task.apply_async(args=args, kwargs=kwargs)
141 log.info('running task %s:%s', t.task_id, task)
141 log.info('running task %s:%s', t.task_id, task)
142 celery_is_up = True
142 celery_is_up = True
143 return t
143 return t
144
144
145 except socket.error as e:
145 except socket.error as e:
146 if isinstance(e, IOError) and e.errno == 111:
146 if isinstance(e, IOError) and e.errno == 111:
147 log.error('Unable to connect to celeryd. Sync execution')
147 log.error('Unable to connect to celeryd. Sync execution')
148 else:
148 else:
149 log.exception("Exception while connecting to celeryd.")
149 log.exception("Exception while connecting to celeryd.")
150 except KeyError as e:
150 except KeyError as e:
151 log.error('Unable to connect to celeryd. Sync execution')
151 log.error('Unable to connect to celeryd. Sync execution')
152 except Exception as e:
152 except Exception as e:
153 log.exception(
153 log.exception(
154 "Exception while trying to run task asynchronous. "
154 "Exception while trying to run task asynchronous. "
155 "Fallback to sync execution.")
155 "Fallback to sync execution.")
156
156
157 # keep in mind there maybe a subtle race condition where something
157 # keep in mind there maybe a subtle race condition where something
158 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
158 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
159 # will see CELERY_ENABLED as True before this has a chance to set False
159 # will see CELERY_ENABLED as True before this has a chance to set False
160 rhodecode.CELERY_ENABLED = celery_is_up
160 rhodecode.CELERY_ENABLED = celery_is_up
161 else:
161 else:
162 log.debug('executing task %s in sync mode', task)
162 log.debug('executing task %s in sync mode', task)
163 return ResultWrapper(task(*args, **kwargs))
163 return ResultWrapper(task(*args, **kwargs))
164
164
165
165
166 def __get_lockkey(func, *fargs, **fkwargs):
166 def __get_lockkey(func, *fargs, **fkwargs):
167 params = list(fargs)
167 params = list(fargs)
168 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
168 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
169
169
170 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
170 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
171 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
171 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
172 return 'task_%s.lock' % (md5_safe(_lock_key),)
172 return 'task_%s.lock' % (md5_safe(_lock_key),)
173
173
174
174
175 def locked_task(func):
175 def locked_task(func):
176 def __wrapper(func, *fargs, **fkwargs):
176 def __wrapper(func, *fargs, **fkwargs):
177 lockkey = __get_lockkey(func, *fargs, **fkwargs)
177 lockkey = __get_lockkey(func, *fargs, **fkwargs)
178 lockkey_path = config['app_conf']['cache_dir']
178 lockkey_path = config['app_conf']['cache_dir']
179
179
180 log.info('running task with lockkey %s' % lockkey)
180 log.info('running task with lockkey %s' % lockkey)
181 try:
181 try:
182 l = DaemonLock(file_=jn(lockkey_path, lockkey))
182 l = DaemonLock(file_=jn(lockkey_path, lockkey))
183 ret = func(*fargs, **fkwargs)
183 ret = func(*fargs, **fkwargs)
184 l.release()
184 l.release()
185 return ret
185 return ret
186 except LockHeld:
186 except LockHeld:
187 log.info('LockHeld')
187 log.info('LockHeld')
188 return 'Task with key %s already running' % lockkey
188 return 'Task with key %s already running' % lockkey
189
189
190 return decorator(__wrapper, func)
190 return decorator(__wrapper, func)
191
191
192
192
193 def get_session():
193 def get_session():
194 if rhodecode.CELERY_ENABLED:
194 if rhodecode.CELERY_ENABLED:
195 utils.initialize_database(config)
195 utils.initialize_database(config)
196 sa = meta.Session()
196 sa = meta.Session()
197 return sa
197 return sa
198
198
199
199
200 def dbsession(func):
200 def dbsession(func):
201 def __wrapper(func, *fargs, **fkwargs):
201 def __wrapper(func, *fargs, **fkwargs):
202 try:
202 try:
203 ret = func(*fargs, **fkwargs)
203 ret = func(*fargs, **fkwargs)
204 return ret
204 return ret
205 finally:
205 finally:
206 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
206 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
207 meta.Session.remove()
207 meta.Session.remove()
208
208
209 return decorator(__wrapper, func)
209 return decorator(__wrapper, func)
210
210
211
211
212 def vcsconnection(func):
212 def vcsconnection(func):
213 def __wrapper(func, *fargs, **fkwargs):
213 def __wrapper(func, *fargs, **fkwargs):
214 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
214 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
215 backends = config['vcs.backends'] = aslist(
215 settings = rhodecode.PYRAMID_SETTINGS
216 config.get('vcs.backends', 'hg,git'), sep=',')
216 backends = settings['vcs.backends']
217 for alias in rhodecode.BACKENDS.keys():
217 for alias in rhodecode.BACKENDS.keys():
218 if alias not in backends:
218 if alias not in backends:
219 del rhodecode.BACKENDS[alias]
219 del rhodecode.BACKENDS[alias]
220 utils.configure_pyro4(config)
220 utils.configure_pyro4(settings)
221 utils.configure_vcs(config)
221 utils.configure_vcs(settings)
222 connect_vcs(
222 connect_vcs(
223 config['vcs.server'],
223 settings['vcs.server'],
224 utils.get_vcs_server_protocol(config))
224 utils.get_vcs_server_protocol(settings))
225 ret = func(*fargs, **fkwargs)
225 ret = func(*fargs, **fkwargs)
226 return ret
226 return ret
227
227
228 return decorator(__wrapper, func)
228 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now