##// END OF EJS Templates
caches: use individual namespaces per user to prevent beaker caching problems....
caches: use individual namespaces per user to prevent beaker caching problems. - especially for mysql in case large number of data in caches there could be critical errors storing cache, and thus preventing users from authentication. This is caused by the fact that we used single namespace for ALL users. It means it grew as number of users grew reaching mysql single column limit. This changes the behaviour and now we use namespace per-user it means that each user-id will have it's own cache namespace fragmenting maximum column data to a single user cache. Which we should never reach.

File last commit:

r2487:fcee5614 default
r2572:5b07455a default
Show More
tasks.py
301 lines | 10.6 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
release: update copyright year to 2018
r2487 # Copyright (C) 2012-2018 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
RhodeCode task modules, containing all task that suppose to be run
by celery daemon
"""
import os
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 import time
project: added all source files and assets
r1
dan
celery: fixed bug where celery was not running
r265 import rhodecode
audit-logs: moved async tasks from old deprecated action_logger.
r1803 from rhodecode.lib import audit_logger
celery: celery 4.X support. Fixes #4169...
r2359 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
project: added all source files and assets
r1 from rhodecode.lib.hooks_base import log_create_repository
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
from rhodecode.lib.utils2 import safe_int, str2bool
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 from rhodecode.model.db import Session, IntegrityError, Repository, User
project: added all source files and assets
r1
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
project: added all source files and assets
r1 def send_email(recipients, subject, body='', html_body='', email_config=None):
"""
Sends an email with defined parameters from the .ini files.
:param recipients: list of recipients, it this is empty the defined email
address from field 'email_to' is used instead
:param subject: subject of the mail
:param body: body of the mail
:param html_body: html version of body
"""
log = get_logger(send_email)
dan
integrations: add email integration, fixes #4159
r640 email_config = email_config or rhodecode.CONFIG
project: added all source files and assets
r1 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
if not recipients:
# if recipients are not defined we send to email_config + all admins
admins = [
u.email for u in User.query().filter(User.admin == True).all()]
recipients = [email_config.get('email_to')] + admins
mail_server = email_config.get('smtp_server') or None
if mail_server is None:
log.error("SMTP server information missing. Sending email failed. "
"Make sure that `smtp_server` variable is configured "
"inside the .ini file")
return False
mail_from = email_config.get('app_email_from', 'RhodeCode')
user = email_config.get('smtp_username')
passwd = email_config.get('smtp_password')
mail_port = email_config.get('smtp_port')
tls = str2bool(email_config.get('smtp_use_tls'))
ssl = str2bool(email_config.get('smtp_use_ssl'))
debug = str2bool(email_config.get('debug'))
smtp_auth = email_config.get('smtp_auth')
try:
m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
mail_port, ssl, tls, debug=debug)
m.send(recipients, subject, body, html_body)
except Exception:
log.exception('Mail sending failed')
return False
return True
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
project: added all source files and assets
r1 def create_repo(form_data, cur_user):
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.settings import SettingsModel
log = get_logger(create_repo)
celery: celery 4.X support. Fixes #4169...
r2359 cur_user = UserModel()._get_user(cur_user)
project: added all source files and assets
r1 owner = cur_user
repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
repo_type = form_data['repo_type']
description = form_data['repo_description']
private = form_data['repo_private']
clone_uri = form_data.get('clone_uri')
repo_group = safe_int(form_data['repo_group'])
landing_rev = form_data['repo_landing_rev']
copy_fork_permissions = form_data.get('copy_permissions')
copy_group_permissions = form_data.get('repo_copy_permissions')
fork_of = form_data.get('fork_parent_id')
state = form_data.get('repo_state', Repository.STATE_PENDING)
# repo creation defaults, private and repo_type are filled in form
defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
enable_statistics = form_data.get(
'enable_statistics', defs.get('repo_enable_statistics'))
enable_locking = form_data.get(
'enable_locking', defs.get('repo_enable_locking'))
enable_downloads = form_data.get(
'enable_downloads', defs.get('repo_enable_downloads'))
try:
celery: celery 4.X support. Fixes #4169...
r2359 repo = RepoModel()._create_repo(
project: added all source files and assets
r1 repo_name=repo_name_full,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repo_group=repo_group,
landing_rev=landing_rev,
fork_of=fork_of,
copy_fork_permissions=copy_fork_permissions,
copy_group_permissions=copy_group_permissions,
enable_statistics=enable_statistics,
enable_locking=enable_locking,
enable_downloads=enable_downloads,
state=state
)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1
# now create this repo on Filesystem
celery: celery 4.X support. Fixes #4169...
r2359 RepoModel()._create_filesystem_repo(
project: added all source files and assets
r1 repo_name=repo_name,
repo_type=repo_type,
celery: celery 4.X support. Fixes #4169...
r2359 repo_group=RepoModel()._get_repo_group(repo_group),
project: added all source files and assets
r1 clone_uri=clone_uri,
)
repo = Repository.get_by_repo_name(repo_name_full)
log_create_repository(created_by=owner.username, **repo.get_dict())
# update repo commit caches initially
repo.update_commit_cache()
# set new created state
repo.set_state(Repository.STATE_CREATED)
audit-logs: moved async tasks from old deprecated action_logger.
r1803 repo_id = repo.repo_id
repo_data = repo.get_api_data()
audit-logs: implemented full audit logs across application....
r1829 audit_logger.store(
'repo.create', action_data={'data': repo_data},
audit-logs: moved async tasks from old deprecated action_logger.
r1803 user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 except Exception as e:
exception-handling: nicer error catching on repository creation.
r1187 log.warning('Exception occurred when creating repository, '
'doing cleanup...', exc_info=True)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 if isinstance(e, IntegrityError):
Session().rollback()
project: added all source files and assets
r1 # rollback things manually !
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
RepoModel()._delete_filesystem_repo(repo)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 log.info('Cleanup of repo %s finished', repo_name_full)
project: added all source files and assets
r1 raise
return True
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
project: added all source files and assets
r1 def create_repo_fork(form_data, cur_user):
"""
Creates a fork of repository using internal VCS methods
"""
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
log = get_logger(create_repo_fork)
celery: celery 4.X support. Fixes #4169...
r2359 cur_user = UserModel()._get_user(cur_user)
project: added all source files and assets
r1 owner = cur_user
repo_name = form_data['repo_name'] # fork in this case
repo_name_full = form_data['repo_name_full']
repo_type = form_data['repo_type']
description = form_data['description']
private = form_data['private']
clone_uri = form_data.get('clone_uri')
repo_group = safe_int(form_data['repo_group'])
landing_rev = form_data['landing_rev']
copy_fork_permissions = form_data.get('copy_permissions')
fork_id = safe_int(form_data.get('fork_parent_id'))
try:
celery: celery 4.X support. Fixes #4169...
r2359 fork_of = RepoModel()._get_repo(fork_id)
RepoModel()._create_repo(
project: added all source files and assets
r1 repo_name=repo_name_full,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repo_group=repo_group,
landing_rev=landing_rev,
fork_of=fork_of,
copy_fork_permissions=copy_fork_permissions
)
audit-logs: moved async tasks from old deprecated action_logger.
r1803
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1
base_path = Repository.base_path()
source_repo_path = os.path.join(base_path, fork_of.repo_name)
# now create this repo on Filesystem
celery: celery 4.X support. Fixes #4169...
r2359 RepoModel()._create_filesystem_repo(
project: added all source files and assets
r1 repo_name=repo_name,
repo_type=repo_type,
celery: celery 4.X support. Fixes #4169...
r2359 repo_group=RepoModel()._get_repo_group(repo_group),
project: added all source files and assets
r1 clone_uri=source_repo_path,
)
repo = Repository.get_by_repo_name(repo_name_full)
log_create_repository(created_by=owner.username, **repo.get_dict())
# update repo commit caches initially
config = repo._config
config.set('extensions', 'largefiles', '')
repo.update_commit_cache(config=config)
# set new created state
repo.set_state(Repository.STATE_CREATED)
audit-logs: moved async tasks from old deprecated action_logger.
r1803
repo_id = repo.repo_id
repo_data = repo.get_api_data()
audit-logs: implemented full audit logs across application....
r1829 audit_logger.store(
'repo.fork', action_data={'data': repo_data},
audit-logs: moved async tasks from old deprecated action_logger.
r1803 user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1 except Exception as e:
celery: fix fork exception message formatting.
r2459 log.warning('Exception occurred when forking repository, '
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 'doing cleanup...', exc_info=True)
if isinstance(e, IntegrityError):
Session().rollback()
project: added all source files and assets
r1 # rollback things manually !
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
RepoModel()._delete_filesystem_repo(repo)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 log.info('Cleanup of repo %s finished', repo_name_full)
project: added all source files and assets
r1 raise
return True
celery: celery 4.X support. Fixes #4169...
r2359
@async_task(ignore_result=True)
tasks: added a periodic task for repo maintenance. Fixes #5202
r2432 def repo_maintenance(repoid):
from rhodecode.lib import repo_maintenance as repo_maintenance_lib
log = get_logger(repo_maintenance)
repo = Repository.get_by_id_or_repo_name(repoid)
if repo:
maintenance = repo_maintenance_lib.RepoMaintenance()
tasks = maintenance.get_tasks_for_repo(repo)
log.debug('Executing %s tasks on repo `%s`', tasks, repoid)
executed_types = maintenance.execute(repo)
log.debug('Got execution results %s', executed_types)
else:
log.debug('Repo `%s` not found or without a clone_url', repoid)
@async_task(ignore_result=True)
update: add new async task to check for updates via scheduler....
r2431 def check_for_update():
from rhodecode.model.update import UpdateModel
update_url = UpdateModel().get_update_url()
cur_ver = rhodecode.__version__
try:
data = UpdateModel().get_update_data(update_url)
latest = data['versions'][0]
UpdateModel().store_version(latest['version'])
except Exception:
pass
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 @async_task(ignore_result=False)
def beat_check(*args, **kwargs):
log = get_logger(beat_check)
log.info('Got args: %r and kwargs %r', args, kwargs)
return time.time()