import time
import logging
import operator
from pyramid import compat
from pyramid.httpexceptions import HTTPFound, HTTPForbidden, HTTPBadRequest
from rhodecode.lib import helpers as h, diffs
from rhodecode.lib.utils2 import (
StrictAttributeDict, safe_int, datetime_to_time, safe_unicode)
from rhodecode.lib.vcs.exceptions import RepositoryRequirementError
from rhodecode.model import repo
from rhodecode.model import repo_group
from rhodecode.model import user_group
from rhodecode.model import user
from rhodecode.model.db import User
from rhodecode.model.scm import ScmModel
from rhodecode.model.settings import VcsSettingsModel
log = logging.getLogger(__name__)
ADMIN_PREFIX = '/_admin'
# group name can have a slash in them, but they must not end with a slash
'group_name': r'.*?[^/]',
'repo_group_name': r'.*?[^/]',
# repo names can have a slash in them, but they must not end with a slash
'repo_name': r'.*?[^/]',
# file path eats up everything at the end
'f_path': r'.*',
# reference types
'source_ref_type': '(branch|book|tag|rev|\%\(source_ref_type\)s)',
'target_ref_type': '(branch|book|tag|rev|\%\(target_ref_type\)s)',
def add_route_with_slash(config,name, pattern, **kw):
config.add_route(name, pattern, **kw)
if not pattern.endswith('/'):
config.add_route(name + '_slash', pattern + '/', **kw)
def add_route_requirements(route_path, requirements=None):
Adds regex requirements to pyramid routes using a mapping dict
requirements = requirements or URL_NAME_REQUIREMENTS
for key, regex in requirements.items():
route_path = route_path.replace('{%s}' % key, '{%s:%s}' % (key, regex))
return route_path
def get_format_ref_id(repo):
"""Returns a `repo` specific reference formatter function"""
if h.is_svn(repo):
return _format_ref_id_svn
return _format_ref_id
def _format_ref_id(name, raw_id):
"""Default formatting of a given reference `name`"""
return name
def _format_ref_id_svn(name, raw_id):
"""Special way of formatting a reference for Subversion including path"""
return '%s@%s' % (name, raw_id)
class TemplateArgs(StrictAttributeDict):
class BaseAppView(object):
def __init__(self, context, request):
self.request = request
self.context = context
self.session = request.session
if not hasattr(request, 'user'):
# NOTE(marcink): edge case, we ended up in matched route
# but probably of web-app context, e.g API CALL/VCS CALL
if hasattr(request, 'vcs_call') or hasattr(request, 'rpc_method'):
log.warning('Unable to process request `%s` in this scope', request)
raise HTTPBadRequest()
self._rhodecode_user = request.user # auth user
self._rhodecode_db_user = self._rhodecode_user.get_instance()
self._maybe_needs_password_change(, self._rhodecode_db_user)
def _maybe_needs_password_change(self, view_name, user_obj):
log.debug('Checking if user %s needs password change on view %s',
user_obj, view_name)
skip_user_views = [
'logout', 'login',
'my_account_password', 'my_account_password_update'
if not user_obj:
if user_obj.username == User.DEFAULT_USER:
now = time.time()
should_change = user_obj.user_data.get('force_password_change')
change_after = safe_int(should_change) or 0
if should_change and now > change_after:
log.debug('User %s requires password change', user_obj)
h.flash('You are required to change your password', 'warning',
if view_name not in skip_user_views:
raise HTTPFound(
def _log_creation_exception(self, e, repo_name):
_ = self.request.translate
reason = None
if len(e.args) == 2:
reason = e.args[1]
'Exception creating a repository: invalid certificate')
msg = (_('Error creating repository %s: invalid certificate')
% repo_name)
log.exception("Exception creating a repository")
msg = (_('Error creating repository %s')
% repo_name)
return msg
def _get_local_tmpl_context(self, include_app_defaults=True):
c = TemplateArgs()
c.auth_user = self.request.user
# TODO(marcink): migrate the usage of c.rhodecode_user to c.auth_user
c.rhodecode_user = self.request.user
if include_app_defaults:
from rhodecode.lib.base import attach_context_attributes
attach_context_attributes(c, self.request, self.request.user.user_id)
return c
def _get_template_context(self, tmpl_args, **kwargs):
local_tmpl_args = {
'defaults': {},
'errors': {},
'c': tmpl_args
return local_tmpl_args
def load_default_context(self):
c = self._get_local_tmpl_context()
c.custom_var = 'foobar'
return c
raise NotImplementedError('Needs implementation in view class')
class RepoAppView(BaseAppView):
def __init__(self, context, request):
super(RepoAppView, self).__init__(context, request)
self.db_repo = request.db_repo
self.db_repo_name = self.db_repo.repo_name
self.db_repo_pull_requests = ScmModel().get_pull_requests(self.db_repo)
def _handle_missing_requirements(self, error):
'Requirements are missing for repository %s: %s',
self.db_repo_name, safe_unicode(error))
def _get_local_tmpl_context(self, include_app_defaults=True):
_ = self.request.translate
c = super(RepoAppView, self)._get_local_tmpl_context(
# register common vars for this type of view
c.rhodecode_db_repo = self.db_repo
c.repo_name = self.db_repo_name
c.repository_pull_requests = self.db_repo_pull_requests
self.path_filter = PathFilter(None)
c.repository_requirements_missing = {}
self.rhodecode_vcs_repo = self.db_repo.scm_instance()
if self.rhodecode_vcs_repo:
path_perms = self.rhodecode_vcs_repo.get_path_permissions(
self.path_filter = PathFilter(path_perms)
except RepositoryRequirementError as e:
c.repository_requirements_missing = {'error': str(e)}
self.rhodecode_vcs_repo = None
c.path_filter = self.path_filter # used by atom_feed_entry.mako
if self.rhodecode_vcs_repo is None:
# unable to fetch this repo as vcs instance, report back to user
"The repository `%(repo_name)s` cannot be loaded in filesystem. "
"Please check if it exist, or is not damaged.") %
{'repo_name': c.repo_name},
category='error', ignore_duplicate=True)
if c.repository_requirements_missing:
route =
if route.startswith(('edit_repo', 'repo_summary')):
# allow summary and edit repo on missing requirements
return c
raise HTTPFound(
h.route_path('repo_summary', repo_name=self.db_repo_name))
else: # redirect if we don't show missing requirements
raise HTTPFound(h.route_path('home'))
c.has_origin_repo_read_perm = False
if self.db_repo.fork:
c.has_origin_repo_read_perm = h.HasRepoPermissionAny(
'repository.write', '', 'repository.admin')(
self.db_repo.fork.repo_name, 'summary fork link')
return c
def _get_f_path_unchecked(self, matchdict, default=None):
Should only be used by redirects, everything else should call _get_f_path
f_path = matchdict.get('f_path')
if f_path:
# fix for multiple initial slashes that causes errors for GIT
return f_path.lstrip('/')
return default
def _get_f_path(self, matchdict, default=None):
f_path_match = self._get_f_path_unchecked(matchdict, default)
return self.path_filter.assert_path_permissions(f_path_match)
def _get_general_setting(self, target_repo, settings_key, default=False):
settings_model = VcsSettingsModel(repo=target_repo)
settings = settings_model.get_general_settings()
return settings.get(settings_key, default)
class PathFilter(object):
# Expects and instance of BasePathPermissionChecker or None
def __init__(self, permission_checker):
self.permission_checker = permission_checker
def assert_path_permissions(self, path):
if path and self.permission_checker and not self.permission_checker.has_access(path):
raise HTTPForbidden()
return path
def filter_patchset(self, patchset):
if not self.permission_checker or not patchset:
return patchset, False
had_filtered = False
filtered_patchset = []
for patch in patchset:
filename = patch.get('filename', None)
if not filename or self.permission_checker.has_access(filename):
had_filtered = True
if had_filtered:
if isinstance(patchset, diffs.LimitedDiffContainer):
filtered_patchset = diffs.LimitedDiffContainer(patchset.diff_limit, patchset.cur_diff_size, filtered_patchset)
return filtered_patchset, True
return patchset, False
def render_patchset_filtered(self, diffset, patchset, source_ref=None, target_ref=None):
filtered_patchset, has_hidden_changes = self.filter_patchset(patchset)
result = diffset.render_patchset(
filtered_patchset, source_ref=source_ref, target_ref=target_ref)
result.has_hidden_changes = has_hidden_changes
return result
def get_raw_patch(self, diff_processor):
if self.permission_checker is None:
return diff_processor.as_raw()
elif self.permission_checker.has_full_access:
return diff_processor.as_raw()
return '# Repository has user-specific filters, raw patch generation is disabled.'
def is_enabled(self):
return self.permission_checker is not None
class RepoGroupAppView(BaseAppView):
def __init__(self, context, request):
super(RepoGroupAppView, self).__init__(context, request)
self.db_repo_group = request.db_repo_group
self.db_repo_group_name = self.db_repo_group.group_name
def _revoke_perms_on_yourself(self, form_result):
_updates = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
_additions = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
_deletions = filter(lambda u: self._rhodecode_user.user_id == int(u[0]),
admin_perm = 'group.admin'
if _updates and _updates[0][1] != admin_perm or \
_additions and _additions[0][1] != admin_perm or \
_deletions and _deletions[0][1] != admin_perm:
return True
return False
class UserGroupAppView(BaseAppView):
def __init__(self, context, request):
super(UserGroupAppView, self).__init__(context, request)
self.db_user_group = request.db_user_group
self.db_user_group_name = self.db_user_group.users_group_name
class UserAppView(BaseAppView):
def __init__(self, context, request):
super(UserAppView, self).__init__(context, request)
self.db_user = request.db_user
self.db_user_id = self.db_user.user_id
_ = self.request.translate
if not request.db_user_supports_default:
if self.db_user.username == User.DEFAULT_USER:
h.flash(_("Editing user `{}` is disabled.".format(
User.DEFAULT_USER)), category='warning')
raise HTTPFound(h.route_path('users'))
class DataGridAppView(object):
Common class to have re-usable grid rendering components
def _extract_ordering(self, request, column_map=None):
column_map = column_map or {}
column_index = safe_int(request.GET.get('order[0][column]'))
order_dir = request.GET.get(
'order[0][dir]', 'desc')
order_by = request.GET.get(
'columns[%s][data][sort]' % column_index, 'name_raw')
# translate datatable to DB columns
order_by = column_map.get(order_by) or order_by
search_q = request.GET.get('search[value]')
return search_q, order_by, order_dir
def _extract_chunk(self, request):
start = safe_int(request.GET.get('start'), 0)
length = safe_int(request.GET.get('length'), 25)
draw = safe_int(request.GET.get('draw'))
return draw, start, length
def _get_order_col(self, order_by, model):
if isinstance(order_by, compat.string_types):
return operator.attrgetter(order_by)(model)
except AttributeError:
return None
return order_by
class BaseReferencesView(RepoAppView):
Base for reference view for branches, tags and bookmarks.
def load_default_context(self):
c = self._get_local_tmpl_context()
return c
def load_refs_context(self, ref_items, partials_template):
_render = self.request.get_partial_renderer(partials_template)
pre_load = ["author", "date", "message"]
is_svn = h.is_svn(self.rhodecode_vcs_repo)
is_hg = h.is_hg(self.rhodecode_vcs_repo)
format_ref_id = get_format_ref_id(self.rhodecode_vcs_repo)
closed_refs = {}
if is_hg:
closed_refs = self.rhodecode_vcs_repo.branches_closed
data = []
for ref_name, commit_id in ref_items:
commit = self.rhodecode_vcs_repo.get_commit(
commit_id=commit_id, pre_load=pre_load)
closed = ref_name in closed_refs
# TODO: johbo: Unify generation of reference links
use_commit_id = '/' in ref_name or is_svn
if use_commit_id:
files_url = h.route_path(
f_path=ref_name if is_svn else '',
files_url = h.route_path(
f_path=ref_name if is_svn else '',
"name": _render('name', ref_name, files_url, closed),
"name_raw": ref_name,
"date": _render('date',,
"date_raw": datetime_to_time(,
"author": _render('author',,
"commit": _render(
'commit', commit.message, commit.raw_id, commit.idx),
"commit_raw": commit.idx,
"compare": _render(
'compare', format_ref_id(ref_name, commit.raw_id)),
return data
class RepoRoutePredicate(object):
def __init__(self, val, config):
self.val = val
def text(self):
return 'repo_route = %s' % self.val
phash = text
def __call__(self, info, request):
if hasattr(request, 'vcs_call'):
# skip vcs calls
repo_name = info['match']['repo_name']
repo_model = repo.RepoModel()
by_name_match = repo_model.get_by_repo_name(repo_name, cache=False)
def redirect_if_creating(route_info, db_repo):
skip_views = ['edit_repo_advanced_delete']
route = route_info['route']
# we should skip delete view so we can actually "remove" repositories
# if they get stuck in creating state.
if in skip_views:
if db_repo.repo_state in [repo.Repository.STATE_PENDING]:
repo_creating_url = request.route_path(
'repo_creating', repo_name=db_repo.repo_name)
raise HTTPFound(repo_creating_url)
if by_name_match:
# register this as request object we can re-use later
request.db_repo = by_name_match
redirect_if_creating(info, by_name_match)
return True
by_id_match = repo_model.get_repo_by_id(repo_name)
if by_id_match:
request.db_repo = by_id_match
redirect_if_creating(info, by_id_match)
return True
return False
class RepoForbidArchivedRoutePredicate(object):
def __init__(self, val, config):
self.val = val
def text(self):
return 'repo_forbid_archived = %s' % self.val
phash = text
def __call__(self, info, request):
_ = request.translate
rhodecode_db_repo = request.db_repo
'%s checking if archived flag for repo for %s',
self.__class__.__name__, rhodecode_db_repo.repo_name)
if rhodecode_db_repo.archived:
log.warning('Current view is not supported for archived repo:%s',
h.literal(_('Action not supported for archived repository.')),
summary_url = request.route_path(
'repo_summary', repo_name=rhodecode_db_repo.repo_name)
raise HTTPFound(summary_url)
return True
class RepoTypeRoutePredicate(object):
def __init__(self, val, config):
self.val = val or ['hg', 'git', 'svn']
def text(self):
return 'repo_accepted_type = %s' % self.val
phash = text
def __call__(self, info, request):
if hasattr(request, 'vcs_call'):
# skip vcs calls
rhodecode_db_repo = request.db_repo
'%s checking repo type for %s in %s',
self.__class__.__name__, rhodecode_db_repo.repo_type, self.val)
if rhodecode_db_repo.repo_type in self.val:
return True
log.warning('Current view is not supported for repo type:%s',
return False
class RepoGroupRoutePredicate(object):
def __init__(self, val, config):
self.val = val
def text(self):
return 'repo_group_route = %s' % self.val
phash = text
def __call__(self, info, request):
if hasattr(request, 'vcs_call'):
# skip vcs calls
repo_group_name = info['match']['repo_group_name']
repo_group_model = repo_group.RepoGroupModel()
by_name_match = repo_group_model.get_by_group_name(repo_group_name, cache=False)
if by_name_match:
# register this as request object we can re-use later
request.db_repo_group = by_name_match
return True
return False
class UserGroupRoutePredicate(object):
def __init__(self, val, config):
self.val = val
def text(self):
return 'user_group_route = %s' % self.val
phash = text
def __call__(self, info, request):
if hasattr(request, 'vcs_call'):
# skip vcs calls
user_group_id = info['match']['user_group_id']
user_group_model = user_group.UserGroup()
by_id_match = user_group_model.get(user_group_id, cache=False)
if by_id_match:
# register this as request object we can re-use later
request.db_user_group = by_id_match
return True
return False
class UserRoutePredicateBase(object):
supports_default = None
def __init__(self, val, config):
self.val = val
def text(self):
raise NotImplementedError()
def __call__(self, info, request):
if hasattr(request, 'vcs_call'):
# skip vcs calls
user_id = info['match']['user_id']
user_model = user.User()
by_id_match = user_model.get(user_id, cache=False)
if by_id_match:
# register this as request object we can re-use later
request.db_user = by_id_match
request.db_user_supports_default = self.supports_default
return True
return False
class UserRoutePredicate(UserRoutePredicateBase):
supports_default = False
def text(self):
return 'user_route = %s' % self.val
phash = text
class UserRouteWithDefaultPredicate(UserRoutePredicateBase):
supports_default = True
def text(self):
return 'user_with_default_route = %s' % self.val
phash = text
def includeme(config):
'repo_route', RepoRoutePredicate)
'repo_accepted_types', RepoTypeRoutePredicate)
'repo_forbid_when_archived', RepoForbidArchivedRoutePredicate)
'repo_group_route', RepoGroupRoutePredicate)
'user_group_route', UserGroupRoutePredicate)
'user_route_with_default', UserRouteWithDefaultPredicate)
'user_route', UserRoutePredicate)