base.py
607 lines
| 21.4 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
""" | ||||
The base Controller API | ||||
Provides the BaseController class for subclassing. And usage in different | ||||
controllers | ||||
""" | ||||
import logging | ||||
import socket | ||||
r5085 | import base64 | |||
r1 | ||||
r1949 | import markupsafe | |||
r1 | import ipaddress | |||
r5085 | import paste.httpheaders | |||
r1 | from paste.auth.basic import AuthBasicAuthenticator | |||
from paste.httpexceptions import HTTPUnauthorized, HTTPForbidden, get_exception | ||||
import rhodecode | ||||
from rhodecode.authentication.base import VCS_TYPE | ||||
from rhodecode.lib import auth, utils2 | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib.auth import AuthUser, CookieStoreWrapper | ||||
from rhodecode.lib.exceptions import UserCreationError | ||||
r2358 | from rhodecode.lib.utils import (password_changed, get_enabled_hook_classes) | |||
r5085 | from rhodecode.lib.utils2 import AttributeDict | |||
from rhodecode.lib.str_utils import ascii_bytes, safe_int, safe_str | ||||
from rhodecode.lib.type_utils import aslist, str2bool | ||||
from rhodecode.lib.hash_utils import sha1 | ||||
r3424 | from rhodecode.model.db import Repository, User, ChangesetComment, UserBookmark | |||
r1 | from rhodecode.model.notification import NotificationModel | |||
from rhodecode.model.settings import VcsSettingsModel, SettingsModel | ||||
log = logging.getLogger(__name__) | ||||
def _filter_proxy(ip): | ||||
""" | ||||
Passed in IP addresses in HEADERS can be in a special format of multiple | ||||
ips. Those comma separated IPs are passed from various proxies in the | ||||
chain of request processing. The left-most being the original client. | ||||
We only care about the first IP which came from the org. client. | ||||
:param ip: ip string from headers | ||||
""" | ||||
if ',' in ip: | ||||
_ips = ip.split(',') | ||||
_first_ip = _ips[0].strip() | ||||
log.debug('Got multiple IPs %s, using %s', ','.join(_ips), _first_ip) | ||||
return _first_ip | ||||
return ip | ||||
def _filter_port(ip): | ||||
""" | ||||
Removes a port from ip, there are 4 main cases to handle here. | ||||
- ipv4 eg. 127.0.0.1 | ||||
- ipv6 eg. ::1 | ||||
- ipv4+port eg. 127.0.0.1:8080 | ||||
- ipv6+port eg. [::1]:8080 | ||||
:param ip: | ||||
""" | ||||
def is_ipv6(ip_addr): | ||||
if hasattr(socket, 'inet_pton'): | ||||
try: | ||||
socket.inet_pton(socket.AF_INET6, ip_addr) | ||||
except socket.error: | ||||
return False | ||||
else: | ||||
# fallback to ipaddress | ||||
try: | ||||
r4959 | ipaddress.IPv6Address(safe_str(ip_addr)) | |||
r1 | except Exception: | |||
return False | ||||
return True | ||||
if ':' not in ip: # must be ipv4 pure ip | ||||
return ip | ||||
if '[' in ip and ']' in ip: # ipv6 with port | ||||
return ip.split(']')[0][1:].lower() | ||||
# must be ipv6 or ipv4 with port | ||||
if is_ipv6(ip): | ||||
return ip | ||||
else: | ||||
ip, _port = ip.split(':')[:2] # means ipv4+port | ||||
return ip | ||||
def get_ip_addr(environ): | ||||
proxy_key = 'HTTP_X_REAL_IP' | ||||
proxy_key2 = 'HTTP_X_FORWARDED_FOR' | ||||
def_key = 'REMOTE_ADDR' | ||||
r5085 | ||||
def ip_filters(ip_): | ||||
return _filter_port(_filter_proxy(ip_)) | ||||
r1 | ||||
ip = environ.get(proxy_key) | ||||
if ip: | ||||
r5085 | return ip_filters(ip) | |||
r1 | ||||
ip = environ.get(proxy_key2) | ||||
if ip: | ||||
r5085 | return ip_filters(ip) | |||
r1 | ||||
ip = environ.get(def_key, '0.0.0.0') | ||||
r5085 | return ip_filters(ip) | |||
r1 | ||||
def get_server_ip_addr(environ, log_errors=True): | ||||
hostname = environ.get('SERVER_NAME') | ||||
try: | ||||
return socket.gethostbyname(hostname) | ||||
except Exception as e: | ||||
if log_errors: | ||||
# in some cases this lookup is not possible, and we don't want to | ||||
# make it an exception in logs | ||||
log.exception('Could not retrieve server ip address: %s', e) | ||||
return hostname | ||||
def get_server_port(environ): | ||||
return environ.get('SERVER_PORT') | ||||
r1710 | def get_user_agent(environ): | |||
return environ.get('HTTP_USER_AGENT') | ||||
r1 | def vcs_operation_context( | |||
Martin Bornhold
|
r899 | environ, repo_name, username, action, scm, check_locking=True, | ||
r2979 | is_shadow_repo=False, check_branch_perms=False, detect_force_push=False): | |||
r1 | """ | |||
Generate the context for a vcs operation, e.g. push or pull. | ||||
This context is passed over the layers so that hooks triggered by the | ||||
vcs operation know details like the user, the user's IP address etc. | ||||
:param check_locking: Allows to switch of the computation of the locking | ||||
data. This serves mainly the need of the simplevcs middleware to be | ||||
able to disable this for certain operations. | ||||
""" | ||||
# Tri-state value: False: unlock, None: nothing, True: lock | ||||
make_lock = None | ||||
locked_by = [None, None, None] | ||||
is_anonymous = username == User.DEFAULT_USER | ||||
r2411 | user = User.get_by_username(username) | |||
r1 | if not is_anonymous and check_locking: | |||
log.debug('Checking locking on repository "%s"', repo_name) | ||||
repo = Repository.get_by_repo_name(repo_name) | ||||
make_lock, __, locked_by = repo.get_locking_state( | ||||
action, user.user_id) | ||||
r2411 | user_id = user.user_id | |||
r1 | settings_model = VcsSettingsModel(repo=repo_name) | |||
ui_settings = settings_model.get_ui_settings() | ||||
r2982 | # NOTE(marcink): This should be also in sync with | |||
r3094 | # rhodecode/apps/ssh_support/lib/backends/base.py:update_environment scm_data | |||
store = [x for x in ui_settings if x.key == '/'] | ||||
repo_store = '' | ||||
if store: | ||||
repo_store = store[0].value | ||||
r2982 | scm_data = { | |||
r1 | 'ip': get_ip_addr(environ), | |||
'username': username, | ||||
r2411 | 'user_id': user_id, | |||
r1 | 'action': action, | |||
'repository': repo_name, | ||||
'scm': scm, | ||||
'config': rhodecode.CONFIG['__file__'], | ||||
r3094 | 'repo_store': repo_store, | |||
r1 | 'make_lock': make_lock, | |||
'locked_by': locked_by, | ||||
'server_url': utils2.get_server_url(environ), | ||||
r1710 | 'user_agent': get_user_agent(environ), | |||
r1 | 'hooks': get_enabled_hook_classes(ui_settings), | |||
Martin Bornhold
|
r899 | 'is_shadow_repo': is_shadow_repo, | ||
r2979 | 'detect_force_push': detect_force_push, | |||
'check_branch_perms': check_branch_perms, | ||||
r1 | } | |||
r2982 | return scm_data | |||
r1 | ||||
class BasicAuth(AuthBasicAuthenticator): | ||||
Martin Bornhold
|
r591 | def __init__(self, realm, authfunc, registry, auth_http_code=None, | ||
r4220 | initial_call_detection=False, acl_repo_name=None, rc_realm=''): | |||
r5095 | super().__init__(realm=realm, authfunc=authfunc) | |||
r1 | self.realm = realm | |||
r4220 | self.rc_realm = rc_realm | |||
r1 | self.initial_call = initial_call_detection | |||
self.authfunc = authfunc | ||||
Martin Bornhold
|
r591 | self.registry = registry | ||
r1510 | self.acl_repo_name = acl_repo_name | |||
r1 | self._rc_auth_http_code = auth_http_code | |||
r5085 | def _get_response_from_code(self, http_code, fallback): | |||
r1 | try: | |||
return get_exception(safe_int(http_code)) | ||||
except Exception: | ||||
r5085 | log.exception('Failed to fetch response class for code %s, using fallback: %s', http_code, fallback) | |||
return fallback | ||||
r1 | ||||
r2140 | def get_rc_realm(self): | |||
r4220 | return safe_str(self.rc_realm) | |||
r2140 | ||||
r1 | def build_authentication(self): | |||
r5085 | header = [('WWW-Authenticate', f'Basic realm="{self.realm}"')] | |||
# NOTE: the initial_Call detection seems to be not working/not needed witg latest Mercurial | ||||
# investigate if we still need it. | ||||
r1 | if self._rc_auth_http_code and not self.initial_call: | |||
# return alternative HTTP code if alternative http return code | ||||
# is specified in RhodeCode config, but ONLY if it's not the | ||||
# FIRST call | ||||
r5085 | custom_response_klass = self._get_response_from_code(self._rc_auth_http_code, fallback=HTTPUnauthorized) | |||
log.debug('Using custom response class: %s', custom_response_klass) | ||||
return custom_response_klass(headers=header) | ||||
return HTTPUnauthorized(headers=header) | ||||
r1 | ||||
def authenticate(self, environ): | ||||
r5085 | authorization = paste.httpheaders.AUTHORIZATION(environ) | |||
r1 | if not authorization: | |||
return self.build_authentication() | ||||
r5085 | (auth_meth, auth_creds_b64) = authorization.split(' ', 1) | |||
if 'basic' != auth_meth.lower(): | ||||
r1 | return self.build_authentication() | |||
r5085 | ||||
credentials = safe_str(base64.b64decode(auth_creds_b64.strip())) | ||||
_parts = credentials.split(':', 1) | ||||
r1 | if len(_parts) == 2: | |||
username, password = _parts | ||||
r2154 | auth_data = self.authfunc( | |||
Martin Bornhold
|
r591 | username, password, environ, VCS_TYPE, | ||
r2154 | registry=self.registry, acl_repo_name=self.acl_repo_name) | |||
if auth_data: | ||||
return {'username': username, 'auth_data': auth_data} | ||||
r1 | if username and password: | |||
# we mark that we actually executed authentication once, at | ||||
# that point we can use the alternative auth code | ||||
self.initial_call = False | ||||
return self.build_authentication() | ||||
__call__ = authenticate | ||||
r2000 | def calculate_version_hash(config): | |||
r2835 | return sha1( | |||
r5085 | config.get(b'beaker.session.secret', b'') + ascii_bytes(rhodecode.__version__) | |||
)[:8] | ||||
r1893 | ||||
r1904 | def get_current_lang(request): | |||
r1919 | return getattr(request, '_LOCALE_', request.locale_name) | |||
r1904 | ||||
r4277 | def attach_context_attributes(context, request, user_id=None, is_api=None): | |||
r400 | """ | |||
r2351 | Attach variables into template context called `c`. | |||
r400 | """ | |||
r2351 | config = request.registry.settings | |||
r4200 | rc_config = SettingsModel().get_all_settings(cache=True, from_request=False) | |||
r3855 | context.rc_config = rc_config | |||
r1 | context.rhodecode_version = rhodecode.__version__ | |||
context.rhodecode_edition = config.get('rhodecode.edition') | ||||
r4516 | context.rhodecode_edition_id = config.get('rhodecode.edition_id') | |||
r1 | # unique secret + version does not leak the version but keep consistency | |||
r2000 | context.rhodecode_version_hash = calculate_version_hash(config) | |||
r1 | ||||
# Default language set for the incoming request | ||||
r1904 | context.language = get_current_lang(request) | |||
r1 | ||||
# Visual options | ||||
context.visual = AttributeDict({}) | ||||
r754 | # DB stored Visual Items | |||
r1 | context.visual.show_public_icon = str2bool( | |||
rc_config.get('rhodecode_show_public_icon')) | ||||
context.visual.show_private_icon = str2bool( | ||||
rc_config.get('rhodecode_show_private_icon')) | ||||
context.visual.stylify_metatags = str2bool( | ||||
rc_config.get('rhodecode_stylify_metatags')) | ||||
context.visual.dashboard_items = safe_int( | ||||
rc_config.get('rhodecode_dashboard_items', 100)) | ||||
context.visual.admin_grid_items = safe_int( | ||||
rc_config.get('rhodecode_admin_grid_items', 100)) | ||||
r3404 | context.visual.show_revision_number = str2bool( | |||
rc_config.get('rhodecode_show_revision_number', True)) | ||||
context.visual.show_sha_length = safe_int( | ||||
rc_config.get('rhodecode_show_sha_length', 100)) | ||||
r1 | context.visual.repository_fields = str2bool( | |||
rc_config.get('rhodecode_repository_fields')) | ||||
context.visual.show_version = str2bool( | ||||
rc_config.get('rhodecode_show_version')) | ||||
context.visual.use_gravatar = str2bool( | ||||
rc_config.get('rhodecode_use_gravatar')) | ||||
context.visual.gravatar_url = rc_config.get('rhodecode_gravatar_url') | ||||
context.visual.default_renderer = rc_config.get( | ||||
'rhodecode_markup_renderer', 'rst') | ||||
r1324 | context.visual.comment_types = ChangesetComment.COMMENT_TYPES | |||
r1 | context.visual.rhodecode_support_url = \ | |||
r1679 | rc_config.get('rhodecode_support_url') or h.route_url('rhodecode_support') | |||
r1 | ||||
r1931 | context.visual.affected_files_cut_off = 60 | |||
r1 | context.pre_code = rc_config.get('rhodecode_pre_code') | |||
context.post_code = rc_config.get('rhodecode_post_code') | ||||
context.rhodecode_name = rc_config.get('rhodecode_title') | ||||
context.default_encodings = aslist(config.get('default_encoding'), sep=',') | ||||
# if we have specified default_encoding in the request, it has more | ||||
# priority | ||||
if request.GET.get('default_encoding'): | ||||
context.default_encodings.insert(0, request.GET.get('default_encoding')) | ||||
context.clone_uri_tmpl = rc_config.get('rhodecode_clone_uri_tmpl') | ||||
r4629 | context.clone_uri_id_tmpl = rc_config.get('rhodecode_clone_uri_id_tmpl') | |||
r2497 | context.clone_uri_ssh_tmpl = rc_config.get('rhodecode_clone_uri_ssh_tmpl') | |||
r1 | ||||
# INI stored | ||||
context.labs_active = str2bool( | ||||
config.get('labs_settings_active', 'false')) | ||||
r2498 | context.ssh_enabled = str2bool( | |||
config.get('ssh.generate_authorized_keyfile', 'false')) | ||||
r3478 | context.ssh_key_generator_enabled = str2bool( | |||
config.get('ssh.enable_ui_key_generator', 'true')) | ||||
r2498 | ||||
r1 | context.visual.allow_custom_hooks_settings = str2bool( | |||
config.get('allow_custom_hooks_settings', True)) | ||||
context.debug_style = str2bool(config.get('debug_style', False)) | ||||
context.rhodecode_instanceid = config.get('instance_id') | ||||
r1813 | context.visual.cut_off_limit_diff = safe_int( | |||
r5085 | config.get('cut_off_limit_diff'), default=0) | |||
r1813 | context.visual.cut_off_limit_file = safe_int( | |||
r5085 | config.get('cut_off_limit_file'), default=0) | |||
r1813 | ||||
r4006 | context.license = AttributeDict({}) | |||
context.license.hide_license_info = str2bool( | ||||
config.get('license.hide_license_info', False)) | ||||
r1 | # AppEnlight | |||
r4823 | context.appenlight_enabled = config.get('appenlight', False) | |||
r1 | context.appenlight_api_public_key = config.get( | |||
'appenlight.api_public_key', '') | ||||
context.appenlight_server_url = config.get('appenlight.server_url', '') | ||||
r3088 | diffmode = { | |||
"unified": "unified", | ||||
"sideside": "sideside" | ||||
}.get(request.GET.get('diffmode')) | ||||
r4277 | if is_api is not None: | |||
is_api = hasattr(request, 'rpc_user') | ||||
r3088 | session_attrs = { | |||
# defaults | ||||
"clone_url_format": "http", | ||||
r4453 | "diffmode": "sideside", | |||
"license_fingerprint": request.session.get('license_fingerprint') | ||||
r3088 | } | |||
r3749 | ||||
if not is_api: | ||||
# don't access pyramid session for API calls | ||||
if diffmode and diffmode != request.session.get('rc_user_session_attr.diffmode'): | ||||
request.session['rc_user_session_attr.diffmode'] = diffmode | ||||
# session settings per user | ||||
r5085 | for k, v in list(request.session.items()): | |||
r3749 | pref = 'rc_user_session_attr.' | |||
if k and k.startswith(pref): | ||||
k = k[len(pref):] | ||||
session_attrs[k] = v | ||||
r3088 | ||||
context.user_session_attrs = session_attrs | ||||
r400 | # JS template context | |||
context.template_context = { | ||||
'repo_name': None, | ||||
'repo_type': None, | ||||
'repo_landing_commit': None, | ||||
'rhodecode_user': { | ||||
'username': None, | ||||
'email': None, | ||||
r526 | 'notification_status': False | |||
r400 | }, | |||
r3088 | 'session_attrs': session_attrs, | |||
r400 | 'visual': { | |||
'default_renderer': None | ||||
}, | ||||
'commit_data': { | ||||
'commit_id': None | ||||
}, | ||||
'pull_request_data': {'pull_request_id': None}, | ||||
'timeago': { | ||||
'refresh_time': 120 * 1000, | ||||
'cutoff_limit': 1000 * 60 * 60 * 24 * 7 | ||||
}, | ||||
'pyramid_dispatch': { | ||||
}, | ||||
'extra': {'plugins': {}} | ||||
} | ||||
r1 | # END CONFIG VARS | |||
r3749 | if is_api: | |||
csrf_token = None | ||||
else: | ||||
csrf_token = auth.get_csrf_token(session=request.session) | ||||
r1 | ||||
r3749 | context.csrf_token = csrf_token | |||
r5085 | context.backends = list(rhodecode.BACKENDS.keys()) | |||
r4321 | ||||
r3424 | unread_count = 0 | |||
user_bookmark_list = [] | ||||
if user_id: | ||||
unread_count = NotificationModel().get_unread_cnt_for_user(user_id) | ||||
user_bookmark_list = UserBookmark.get_bookmarks_for_user(user_id) | ||||
context.unread_notifications = unread_count | ||||
context.bookmark_items = user_bookmark_list | ||||
r1906 | ||||
# web case | ||||
r2351 | if hasattr(request, 'user'): | |||
context.auth_user = request.user | ||||
context.rhodecode_user = request.user | ||||
r1906 | ||||
# api case | ||||
r2351 | if hasattr(request, 'rpc_user'): | |||
context.auth_user = request.rpc_user | ||||
context.rhodecode_user = request.rpc_user | ||||
r1794 | ||||
r1896 | # attach the whole call context to the request | |||
r4873 | request.set_call_context(context) | |||
r1794 | ||||
r765 | ||||
r1903 | def get_auth_user(request): | |||
environ = request.environ | ||||
session = request.session | ||||
r1 | ||||
ip_addr = get_ip_addr(environ) | ||||
r4002 | ||||
r1 | # make sure that we update permissions each time we call controller | |||
r4608 | _auth_token = ( | |||
# ?auth_token=XXX | ||||
request.GET.get('auth_token', '') | ||||
# ?api_key=XXX !LEGACY | ||||
or request.GET.get('api_key', '') | ||||
# or headers.... | ||||
or request.headers.get('X-Rc-Auth-Token', '') | ||||
) | ||||
r4007 | if not _auth_token and request.matchdict: | |||
r4002 | url_auth_token = request.matchdict.get('_auth_token') | |||
_auth_token = url_auth_token | ||||
if _auth_token: | ||||
log.debug('Using URL extracted auth token `...%s`', _auth_token[-4:]) | ||||
r1 | ||||
if _auth_token: | ||||
r1300 | # when using API_KEY we assume user exists, and | |||
# doesn't need auth based on cookies. | ||||
r1 | auth_user = AuthUser(api_key=_auth_token, ip_addr=ip_addr) | |||
authenticated = False | ||||
else: | ||||
cookie_store = CookieStoreWrapper(session.get('rhodecode_user')) | ||||
try: | ||||
auth_user = AuthUser(user_id=cookie_store.get('user_id', None), | ||||
ip_addr=ip_addr) | ||||
except UserCreationError as e: | ||||
h.flash(e, 'error') | ||||
# container auth or other auth functions that create users | ||||
# on the fly can throw this exception signaling that there's | ||||
# issue with user creation, explanation should be provided | ||||
# in Exception itself. We then create a simple blank | ||||
# AuthUser | ||||
auth_user = AuthUser(ip_addr=ip_addr) | ||||
r2358 | # in case someone changes a password for user it triggers session | |||
# flush and forces a re-login | ||||
r1 | if password_changed(auth_user, session): | |||
session.invalidate() | ||||
r1300 | cookie_store = CookieStoreWrapper(session.get('rhodecode_user')) | |||
r1 | auth_user = AuthUser(ip_addr=ip_addr) | |||
authenticated = cookie_store.get('is_authenticated') | ||||
if not auth_user.is_authenticated and auth_user.is_user_object: | ||||
# user is not authenticated and not empty | ||||
auth_user.set_authenticated(authenticated) | ||||
r4002 | return auth_user, _auth_token | |||
r1 | ||||
r1949 | def h_filter(s): | |||
""" | ||||
Custom filter for Mako templates. Mako by standard uses `markupsafe.escape` | ||||
we wrap this with additional functionality that converts None to empty | ||||
strings | ||||
""" | ||||
if s is None: | ||||
return markupsafe.Markup() | ||||
return markupsafe.escape(s) | ||||
r2016 | def add_events_routes(config): | |||
""" | ||||
Adds routing that can be used in events. Because some events are triggered | ||||
outside of pyramid context, we need to bootstrap request with some | ||||
routing registered | ||||
""" | ||||
r2311 | ||||
from rhodecode.apps._base import ADMIN_PREFIX | ||||
r2016 | config.add_route(name='home', pattern='/') | |||
r4148 | config.add_route(name='main_page_repos_data', pattern='/_home_repos') | |||
config.add_route(name='main_page_repo_groups_data', pattern='/_home_repo_groups') | ||||
r2016 | ||||
r2311 | config.add_route(name='login', pattern=ADMIN_PREFIX + '/login') | |||
config.add_route(name='logout', pattern=ADMIN_PREFIX + '/logout') | ||||
r2016 | config.add_route(name='repo_summary', pattern='/{repo_name}') | |||
config.add_route(name='repo_summary_explicit', pattern='/{repo_name}/summary') | ||||
config.add_route(name='repo_group_home', pattern='/{repo_group_name}') | ||||
config.add_route(name='pullrequest_show', | ||||
pattern='/{repo_name}/pull-request/{pull_request_id}') | ||||
config.add_route(name='pull_requests_global', | ||||
pattern='/pull-request/{pull_request_id}') | ||||
r4047 | ||||
r2016 | config.add_route(name='repo_commit', | |||
pattern='/{repo_name}/changeset/{commit_id}') | ||||
config.add_route(name='repo_files', | ||||
pattern='/{repo_name}/files/{commit_id}/{f_path}') | ||||
r4047 | config.add_route(name='hovercard_user', | |||
pattern='/_hovercard/user/{user_id}') | ||||
config.add_route(name='hovercard_user_group', | ||||
pattern='/_hovercard/user_group/{user_group_id}') | ||||
config.add_route(name='hovercard_pull_request', | ||||
pattern='/_hovercard/pull_request/{pull_request_id}') | ||||
config.add_route(name='hovercard_repo_commit', | ||||
pattern='/_hovercard/commit/{repo_name}/{commit_id}') | ||||
r2016 | ||||
r4873 | def bootstrap_config(request, registry_name='RcTestRegistry'): | |||
r5325 | from rhodecode.config.config_maker import sanitize_settings_and_apply_defaults | |||
r2311 | import pyramid.testing | |||
r4873 | registry = pyramid.testing.Registry(registry_name) | |||
r2317 | ||||
r5145 | global_config = {'__file__': ''} | |||
r2311 | config = pyramid.testing.setUp(registry=registry, request=request) | |||
r5145 | sanitize_settings_and_apply_defaults(global_config, config.registry.settings) | |||
r2311 | ||||
# allow pyramid lookup in testing | ||||
config.include('pyramid_mako') | ||||
r3748 | config.include('rhodecode.lib.rc_beaker') | |||
r2845 | config.include('rhodecode.lib.rc_cache') | |||
r5085 | config.include('rhodecode.lib.rc_cache.archive_cache') | |||
r2311 | add_events_routes(config) | |||
r2317 | ||||
r2311 | return config | |||
r2017 | def bootstrap_request(**kwargs): | |||
r4873 | """ | |||
Returns a thin version of Request Object that is used in non-web context like testing/celery | ||||
""" | ||||
r2095 | ||||
r4873 | import pyramid.testing | |||
from rhodecode.lib.request import ThinRequest as _ThinRequest | ||||
class ThinRequest(_ThinRequest): | ||||
r2095 | application_url = kwargs.pop('application_url', 'http://example.com') | |||
host = kwargs.pop('host', 'example.com:80') | ||||
domain = kwargs.pop('domain', 'example.com') | ||||
r4873 | class ThinSession(pyramid.testing.DummySession): | |||
r2095 | def save(*arg, **kw): | |||
pass | ||||
r4873 | request = ThinRequest(**kwargs) | |||
request.session = ThinSession() | ||||
r2017 | ||||
r2095 | return request | |||