##// END OF EJS Templates
celery: ping db connection before task execution to recycle db connections.
celery: ping db connection before task execution to recycle db connections.

File last commit:

r3363:f08e98b1 default
r3390:02f7713a default
Show More
utils.py
781 lines | 25.7 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2010-2019 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
"""
Utilities library for RhodeCode
"""
import datetime
import decorator
import json
import logging
import os
import re
rcextensions: new builtin rcextensions....
r3133 import sys
project: added all source files and assets
r1 import shutil
import tempfile
import traceback
import tarfile
import warnings
system-info: added UUID placeholder for generating platform unique identifiers.
r1115 import hashlib
Martin Bornhold
admin: Add helper to get the registry from requests.
r298 from os.path import join as jn
project: added all source files and assets
r1
import paste
import pkg_resources
from webhelpers.text import collapse, remove_formatting, strip_tags
from mako import exceptions
Martin Bornhold
admin: Add helper to get the registry from requests.
r298 from pyramid.threadlocal import get_current_registry
debug: add new custom logging to track unique requests across systems.
r2794 from rhodecode.lib.request import Request
project: added all source files and assets
r1
from rhodecode.lib.vcs.backends.base import Config
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.utils.helpers import get_scm, get_scm_backend
from rhodecode.lib.utils2 import (
caches: don't use deprecated md5 for key calculation....
r2834 safe_str, safe_unicode, get_current_rhodecode_user, md5, sha1)
project: added all source files and assets
r1 from rhodecode.model import meta
from rhodecode.model.db import (
Repository, User, RhodeCodeUi, UserLog, RepoGroup, UserGroup)
from rhodecode.model.meta import Session
utils: make sure we don't import any models inside utils
r262
project: added all source files and assets
r1
log = logging.getLogger(__name__)
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}__.*')
Martin Bornhold
vcs: Clean up the shadow-repo-expose code and make some nicer comments.
r904 # String which contains characters that are not allowed in slug names for
# repositories or repository groups. It is properly escaped to use it in
# regular expressions.
slugs: make special chars not be replaced by '-'. THis produces a much...
r1147 SLUG_BAD_CHARS = re.escape('`?=[]\;\'"<>,/~!@#$%^&*()+{}|:')
Martin Bornhold
vcs: Clean up the shadow-repo-expose code and make some nicer comments.
r904
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 # Regex that matches forbidden characters in repo/group slugs.
SLUG_BAD_CHAR_RE = re.compile('[{}]'.format(SLUG_BAD_CHARS))
Martin Bornhold
vcs: Clean up the shadow-repo-expose code and make some nicer comments.
r904
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 # Regex that matches allowed characters in repo/group slugs.
SLUG_GOOD_CHAR_RE = re.compile('[^{}]'.format(SLUG_BAD_CHARS))
Martin Bornhold
vcs: Clean up the shadow-repo-expose code and make some nicer comments.
r904
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 # Regex that matches whole repo/group slugs.
SLUG_RE = re.compile('[^{}]+'.format(SLUG_BAD_CHARS))
project: added all source files and assets
r1
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 _license_cache = None
project: added all source files and assets
r1
def repo_name_slug(value):
"""
Return slug of name of repository
This function is called on each creation/modification
of repository to prevent bad names in repo
"""
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 replacement_char = '-'
project: added all source files and assets
r1
slug = remove_formatting(value)
slugs: make special chars not be replaced by '-'. THis produces a much...
r1147 slug = SLUG_BAD_CHAR_RE.sub('', slug)
slug = re.sub('[\s]+', '-', slug)
Martin Bornhold
shadow: Use only a single regular expression to generate and match repo/group slugs.
r901 slug = collapse(slug, replacement_char)
project: added all source files and assets
r1 return slug
#==============================================================================
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
#==============================================================================
def get_repo_slug(request):
routing: optionally use explicit db object on route....
r1989 _repo = ''
pylons: remove pylons as dependency...
r2351 if hasattr(request, 'db_repo'):
# if our requests has set db reference use it for name, this
# translates the example.com/_<id> into proper repo names
_repo = request.db_repo.repo_name
elif getattr(request, 'matchdict', None):
# pyramid
_repo = request.matchdict.get('repo_name')
auth: make the perms decorators and function pyramid compatible.
r1494
project: added all source files and assets
r1 if _repo:
_repo = _repo.rstrip('/')
return _repo
def get_repo_group_slug(request):
routing: optionally use explicit db object on route....
r1989 _group = ''
pylons: remove pylons as dependency...
r2351 if hasattr(request, 'db_repo_group'):
# if our requests has set db reference use it for name, this
# translates the example.com/_<id> into proper repo group names
_group = request.db_repo_group.group_name
elif getattr(request, 'matchdict', None):
# pyramid
_group = request.matchdict.get('repo_group_name')
routing: optionally use explicit db object on route....
r1989
project: added all source files and assets
r1 if _group:
_group = _group.rstrip('/')
return _group
def get_user_group_slug(request):
user-groups: rewrote the app to pyramid...
r2068 _user_group = ''
pylons: remove pylons as dependency...
r2351 if hasattr(request, 'db_user_group'):
_user_group = request.db_user_group.users_group_name
elif getattr(request, 'matchdict', None):
# pyramid
_user_group = request.matchdict.get('user_group_id')
Bartłomiej Wołyńczyk
Public user group profile Task #5326
r2638 _user_group_name = request.matchdict.get('user_group_name')
pylons: remove pylons as dependency...
r2351 try:
Bartłomiej Wołyńczyk
Public user group profile Task #5326
r2638 if _user_group:
_user_group = UserGroup.get(_user_group)
elif _user_group_name:
_user_group = UserGroup.get_by_group_name(_user_group_name)
pylons: remove pylons as dependency...
r2351 if _user_group:
_user_group = _user_group.users_group_name
except Exception:
Bartłomiej Wołyńczyk
Public user group profile Task #5326
r2638 log.exception('Failed to get user group by id and name')
pylons: remove pylons as dependency...
r2351 # catch all failures here
return None
user-groups: rewrote the app to pyramid...
r2068
return _user_group
project: added all source files and assets
r1
def get_filesystem_repos(path, recursive=False, skip_removed_repos=True):
"""
Scans given path for repos and return (name,(type,path)) tuple
:param path: path to scan for repositories
:param recursive: recursive search and return names with subdirs in front
"""
# remove ending slash for better results
path = path.rstrip(os.sep)
log.debug('now scanning in %s location recursive:%s...', path, recursive)
def _get_repos(p):
dirpaths = _get_dirpaths(p)
if not _is_dir_writable(p):
log.warning('repo path without write access: %s', p)
for dirpath in dirpaths:
if os.path.isfile(os.path.join(p, dirpath)):
continue
cur_path = os.path.join(p, dirpath)
# skip removed repos
if skip_removed_repos and REMOVED_REPO_PAT.match(dirpath):
continue
#skip .<somethin> dirs
if dirpath.startswith('.'):
continue
try:
scm_info = get_scm(cur_path)
yield scm_info[1].split(path, 1)[-1].lstrip(os.sep), scm_info
except VCSError:
if not recursive:
continue
#check if this dir containts other repos for recursive scan
rec_path = os.path.join(p, dirpath)
if os.path.isdir(rec_path):
for inner_scm in _get_repos(rec_path):
yield inner_scm
return _get_repos(path)
def _get_dirpaths(p):
try:
# OS-independable way of checking if we have at least read-only
# access or not.
dirpaths = os.listdir(p)
except OSError:
log.warning('ignoring repo path without read access: %s', p)
return []
# os.listpath has a tweak: If a unicode is passed into it, then it tries to
# decode paths and suddenly returns unicode objects itself. The items it
# cannot decode are returned as strings and cause issues.
#
# Those paths are ignored here until a solid solution for path handling has
# been built.
expected_type = type(p)
def _has_correct_type(item):
if type(item) is not expected_type:
log.error(
u"Ignoring path %s since it cannot be decoded into unicode.",
# Using "repr" to make sure that we see the byte value in case
# of support.
repr(item))
return False
return True
dirpaths = [item for item in dirpaths if _has_correct_type(item)]
return dirpaths
def _is_dir_writable(path):
"""
Probe if `path` is writable.
Due to trouble on Cygwin / Windows, this is actually probing if it is
possible to create a file inside of `path`, stat does not produce reliable
results in this case.
"""
try:
with tempfile.TemporaryFile(dir=path):
pass
except OSError:
return False
return True
simplevcs: allow passing config into repo detection logic....
r2519 def is_valid_repo(repo_name, base_path, expect_scm=None, explicit_scm=None, config=None):
project: added all source files and assets
r1 """
Returns True if given path is a valid repository False otherwise.
If expect_scm param is given also, compare if given scm is the same
as expected from scm parameter. If explicit_scm is given don't try to
detect the scm, just use the given one to check if repo is valid
:param repo_name:
:param base_path:
:param expect_scm:
:param explicit_scm:
simplevcs: allow passing config into repo detection logic....
r2519 :param config:
project: added all source files and assets
r1
:return True: if given path is a valid repository
"""
full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
logging: added additional log info to vcs detection util.
r1315 log.debug('Checking if `%s` is a valid path for repository. '
'Explicit type: %s', repo_name, explicit_scm)
project: added all source files and assets
r1
try:
if explicit_scm:
simplevcs: allow passing config into repo detection logic....
r2519 detected_scms = [get_scm_backend(explicit_scm)(
full_path, config=config).alias]
project: added all source files and assets
r1 else:
detected_scms = get_scm(full_path)
if expect_scm:
return detected_scms[0] == expect_scm
log.debug('path: %s is an vcs object:%s', full_path, detected_scms)
return True
except VCSError:
log.debug('path: %s is not a valid repo !', full_path)
return False
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
"""
Returns True if given path is a repository group, False otherwise
:param repo_name:
:param base_path:
"""
full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
log.debug('Checking if `%s` is a valid path for repository group',
repo_group_name)
# check if it's not a repo
if is_valid_repo(repo_group_name, base_path):
logging: use lazy parameter evaluation in log calls.
r3061 log.debug('Repo called %s exist, it is not a valid repo group', repo_group_name)
project: added all source files and assets
r1 return False
try:
# we need to check bare git repos at higher level
# since we might match branches/hooks/info/objects or possible
# other things inside bare git repo
import/repo-checks: skip validating storage path in detection of SCMs.
r3037 maybe_repo = os.path.dirname(full_path)
if maybe_repo == base_path:
# skip root level repo check, we know root location CANNOT BE a repo group
return False
scm_ = get_scm(maybe_repo)
log.debug('path: %s is a vcs object:%s, not valid repo group', full_path, scm_)
project: added all source files and assets
r1 return False
except VCSError:
pass
# check if it's a valid path
if skip_path_check or os.path.isdir(full_path):
log.debug('path: %s is a valid repo group !', full_path)
return True
log.debug('path: %s is not a valid repo group !', full_path)
return False
setup: improve support for prompts, we should not take care of case sensitivity here
r505 def ask_ok(prompt, retries=4, complaint='[y]es or [n]o please!'):
project: added all source files and assets
r1 while True:
ok = raw_input(prompt)
setup: improve support for prompts, we should not take care of case sensitivity here
r505 if ok.lower() in ('y', 'ye', 'yes'):
project: added all source files and assets
r1 return True
setup: improve support for prompts, we should not take care of case sensitivity here
r505 if ok.lower() in ('n', 'no', 'nop', 'nope'):
project: added all source files and assets
r1 return False
retries = retries - 1
if retries < 0:
raise IOError
setup: improve support for prompts, we should not take care of case sensitivity here
r505 print(complaint)
project: added all source files and assets
r1
# propagated from mercurial documentation
ui_sections = [
'alias', 'auth',
'decode/encode', 'defaults',
'diff', 'email',
'extensions', 'format',
'merge-patterns', 'merge-tools',
'hooks', 'http_proxy',
'smtp', 'patch',
'paths', 'profiling',
'server', 'trusted',
'ui', 'web', ]
def config_data_from_db(clear_session=True, repo=None):
"""
Read the configuration data from the database and return configuration
tuples.
"""
utils: make sure we don't import any models inside utils
r262 from rhodecode.model.settings import VcsSettingsModel
project: added all source files and assets
r1 config = []
sa = meta.Session()
settings_model = VcsSettingsModel(repo=repo, sa=sa)
ui_settings = settings_model.get_ui_settings()
logging: dont spawn UI logs across multiple lines
r2640 ui_data = []
project: added all source files and assets
r1 for setting in ui_settings:
if setting.active:
logging: dont spawn UI logs across multiple lines
r2640 ui_data.append((setting.section, setting.key, setting.value))
project: added all source files and assets
r1 config.append((
safe_str(setting.section), safe_str(setting.key),
safe_str(setting.value)))
if setting.key == 'push_ssl':
# force set push_ssl requirement to False, rhodecode
# handles that
config.append((
safe_str(setting.section), safe_str(setting.key), False))
logging: dont spawn UI logs across multiple lines
r2640 log.debug(
'settings ui from db: %s',
','.join(map(lambda s: '[{}] {}={}'.format(*s), ui_data)))
project: added all source files and assets
r1 if clear_session:
meta.Session.remove()
# TODO: mikhail: probably it makes no sense to re-read hooks information.
# It's already there and activated/deactivated
skip_entries = []
enabled_hook_classes = get_enabled_hook_classes(ui_settings)
if 'pull' not in enabled_hook_classes:
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRE_PULL))
if 'push' not in enabled_hook_classes:
skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRE_PUSH))
hooks: added new pretx hook to allow mercurial checks such as protected branches, or force push.
r1461 skip_entries.append(('hooks', RhodeCodeUi.HOOK_PRETX_PUSH))
hooks: expose refs on push....
r1755 skip_entries.append(('hooks', RhodeCodeUi.HOOK_PUSH_KEY))
project: added all source files and assets
r1
config = [entry for entry in config if entry[:2] not in skip_entries]
return config
def make_db_config(clear_session=True, repo=None):
"""
Create a :class:`Config` instance based on the values in the database.
"""
config = Config()
config_data = config_data_from_db(clear_session=clear_session, repo=repo)
for section, option, value in config_data:
config.set(section, option, value)
return config
def get_enabled_hook_classes(ui_settings):
"""
Return the enabled hook classes.
:param ui_settings: List of ui_settings as returned
by :meth:`VcsSettingsModel.get_ui_settings`
:return: a list with the enabled hook classes. The order is not guaranteed.
:rtype: list
"""
enabled_hooks = []
active_hook_keys = [
key for section, key, value, active in ui_settings
if section == 'hooks' and active]
hook_names = {
RhodeCodeUi.HOOK_PUSH: 'push',
RhodeCodeUi.HOOK_PULL: 'pull',
RhodeCodeUi.HOOK_REPO_SIZE: 'repo_size'
}
for key in active_hook_keys:
hook = hook_names.get(key)
if hook:
enabled_hooks.append(hook)
return enabled_hooks
def set_rhodecode_config(config):
"""
pylons: remove pylons as dependency...
r2351 Updates pyramid config with new settings from database
project: added all source files and assets
r1
:param config:
"""
utils: make sure we don't import any models inside utils
r262 from rhodecode.model.settings import SettingsModel
project: added all source files and assets
r1 app_settings = SettingsModel().get_all_settings()
for k, v in app_settings.items():
config[k] = v
Martin Bornhold
vcs: Add method to utils which returnd the rhodecode realm for HTTP basic auth.
r554 def get_rhodecode_realm():
"""
Return the rhodecode realm from database.
"""
from rhodecode.model.settings import SettingsModel
realm = SettingsModel().get_setting_by_name('realm')
return safe_str(realm.app_settings_value)
Martin Bornhold
vcs: Add method to return the base path to the repo store.
r555 def get_rhodecode_base_path():
"""
Returns the base path. The base path is the filesystem path which points
to the repository store.
"""
from rhodecode.model.settings import SettingsModel
paths_ui = SettingsModel().get_ui_by_section_and_key('paths', '/')
return safe_str(paths_ui.ui_value)
project: added all source files and assets
r1 def map_groups(path):
"""
Given a full path to a repository, create all nested groups that this
repo is inside. This function creates parent-child relationships between
groups and creates default perms for all new groups.
:param paths: full path to repository
"""
utils: make sure we don't import any models inside utils
r262 from rhodecode.model.repo_group import RepoGroupModel
project: added all source files and assets
r1 sa = meta.Session()
groups = path.split(Repository.NAME_SEP)
parent = None
group = None
# last element is repo in nested groups structure
groups = groups[:-1]
rgm = RepoGroupModel(sa)
refactor: renamed get_first_admin to get_first_super_admin which...
r278 owner = User.get_first_super_admin()
project: added all source files and assets
r1 for lvl, group_name in enumerate(groups):
group_name = '/'.join(groups[:lvl] + [group_name])
group = RepoGroup.get_by_group_name(group_name)
desc = '%s group' % group_name
# skip folders that are now removed repos
if REMOVED_REPO_PAT.match(group_name):
break
if group is None:
log.debug('creating group level: %s group_name: %s',
lvl, group_name)
group = RepoGroup(group_name, parent)
group.group_description = desc
group.user = owner
sa.add(group)
perm_obj = rgm._create_default_perms(group)
sa.add(perm_obj)
sa.flush()
parent = group
return group
def repo2db_mapper(initial_repo_list, remove_obsolete=False):
"""
maps all repos given in initial_repo_list, non existing repositories
are created, if remove_obsolete is True it also checks for db entries
that are not in initial_repo_list and removes them.
:param initial_repo_list: list of repositories found by scanning methods
:param remove_obsolete: check for obsolete entries in database
"""
from rhodecode.model.repo import RepoModel
utils: make sure we don't import any models inside utils
r262 from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.settings import SettingsModel
project: added all source files and assets
r1 sa = meta.Session()
repo_model = RepoModel()
refactor: renamed get_first_admin to get_first_super_admin which...
r278 user = User.get_first_super_admin()
project: added all source files and assets
r1 added = []
# creation defaults
defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
enable_statistics = defs.get('repo_enable_statistics')
enable_locking = defs.get('repo_enable_locking')
enable_downloads = defs.get('repo_enable_downloads')
private = defs.get('repo_private')
for name, repo in initial_repo_list.items():
group = map_groups(name)
unicode_name = safe_unicode(name)
db_repo = repo_model.get_by_repo_name(unicode_name)
# found repo that is on filesystem not in RhodeCode database
if not db_repo:
log.info('repository %s not found, creating now', name)
added.append(name)
desc = (repo.description
if repo.description != 'unknown'
else '%s repository' % name)
db_repo = repo_model._create_repo(
repo_name=name,
repo_type=repo.alias,
description=desc,
repo_group=getattr(group, 'group_id', None),
owner=user,
enable_locking=enable_locking,
enable_downloads=enable_downloads,
enable_statistics=enable_statistics,
private=private,
state=Repository.STATE_CREATED
)
sa.commit()
# we added that repo just now, and make sure we updated server info
if db_repo.repo_type == 'git':
git_repo = db_repo.scm_instance()
# update repository server-info
log.debug('Running update server info')
git_repo._update_server_info()
db_repo.update_commit_cache()
config = db_repo._config
config.set('extensions', 'largefiles', '')
svn: enable hooks and integration framework execution....
r2677 repo = db_repo.scm_instance(config=config)
repo.install_hooks()
project: added all source files and assets
r1
removed = []
if remove_obsolete:
# remove from database those repositories that are not in the filesystem
for repo in sa.query(Repository).all():
if repo.repo_name not in initial_repo_list.keys():
log.debug("Removing non-existing repository found in db `%s`",
repo.repo_name)
try:
RepoModel(sa).delete(repo, forks='detach', fs_remove=False)
sa.commit()
removed.append(repo.repo_name)
except Exception:
# don't hold further removals on error
log.error(traceback.format_exc())
sa.rollback()
def splitter(full_repo_name):
_parts = full_repo_name.rsplit(RepoGroup.url_sep(), 1)
gr_name = None
if len(_parts) == 2:
gr_name = _parts[0]
return gr_name
initial_repo_group_list = [splitter(x) for x in
initial_repo_list.keys() if splitter(x)]
# remove from database those repository groups that are not in the
# filesystem due to parent child relationships we need to delete them
# in a specific order of most nested first
all_groups = [x.group_name for x in sa.query(RepoGroup).all()]
nested_sort = lambda gr: len(gr.split('/'))
for group_name in sorted(all_groups, key=nested_sort, reverse=True):
if group_name not in initial_repo_group_list:
repo_group = RepoGroup.get_by_group_name(group_name)
if (repo_group.children.all() or
not RepoGroupModel().check_exist_filesystem(
group_name=group_name, exc_on_failure=False)):
continue
log.info(
'Removing non-existing repository group found in db `%s`',
group_name)
try:
RepoGroupModel(sa).delete(group_name, fs_remove=False)
sa.commit()
removed.append(group_name)
except Exception:
# don't hold further removals on error
log.exception(
'Unable to remove repository group `%s`',
group_name)
sa.rollback()
raise
return added, removed
def load_rcextensions(root_path):
import rhodecode
from rhodecode.config import conf
rcextensions: new builtin rcextensions....
r3133 path = os.path.join(root_path)
sys.path.append(path)
try:
rcextensions = __import__('rcextensions')
except ImportError:
log.warn('Unable to load rcextensions from %s', path)
rcextensions = None
if rcextensions:
log.debug('Found rcextensions module loaded %s...', rcextensions)
rhodecode.EXTENSIONS = rcextensions
project: added all source files and assets
r1
# Additional mappings that are not present in the pygments lexers
rcextensions: new builtin rcextensions....
r3133 conf.LANGUAGES_EXTENSIONS_MAP.update(
getattr(rhodecode.EXTENSIONS, 'EXTRA_MAPPINGS', {}))
project: added all source files and assets
r1
def get_custom_lexer(extension):
"""
returns a custom lexer if it is defined in rcextensions module, or None
if there's no custom lexer defined
"""
import rhodecode
from pygments import lexers
files: use custom lexer on mako files....
r1584
# custom override made by RhodeCode
if extension in ['mako']:
files: use html+mako lexer for .mako files.
r1592 return lexers.get_lexer_by_name('html+mako')
files: use custom lexer on mako files....
r1584
project: added all source files and assets
r1 # check if we didn't define this extension as other lexer
extensions = rhodecode.EXTENSIONS and getattr(rhodecode.EXTENSIONS, 'EXTRA_LEXERS', None)
if extensions and extension in rhodecode.EXTENSIONS.EXTRA_LEXERS:
_lexer_name = rhodecode.EXTENSIONS.EXTRA_LEXERS[extension]
return lexers.get_lexer_by_name(_lexer_name)
#==============================================================================
# TEST FUNCTIONS AND CREATORS
#==============================================================================
Martin Bornhold
pytest: Removed unused argument.
r214 def create_test_index(repo_location, config):
project: added all source files and assets
r1 """
Martin Bornhold
pytest: Update docstrings to reflect changes.
r216 Makes default test index.
project: added all source files and assets
r1 """
Martin Bornhold
pytest: Create test search index from rc_testdata package.
r212 import rc_testdata
rc_testdata.extract_search_index(
'vcs_search_index', os.path.dirname(config['search.location']))
project: added all source files and assets
r1
Martin Bornhold
pytest: Unify create_test_* functions.
r215 def create_test_directory(test_path):
"""
Martin Bornhold
pytest: Update docstrings to reflect changes.
r216 Create test directory if it doesn't exist.
Martin Bornhold
pytest: Unify create_test_* functions.
r215 """
if not os.path.isdir(test_path):
log.debug('Creating testdir %s', test_path)
os.makedirs(test_path)
def create_test_database(test_path, config):
project: added all source files and assets
r1 """
Martin Bornhold
pytest: Create test repositories from rc_testdata package.
r213 Makes a fresh database.
project: added all source files and assets
r1 """
from rhodecode.lib.db_manage import DbManage
# PART ONE create db
dbconf = config['sqlalchemy.db1.url']
log.debug('making test db %s', dbconf)
dbmanage = DbManage(log_sql=False, dbconf=dbconf, root=config['here'],
tests=True, cli_args={'force_ask': True})
dbmanage.create_tables(override=True)
dbmanage.set_db_version()
# for tests dynamically set new root paths based on generated content
Martin Bornhold
pytest: Unify create_test_* functions.
r215 dbmanage.create_settings(dbmanage.config_prompt(test_path))
project: added all source files and assets
r1 dbmanage.create_default_user()
dbmanage.create_test_admin_and_users()
dbmanage.create_permissions()
dbmanage.populate_default_permissions()
Session().commit()
Martin Bornhold
pytest: Create test repositories from rc_testdata package.
r213
Martin Bornhold
pytest: Unify create_test_* functions.
r215 def create_test_repositories(test_path, config):
Martin Bornhold
pytest: Create test repositories from rc_testdata package.
r213 """
Creates test repositories in the temporary directory. Repositories are
extracted from archives within the rc_testdata package.
"""
import rc_testdata
Martin Bornhold
pytest: Unify create_test_* functions.
r215 from rhodecode.tests import HG_REPO, GIT_REPO, SVN_REPO
Martin Bornhold
pytest: Create test repositories from rc_testdata package.
r213
project: added all source files and assets
r1 log.debug('making test vcs repositories')
db: Move initialization of test environment up to pyramid layer.
r116 idx_path = config['search.location']
data_path = config['cache_dir']
project: added all source files and assets
r1
db: Move initialization of test environment up to pyramid layer.
r116 # clean index and data
project: added all source files and assets
r1 if idx_path and os.path.exists(idx_path):
log.debug('remove %s', idx_path)
shutil.rmtree(idx_path)
if data_path and os.path.exists(data_path):
log.debug('remove %s', data_path)
shutil.rmtree(data_path)
Martin Bornhold
pytest: Unify create_test_* functions.
r215 rc_testdata.extract_hg_dump('vcs_test_hg', jn(test_path, HG_REPO))
rc_testdata.extract_git_dump('vcs_test_git', jn(test_path, GIT_REPO))
project: added all source files and assets
r1
# Note: Subversion is in the process of being integrated with the system,
# until we have a properly packed version of the test svn repository, this
# tries to copy over the repo from a package "rc_testdata"
svn_repo_path = rc_testdata.get_svn_repo_archive()
with tarfile.open(svn_repo_path) as tar:
Martin Bornhold
pytest: Unify create_test_* functions.
r215 tar.extractall(jn(test_path, SVN_REPO))
project: added all source files and assets
r1
def password_changed(auth_user, session):
Martin Bornhold
auth: Fix password_changed function, fixes #4043....
r482 # Never report password change in case of default user or anonymous user.
if auth_user.username == User.DEFAULT_USER or auth_user.user_id is None:
project: added all source files and assets
r1 return False
Martin Bornhold
auth: Fix password_changed function, fixes #4043....
r482
project: added all source files and assets
r1 password_hash = md5(auth_user.password) if auth_user.password else None
rhodecode_user = session.get('rhodecode_user', {})
session_password_hash = rhodecode_user.get('password', '')
return password_hash != session_password_hash
def read_opensource_licenses():
global _license_cache
if not _license_cache:
licenses = pkg_resources.resource_string(
Martin Bornhold
oss-licenses: Use only 'rhodecode' as pkg_resource manager string....
r205 'rhodecode', 'config/licenses.json')
project: added all source files and assets
r1 _license_cache = json.loads(licenses)
return _license_cache
Martin Bornhold
admin: Add helper to get the registry from requests.
r298
system-info: added UUID placeholder for generating platform unique identifiers.
r1115 def generate_platform_uuid():
"""
Generates platform UUID based on it's name
"""
import platform
try:
uuid_list = [platform.platform()]
return hashlib.sha256(':'.join(uuid_list)).hexdigest()
except Exception as e:
logging: use lazy parameter evaluation in log calls.
r3061 log.error('Failed to generate host uuid: %s', e)
system-info: added UUID placeholder for generating platform unique identifiers.
r1115 return 'UNDEFINED'