tasks.py
412 lines
| 14.3 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2012-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
RhodeCode task modules, containing all task that suppose to be run | ||||
by celery daemon | ||||
""" | ||||
import os | ||||
r2406 | import time | |||
r1 | ||||
r3113 | from pyramid_mailer.mailer import Mailer | |||
from pyramid_mailer.message import Message | ||||
r4447 | from email.utils import formatdate | |||
r3113 | ||||
r265 | import rhodecode | |||
r1803 | from rhodecode.lib import audit_logger | |||
r4634 | from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task | |||
r4445 | from rhodecode.lib import hooks_base | |||
r4634 | from rhodecode.lib.utils2 import safe_int, str2bool, aslist | |||
r4803 | from rhodecode.lib.statsd_client import StatsdClient | |||
r4160 | from rhodecode.model.db import ( | |||
r5180 | true, null, Session, IntegrityError, Repository, RepoGroup, User) | |||
r4697 | from rhodecode.model.permission import PermissionModel | |||
r1 | ||||
r2359 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r4447 | def send_email(recipients, subject, body='', html_body='', email_config=None, | |||
extra_headers=None): | ||||
r1 | """ | |||
Sends an email with defined parameters from the .ini files. | ||||
:param recipients: list of recipients, it this is empty the defined email | ||||
address from field 'email_to' is used instead | ||||
:param subject: subject of the mail | ||||
:param body: body of the mail | ||||
:param html_body: html version of body | ||||
r4275 | :param email_config: specify custom configuration for mailer | |||
r4447 | :param extra_headers: specify custom headers | |||
r1 | """ | |||
log = get_logger(send_email) | ||||
r640 | email_config = email_config or rhodecode.CONFIG | |||
r3077 | ||||
mail_server = email_config.get('smtp_server') or None | ||||
if mail_server is None: | ||||
log.error("SMTP server information missing. Sending email failed. " | ||||
"Make sure that `smtp_server` variable is configured " | ||||
"inside the .ini file") | ||||
return False | ||||
r1 | subject = "%s %s" % (email_config.get('email_prefix', ''), subject) | |||
r3113 | ||||
if recipients: | ||||
r4908 | if isinstance(recipients, str): | |||
r3113 | recipients = recipients.split(',') | |||
else: | ||||
r1 | # if recipients are not defined we send to email_config + all admins | |||
r2896 | admins = [] | |||
for u in User.query().filter(User.admin == true()).all(): | ||||
if u.email: | ||||
admins.append(u.email) | ||||
recipients = [] | ||||
config_email = email_config.get('email_to') | ||||
if config_email: | ||||
recipients += [config_email] | ||||
recipients += admins | ||||
r1 | ||||
r3113 | # translate our LEGACY config into the one that pyramid_mailer supports | |||
email_conf = dict( | ||||
host=mail_server, | ||||
r3197 | port=email_config.get('smtp_port', 25), | |||
r3113 | username=email_config.get('smtp_username'), | |||
password=email_config.get('smtp_password'), | ||||
tls=str2bool(email_config.get('smtp_use_tls')), | ||||
ssl=str2bool(email_config.get('smtp_use_ssl')), | ||||
# SSL key file | ||||
# keyfile='', | ||||
# SSL certificate file | ||||
# certfile='', | ||||
# Location of maildir | ||||
# queue_path='', | ||||
r4709 | default_sender=email_config.get('app_email_from', 'RhodeCode-noreply@rhodecode.com'), | |||
r3113 | ||||
debug=str2bool(email_config.get('smtp_debug')), | ||||
# /usr/sbin/sendmail Sendmail executable | ||||
# sendmail_app='', | ||||
# {sendmail_app} -t -i -f {sender} Template for sendmail execution | ||||
# sendmail_template='', | ||||
) | ||||
r1 | ||||
r4447 | if extra_headers is None: | |||
extra_headers = {} | ||||
extra_headers.setdefault('Date', formatdate(time.time())) | ||||
if 'thread_ids' in extra_headers: | ||||
thread_ids = extra_headers.pop('thread_ids') | ||||
extra_headers['References'] = ' '.join('<{}>'.format(t) for t in thread_ids) | ||||
r1 | try: | |||
r3113 | mailer = Mailer(**email_conf) | |||
message = Message(subject=subject, | ||||
sender=email_conf['default_sender'], | ||||
recipients=recipients, | ||||
r4447 | body=body, html=html_body, | |||
extra_headers=extra_headers) | ||||
r3113 | mailer.send_immediately(message) | |||
r4803 | statsd = StatsdClient.statsd | |||
if statsd: | ||||
statsd.incr('rhodecode_email_sent_total') | ||||
r3113 | ||||
r1 | except Exception: | |||
log.exception('Mail sending failed') | ||||
return False | ||||
return True | ||||
r2359 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r1 | def create_repo(form_data, cur_user): | |||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.user import UserModel | ||||
r3881 | from rhodecode.model.scm import ScmModel | |||
r1 | from rhodecode.model.settings import SettingsModel | |||
log = get_logger(create_repo) | ||||
r2359 | cur_user = UserModel()._get_user(cur_user) | |||
r1 | owner = cur_user | |||
repo_name = form_data['repo_name'] | ||||
repo_name_full = form_data['repo_name_full'] | ||||
repo_type = form_data['repo_type'] | ||||
description = form_data['repo_description'] | ||||
private = form_data['repo_private'] | ||||
clone_uri = form_data.get('clone_uri') | ||||
repo_group = safe_int(form_data['repo_group']) | ||||
copy_fork_permissions = form_data.get('copy_permissions') | ||||
copy_group_permissions = form_data.get('repo_copy_permissions') | ||||
fork_of = form_data.get('fork_parent_id') | ||||
state = form_data.get('repo_state', Repository.STATE_PENDING) | ||||
# repo creation defaults, private and repo_type are filled in form | ||||
defs = SettingsModel().get_default_repo_settings(strip_prefix=True) | ||||
enable_statistics = form_data.get( | ||||
'enable_statistics', defs.get('repo_enable_statistics')) | ||||
enable_locking = form_data.get( | ||||
'enable_locking', defs.get('repo_enable_locking')) | ||||
enable_downloads = form_data.get( | ||||
'enable_downloads', defs.get('repo_enable_downloads')) | ||||
r3881 | # set landing rev based on default branches for SCM | |||
landing_ref, _label = ScmModel.backend_landing_ref(repo_type) | ||||
r1 | try: | |||
r3078 | RepoModel()._create_repo( | |||
r1 | repo_name=repo_name_full, | |||
repo_type=repo_type, | ||||
description=description, | ||||
owner=owner, | ||||
private=private, | ||||
clone_uri=clone_uri, | ||||
repo_group=repo_group, | ||||
r3881 | landing_rev=landing_ref, | |||
r1 | fork_of=fork_of, | |||
copy_fork_permissions=copy_fork_permissions, | ||||
copy_group_permissions=copy_group_permissions, | ||||
enable_statistics=enable_statistics, | ||||
enable_locking=enable_locking, | ||||
enable_downloads=enable_downloads, | ||||
state=state | ||||
) | ||||
r2359 | Session().commit() | |||
r1 | ||||
# now create this repo on Filesystem | ||||
r2359 | RepoModel()._create_filesystem_repo( | |||
r1 | repo_name=repo_name, | |||
repo_type=repo_type, | ||||
r2359 | repo_group=RepoModel()._get_repo_group(repo_group), | |||
r1 | clone_uri=clone_uri, | |||
) | ||||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
r4445 | hooks_base.create_repository(created_by=owner.username, **repo.get_dict()) | |||
r1 | ||||
# update repo commit caches initially | ||||
repo.update_commit_cache() | ||||
# set new created state | ||||
repo.set_state(Repository.STATE_CREATED) | ||||
r1803 | repo_id = repo.repo_id | |||
repo_data = repo.get_api_data() | ||||
r1829 | audit_logger.store( | |||
'repo.create', action_data={'data': repo_data}, | ||||
r1803 | user=cur_user, | |||
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) | ||||
r2359 | Session().commit() | |||
r4697 | ||||
PermissionModel().trigger_permission_flush() | ||||
r2419 | except Exception as e: | |||
r1187 | log.warning('Exception occurred when creating repository, ' | |||
'doing cleanup...', exc_info=True) | ||||
r2419 | if isinstance(e, IntegrityError): | |||
Session().rollback() | ||||
r1 | # rollback things manually ! | |||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
if repo: | ||||
Repository.delete(repo.repo_id) | ||||
r2359 | Session().commit() | |||
RepoModel()._delete_filesystem_repo(repo) | ||||
r2419 | log.info('Cleanup of repo %s finished', repo_name_full) | |||
r1 | raise | |||
return True | ||||
r2359 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r1 | def create_repo_fork(form_data, cur_user): | |||
""" | ||||
Creates a fork of repository using internal VCS methods | ||||
""" | ||||
from rhodecode.model.repo import RepoModel | ||||
from rhodecode.model.user import UserModel | ||||
log = get_logger(create_repo_fork) | ||||
r2359 | cur_user = UserModel()._get_user(cur_user) | |||
r1 | owner = cur_user | |||
repo_name = form_data['repo_name'] # fork in this case | ||||
repo_name_full = form_data['repo_name_full'] | ||||
repo_type = form_data['repo_type'] | ||||
description = form_data['description'] | ||||
private = form_data['private'] | ||||
clone_uri = form_data.get('clone_uri') | ||||
repo_group = safe_int(form_data['repo_group']) | ||||
r3969 | landing_ref = form_data['landing_rev'] | |||
r1 | copy_fork_permissions = form_data.get('copy_permissions') | |||
fork_id = safe_int(form_data.get('fork_parent_id')) | ||||
try: | ||||
r2359 | fork_of = RepoModel()._get_repo(fork_id) | |||
RepoModel()._create_repo( | ||||
r1 | repo_name=repo_name_full, | |||
repo_type=repo_type, | ||||
description=description, | ||||
owner=owner, | ||||
private=private, | ||||
clone_uri=clone_uri, | ||||
repo_group=repo_group, | ||||
r3969 | landing_rev=landing_ref, | |||
r1 | fork_of=fork_of, | |||
copy_fork_permissions=copy_fork_permissions | ||||
) | ||||
r1803 | ||||
r2359 | Session().commit() | |||
r1 | ||||
base_path = Repository.base_path() | ||||
source_repo_path = os.path.join(base_path, fork_of.repo_name) | ||||
# now create this repo on Filesystem | ||||
r2359 | RepoModel()._create_filesystem_repo( | |||
r1 | repo_name=repo_name, | |||
repo_type=repo_type, | ||||
r2359 | repo_group=RepoModel()._get_repo_group(repo_group), | |||
r1 | clone_uri=source_repo_path, | |||
) | ||||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
r4445 | hooks_base.create_repository(created_by=owner.username, **repo.get_dict()) | |||
r1 | ||||
# update repo commit caches initially | ||||
config = repo._config | ||||
config.set('extensions', 'largefiles', '') | ||||
repo.update_commit_cache(config=config) | ||||
# set new created state | ||||
repo.set_state(Repository.STATE_CREATED) | ||||
r1803 | ||||
repo_id = repo.repo_id | ||||
repo_data = repo.get_api_data() | ||||
r1829 | audit_logger.store( | |||
'repo.fork', action_data={'data': repo_data}, | ||||
r1803 | user=cur_user, | |||
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id)) | ||||
r2359 | Session().commit() | |||
r1 | except Exception as e: | |||
r2459 | log.warning('Exception occurred when forking repository, ' | |||
r2419 | 'doing cleanup...', exc_info=True) | |||
if isinstance(e, IntegrityError): | ||||
Session().rollback() | ||||
r1 | # rollback things manually ! | |||
repo = Repository.get_by_repo_name(repo_name_full) | ||||
if repo: | ||||
Repository.delete(repo.repo_id) | ||||
r2359 | Session().commit() | |||
RepoModel()._delete_filesystem_repo(repo) | ||||
r2419 | log.info('Cleanup of repo %s finished', repo_name_full) | |||
r1 | raise | |||
return True | ||||
r2359 | ||||
r4842 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r2432 | def repo_maintenance(repoid): | |||
from rhodecode.lib import repo_maintenance as repo_maintenance_lib | ||||
log = get_logger(repo_maintenance) | ||||
repo = Repository.get_by_id_or_repo_name(repoid) | ||||
if repo: | ||||
maintenance = repo_maintenance_lib.RepoMaintenance() | ||||
tasks = maintenance.get_tasks_for_repo(repo) | ||||
log.debug('Executing %s tasks on repo `%s`', tasks, repoid) | ||||
executed_types = maintenance.execute(repo) | ||||
log.debug('Got execution results %s', executed_types) | ||||
else: | ||||
log.debug('Repo `%s` not found or without a clone_url', repoid) | ||||
r4842 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r4634 | def check_for_update(send_email_notification=True, email_recipients=None): | |||
r2431 | from rhodecode.model.update import UpdateModel | |||
r4634 | from rhodecode.model.notification import EmailNotificationModel | |||
log = get_logger(check_for_update) | ||||
r2431 | update_url = UpdateModel().get_update_url() | |||
cur_ver = rhodecode.__version__ | ||||
try: | ||||
data = UpdateModel().get_update_data(update_url) | ||||
r4634 | ||||
current_ver = UpdateModel().get_stored_version(fallback=cur_ver) | ||||
latest_ver = data['versions'][0]['version'] | ||||
UpdateModel().store_version(latest_ver) | ||||
if send_email_notification: | ||||
log.debug('Send email notification is enabled. ' | ||||
'Current RhodeCode version: %s, latest known: %s', current_ver, latest_ver) | ||||
if UpdateModel().is_outdated(current_ver, latest_ver): | ||||
email_kwargs = { | ||||
'current_ver': current_ver, | ||||
'latest_ver': latest_ver, | ||||
} | ||||
(subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email( | ||||
EmailNotificationModel.TYPE_UPDATE_AVAILABLE, **email_kwargs) | ||||
email_recipients = aslist(email_recipients, sep=',') or \ | ||||
[user.email for user in User.get_all_super_admins()] | ||||
run_task(send_email, email_recipients, subject, | ||||
email_body_plaintext, email_body) | ||||
r2431 | except Exception: | |||
r4842 | log.exception('Failed to check for update') | |||
raise | ||||
r4145 | ||||
r4735 | def sync_last_update_for_objects(*args, **kwargs): | |||
r4160 | skip_repos = kwargs.get('skip_repos') | |||
if not skip_repos: | ||||
repos = Repository.query() \ | ||||
.order_by(Repository.group_id.asc()) | ||||
for repo in repos: | ||||
repo.update_commit_cache() | ||||
skip_groups = kwargs.get('skip_groups') | ||||
if not skip_groups: | ||||
repo_groups = RepoGroup.query() \ | ||||
r5180 | .filter(RepoGroup.group_parent_id == null()) | |||
r4160 | ||||
for root_gr in repo_groups: | ||||
for repo_gr in reversed(root_gr.recursive_groups()): | ||||
repo_gr.update_commit_cache() | ||||
r4735 | ||||
r4842 | @async_task(ignore_result=True, base=RequestContextTask) | |||
r4735 | def sync_last_update(*args, **kwargs): | |||
sync_last_update_for_objects(*args, **kwargs) | ||||
r4842 | ||||
@async_task(ignore_result=False) | ||||
def beat_check(*args, **kwargs): | ||||
log = get_logger(beat_check) | ||||
log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs) | ||||
return time.time() | ||||