##// END OF EJS Templates
celery: create custom celery task class which loads context/threadlocals...
dan -
r576:ffa50597 default
parent child Browse files
Show More
@@ -1,147 +1,228 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 celery libs for RhodeCode
22 22 """
23 23
24 24
25 import pylons
25 26 import socket
26 27 import logging
27 28
28 29 import rhodecode
29 30
30 31 from os.path import join as jn
31 32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
32 37
33 38 from decorator import decorator
34 39
35 40 from zope.cachedescriptors.property import Lazy as LazyProperty
36 41
37 42 from rhodecode.config import utils
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
39 46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 47 from rhodecode.lib.vcs import connect_vcs
41 48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
42 50
43 51 log = logging.getLogger(__name__)
44 52
45 53
46 54 class ResultWrapper(object):
47 55 def __init__(self, task):
48 56 self.task = task
49 57
50 58 @LazyProperty
51 59 def result(self):
52 60 return self.task
53 61
54 62
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
81 kwargs.update({
82 '_rhodecode_proxy_data': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
95 },
96 }
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
100
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
112
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
116 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
119
120 pyramid_request = prepare(request) # set pyramid threadlocal request
121
122 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
127 ))
128
129 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
132 pyramid_request['closer']()
133 pylons.url._pop_object()
134
135
55 136 def run_task(task, *args, **kwargs):
56 137 if rhodecode.CELERY_ENABLED:
57 138 celery_is_up = False
58 139 try:
59 140 t = task.apply_async(args=args, kwargs=kwargs)
60 141 log.info('running task %s:%s', t.task_id, task)
61 142 celery_is_up = True
62 143 return t
63 144
64 145 except socket.error as e:
65 146 if isinstance(e, IOError) and e.errno == 111:
66 147 log.error('Unable to connect to celeryd. Sync execution')
67 148 else:
68 149 log.exception("Exception while connecting to celeryd.")
69 150 except KeyError as e:
70 151 log.error('Unable to connect to celeryd. Sync execution')
71 152 except Exception as e:
72 153 log.exception(
73 154 "Exception while trying to run task asynchronous. "
74 155 "Fallback to sync execution.")
75 156
76 157 # keep in mind there maybe a subtle race condition where something
77 158 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
78 159 # will see CELERY_ENABLED as True before this has a chance to set False
79 160 rhodecode.CELERY_ENABLED = celery_is_up
80 161 else:
81 162 log.debug('executing task %s in sync mode', task)
82 163 return ResultWrapper(task(*args, **kwargs))
83 164
84 165
85 166 def __get_lockkey(func, *fargs, **fkwargs):
86 167 params = list(fargs)
87 168 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
88 169
89 170 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
90 171 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
91 172 return 'task_%s.lock' % (md5_safe(_lock_key),)
92 173
93 174
94 175 def locked_task(func):
95 176 def __wrapper(func, *fargs, **fkwargs):
96 177 lockkey = __get_lockkey(func, *fargs, **fkwargs)
97 178 lockkey_path = config['app_conf']['cache_dir']
98 179
99 180 log.info('running task with lockkey %s' % lockkey)
100 181 try:
101 182 l = DaemonLock(file_=jn(lockkey_path, lockkey))
102 183 ret = func(*fargs, **fkwargs)
103 184 l.release()
104 185 return ret
105 186 except LockHeld:
106 187 log.info('LockHeld')
107 188 return 'Task with key %s already running' % lockkey
108 189
109 190 return decorator(__wrapper, func)
110 191
111 192
112 193 def get_session():
113 194 if rhodecode.CELERY_ENABLED:
114 195 utils.initialize_database(config)
115 196 sa = meta.Session()
116 197 return sa
117 198
118 199
119 200 def dbsession(func):
120 201 def __wrapper(func, *fargs, **fkwargs):
121 202 try:
122 203 ret = func(*fargs, **fkwargs)
123 204 return ret
124 205 finally:
125 206 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
126 207 meta.Session.remove()
127 208
128 209 return decorator(__wrapper, func)
129 210
130 211
131 212 def vcsconnection(func):
132 213 def __wrapper(func, *fargs, **fkwargs):
133 214 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
134 215 backends = config['vcs.backends'] = aslist(
135 216 config.get('vcs.backends', 'hg,git'), sep=',')
136 217 for alias in rhodecode.BACKENDS.keys():
137 218 if alias not in backends:
138 219 del rhodecode.BACKENDS[alias]
139 220 utils.configure_pyro4(config)
140 221 utils.configure_vcs(config)
141 222 connect_vcs(
142 223 config['vcs.server'],
143 224 utils.get_vcs_server_protocol(config))
144 225 ret = func(*fargs, **fkwargs)
145 226 return ret
146 227
147 228 return decorator(__wrapper, func)
@@ -1,284 +1,284 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode task modules, containing all task that suppose to be run
23 23 by celery daemon
24 24 """
25 25
26 26
27 27 import os
28 28 import logging
29 29
30 30 from celery.task import task
31 31 from pylons import config
32 32
33 33 import rhodecode
34 34 from rhodecode.lib.celerylib import (
35 35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection)
36 get_session, vcsconnection, RhodecodeCeleryTask)
37 37 from rhodecode.lib.hooks_base import log_create_repository
38 38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 39 from rhodecode.lib.utils import add_cache, action_logger
40 40 from rhodecode.lib.utils2 import safe_int, str2bool
41 41 from rhodecode.model.db import Repository, User
42 42
43 43
44 44 add_cache(config) # pragma: no cover
45 45
46 46
47 47 def get_logger(cls):
48 48 if rhodecode.CELERY_ENABLED:
49 49 try:
50 50 log = cls.get_logger()
51 51 except Exception:
52 52 log = logging.getLogger(__name__)
53 53 else:
54 54 log = logging.getLogger(__name__)
55 55
56 56 return log
57 57
58 58
59 @task(ignore_result=True)
59 @task(ignore_result=True, base=RhodecodeCeleryTask)
60 60 @dbsession
61 61 def send_email(recipients, subject, body='', html_body='', email_config=None):
62 62 """
63 63 Sends an email with defined parameters from the .ini files.
64 64
65 65 :param recipients: list of recipients, it this is empty the defined email
66 66 address from field 'email_to' is used instead
67 67 :param subject: subject of the mail
68 68 :param body: body of the mail
69 69 :param html_body: html version of body
70 70 """
71 71 log = get_logger(send_email)
72 72
73 73 email_config = email_config or config
74 74 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
75 75 if not recipients:
76 76 # if recipients are not defined we send to email_config + all admins
77 77 admins = [
78 78 u.email for u in User.query().filter(User.admin == True).all()]
79 79 recipients = [email_config.get('email_to')] + admins
80 80
81 81 mail_server = email_config.get('smtp_server') or None
82 82 if mail_server is None:
83 83 log.error("SMTP server information missing. Sending email failed. "
84 84 "Make sure that `smtp_server` variable is configured "
85 85 "inside the .ini file")
86 86 return False
87 87
88 88 mail_from = email_config.get('app_email_from', 'RhodeCode')
89 89 user = email_config.get('smtp_username')
90 90 passwd = email_config.get('smtp_password')
91 91 mail_port = email_config.get('smtp_port')
92 92 tls = str2bool(email_config.get('smtp_use_tls'))
93 93 ssl = str2bool(email_config.get('smtp_use_ssl'))
94 94 debug = str2bool(email_config.get('debug'))
95 95 smtp_auth = email_config.get('smtp_auth')
96 96
97 97 try:
98 98 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
99 99 mail_port, ssl, tls, debug=debug)
100 100 m.send(recipients, subject, body, html_body)
101 101 except Exception:
102 102 log.exception('Mail sending failed')
103 103 return False
104 104 return True
105 105
106 106
107 @task(ignore_result=False)
107 @task(ignore_result=True, base=RhodecodeCeleryTask)
108 108 @dbsession
109 109 @vcsconnection
110 110 def create_repo(form_data, cur_user):
111 111 from rhodecode.model.repo import RepoModel
112 112 from rhodecode.model.user import UserModel
113 113 from rhodecode.model.settings import SettingsModel
114 114
115 115 log = get_logger(create_repo)
116 116 DBS = get_session()
117 117
118 118 cur_user = UserModel(DBS)._get_user(cur_user)
119 119 owner = cur_user
120 120
121 121 repo_name = form_data['repo_name']
122 122 repo_name_full = form_data['repo_name_full']
123 123 repo_type = form_data['repo_type']
124 124 description = form_data['repo_description']
125 125 private = form_data['repo_private']
126 126 clone_uri = form_data.get('clone_uri')
127 127 repo_group = safe_int(form_data['repo_group'])
128 128 landing_rev = form_data['repo_landing_rev']
129 129 copy_fork_permissions = form_data.get('copy_permissions')
130 130 copy_group_permissions = form_data.get('repo_copy_permissions')
131 131 fork_of = form_data.get('fork_parent_id')
132 132 state = form_data.get('repo_state', Repository.STATE_PENDING)
133 133
134 134 # repo creation defaults, private and repo_type are filled in form
135 135 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
136 136 enable_statistics = form_data.get(
137 137 'enable_statistics', defs.get('repo_enable_statistics'))
138 138 enable_locking = form_data.get(
139 139 'enable_locking', defs.get('repo_enable_locking'))
140 140 enable_downloads = form_data.get(
141 141 'enable_downloads', defs.get('repo_enable_downloads'))
142 142
143 143 try:
144 144 RepoModel(DBS)._create_repo(
145 145 repo_name=repo_name_full,
146 146 repo_type=repo_type,
147 147 description=description,
148 148 owner=owner,
149 149 private=private,
150 150 clone_uri=clone_uri,
151 151 repo_group=repo_group,
152 152 landing_rev=landing_rev,
153 153 fork_of=fork_of,
154 154 copy_fork_permissions=copy_fork_permissions,
155 155 copy_group_permissions=copy_group_permissions,
156 156 enable_statistics=enable_statistics,
157 157 enable_locking=enable_locking,
158 158 enable_downloads=enable_downloads,
159 159 state=state
160 160 )
161 161
162 162 action_logger(cur_user, 'user_created_repo',
163 163 repo_name_full, '', DBS)
164 164 DBS.commit()
165 165
166 166 # now create this repo on Filesystem
167 167 RepoModel(DBS)._create_filesystem_repo(
168 168 repo_name=repo_name,
169 169 repo_type=repo_type,
170 170 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
171 171 clone_uri=clone_uri,
172 172 )
173 173 repo = Repository.get_by_repo_name(repo_name_full)
174 174 log_create_repository(created_by=owner.username, **repo.get_dict())
175 175
176 176 # update repo commit caches initially
177 177 repo.update_commit_cache()
178 178
179 179 # set new created state
180 180 repo.set_state(Repository.STATE_CREATED)
181 181 DBS.commit()
182 182 except Exception as e:
183 183 log.warning('Exception %s occurred when creating repository, '
184 184 'doing cleanup...', e)
185 185 # rollback things manually !
186 186 repo = Repository.get_by_repo_name(repo_name_full)
187 187 if repo:
188 188 Repository.delete(repo.repo_id)
189 189 DBS.commit()
190 190 RepoModel(DBS)._delete_filesystem_repo(repo)
191 191 raise
192 192
193 193 # it's an odd fix to make celery fail task when exception occurs
194 194 def on_failure(self, *args, **kwargs):
195 195 pass
196 196
197 197 return True
198 198
199 199
200 @task(ignore_result=False)
200 @task(ignore_result=True, base=RhodecodeCeleryTask)
201 201 @dbsession
202 202 @vcsconnection
203 203 def create_repo_fork(form_data, cur_user):
204 204 """
205 205 Creates a fork of repository using internal VCS methods
206 206
207 207 :param form_data:
208 208 :param cur_user:
209 209 """
210 210 from rhodecode.model.repo import RepoModel
211 211 from rhodecode.model.user import UserModel
212 212
213 213 log = get_logger(create_repo_fork)
214 214 DBS = get_session()
215 215
216 216 cur_user = UserModel(DBS)._get_user(cur_user)
217 217 owner = cur_user
218 218
219 219 repo_name = form_data['repo_name'] # fork in this case
220 220 repo_name_full = form_data['repo_name_full']
221 221 repo_type = form_data['repo_type']
222 222 description = form_data['description']
223 223 private = form_data['private']
224 224 clone_uri = form_data.get('clone_uri')
225 225 repo_group = safe_int(form_data['repo_group'])
226 226 landing_rev = form_data['landing_rev']
227 227 copy_fork_permissions = form_data.get('copy_permissions')
228 228 fork_id = safe_int(form_data.get('fork_parent_id'))
229 229
230 230 try:
231 231 fork_of = RepoModel(DBS)._get_repo(fork_id)
232 232 RepoModel(DBS)._create_repo(
233 233 repo_name=repo_name_full,
234 234 repo_type=repo_type,
235 235 description=description,
236 236 owner=owner,
237 237 private=private,
238 238 clone_uri=clone_uri,
239 239 repo_group=repo_group,
240 240 landing_rev=landing_rev,
241 241 fork_of=fork_of,
242 242 copy_fork_permissions=copy_fork_permissions
243 243 )
244 244 action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
245 245 fork_of.repo_name, '', DBS)
246 246 DBS.commit()
247 247
248 248 base_path = Repository.base_path()
249 249 source_repo_path = os.path.join(base_path, fork_of.repo_name)
250 250
251 251 # now create this repo on Filesystem
252 252 RepoModel(DBS)._create_filesystem_repo(
253 253 repo_name=repo_name,
254 254 repo_type=repo_type,
255 255 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
256 256 clone_uri=source_repo_path,
257 257 )
258 258 repo = Repository.get_by_repo_name(repo_name_full)
259 259 log_create_repository(created_by=owner.username, **repo.get_dict())
260 260
261 261 # update repo commit caches initially
262 262 config = repo._config
263 263 config.set('extensions', 'largefiles', '')
264 264 repo.update_commit_cache(config=config)
265 265
266 266 # set new created state
267 267 repo.set_state(Repository.STATE_CREATED)
268 268 DBS.commit()
269 269 except Exception as e:
270 270 log.warning('Exception %s occurred when forking repository, '
271 271 'doing cleanup...', e)
272 272 # rollback things manually !
273 273 repo = Repository.get_by_repo_name(repo_name_full)
274 274 if repo:
275 275 Repository.delete(repo.repo_id)
276 276 DBS.commit()
277 277 RepoModel(DBS)._delete_filesystem_repo(repo)
278 278 raise
279 279
280 280 # it's an odd fix to make celery fail task when exception occurs
281 281 def on_failure(self, *args, **kwargs):
282 282 pass
283 283
284 284 return True
General Comments 0
You need to be logged in to leave comments. Login now