##// END OF EJS Templates
celerylib: fixed broken tasks for auto-update
super-admin -
r4735:fd8f99cd stable
parent child Browse files
Show More
@@ -1,69 +1,72 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import socket
22 22 import logging
23 23
24 24 import rhodecode
25 25 from zope.cachedescriptors.property import Lazy as LazyProperty
26 26 from rhodecode.lib.celerylib.loader import (
27 27 celery_app, RequestContextTask, get_logger)
28 28
29 29 async_task = celery_app.task
30 30
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 class ResultWrapper(object):
36 36 def __init__(self, task):
37 37 self.task = task
38 38
39 39 @LazyProperty
40 40 def result(self):
41 41 return self.task
42 42
43 43
44 44 def run_task(task, *args, **kwargs):
45 45 log.debug('Got task `%s` for execution', task)
46 if task is None:
47 raise ValueError('Got non-existing task for execution')
48
46 49 if rhodecode.CELERY_ENABLED:
47 50 celery_is_up = False
48 51 try:
49 52 t = task.apply_async(args=args, kwargs=kwargs)
50 53 celery_is_up = True
51 54 log.debug('executing task %s:%s in async mode', t.task_id, task)
52 55 return t
53 56
54 57 except socket.error as e:
55 58 if isinstance(e, IOError) and e.errno == 111:
56 59 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
57 60 else:
58 61 log.exception("Exception while connecting to celeryd.")
59 62 except KeyError as e:
60 63 log.error('Unable to connect to celeryd `%s`. Sync execution', e)
61 64 except Exception as e:
62 65 log.exception(
63 66 "Exception while trying to run task asynchronous. "
64 67 "Fallback to sync execution.")
65 68
66 69 else:
67 70 log.debug('executing task %s:%s in sync mode', 'TASK', task)
68 71
69 72 return ResultWrapper(task(*args, **kwargs))
@@ -1,407 +1,410 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode task modules, containing all task that suppose to be run
23 23 by celery daemon
24 24 """
25 25
26 26 import os
27 27 import time
28 28
29 29 from pyramid import compat
30 30 from pyramid_mailer.mailer import Mailer
31 31 from pyramid_mailer.message import Message
32 32 from email.utils import formatdate
33 33
34 34 import rhodecode
35 35 from rhodecode.lib import audit_logger
36 36 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
37 37 from rhodecode.lib import hooks_base
38 38 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
39 39 from rhodecode.model.db import (
40 40 Session, IntegrityError, true, Repository, RepoGroup, User)
41 41 from rhodecode.model.permission import PermissionModel
42 42
43 43
44 44 @async_task(ignore_result=True, base=RequestContextTask)
45 45 def send_email(recipients, subject, body='', html_body='', email_config=None,
46 46 extra_headers=None):
47 47 """
48 48 Sends an email with defined parameters from the .ini files.
49 49
50 50 :param recipients: list of recipients, it this is empty the defined email
51 51 address from field 'email_to' is used instead
52 52 :param subject: subject of the mail
53 53 :param body: body of the mail
54 54 :param html_body: html version of body
55 55 :param email_config: specify custom configuration for mailer
56 56 :param extra_headers: specify custom headers
57 57 """
58 58 log = get_logger(send_email)
59 59
60 60 email_config = email_config or rhodecode.CONFIG
61 61
62 62 mail_server = email_config.get('smtp_server') or None
63 63 if mail_server is None:
64 64 log.error("SMTP server information missing. Sending email failed. "
65 65 "Make sure that `smtp_server` variable is configured "
66 66 "inside the .ini file")
67 67 return False
68 68
69 69 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
70 70
71 71 if recipients:
72 72 if isinstance(recipients, compat.string_types):
73 73 recipients = recipients.split(',')
74 74 else:
75 75 # if recipients are not defined we send to email_config + all admins
76 76 admins = []
77 77 for u in User.query().filter(User.admin == true()).all():
78 78 if u.email:
79 79 admins.append(u.email)
80 80 recipients = []
81 81 config_email = email_config.get('email_to')
82 82 if config_email:
83 83 recipients += [config_email]
84 84 recipients += admins
85 85
86 86 # translate our LEGACY config into the one that pyramid_mailer supports
87 87 email_conf = dict(
88 88 host=mail_server,
89 89 port=email_config.get('smtp_port', 25),
90 90 username=email_config.get('smtp_username'),
91 91 password=email_config.get('smtp_password'),
92 92
93 93 tls=str2bool(email_config.get('smtp_use_tls')),
94 94 ssl=str2bool(email_config.get('smtp_use_ssl')),
95 95
96 96 # SSL key file
97 97 # keyfile='',
98 98
99 99 # SSL certificate file
100 100 # certfile='',
101 101
102 102 # Location of maildir
103 103 # queue_path='',
104 104
105 105 default_sender=email_config.get('app_email_from', 'RhodeCode-noreply@rhodecode.com'),
106 106
107 107 debug=str2bool(email_config.get('smtp_debug')),
108 108 # /usr/sbin/sendmail Sendmail executable
109 109 # sendmail_app='',
110 110
111 111 # {sendmail_app} -t -i -f {sender} Template for sendmail execution
112 112 # sendmail_template='',
113 113 )
114 114
115 115 if extra_headers is None:
116 116 extra_headers = {}
117 117
118 118 extra_headers.setdefault('Date', formatdate(time.time()))
119 119
120 120 if 'thread_ids' in extra_headers:
121 121 thread_ids = extra_headers.pop('thread_ids')
122 122 extra_headers['References'] = ' '.join('<{}>'.format(t) for t in thread_ids)
123 123
124 124 try:
125 125 mailer = Mailer(**email_conf)
126 126
127 127 message = Message(subject=subject,
128 128 sender=email_conf['default_sender'],
129 129 recipients=recipients,
130 130 body=body, html=html_body,
131 131 extra_headers=extra_headers)
132 132 mailer.send_immediately(message)
133 133
134 134 except Exception:
135 135 log.exception('Mail sending failed')
136 136 return False
137 137 return True
138 138
139 139
140 140 @async_task(ignore_result=True, base=RequestContextTask)
141 141 def create_repo(form_data, cur_user):
142 142 from rhodecode.model.repo import RepoModel
143 143 from rhodecode.model.user import UserModel
144 144 from rhodecode.model.scm import ScmModel
145 145 from rhodecode.model.settings import SettingsModel
146 146
147 147 log = get_logger(create_repo)
148 148
149 149 cur_user = UserModel()._get_user(cur_user)
150 150 owner = cur_user
151 151
152 152 repo_name = form_data['repo_name']
153 153 repo_name_full = form_data['repo_name_full']
154 154 repo_type = form_data['repo_type']
155 155 description = form_data['repo_description']
156 156 private = form_data['repo_private']
157 157 clone_uri = form_data.get('clone_uri')
158 158 repo_group = safe_int(form_data['repo_group'])
159 159 copy_fork_permissions = form_data.get('copy_permissions')
160 160 copy_group_permissions = form_data.get('repo_copy_permissions')
161 161 fork_of = form_data.get('fork_parent_id')
162 162 state = form_data.get('repo_state', Repository.STATE_PENDING)
163 163
164 164 # repo creation defaults, private and repo_type are filled in form
165 165 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
166 166 enable_statistics = form_data.get(
167 167 'enable_statistics', defs.get('repo_enable_statistics'))
168 168 enable_locking = form_data.get(
169 169 'enable_locking', defs.get('repo_enable_locking'))
170 170 enable_downloads = form_data.get(
171 171 'enable_downloads', defs.get('repo_enable_downloads'))
172 172
173 173 # set landing rev based on default branches for SCM
174 174 landing_ref, _label = ScmModel.backend_landing_ref(repo_type)
175 175
176 176 try:
177 177 RepoModel()._create_repo(
178 178 repo_name=repo_name_full,
179 179 repo_type=repo_type,
180 180 description=description,
181 181 owner=owner,
182 182 private=private,
183 183 clone_uri=clone_uri,
184 184 repo_group=repo_group,
185 185 landing_rev=landing_ref,
186 186 fork_of=fork_of,
187 187 copy_fork_permissions=copy_fork_permissions,
188 188 copy_group_permissions=copy_group_permissions,
189 189 enable_statistics=enable_statistics,
190 190 enable_locking=enable_locking,
191 191 enable_downloads=enable_downloads,
192 192 state=state
193 193 )
194 194 Session().commit()
195 195
196 196 # now create this repo on Filesystem
197 197 RepoModel()._create_filesystem_repo(
198 198 repo_name=repo_name,
199 199 repo_type=repo_type,
200 200 repo_group=RepoModel()._get_repo_group(repo_group),
201 201 clone_uri=clone_uri,
202 202 )
203 203 repo = Repository.get_by_repo_name(repo_name_full)
204 204 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
205 205
206 206 # update repo commit caches initially
207 207 repo.update_commit_cache()
208 208
209 209 # set new created state
210 210 repo.set_state(Repository.STATE_CREATED)
211 211 repo_id = repo.repo_id
212 212 repo_data = repo.get_api_data()
213 213
214 214 audit_logger.store(
215 215 'repo.create', action_data={'data': repo_data},
216 216 user=cur_user,
217 217 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
218 218
219 219 Session().commit()
220 220
221 221 PermissionModel().trigger_permission_flush()
222 222
223 223 except Exception as e:
224 224 log.warning('Exception occurred when creating repository, '
225 225 'doing cleanup...', exc_info=True)
226 226 if isinstance(e, IntegrityError):
227 227 Session().rollback()
228 228
229 229 # rollback things manually !
230 230 repo = Repository.get_by_repo_name(repo_name_full)
231 231 if repo:
232 232 Repository.delete(repo.repo_id)
233 233 Session().commit()
234 234 RepoModel()._delete_filesystem_repo(repo)
235 235 log.info('Cleanup of repo %s finished', repo_name_full)
236 236 raise
237 237
238 238 return True
239 239
240 240
241 241 @async_task(ignore_result=True, base=RequestContextTask)
242 242 def create_repo_fork(form_data, cur_user):
243 243 """
244 244 Creates a fork of repository using internal VCS methods
245 245 """
246 246 from rhodecode.model.repo import RepoModel
247 247 from rhodecode.model.user import UserModel
248 248
249 249 log = get_logger(create_repo_fork)
250 250
251 251 cur_user = UserModel()._get_user(cur_user)
252 252 owner = cur_user
253 253
254 254 repo_name = form_data['repo_name'] # fork in this case
255 255 repo_name_full = form_data['repo_name_full']
256 256 repo_type = form_data['repo_type']
257 257 description = form_data['description']
258 258 private = form_data['private']
259 259 clone_uri = form_data.get('clone_uri')
260 260 repo_group = safe_int(form_data['repo_group'])
261 261 landing_ref = form_data['landing_rev']
262 262 copy_fork_permissions = form_data.get('copy_permissions')
263 263 fork_id = safe_int(form_data.get('fork_parent_id'))
264 264
265 265 try:
266 266 fork_of = RepoModel()._get_repo(fork_id)
267 267 RepoModel()._create_repo(
268 268 repo_name=repo_name_full,
269 269 repo_type=repo_type,
270 270 description=description,
271 271 owner=owner,
272 272 private=private,
273 273 clone_uri=clone_uri,
274 274 repo_group=repo_group,
275 275 landing_rev=landing_ref,
276 276 fork_of=fork_of,
277 277 copy_fork_permissions=copy_fork_permissions
278 278 )
279 279
280 280 Session().commit()
281 281
282 282 base_path = Repository.base_path()
283 283 source_repo_path = os.path.join(base_path, fork_of.repo_name)
284 284
285 285 # now create this repo on Filesystem
286 286 RepoModel()._create_filesystem_repo(
287 287 repo_name=repo_name,
288 288 repo_type=repo_type,
289 289 repo_group=RepoModel()._get_repo_group(repo_group),
290 290 clone_uri=source_repo_path,
291 291 )
292 292 repo = Repository.get_by_repo_name(repo_name_full)
293 293 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
294 294
295 295 # update repo commit caches initially
296 296 config = repo._config
297 297 config.set('extensions', 'largefiles', '')
298 298 repo.update_commit_cache(config=config)
299 299
300 300 # set new created state
301 301 repo.set_state(Repository.STATE_CREATED)
302 302
303 303 repo_id = repo.repo_id
304 304 repo_data = repo.get_api_data()
305 305 audit_logger.store(
306 306 'repo.fork', action_data={'data': repo_data},
307 307 user=cur_user,
308 308 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
309 309
310 310 Session().commit()
311 311 except Exception as e:
312 312 log.warning('Exception occurred when forking repository, '
313 313 'doing cleanup...', exc_info=True)
314 314 if isinstance(e, IntegrityError):
315 315 Session().rollback()
316 316
317 317 # rollback things manually !
318 318 repo = Repository.get_by_repo_name(repo_name_full)
319 319 if repo:
320 320 Repository.delete(repo.repo_id)
321 321 Session().commit()
322 322 RepoModel()._delete_filesystem_repo(repo)
323 323 log.info('Cleanup of repo %s finished', repo_name_full)
324 324 raise
325 325
326 326 return True
327 327
328 328
329 329 @async_task(ignore_result=True)
330 330 def repo_maintenance(repoid):
331 331 from rhodecode.lib import repo_maintenance as repo_maintenance_lib
332 332 log = get_logger(repo_maintenance)
333 333 repo = Repository.get_by_id_or_repo_name(repoid)
334 334 if repo:
335 335 maintenance = repo_maintenance_lib.RepoMaintenance()
336 336 tasks = maintenance.get_tasks_for_repo(repo)
337 337 log.debug('Executing %s tasks on repo `%s`', tasks, repoid)
338 338 executed_types = maintenance.execute(repo)
339 339 log.debug('Got execution results %s', executed_types)
340 340 else:
341 341 log.debug('Repo `%s` not found or without a clone_url', repoid)
342 342
343 343
344 344 @async_task(ignore_result=True)
345 345 def check_for_update(send_email_notification=True, email_recipients=None):
346 346 from rhodecode.model.update import UpdateModel
347 347 from rhodecode.model.notification import EmailNotificationModel
348 348
349 349 log = get_logger(check_for_update)
350 350 update_url = UpdateModel().get_update_url()
351 351 cur_ver = rhodecode.__version__
352 352
353 353 try:
354 354 data = UpdateModel().get_update_data(update_url)
355 355
356 356 current_ver = UpdateModel().get_stored_version(fallback=cur_ver)
357 357 latest_ver = data['versions'][0]['version']
358 358 UpdateModel().store_version(latest_ver)
359 359
360 360 if send_email_notification:
361 361 log.debug('Send email notification is enabled. '
362 362 'Current RhodeCode version: %s, latest known: %s', current_ver, latest_ver)
363 363 if UpdateModel().is_outdated(current_ver, latest_ver):
364 364
365 365 email_kwargs = {
366 366 'current_ver': current_ver,
367 367 'latest_ver': latest_ver,
368 368 }
369 369
370 370 (subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email(
371 371 EmailNotificationModel.TYPE_UPDATE_AVAILABLE, **email_kwargs)
372 372
373 373 email_recipients = aslist(email_recipients, sep=',') or \
374 374 [user.email for user in User.get_all_super_admins()]
375 375 run_task(send_email, email_recipients, subject,
376 376 email_body_plaintext, email_body)
377 377
378 378 except Exception:
379 379 pass
380 380
381 381
382 382 @async_task(ignore_result=False)
383 383 def beat_check(*args, **kwargs):
384 384 log = get_logger(beat_check)
385 385 log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
386 386 return time.time()
387 387
388 388
389 @async_task(ignore_result=True)
390 def sync_last_update(*args, **kwargs):
391
389 def sync_last_update_for_objects(*args, **kwargs):
392 390 skip_repos = kwargs.get('skip_repos')
393 391 if not skip_repos:
394 392 repos = Repository.query() \
395 393 .order_by(Repository.group_id.asc())
396 394
397 395 for repo in repos:
398 396 repo.update_commit_cache()
399 397
400 398 skip_groups = kwargs.get('skip_groups')
401 399 if not skip_groups:
402 400 repo_groups = RepoGroup.query() \
403 401 .filter(RepoGroup.group_parent_id == None)
404 402
405 403 for root_gr in repo_groups:
406 404 for repo_gr in reversed(root_gr.recursive_groups()):
407 405 repo_gr.update_commit_cache()
406
407
408 @async_task(ignore_result=True)
409 def sync_last_update(*args, **kwargs):
410 sync_last_update_for_objects(*args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now