# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ The base Controller API Provides the BaseController class for subclassing. And usage in different controllers """ import logging import socket import base64 import markupsafe import ipaddress import paste.httpheaders from paste.auth.basic import AuthBasicAuthenticator from paste.httpexceptions import HTTPUnauthorized, HTTPForbidden, get_exception import rhodecode from rhodecode.authentication.base import VCS_TYPE from rhodecode.lib import auth, utils2 from rhodecode.lib import helpers as h from rhodecode.lib.auth import AuthUser, CookieStoreWrapper from rhodecode.lib.exceptions import UserCreationError from rhodecode.lib.utils import (password_changed, get_enabled_hook_classes) from rhodecode.lib.utils2 import AttributeDict from rhodecode.lib.str_utils import ascii_bytes, safe_int, safe_str from rhodecode.lib.type_utils import aslist, str2bool from rhodecode.lib.hash_utils import sha1 from rhodecode.model.db import Repository, User, ChangesetComment, UserBookmark from rhodecode.model.notification import NotificationModel from rhodecode.model.settings import VcsSettingsModel, SettingsModel log = logging.getLogger(__name__) def _filter_proxy(ip): """ Passed in IP addresses in HEADERS can be in a special format of multiple ips. Those comma separated IPs are passed from various proxies in the chain of request processing. The left-most being the original client. We only care about the first IP which came from the org. client. :param ip: ip string from headers """ if ',' in ip: _ips = ip.split(',') _first_ip = _ips[0].strip() log.debug('Got multiple IPs %s, using %s', ','.join(_ips), _first_ip) return _first_ip return ip def _filter_port(ip): """ Removes a port from ip, there are 4 main cases to handle here. - ipv4 eg. 127.0.0.1 - ipv6 eg. ::1 - ipv4+port eg. 127.0.0.1:8080 - ipv6+port eg. [::1]:8080 :param ip: """ def is_ipv6(ip_addr): if hasattr(socket, 'inet_pton'): try: socket.inet_pton(socket.AF_INET6, ip_addr) except socket.error: return False else: # fallback to ipaddress try: ipaddress.IPv6Address(safe_str(ip_addr)) except Exception: return False return True if ':' not in ip: # must be ipv4 pure ip return ip if '[' in ip and ']' in ip: # ipv6 with port return ip.split(']')[0][1:].lower() # must be ipv6 or ipv4 with port if is_ipv6(ip): return ip else: ip, _port = ip.split(':')[:2] # means ipv4+port return ip def get_ip_addr(environ): proxy_key = 'HTTP_X_REAL_IP' proxy_key2 = 'HTTP_X_FORWARDED_FOR' def_key = 'REMOTE_ADDR' def ip_filters(ip_): return _filter_port(_filter_proxy(ip_)) ip = environ.get(proxy_key) if ip: return ip_filters(ip) ip = environ.get(proxy_key2) if ip: return ip_filters(ip) ip = environ.get(def_key, '0.0.0.0') return ip_filters(ip) def get_server_ip_addr(environ, log_errors=True): hostname = environ.get('SERVER_NAME') try: return socket.gethostbyname(hostname) except Exception as e: if log_errors: # in some cases this lookup is not possible, and we don't want to # make it an exception in logs log.exception('Could not retrieve server ip address: %s', e) return hostname def get_server_port(environ): return environ.get('SERVER_PORT') def get_user_agent(environ): return environ.get('HTTP_USER_AGENT') def vcs_operation_context( environ, repo_name, username, action, scm, check_locking=True, is_shadow_repo=False, check_branch_perms=False, detect_force_push=False): """ Generate the context for a vcs operation, e.g. push or pull. This context is passed over the layers so that hooks triggered by the vcs operation know details like the user, the user's IP address etc. :param check_locking: Allows to switch of the computation of the locking data. This serves mainly the need of the simplevcs middleware to be able to disable this for certain operations. """ # Tri-state value: False: unlock, None: nothing, True: lock make_lock = None locked_by = [None, None, None] is_anonymous = username == User.DEFAULT_USER user = User.get_by_username(username) if not is_anonymous and check_locking: log.debug('Checking locking on repository "%s"', repo_name) repo = Repository.get_by_repo_name(repo_name) make_lock, __, locked_by = repo.get_locking_state( action, user.user_id) user_id = user.user_id settings_model = VcsSettingsModel(repo=repo_name) ui_settings = settings_model.get_ui_settings() # NOTE(marcink): This should be also in sync with # rhodecode/apps/ssh_support/lib/backends/base.py:update_environment scm_data store = [x for x in ui_settings if x.key == '/'] repo_store = '' if store: repo_store = store[0].value scm_data = { 'ip': get_ip_addr(environ), 'username': username, 'user_id': user_id, 'action': action, 'repository': repo_name, 'scm': scm, 'config': rhodecode.CONFIG['__file__'], 'repo_store': repo_store, 'make_lock': make_lock, 'locked_by': locked_by, 'server_url': utils2.get_server_url(environ), 'user_agent': get_user_agent(environ), 'hooks': get_enabled_hook_classes(ui_settings), 'is_shadow_repo': is_shadow_repo, 'detect_force_push': detect_force_push, 'check_branch_perms': check_branch_perms, } return scm_data class BasicAuth(AuthBasicAuthenticator): def __init__(self, realm, authfunc, registry, auth_http_code=None, initial_call_detection=False, acl_repo_name=None, rc_realm=''): super().__init__(realm=realm, authfunc=authfunc) self.realm = realm self.rc_realm = rc_realm self.initial_call = initial_call_detection self.authfunc = authfunc self.registry = registry self.acl_repo_name = acl_repo_name self._rc_auth_http_code = auth_http_code def _get_response_from_code(self, http_code, fallback): try: return get_exception(safe_int(http_code)) except Exception: log.exception('Failed to fetch response class for code %s, using fallback: %s', http_code, fallback) return fallback def get_rc_realm(self): return safe_str(self.rc_realm) def build_authentication(self): header = [('WWW-Authenticate', f'Basic realm="{self.realm}"')] # NOTE: the initial_Call detection seems to be not working/not needed witg latest Mercurial # investigate if we still need it. if self._rc_auth_http_code and not self.initial_call: # return alternative HTTP code if alternative http return code # is specified in RhodeCode config, but ONLY if it's not the # FIRST call custom_response_klass = self._get_response_from_code(self._rc_auth_http_code, fallback=HTTPUnauthorized) log.debug('Using custom response class: %s', custom_response_klass) return custom_response_klass(headers=header) return HTTPUnauthorized(headers=header) def authenticate(self, environ): authorization = paste.httpheaders.AUTHORIZATION(environ) if not authorization: return self.build_authentication() (auth_meth, auth_creds_b64) = authorization.split(' ', 1) if 'basic' != auth_meth.lower(): return self.build_authentication() credentials = safe_str(base64.b64decode(auth_creds_b64.strip())) _parts = credentials.split(':', 1) if len(_parts) == 2: username, password = _parts auth_data = self.authfunc( username, password, environ, VCS_TYPE, registry=self.registry, acl_repo_name=self.acl_repo_name) if auth_data: return {'username': username, 'auth_data': auth_data} if username and password: # we mark that we actually executed authentication once, at # that point we can use the alternative auth code self.initial_call = False return self.build_authentication() __call__ = authenticate def calculate_version_hash(config): return sha1( config.get(b'beaker.session.secret', b'') + ascii_bytes(rhodecode.__version__) )[:8] def get_current_lang(request): return getattr(request, '_LOCALE_', request.locale_name) def attach_context_attributes(context, request, user_id=None, is_api=None): """ Attach variables into template context called `c`. """ config = request.registry.settings rc_config = SettingsModel().get_all_settings(cache=True, from_request=False) context.rc_config = rc_config context.rhodecode_version = rhodecode.__version__ context.rhodecode_edition = config.get('rhodecode.edition') context.rhodecode_edition_id = config.get('rhodecode.edition_id') # unique secret + version does not leak the version but keep consistency context.rhodecode_version_hash = calculate_version_hash(config) # Default language set for the incoming request context.language = get_current_lang(request) # Visual options context.visual = AttributeDict({}) # DB stored Visual Items context.visual.show_public_icon = str2bool( rc_config.get('rhodecode_show_public_icon')) context.visual.show_private_icon = str2bool( rc_config.get('rhodecode_show_private_icon')) context.visual.stylify_metatags = str2bool( rc_config.get('rhodecode_stylify_metatags')) context.visual.dashboard_items = safe_int( rc_config.get('rhodecode_dashboard_items', 100)) context.visual.admin_grid_items = safe_int( rc_config.get('rhodecode_admin_grid_items', 100)) context.visual.show_revision_number = str2bool( rc_config.get('rhodecode_show_revision_number', True)) context.visual.show_sha_length = safe_int( rc_config.get('rhodecode_show_sha_length', 100)) context.visual.repository_fields = str2bool( rc_config.get('rhodecode_repository_fields')) context.visual.show_version = str2bool( rc_config.get('rhodecode_show_version')) context.visual.use_gravatar = str2bool( rc_config.get('rhodecode_use_gravatar')) context.visual.gravatar_url = rc_config.get('rhodecode_gravatar_url') context.visual.default_renderer = rc_config.get( 'rhodecode_markup_renderer', 'rst') context.visual.comment_types = ChangesetComment.COMMENT_TYPES context.visual.rhodecode_support_url = \ rc_config.get('rhodecode_support_url') or h.route_url('rhodecode_support') context.visual.affected_files_cut_off = 60 context.pre_code = rc_config.get('rhodecode_pre_code') context.post_code = rc_config.get('rhodecode_post_code') context.rhodecode_name = rc_config.get('rhodecode_title') context.default_encodings = aslist(config.get('default_encoding'), sep=',') # if we have specified default_encoding in the request, it has more # priority if request.GET.get('default_encoding'): context.default_encodings.insert(0, request.GET.get('default_encoding')) context.clone_uri_tmpl = rc_config.get('rhodecode_clone_uri_tmpl') context.clone_uri_id_tmpl = rc_config.get('rhodecode_clone_uri_id_tmpl') context.clone_uri_ssh_tmpl = rc_config.get('rhodecode_clone_uri_ssh_tmpl') # INI stored context.labs_active = str2bool( config.get('labs_settings_active', 'false')) context.ssh_enabled = str2bool( config.get('ssh.generate_authorized_keyfile', 'false')) context.ssh_key_generator_enabled = str2bool( config.get('ssh.enable_ui_key_generator', 'true')) context.visual.allow_custom_hooks_settings = str2bool( config.get('allow_custom_hooks_settings', True)) context.debug_style = str2bool(config.get('debug_style', False)) context.rhodecode_instanceid = config.get('instance_id') context.visual.cut_off_limit_diff = safe_int( config.get('cut_off_limit_diff'), default=0) context.visual.cut_off_limit_file = safe_int( config.get('cut_off_limit_file'), default=0) context.license = AttributeDict({}) context.license.hide_license_info = str2bool( config.get('license.hide_license_info', False)) # AppEnlight context.appenlight_enabled = config.get('appenlight', False) context.appenlight_api_public_key = config.get( 'appenlight.api_public_key', '') context.appenlight_server_url = config.get('appenlight.server_url', '') diffmode = { "unified": "unified", "sideside": "sideside" }.get(request.GET.get('diffmode')) if is_api is not None: is_api = hasattr(request, 'rpc_user') session_attrs = { # defaults "clone_url_format": "http", "diffmode": "sideside", "license_fingerprint": request.session.get('license_fingerprint') } if not is_api: # don't access pyramid session for API calls if diffmode and diffmode != request.session.get('rc_user_session_attr.diffmode'): request.session['rc_user_session_attr.diffmode'] = diffmode # session settings per user for k, v in list(request.session.items()): pref = 'rc_user_session_attr.' if k and k.startswith(pref): k = k[len(pref):] session_attrs[k] = v context.user_session_attrs = session_attrs # JS template context context.template_context = { 'repo_name': None, 'repo_type': None, 'repo_landing_commit': None, 'rhodecode_user': { 'username': None, 'email': None, 'notification_status': False }, 'session_attrs': session_attrs, 'visual': { 'default_renderer': None }, 'commit_data': { 'commit_id': None }, 'pull_request_data': {'pull_request_id': None}, 'timeago': { 'refresh_time': 120 * 1000, 'cutoff_limit': 1000 * 60 * 60 * 24 * 7 }, 'pyramid_dispatch': { }, 'extra': {'plugins': {}} } # END CONFIG VARS if is_api: csrf_token = None else: csrf_token = auth.get_csrf_token(session=request.session) context.csrf_token = csrf_token context.backends = list(rhodecode.BACKENDS.keys()) unread_count = 0 user_bookmark_list = [] if user_id: unread_count = NotificationModel().get_unread_cnt_for_user(user_id) user_bookmark_list = UserBookmark.get_bookmarks_for_user(user_id) context.unread_notifications = unread_count context.bookmark_items = user_bookmark_list # web case if hasattr(request, 'user'): context.auth_user = request.user context.rhodecode_user = request.user # api case if hasattr(request, 'rpc_user'): context.auth_user = request.rpc_user context.rhodecode_user = request.rpc_user # attach the whole call context to the request request.set_call_context(context) def get_auth_user(request): environ = request.environ session = request.session ip_addr = get_ip_addr(environ) # make sure that we update permissions each time we call controller _auth_token = ( # ?auth_token=XXX request.GET.get('auth_token', '') # ?api_key=XXX !LEGACY or request.GET.get('api_key', '') # or headers.... or request.headers.get('X-Rc-Auth-Token', '') ) if not _auth_token and request.matchdict: url_auth_token = request.matchdict.get('_auth_token') _auth_token = url_auth_token if _auth_token: log.debug('Using URL extracted auth token `...%s`', _auth_token[-4:]) if _auth_token: # when using API_KEY we assume user exists, and # doesn't need auth based on cookies. auth_user = AuthUser(api_key=_auth_token, ip_addr=ip_addr) authenticated = False else: cookie_store = CookieStoreWrapper(session.get('rhodecode_user')) try: auth_user = AuthUser(user_id=cookie_store.get('user_id', None), ip_addr=ip_addr) except UserCreationError as e: h.flash(e, 'error') # container auth or other auth functions that create users # on the fly can throw this exception signaling that there's # issue with user creation, explanation should be provided # in Exception itself. We then create a simple blank # AuthUser auth_user = AuthUser(ip_addr=ip_addr) # in case someone changes a password for user it triggers session # flush and forces a re-login if password_changed(auth_user, session): session.invalidate() cookie_store = CookieStoreWrapper(session.get('rhodecode_user')) auth_user = AuthUser(ip_addr=ip_addr) authenticated = cookie_store.get('is_authenticated') if not auth_user.is_authenticated and auth_user.is_user_object: # user is not authenticated and not empty auth_user.set_authenticated(authenticated) return auth_user, _auth_token def h_filter(s): """ Custom filter for Mako templates. Mako by standard uses `markupsafe.escape` we wrap this with additional functionality that converts None to empty strings """ if s is None: return markupsafe.Markup() return markupsafe.escape(s) def add_events_routes(config): """ Adds routing that can be used in events. Because some events are triggered outside of pyramid context, we need to bootstrap request with some routing registered """ from rhodecode.apps._base import ADMIN_PREFIX config.add_route(name='home', pattern='/') config.add_route(name='main_page_repos_data', pattern='/_home_repos') config.add_route(name='main_page_repo_groups_data', pattern='/_home_repo_groups') config.add_route(name='login', pattern=ADMIN_PREFIX + '/login') config.add_route(name='logout', pattern=ADMIN_PREFIX + '/logout') config.add_route(name='repo_summary', pattern='/{repo_name}') config.add_route(name='repo_summary_explicit', pattern='/{repo_name}/summary') config.add_route(name='repo_group_home', pattern='/{repo_group_name}') config.add_route(name='pullrequest_show', pattern='/{repo_name}/pull-request/{pull_request_id}') config.add_route(name='pull_requests_global', pattern='/pull-request/{pull_request_id}') config.add_route(name='repo_commit', pattern='/{repo_name}/changeset/{commit_id}') config.add_route(name='repo_files', pattern='/{repo_name}/files/{commit_id}/{f_path}') config.add_route(name='hovercard_user', pattern='/_hovercard/user/{user_id}') config.add_route(name='hovercard_user_group', pattern='/_hovercard/user_group/{user_group_id}') config.add_route(name='hovercard_pull_request', pattern='/_hovercard/pull_request/{pull_request_id}') config.add_route(name='hovercard_repo_commit', pattern='/_hovercard/commit/{repo_name}/{commit_id}') def bootstrap_config(request, registry_name='RcTestRegistry'): from rhodecode.config.config_maker import sanitize_settings_and_apply_defaults import pyramid.testing registry = pyramid.testing.Registry(registry_name) global_config = {'__file__': ''} config = pyramid.testing.setUp(registry=registry, request=request) sanitize_settings_and_apply_defaults(global_config, config.registry.settings) # allow pyramid lookup in testing config.include('pyramid_mako') config.include('rhodecode.lib.rc_beaker') config.include('rhodecode.lib.rc_cache') config.include('rhodecode.lib.rc_cache.archive_cache') add_events_routes(config) return config def bootstrap_request(**kwargs): """ Returns a thin version of Request Object that is used in non-web context like testing/celery """ import pyramid.testing from rhodecode.lib.request import ThinRequest as _ThinRequest class ThinRequest(_ThinRequest): application_url = kwargs.pop('application_url', 'http://example.com') host = kwargs.pop('host', 'example.com:80') domain = kwargs.pop('domain', 'example.com') class ThinSession(pyramid.testing.DummySession): def save(*arg, **kw): pass request = ThinRequest(**kwargs) request.session = ThinSession() return request