|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
"""
|
|
|
Utilities library for RhodeCode
|
|
|
"""
|
|
|
|
|
|
import datetime
|
|
|
import decorator
|
|
|
import logging
|
|
|
import os
|
|
|
import re
|
|
|
import sys
|
|
|
import shutil
|
|
|
import socket
|
|
|
import tempfile
|
|
|
import traceback
|
|
|
import tarfile
|
|
|
import warnings
|
|
|
from os.path import join as jn
|
|
|
|
|
|
import paste
|
|
|
import pkg_resources
|
|
|
from webhelpers2.text import collapse, strip_tags, convert_accented_entities, convert_misc_entities
|
|
|
|
|
|
from mako import exceptions
|
|
|
|
|
|
from rhodecode.lib.hash_utils import sha256_safe, md5, sha1
|
|
|
from rhodecode.lib.str_utils import safe_bytes, safe_str
|
|
|
from rhodecode.lib.vcs.backends.base import Config
|
|
|
from rhodecode.lib.vcs.exceptions import VCSError
|
|
|
from rhodecode.lib.vcs.utils.helpers import get_scm, get_scm_backend
|
|
|
from rhodecode.lib.ext_json import sjson as json
|
|
|
from rhodecode.model import meta
|
|
|
from rhodecode.model.db import (
|
|
|
Repository, User, RhodeCodeUi, UserLog, RepoGroup, UserGroup)
|
|
|
from rhodecode.model.meta import Session
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}__.*')
|
|
|
|
|
|
# String which contains characters that are not allowed in slug names for
|
|
|
# repositories or repository groups. It is properly escaped to use it in
|
|
|
# regular expressions.
|
|
|
SLUG_BAD_CHARS = re.escape(r'`?=[]\;\'"<>,/~!@#$%^&*()+{}|:')
|
|
|
|
|
|
# Regex that matches forbidden characters in repo/group slugs.
|
|
|
SLUG_BAD_CHAR_RE = re.compile(r'[{}\x00-\x08\x0b-\x0c\x0e-\x1f]'.format(SLUG_BAD_CHARS))
|
|
|
|
|
|
# Regex that matches allowed characters in repo/group slugs.
|
|
|
SLUG_GOOD_CHAR_RE = re.compile(r'[^{}]'.format(SLUG_BAD_CHARS))
|
|
|
|
|
|
# Regex that matches whole repo/group slugs.
|
|
|
SLUG_RE = re.compile(r'[^{}]+'.format(SLUG_BAD_CHARS))
|
|
|
|
|
|
_license_cache = None
|
|
|
|
|
|
|
|
|
def repo_name_slug(value):
|
|
|
"""
|
|
|
Return slug of name of repository
|
|
|
This function is called on each creation/modification
|
|
|
of repository to prevent bad names in repo
|
|
|
"""
|
|
|
|
|
|
replacement_char = '-'
|
|
|
|
|
|
slug = strip_tags(value)
|
|
|
slug = convert_accented_entities(slug)
|
|
|
slug = convert_misc_entities(slug)
|
|
|
|
|
|
slug = SLUG_BAD_CHAR_RE.sub('', slug)
|
|
|
slug = re.sub(r'[\s]+', '-', slug)
|
|
|
slug = collapse(slug, replacement_char)
|
|
|
|
|
|
return slug
|
|
|
|
|
|
|
|
|
#==============================================================================
|
|
|
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
|
|
|
#==============================================================================
|
|
|
def get_repo_slug(request):
|
|
|
_repo = ''
|
|
|
|
|
|
if hasattr(request, 'db_repo_name'):
|
|
|
# if our requests has set db reference use it for name, this
|
|
|
# translates the example.com/_<id> into proper repo names
|
|
|
_repo = request.db_repo_name
|
|
|
elif getattr(request, 'matchdict', None):
|
|
|
# pyramid
|
|
|
_repo = request.matchdict.get('repo_name')
|
|
|
|
|
|
if _repo:
|
|
|
_repo = _repo.rstrip('/')
|
|
|
return _repo
|
|
|
|
|
|
|
|
|
def get_repo_group_slug(request):
|
|
|
_group = ''
|
|
|
if hasattr(request, 'db_repo_group'):
|
|
|
# if our requests has set db reference use it for name, this
|
|
|
# translates the example.com/_<id> into proper repo group names
|
|
|
_group = request.db_repo_group.group_name
|
|
|
elif getattr(request, 'matchdict', None):
|
|
|
# pyramid
|
|
|
_group = request.matchdict.get('repo_group_name')
|
|
|
|
|
|
if _group:
|
|
|
_group = _group.rstrip('/')
|
|
|
return _group
|
|
|
|
|
|
|
|
|
def get_user_group_slug(request):
|
|
|
_user_group = ''
|
|
|
|
|
|
if hasattr(request, 'db_user_group'):
|
|
|
_user_group = request.db_user_group.users_group_name
|
|
|
elif getattr(request, 'matchdict', None):
|
|
|
# pyramid
|
|
|
_user_group = request.matchdict.get('user_group_id')
|
|
|
_user_group_name = request.matchdict.get('user_group_name')
|
|
|
try:
|
|
|
if _user_group:
|
|
|
_user_group = UserGroup.get(_user_group)
|
|
|
elif _user_group_name:
|
|
|
_user_group = UserGroup.get_by_group_name(_user_group_name)
|
|
|
|
|
|
if _user_group:
|
|
|
_user_group = _user_group.users_group_name
|
|
|
except Exception:
|
|
|
log.exception('Failed to get user group by id and name')
|
|
|
# catch all failures here
|
|
|
return None
|
|
|
|
|
|
return _user_group
|
|
|
|
|
|
|
|
|
def get_filesystem_repos(path, recursive=False, skip_removed_repos=True):
|
|
|
"""
|
|
|
Scans given path for repos and return (name,(type,path)) tuple
|
|
|
|
|
|
:param path: path to scan for repositories
|
|
|
:param recursive: recursive search and return names with subdirs in front
|
|
|
"""
|
|
|
|
|
|
# remove ending slash for better results
|
|
|
path = path.rstrip(os.sep)
|
|
|
log.debug('now scanning in %s location recursive:%s...', path, recursive)
|
|
|
|
|
|
def _get_repos(p):
|
|
|
dirpaths = get_dirpaths(p)
|
|
|
if not _is_dir_writable(p):
|
|
|
log.warning('repo path without write access: %s', p)
|
|
|
|
|
|
for dirpath in dirpaths:
|
|
|
if os.path.isfile(os.path.join(p, dirpath)):
|
|
|
continue
|
|
|
cur_path = os.path.join(p, dirpath)
|
|
|
|
|
|
# skip removed repos
|
|
|
if skip_removed_repos and REMOVED_REPO_PAT.match(dirpath):
|
|
|
continue
|
|
|
|
|
|
#skip .<somethin> dirs
|
|
|
if dirpath.startswith('.'):
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
scm_info = get_scm(cur_path)
|
|
|
yield scm_info[1].split(path, 1)[-1].lstrip(os.sep), scm_info
|
|
|
except VCSError:
|
|
|
if not recursive:
|
|
|
continue
|
|
|
#check if this dir containts other repos for recursive scan
|
|
|
rec_path = os.path.join(p, dirpath)
|
|
|
if os.path.isdir(rec_path):
|
|
|
yield from _get_repos(rec_path)
|
|
|
|
|
|
return _get_repos(path)
|
|
|
|
|
|
|
|
|
def get_dirpaths(p: str) -> list:
|
|
|
try:
|
|
|
# OS-independable way of checking if we have at least read-only
|
|
|
# access or not.
|
|
|
dirpaths = os.listdir(p)
|
|
|
except OSError:
|
|
|
log.warning('ignoring repo path without read access: %s', p)
|
|
|
return []
|
|
|
|
|
|
# os.listpath has a tweak: If a unicode is passed into it, then it tries to
|
|
|
# decode paths and suddenly returns unicode objects itself. The items it
|
|
|
# cannot decode are returned as strings and cause issues.
|
|
|
#
|
|
|
# Those paths are ignored here until a solid solution for path handling has
|
|
|
# been built.
|
|
|
expected_type = type(p)
|
|
|
|
|
|
def _has_correct_type(item):
|
|
|
if type(item) is not expected_type:
|
|
|
log.error(
|
|
|
"Ignoring path %s since it cannot be decoded into str.",
|
|
|
# Using "repr" to make sure that we see the byte value in case
|
|
|
# of support.
|
|
|
repr(item))
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
dirpaths = [item for item in dirpaths if _has_correct_type(item)]
|
|
|
|
|
|
return dirpaths
|
|
|
|
|
|
|
|
|
def _is_dir_writable(path):
|
|
|
"""
|
|
|
Probe if `path` is writable.
|
|
|
|
|
|
Due to trouble on Cygwin / Windows, this is actually probing if it is
|
|
|
possible to create a file inside of `path`, stat does not produce reliable
|
|
|
results in this case.
|
|
|
"""
|
|
|
try:
|
|
|
with tempfile.TemporaryFile(dir=path):
|
|
|
pass
|
|
|
except OSError:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
def is_valid_repo(repo_name, base_path, expect_scm=None, explicit_scm=None, config=None):
|
|
|
"""
|
|
|
Returns True if given path is a valid repository False otherwise.
|
|
|
If expect_scm param is given also, compare if given scm is the same
|
|
|
as expected from scm parameter. If explicit_scm is given don't try to
|
|
|
detect the scm, just use the given one to check if repo is valid
|
|
|
|
|
|
:param repo_name:
|
|
|
:param base_path:
|
|
|
:param expect_scm:
|
|
|
:param explicit_scm:
|
|
|
:param config:
|
|
|
|
|
|
:return True: if given path is a valid repository
|
|
|
"""
|
|
|
full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
|
|
|
log.debug('Checking if `%s` is a valid path for repository. '
|
|
|
'Explicit type: %s', repo_name, explicit_scm)
|
|
|
|
|
|
try:
|
|
|
if explicit_scm:
|
|
|
detected_scms = [get_scm_backend(explicit_scm)(
|
|
|
full_path, config=config).alias]
|
|
|
else:
|
|
|
detected_scms = get_scm(full_path)
|
|
|
|
|
|
if expect_scm:
|
|
|
return detected_scms[0] == expect_scm
|
|
|
log.debug('path: %s is an vcs object:%s', full_path, detected_scms)
|
|
|
return True
|
|
|
except VCSError:
|
|
|
log.debug('path: %s is not a valid repo !', full_path)
|
|
|
return False
|
|
|
|
|
|
|
|
|
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
|
|
|
"""
|
|
|
Returns True if a given path is a repository group, False otherwise
|
|
|
|
|
|
:param repo_group_name:
|
|
|
:param base_path:
|
|
|
"""
|
|
|
full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
|
|
|
log.debug('Checking if `%s` is a valid path for repository group',
|
|
|
repo_group_name)
|
|
|
|
|
|
# check if it's not a repo
|
|
|
if is_valid_repo(repo_group_name, base_path):
|
|
|
log.debug('Repo called %s exist, it is not a valid repo group', repo_group_name)
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
# we need to check bare git repos at higher level
|
|
|
# since we might match branches/hooks/info/objects or possible
|
|
|
# other things inside bare git repo
|
|
|
maybe_repo = os.path.dirname(full_path)
|
|
|
if maybe_repo == base_path:
|
|
|
# skip root level repo check; we know root location CANNOT BE a repo group
|
|
|
return False
|
|
|
|
|
|
scm_ = get_scm(maybe_repo)
|
|
|
log.debug('path: %s is a vcs object:%s, not valid repo group', full_path, scm_)
|
|
|
return False
|
|
|
except VCSError:
|
|
|
pass
|
|
|
|
|
|
# check if it's a valid path
|
|
|
if skip_path_check or os.path.isdir(full_path):
|
|
|
log.debug('path: %s is a valid repo group !', full_path)
|
|
|
return True
|
|
|
|
|
|
log.debug('path: %s is not a valid repo group !', full_path)
|
|
|
return False
|
|
|
|
|
|
|
|
|
def ask_ok(prompt, retries=4, complaint='[y]es or [n]o please!'):
|
|
|
while True:
|
|
|
ok = input(prompt)
|
|
|
if ok.lower() in ('y', 'ye', 'yes'):
|
|
|
return True
|
|
|
if ok.lower() in ('n', 'no', 'nop', 'nope'):
|
|
|
return False
|
|
|
retries = retries - 1
|
|
|
if retries < 0:
|
|
|
raise OSError
|
|
|
print(complaint)
|
|
|
|
|
|
# propagated from mercurial documentation
|
|
|
ui_sections = [
|
|
|
'alias', 'auth',
|
|
|
'decode/encode', 'defaults',
|
|
|
'diff', 'email',
|
|
|
'extensions', 'format',
|
|
|
'merge-patterns', 'merge-tools',
|
|
|
'hooks', 'http_proxy',
|
|
|
'smtp', 'patch',
|
|
|
'paths', 'profiling',
|
|
|
'server', 'trusted',
|
|
|
'ui', 'web', ]
|
|
|
|
|
|
|
|
|
def config_data_from_db(clear_session=True, repo=None):
|
|
|
"""
|
|
|
Read the configuration data from the database and return configuration
|
|
|
tuples.
|
|
|
"""
|
|
|
from rhodecode.model.settings import VcsSettingsModel
|
|
|
|
|
|
config = []
|
|
|
|
|
|
sa = meta.Session()
|
|
|
settings_model = VcsSettingsModel(repo=repo, sa=sa)
|
|
|
|
|
|
ui_settings = settings_model.get_ui_settings()
|
|
|
|
|
|
ui_data = []
|
|
|
for setting in ui_settings:
|
|
|
if setting.active:
|
|
|
ui_data.append((setting.section, setting.key, setting.value))
|
|
|
config.append((
|
|
|
safe_str(setting.section), safe_str(setting.key),
|
|
|
safe_str(setting.value)))
|
|
|
if setting.key == 'push_ssl':
|
|
|
# force set push_ssl requirement to False, rhodecode
|
|
|
# handles that
|
|
|
config.append((
|
|
|
safe_str(setting.section), safe_str(setting.key), False))
|
|
|
log.debug(
|
|
|
'settings ui from db@repo[%s]: %s',
|
|
|
repo,
|
|
|
','.join(['[{}] {}={}'.format(*s) for s in ui_data]))
|
|
|
if clear_session:
|
|
|
meta.Session.remove()
|
|
|
|
|
|
# TODO: mikhail: probably it makes no sense to re-read hooks information.
|
|
|
# It's already there and activated/deactivated
|
|
|
skip_entries = []
|
|
|
enabled_hook_classes = get_enabled_hook_classes(ui_settings)
|
|
|
if 'pull' not in enabled_hook_classes:
|
|
|
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRE_PULL))
|
|
|
if 'push' not in enabled_hook_classes:
|
|
|
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRE_PUSH))
|
|
|
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRETX_PUSH))
|
|
|
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PUSH_KEY))
|
|
|
|
|
|
config = [entry for entry in config if entry[:2] not in skip_entries]
|
|
|
|
|
|
return config
|
|
|
|
|
|
|
|
|
def make_db_config(clear_session=True, repo=None):
|
|
|
"""
|
|
|
Create a :class:`Config` instance based on the values in the database.
|
|
|
"""
|
|
|
config = Config()
|
|
|
config_data = config_data_from_db(clear_session=clear_session, repo=repo)
|
|
|
for section, option, value in config_data:
|
|
|
config.set(section, option, value)
|
|
|
return config
|
|
|
|
|
|
|
|
|
def get_enabled_hook_classes(ui_settings):
|
|
|
"""
|
|
|
Return the enabled hook classes.
|
|
|
|
|
|
:param ui_settings: List of ui_settings as returned
|
|
|
by :meth:`VcsSettingsModel.get_ui_settings`
|
|
|
|
|
|
:return: a list with the enabled hook classes. The order is not guaranteed.
|
|
|
:rtype: list
|
|
|
"""
|
|
|
enabled_hooks = []
|
|
|
active_hook_keys = [
|
|
|
key for section, key, value, active in ui_settings
|
|
|
if section == 'hooks' and active]
|
|
|
|
|
|
hook_names = {
|
|
|
RhodeCodeUi.HOOK_PUSH: 'push',
|
|
|
RhodeCodeUi.HOOK_PULL: 'pull',
|
|
|
RhodeCodeUi.HOOK_REPO_SIZE: 'repo_size'
|
|
|
}
|
|
|
|
|
|
for key in active_hook_keys:
|
|
|
hook = hook_names.get(key)
|
|
|
if hook:
|
|
|
enabled_hooks.append(hook)
|
|
|
|
|
|
return enabled_hooks
|
|
|
|
|
|
|
|
|
def set_rhodecode_config(config):
|
|
|
"""
|
|
|
Updates pyramid config with new settings from database
|
|
|
|
|
|
:param config:
|
|
|
"""
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
app_settings = SettingsModel().get_all_settings()
|
|
|
|
|
|
for k, v in list(app_settings.items()):
|
|
|
config[k] = v
|
|
|
|
|
|
|
|
|
def get_rhodecode_realm():
|
|
|
"""
|
|
|
Return the rhodecode realm from database.
|
|
|
"""
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
realm = SettingsModel().get_setting_by_name('realm')
|
|
|
return safe_str(realm.app_settings_value)
|
|
|
|
|
|
|
|
|
def get_rhodecode_base_path():
|
|
|
"""
|
|
|
Returns the base path. The base path is the filesystem path which points
|
|
|
to the repository store.
|
|
|
"""
|
|
|
|
|
|
import rhodecode
|
|
|
return rhodecode.CONFIG['default_base_path']
|
|
|
|
|
|
|
|
|
def map_groups(path):
|
|
|
"""
|
|
|
Given a full path to a repository, create all nested groups that this
|
|
|
repo is inside. This function creates parent-child relationships between
|
|
|
groups and creates default perms for all new groups.
|
|
|
|
|
|
:param paths: full path to repository
|
|
|
"""
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
sa = meta.Session()
|
|
|
groups = path.split(Repository.NAME_SEP)
|
|
|
parent = None
|
|
|
group = None
|
|
|
|
|
|
# last element is repo in nested groups structure
|
|
|
groups = groups[:-1]
|
|
|
rgm = RepoGroupModel(sa)
|
|
|
owner = User.get_first_super_admin()
|
|
|
for lvl, group_name in enumerate(groups):
|
|
|
group_name = '/'.join(groups[:lvl] + [group_name])
|
|
|
group = RepoGroup.get_by_group_name(group_name)
|
|
|
desc = '%s group' % group_name
|
|
|
|
|
|
# skip folders that are now removed repos
|
|
|
if REMOVED_REPO_PAT.match(group_name):
|
|
|
break
|
|
|
|
|
|
if group is None:
|
|
|
log.debug('creating group level: %s group_name: %s',
|
|
|
lvl, group_name)
|
|
|
group = RepoGroup(group_name, parent)
|
|
|
group.group_description = desc
|
|
|
group.user = owner
|
|
|
sa.add(group)
|
|
|
perm_obj = rgm._create_default_perms(group)
|
|
|
sa.add(perm_obj)
|
|
|
sa.flush()
|
|
|
|
|
|
parent = group
|
|
|
return group
|
|
|
|
|
|
|
|
|
def repo2db_mapper(initial_repo_list, remove_obsolete=False, force_hooks_rebuild=False):
|
|
|
"""
|
|
|
maps all repos given in initial_repo_list, non existing repositories
|
|
|
are created, if remove_obsolete is True it also checks for db entries
|
|
|
that are not in initial_repo_list and removes them.
|
|
|
|
|
|
:param initial_repo_list: list of repositories found by scanning methods
|
|
|
:param remove_obsolete: check for obsolete entries in database
|
|
|
"""
|
|
|
from rhodecode.model.repo import RepoModel
|
|
|
from rhodecode.model.repo_group import RepoGroupModel
|
|
|
from rhodecode.model.settings import SettingsModel
|
|
|
|
|
|
sa = meta.Session()
|
|
|
repo_model = RepoModel()
|
|
|
user = User.get_first_super_admin()
|
|
|
added = []
|
|
|
|
|
|
# creation defaults
|
|
|
defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
|
|
|
enable_statistics = defs.get('repo_enable_statistics')
|
|
|
enable_locking = defs.get('repo_enable_locking')
|
|
|
enable_downloads = defs.get('repo_enable_downloads')
|
|
|
private = defs.get('repo_private')
|
|
|
|
|
|
for name, repo in list(initial_repo_list.items()):
|
|
|
group = map_groups(name)
|
|
|
str_name = safe_str(name)
|
|
|
db_repo = repo_model.get_by_repo_name(str_name)
|
|
|
|
|
|
# found repo that is on filesystem not in RhodeCode database
|
|
|
if not db_repo:
|
|
|
log.info('repository `%s` not found in the database, creating now', name)
|
|
|
added.append(name)
|
|
|
desc = (repo.description
|
|
|
if repo.description != 'unknown'
|
|
|
else '%s repository' % name)
|
|
|
|
|
|
db_repo = repo_model._create_repo(
|
|
|
repo_name=name,
|
|
|
repo_type=repo.alias,
|
|
|
description=desc,
|
|
|
repo_group=getattr(group, 'group_id', None),
|
|
|
owner=user,
|
|
|
enable_locking=enable_locking,
|
|
|
enable_downloads=enable_downloads,
|
|
|
enable_statistics=enable_statistics,
|
|
|
private=private,
|
|
|
state=Repository.STATE_CREATED
|
|
|
)
|
|
|
sa.commit()
|
|
|
# we added that repo just now, and make sure we updated server info
|
|
|
if db_repo.repo_type == 'git':
|
|
|
git_repo = db_repo.scm_instance()
|
|
|
# update repository server-info
|
|
|
log.debug('Running update server info')
|
|
|
git_repo._update_server_info(force=True)
|
|
|
|
|
|
db_repo.update_commit_cache()
|
|
|
|
|
|
config = db_repo._config
|
|
|
config.set('extensions', 'largefiles', '')
|
|
|
repo = db_repo.scm_instance(config=config)
|
|
|
repo.install_hooks(force=force_hooks_rebuild)
|
|
|
|
|
|
removed = []
|
|
|
if remove_obsolete:
|
|
|
# remove from database those repositories that are not in the filesystem
|
|
|
for repo in sa.query(Repository).all():
|
|
|
if repo.repo_name not in list(initial_repo_list.keys()):
|
|
|
log.debug("Removing non-existing repository found in db `%s`",
|
|
|
repo.repo_name)
|
|
|
try:
|
|
|
RepoModel(sa).delete(repo, forks='detach', fs_remove=False)
|
|
|
sa.commit()
|
|
|
removed.append(repo.repo_name)
|
|
|
except Exception:
|
|
|
# don't hold further removals on error
|
|
|
log.error(traceback.format_exc())
|
|
|
sa.rollback()
|
|
|
|
|
|
def splitter(full_repo_name):
|
|
|
_parts = full_repo_name.rsplit(RepoGroup.url_sep(), 1)
|
|
|
gr_name = None
|
|
|
if len(_parts) == 2:
|
|
|
gr_name = _parts[0]
|
|
|
return gr_name
|
|
|
|
|
|
initial_repo_group_list = [splitter(x) for x in
|
|
|
list(initial_repo_list.keys()) if splitter(x)]
|
|
|
|
|
|
# remove from database those repository groups that are not in the
|
|
|
# filesystem due to parent child relationships we need to delete them
|
|
|
# in a specific order of most nested first
|
|
|
all_groups = [x.group_name for x in sa.query(RepoGroup).all()]
|
|
|
def nested_sort(gr):
|
|
|
return len(gr.split('/'))
|
|
|
for group_name in sorted(all_groups, key=nested_sort, reverse=True):
|
|
|
if group_name not in initial_repo_group_list:
|
|
|
repo_group = RepoGroup.get_by_group_name(group_name)
|
|
|
if (repo_group.children.all() or
|
|
|
not RepoGroupModel().check_exist_filesystem(
|
|
|
group_name=group_name, exc_on_failure=False)):
|
|
|
continue
|
|
|
|
|
|
log.info(
|
|
|
'Removing non-existing repository group found in db `%s`',
|
|
|
group_name)
|
|
|
try:
|
|
|
RepoGroupModel(sa).delete(group_name, fs_remove=False)
|
|
|
sa.commit()
|
|
|
removed.append(group_name)
|
|
|
except Exception:
|
|
|
# don't hold further removals on error
|
|
|
log.exception(
|
|
|
'Unable to remove repository group `%s`',
|
|
|
group_name)
|
|
|
sa.rollback()
|
|
|
raise
|
|
|
|
|
|
return added, removed
|
|
|
|
|
|
|
|
|
def load_rcextensions(root_path):
|
|
|
import rhodecode
|
|
|
from rhodecode.config import conf
|
|
|
|
|
|
path = os.path.join(root_path)
|
|
|
sys.path.append(path)
|
|
|
|
|
|
try:
|
|
|
rcextensions = __import__('rcextensions')
|
|
|
except ImportError:
|
|
|
if os.path.isdir(os.path.join(path, 'rcextensions')):
|
|
|
log.warning('Unable to load rcextensions from %s', path)
|
|
|
rcextensions = None
|
|
|
|
|
|
if rcextensions:
|
|
|
log.info('Loaded rcextensions from %s...', rcextensions)
|
|
|
rhodecode.EXTENSIONS = rcextensions
|
|
|
|
|
|
# Additional mappings that are not present in the pygments lexers
|
|
|
conf.LANGUAGES_EXTENSIONS_MAP.update(
|
|
|
getattr(rhodecode.EXTENSIONS, 'EXTRA_MAPPINGS', {}))
|
|
|
|
|
|
|
|
|
def get_custom_lexer(extension):
|
|
|
"""
|
|
|
returns a custom lexer if it is defined in rcextensions module, or None
|
|
|
if there's no custom lexer defined
|
|
|
"""
|
|
|
import rhodecode
|
|
|
from pygments import lexers
|
|
|
|
|
|
# custom override made by RhodeCode
|
|
|
if extension in ['mako']:
|
|
|
return lexers.get_lexer_by_name('html+mako')
|
|
|
|
|
|
# check if we didn't define this extension as other lexer
|
|
|
extensions = rhodecode.EXTENSIONS and getattr(rhodecode.EXTENSIONS, 'EXTRA_LEXERS', None)
|
|
|
if extensions and extension in rhodecode.EXTENSIONS.EXTRA_LEXERS:
|
|
|
_lexer_name = rhodecode.EXTENSIONS.EXTRA_LEXERS[extension]
|
|
|
return lexers.get_lexer_by_name(_lexer_name)
|
|
|
|
|
|
|
|
|
#==============================================================================
|
|
|
# TEST FUNCTIONS AND CREATORS
|
|
|
#==============================================================================
|
|
|
def create_test_index(repo_location, config):
|
|
|
"""
|
|
|
Makes default test index.
|
|
|
"""
|
|
|
try:
|
|
|
import rc_testdata
|
|
|
except ImportError:
|
|
|
raise ImportError('Failed to import rc_testdata, '
|
|
|
'please make sure this package is installed from requirements_test.txt')
|
|
|
rc_testdata.extract_search_index(
|
|
|
'vcs_search_index', os.path.dirname(config['search.location']))
|
|
|
|
|
|
|
|
|
def create_test_directory(test_path):
|
|
|
"""
|
|
|
Create test directory if it doesn't exist.
|
|
|
"""
|
|
|
if not os.path.isdir(test_path):
|
|
|
log.debug('Creating testdir %s', test_path)
|
|
|
os.makedirs(test_path)
|
|
|
|
|
|
|
|
|
def create_test_database(test_path, config):
|
|
|
"""
|
|
|
Makes a fresh database.
|
|
|
"""
|
|
|
from rhodecode.lib.db_manage import DbManage
|
|
|
from rhodecode.lib.utils2 import get_encryption_key
|
|
|
|
|
|
# PART ONE create db
|
|
|
dbconf = config['sqlalchemy.db1.url']
|
|
|
enc_key = get_encryption_key(config)
|
|
|
|
|
|
log.debug('making test db %s', dbconf)
|
|
|
|
|
|
dbmanage = DbManage(log_sql=False, dbconf=dbconf, root=config['here'],
|
|
|
tests=True, cli_args={'force_ask': True}, enc_key=enc_key)
|
|
|
dbmanage.create_tables(override=True)
|
|
|
dbmanage.set_db_version()
|
|
|
# for tests dynamically set new root paths based on generated content
|
|
|
dbmanage.create_settings(dbmanage.config_prompt(test_path))
|
|
|
dbmanage.create_default_user()
|
|
|
dbmanage.create_test_admin_and_users()
|
|
|
dbmanage.create_permissions()
|
|
|
dbmanage.populate_default_permissions()
|
|
|
Session().commit()
|
|
|
|
|
|
|
|
|
def create_test_repositories(test_path, config):
|
|
|
"""
|
|
|
Creates test repositories in the temporary directory. Repositories are
|
|
|
extracted from archives within the rc_testdata package.
|
|
|
"""
|
|
|
import rc_testdata
|
|
|
from rhodecode.tests import HG_REPO, GIT_REPO, SVN_REPO
|
|
|
|
|
|
log.debug('making test vcs repositories')
|
|
|
|
|
|
idx_path = config['search.location']
|
|
|
data_path = config['cache_dir']
|
|
|
|
|
|
# clean index and data
|
|
|
if idx_path and os.path.exists(idx_path):
|
|
|
log.debug('remove %s', idx_path)
|
|
|
shutil.rmtree(idx_path)
|
|
|
|
|
|
if data_path and os.path.exists(data_path):
|
|
|
log.debug('remove %s', data_path)
|
|
|
shutil.rmtree(data_path)
|
|
|
|
|
|
rc_testdata.extract_hg_dump('vcs_test_hg', jn(test_path, HG_REPO))
|
|
|
rc_testdata.extract_git_dump('vcs_test_git', jn(test_path, GIT_REPO))
|
|
|
|
|
|
# Note: Subversion is in the process of being integrated with the system,
|
|
|
# until we have a properly packed version of the test svn repository, this
|
|
|
# tries to copy over the repo from a package "rc_testdata"
|
|
|
svn_repo_path = rc_testdata.get_svn_repo_archive()
|
|
|
with tarfile.open(svn_repo_path) as tar:
|
|
|
tar.extractall(jn(test_path, SVN_REPO))
|
|
|
|
|
|
|
|
|
def password_changed(auth_user, session):
|
|
|
# Never report password change in case of default user or anonymous user.
|
|
|
if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
|
|
|
return False
|
|
|
|
|
|
password_hash = md5(safe_bytes(auth_user.password)) if auth_user.password else None
|
|
|
rhodecode_user = session.get('rhodecode_user', {})
|
|
|
session_password_hash = rhodecode_user.get('password', '')
|
|
|
return password_hash != session_password_hash
|
|
|
|
|
|
|
|
|
def read_opensource_licenses():
|
|
|
global _license_cache
|
|
|
|
|
|
if not _license_cache:
|
|
|
licenses = pkg_resources.resource_string(
|
|
|
'rhodecode', 'config/licenses.json')
|
|
|
_license_cache = json.loads(licenses)
|
|
|
|
|
|
return _license_cache
|
|
|
|
|
|
|
|
|
def generate_platform_uuid():
|
|
|
"""
|
|
|
Generates platform UUID based on it's name
|
|
|
"""
|
|
|
import platform
|
|
|
|
|
|
try:
|
|
|
uuid_list = [platform.platform()]
|
|
|
return sha256_safe(':'.join(uuid_list))
|
|
|
except Exception as e:
|
|
|
log.error('Failed to generate host uuid: %s', e)
|
|
|
return 'UNDEFINED'
|
|
|
|
|
|
|
|
|
def send_test_email(recipients, email_body='TEST EMAIL'):
|
|
|
"""
|
|
|
Simple code for generating test emails.
|
|
|
Usage::
|
|
|
|
|
|
from rhodecode.lib import utils
|
|
|
utils.send_test_email()
|
|
|
"""
|
|
|
from rhodecode.lib.celerylib import tasks, run_task
|
|
|
|
|
|
email_body = email_body_plaintext = email_body
|
|
|
subject = f'SUBJECT FROM: {socket.gethostname()}'
|
|
|
tasks.send_email(recipients, subject, email_body_plaintext, email_body)
|
|
|
|