##// END OF EJS Templates
chore(configs): optimize configs for docker env
chore(configs): optimize configs for docker env

File last commit:

r5298:25044729 default
r5335:518d5c3d default
Show More
tasks.py
448 lines | 15.0 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2012-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
RhodeCode task modules, containing all task that suppose to be run
by celery daemon
"""
import os
scheduler: added DB models and db parsers for the RhodeCode scheduler....
r2406 import time
project: added all source files and assets
r1
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 from pyramid_mailer.mailer import Mailer
from pyramid_mailer.message import Message
emails: set References header for threading in mail user agents even with different subjects...
r4447 from email.utils import formatdate
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113
dan
celery: fixed bug where celery was not running
r265 import rhodecode
audit-logs: moved async tasks from old deprecated action_logger.
r1803 from rhodecode.lib import audit_logger
automation: enabled automated check for new versions.
r4634 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask, run_task
comments: added rcextensions hoooks for comment editing, and renamed methods to remove odd log_ prefix which...
r4445 from rhodecode.lib import hooks_base
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 from rhodecode.lib.utils import adopt_for_celery
automation: enabled automated check for new versions.
r4634 from rhodecode.lib.utils2 import safe_int, str2bool, aslist
metrics: use prom metrics, and added some additional metrics
r4803 from rhodecode.lib.statsd_client import StatsdClient
dan
automation: updated update task to update repositories and groups....
r4160 from rhodecode.model.db import (
lint: use null() to compare to == None for linters to be happy
r5180 true, null, Session, IntegrityError, Repository, RepoGroup, User)
api: added proper full permission flush on API calls when creating repos and repo groups....
r4697 from rhodecode.model.permission import PermissionModel
project: added all source files and assets
r1
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
emails: set References header for threading in mail user agents even with different subjects...
r4447 def send_email(recipients, subject, body='', html_body='', email_config=None,
extra_headers=None):
project: added all source files and assets
r1 """
Sends an email with defined parameters from the .ini files.
:param recipients: list of recipients, it this is empty the defined email
address from field 'email_to' is used instead
:param subject: subject of the mail
:param body: body of the mail
:param html_body: html version of body
emails: added premailer for inline style formatting to make emails render nicer on all email clients.
r4275 :param email_config: specify custom configuration for mailer
emails: set References header for threading in mail user agents even with different subjects...
r4447 :param extra_headers: specify custom headers
project: added all source files and assets
r1 """
log = get_logger(send_email)
dan
integrations: add email integration, fixes #4159
r640 email_config = email_config or rhodecode.CONFIG
mail: use faster detection of email misconfiguration. This should be done at earliest...
r3077
mail_server = email_config.get('smtp_server') or None
if mail_server is None:
log.error("SMTP server information missing. Sending email failed. "
"Make sure that `smtp_server` variable is configured "
"inside the .ini file")
return False
project: added all source files and assets
r1 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113
if recipients:
py3: remove use of pyramid.compat
r4908 if isinstance(recipients, str):
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 recipients = recipients.split(',')
else:
project: added all source files and assets
r1 # if recipients are not defined we send to email_config + all admins
emails: ensure none empty emails are fetched from admin accounts in recipinets aren't specified....
r2896 admins = []
for u in User.query().filter(User.admin == true()).all():
if u.email:
admins.append(u.email)
recipients = []
config_email = email_config.get('email_to')
if config_email:
recipients += [config_email]
recipients += admins
project: added all source files and assets
r1
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 # translate our LEGACY config into the one that pyramid_mailer supports
email_conf = dict(
host=mail_server,
mailer: use default port for mails
r3197 port=email_config.get('smtp_port', 25),
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 username=email_config.get('smtp_username'),
password=email_config.get('smtp_password'),
tls=str2bool(email_config.get('smtp_use_tls')),
ssl=str2bool(email_config.get('smtp_use_ssl')),
# SSL key file
# keyfile='',
# SSL certificate file
# certfile='',
# Location of maildir
# queue_path='',
emails: default app-from email should be a valid one.
r4709 default_sender=email_config.get('app_email_from', 'RhodeCode-noreply@rhodecode.com'),
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113
debug=str2bool(email_config.get('smtp_debug')),
# /usr/sbin/sendmail Sendmail executable
# sendmail_app='',
# {sendmail_app} -t -i -f {sender} Template for sendmail execution
# sendmail_template='',
)
project: added all source files and assets
r1
emails: set References header for threading in mail user agents even with different subjects...
r4447 if extra_headers is None:
extra_headers = {}
extra_headers.setdefault('Date', formatdate(time.time()))
if 'thread_ids' in extra_headers:
thread_ids = extra_headers.pop('thread_ids')
extra_headers['References'] = ' '.join('<{}>'.format(t) for t in thread_ids)
project: added all source files and assets
r1 try:
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 mailer = Mailer(**email_conf)
message = Message(subject=subject,
sender=email_conf['default_sender'],
recipients=recipients,
emails: set References header for threading in mail user agents even with different subjects...
r4447 body=body, html=html_body,
extra_headers=extra_headers)
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113 mailer.send_immediately(message)
metrics: use prom metrics, and added some additional metrics
r4803 statsd = StatsdClient.statsd
if statsd:
statsd.incr('rhodecode_email_sent_total')
mailing: switched from homebrew lib to pyramid_mailer with python3 compatability
r3113
project: added all source files and assets
r1 except Exception:
log.exception('Mail sending failed')
return False
return True
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
project: added all source files and assets
r1 def create_repo(form_data, cur_user):
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
landing-rev: fixes #4102, use branches instead of landing tip refs by default....
r3881 from rhodecode.model.scm import ScmModel
project: added all source files and assets
r1 from rhodecode.model.settings import SettingsModel
log = get_logger(create_repo)
celery: celery 4.X support. Fixes #4169...
r2359 cur_user = UserModel()._get_user(cur_user)
project: added all source files and assets
r1 owner = cur_user
repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
repo_type = form_data['repo_type']
description = form_data['repo_description']
private = form_data['repo_private']
clone_uri = form_data.get('clone_uri')
repo_group = safe_int(form_data['repo_group'])
copy_fork_permissions = form_data.get('copy_permissions')
copy_group_permissions = form_data.get('repo_copy_permissions')
fork_of = form_data.get('fork_parent_id')
state = form_data.get('repo_state', Repository.STATE_PENDING)
# repo creation defaults, private and repo_type are filled in form
defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
enable_statistics = form_data.get(
'enable_statistics', defs.get('repo_enable_statistics'))
enable_locking = form_data.get(
'enable_locking', defs.get('repo_enable_locking'))
enable_downloads = form_data.get(
'enable_downloads', defs.get('repo_enable_downloads'))
landing-rev: fixes #4102, use branches instead of landing tip refs by default....
r3881 # set landing rev based on default branches for SCM
landing_ref, _label = ScmModel.backend_landing_ref(repo_type)
project: added all source files and assets
r1 try:
git: use a fetch_sync based creation of remote repos....
r3078 RepoModel()._create_repo(
project: added all source files and assets
r1 repo_name=repo_name_full,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repo_group=repo_group,
landing-rev: fixes #4102, use branches instead of landing tip refs by default....
r3881 landing_rev=landing_ref,
project: added all source files and assets
r1 fork_of=fork_of,
copy_fork_permissions=copy_fork_permissions,
copy_group_permissions=copy_group_permissions,
enable_statistics=enable_statistics,
enable_locking=enable_locking,
enable_downloads=enable_downloads,
state=state
)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1
# now create this repo on Filesystem
celery: celery 4.X support. Fixes #4169...
r2359 RepoModel()._create_filesystem_repo(
project: added all source files and assets
r1 repo_name=repo_name,
repo_type=repo_type,
celery: celery 4.X support. Fixes #4169...
r2359 repo_group=RepoModel()._get_repo_group(repo_group),
project: added all source files and assets
r1 clone_uri=clone_uri,
)
repo = Repository.get_by_repo_name(repo_name_full)
comments: added rcextensions hoooks for comment editing, and renamed methods to remove odd log_ prefix which...
r4445 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
project: added all source files and assets
r1
# update repo commit caches initially
repo.update_commit_cache()
# set new created state
repo.set_state(Repository.STATE_CREATED)
audit-logs: moved async tasks from old deprecated action_logger.
r1803 repo_id = repo.repo_id
repo_data = repo.get_api_data()
audit-logs: implemented full audit logs across application....
r1829 audit_logger.store(
'repo.create', action_data={'data': repo_data},
audit-logs: moved async tasks from old deprecated action_logger.
r1803 user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
api: added proper full permission flush on API calls when creating repos and repo groups....
r4697
PermissionModel().trigger_permission_flush()
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 except Exception as e:
exception-handling: nicer error catching on repository creation.
r1187 log.warning('Exception occurred when creating repository, '
'doing cleanup...', exc_info=True)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 if isinstance(e, IntegrityError):
Session().rollback()
project: added all source files and assets
r1 # rollback things manually !
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
RepoModel()._delete_filesystem_repo(repo)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 log.info('Cleanup of repo %s finished', repo_name_full)
project: added all source files and assets
r1 raise
return True
celery: celery 4.X support. Fixes #4169...
r2359 @async_task(ignore_result=True, base=RequestContextTask)
project: added all source files and assets
r1 def create_repo_fork(form_data, cur_user):
"""
Creates a fork of repository using internal VCS methods
"""
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
log = get_logger(create_repo_fork)
celery: celery 4.X support. Fixes #4169...
r2359 cur_user = UserModel()._get_user(cur_user)
project: added all source files and assets
r1 owner = cur_user
repo_name = form_data['repo_name'] # fork in this case
repo_name_full = form_data['repo_name_full']
repo_type = form_data['repo_type']
description = form_data['description']
private = form_data['private']
clone_uri = form_data.get('clone_uri')
repo_group = safe_int(form_data['repo_group'])
forks: fixed forking with new landing rev....
r3969 landing_ref = form_data['landing_rev']
project: added all source files and assets
r1 copy_fork_permissions = form_data.get('copy_permissions')
fork_id = safe_int(form_data.get('fork_parent_id'))
try:
celery: celery 4.X support. Fixes #4169...
r2359 fork_of = RepoModel()._get_repo(fork_id)
RepoModel()._create_repo(
project: added all source files and assets
r1 repo_name=repo_name_full,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repo_group=repo_group,
forks: fixed forking with new landing rev....
r3969 landing_rev=landing_ref,
project: added all source files and assets
r1 fork_of=fork_of,
copy_fork_permissions=copy_fork_permissions
)
audit-logs: moved async tasks from old deprecated action_logger.
r1803
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1
base_path = Repository.base_path()
source_repo_path = os.path.join(base_path, fork_of.repo_name)
# now create this repo on Filesystem
celery: celery 4.X support. Fixes #4169...
r2359 RepoModel()._create_filesystem_repo(
project: added all source files and assets
r1 repo_name=repo_name,
repo_type=repo_type,
celery: celery 4.X support. Fixes #4169...
r2359 repo_group=RepoModel()._get_repo_group(repo_group),
project: added all source files and assets
r1 clone_uri=source_repo_path,
)
repo = Repository.get_by_repo_name(repo_name_full)
comments: added rcextensions hoooks for comment editing, and renamed methods to remove odd log_ prefix which...
r4445 hooks_base.create_repository(created_by=owner.username, **repo.get_dict())
project: added all source files and assets
r1
# update repo commit caches initially
config = repo._config
config.set('extensions', 'largefiles', '')
repo.update_commit_cache(config=config)
# set new created state
repo.set_state(Repository.STATE_CREATED)
audit-logs: moved async tasks from old deprecated action_logger.
r1803
repo_id = repo.repo_id
repo_data = repo.get_api_data()
audit-logs: implemented full audit logs across application....
r1829 audit_logger.store(
'repo.fork', action_data={'data': repo_data},
audit-logs: moved async tasks from old deprecated action_logger.
r1803 user=cur_user,
repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
project: added all source files and assets
r1 except Exception as e:
celery: fix fork exception message formatting.
r2459 log.warning('Exception occurred when forking repository, '
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 'doing cleanup...', exc_info=True)
if isinstance(e, IntegrityError):
Session().rollback()
project: added all source files and assets
r1 # rollback things manually !
repo = Repository.get_by_repo_name(repo_name_full)
if repo:
Repository.delete(repo.repo_id)
celery: celery 4.X support. Fixes #4169...
r2359 Session().commit()
RepoModel()._delete_filesystem_repo(repo)
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
r2419 log.info('Cleanup of repo %s finished', repo_name_full)
project: added all source files and assets
r1 raise
return True
celery: celery 4.X support. Fixes #4169...
r2359
translations: moved methods from new request subscriber to actual methods of custom class....
r4842 @async_task(ignore_result=True, base=RequestContextTask)
tasks: added a periodic task for repo maintenance. Fixes #5202
r2432 def repo_maintenance(repoid):
from rhodecode.lib import repo_maintenance as repo_maintenance_lib
log = get_logger(repo_maintenance)
repo = Repository.get_by_id_or_repo_name(repoid)
if repo:
maintenance = repo_maintenance_lib.RepoMaintenance()
tasks = maintenance.get_tasks_for_repo(repo)
log.debug('Executing %s tasks on repo `%s`', tasks, repoid)
executed_types = maintenance.execute(repo)
log.debug('Got execution results %s', executed_types)
else:
log.debug('Repo `%s` not found or without a clone_url', repoid)
translations: moved methods from new request subscriber to actual methods of custom class....
r4842 @async_task(ignore_result=True, base=RequestContextTask)
automation: enabled automated check for new versions.
r4634 def check_for_update(send_email_notification=True, email_recipients=None):
update: add new async task to check for updates via scheduler....
r2431 from rhodecode.model.update import UpdateModel
automation: enabled automated check for new versions.
r4634 from rhodecode.model.notification import EmailNotificationModel
log = get_logger(check_for_update)
update: add new async task to check for updates via scheduler....
r2431 update_url = UpdateModel().get_update_url()
cur_ver = rhodecode.__version__
try:
data = UpdateModel().get_update_data(update_url)
automation: enabled automated check for new versions.
r4634
current_ver = UpdateModel().get_stored_version(fallback=cur_ver)
latest_ver = data['versions'][0]['version']
UpdateModel().store_version(latest_ver)
if send_email_notification:
log.debug('Send email notification is enabled. '
'Current RhodeCode version: %s, latest known: %s', current_ver, latest_ver)
if UpdateModel().is_outdated(current_ver, latest_ver):
email_kwargs = {
'current_ver': current_ver,
'latest_ver': latest_ver,
}
(subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email(
EmailNotificationModel.TYPE_UPDATE_AVAILABLE, **email_kwargs)
email_recipients = aslist(email_recipients, sep=',') or \
[user.email for user in User.get_all_super_admins()]
run_task(send_email, email_recipients, subject,
email_body_plaintext, email_body)
update: add new async task to check for updates via scheduler....
r2431 except Exception:
translations: moved methods from new request subscriber to actual methods of custom class....
r4842 log.exception('Failed to check for update')
raise
automation: moved update groups task into celery task
r4145
celerylib: fixed broken tasks for auto-update
r4735 def sync_last_update_for_objects(*args, **kwargs):
dan
automation: updated update task to update repositories and groups....
r4160 skip_repos = kwargs.get('skip_repos')
if not skip_repos:
repos = Repository.query() \
.order_by(Repository.group_id.asc())
for repo in repos:
repo.update_commit_cache()
skip_groups = kwargs.get('skip_groups')
if not skip_groups:
repo_groups = RepoGroup.query() \
lint: use null() to compare to == None for linters to be happy
r5180 .filter(RepoGroup.group_parent_id == null())
dan
automation: updated update task to update repositories and groups....
r4160
for root_gr in repo_groups:
for repo_gr in reversed(root_gr.recursive_groups()):
repo_gr.update_commit_cache()
celerylib: fixed broken tasks for auto-update
r4735
translations: moved methods from new request subscriber to actual methods of custom class....
r4842 @async_task(ignore_result=True, base=RequestContextTask)
celerylib: fixed broken tasks for auto-update
r4735 def sync_last_update(*args, **kwargs):
sync_last_update_for_objects(*args, **kwargs)
translations: moved methods from new request subscriber to actual methods of custom class....
r4842
@async_task(ignore_result=False)
def beat_check(*args, **kwargs):
log = get_logger(beat_check)
log.info('%r: Got args: %r and kwargs %r', beat_check, args, kwargs)
return time.time()
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298
@async_task
@adopt_for_celery
def repo_size(extras):
from rhodecode.lib.hooks_base import repo_size
return repo_size(extras)
@async_task
@adopt_for_celery
def pre_pull(extras):
from rhodecode.lib.hooks_base import pre_pull
return pre_pull(extras)
@async_task
@adopt_for_celery
def post_pull(extras):
from rhodecode.lib.hooks_base import post_pull
return post_pull(extras)
@async_task
@adopt_for_celery
def pre_push(extras):
from rhodecode.lib.hooks_base import pre_push
return pre_push(extras)
@async_task
@adopt_for_celery
def post_push(extras):
from rhodecode.lib.hooks_base import post_push
return post_push(extras)