# Copyright (C) 2011-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ """ Some simple helper functions """ import collections import datetime import dateutil.relativedelta import logging import re import sys import time import urllib.request import urllib.parse import urllib.error import urlobject import uuid import getpass import socket import errno import random import functools from contextlib import closing import pygments.lexers import sqlalchemy import sqlalchemy.event import sqlalchemy.engine.url import sqlalchemy.exc import sqlalchemy.sql import webob from pyramid.settings import asbool import rhodecode from rhodecode.translation import _, _pluralize from rhodecode.lib.str_utils import safe_str, safe_int, safe_bytes from rhodecode.lib.hash_utils import md5, md5_safe, sha1, sha1_safe from rhodecode.lib.type_utils import aslist, str2bool, StrictAttributeDict, AttributeDict def __get_lem(extra_mapping=None): """ Get language extension map based on what's inside pygments lexers """ d = collections.defaultdict(lambda: []) def __clean(s): s = s.lstrip('*') s = s.lstrip('.') if s.find('[') != -1: exts = [] start, stop = s.find('['), s.find(']') for suffix in s[start + 1:stop]: exts.append(s[:s.find('[')] + suffix) return [e.lower() for e in exts] else: return [s.lower()] for lx, t in sorted(pygments.lexers.LEXERS.items()): m = list(map(__clean, t[-2])) if m: m = functools.reduce(lambda x, y: x + y, m) for ext in m: desc = lx.replace('Lexer', '') d[ext].append(desc) data = dict(d) extra_mapping = extra_mapping or {} if extra_mapping: for k, v in list(extra_mapping.items()): if k not in data: # register new mapping2lexer data[k] = [v] return data def convert_line_endings(line: str, mode) -> str: """ Converts a given line "line end" accordingly to given mode Available modes are:: 0 - Unix 1 - Mac 2 - DOS :param line: given line to convert :param mode: mode to convert to :return: converted line according to mode """ if mode == 0: line = line.replace('\r\n', '\n') line = line.replace('\r', '\n') elif mode == 1: line = line.replace('\r\n', '\r') line = line.replace('\n', '\r') elif mode == 2: line = re.sub('\r(?!\n)|(? int: """ Detects line break for given line, if line break couldn't be found given default value is returned :param line: str line :param default: default :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS """ if line.endswith('\r\n'): return 2 elif line.endswith('\n'): return 0 elif line.endswith('\r'): return 1 else: return default def remove_suffix(s, suffix): if s.endswith(suffix): s = s[:-1 * len(suffix)] return s def remove_prefix(s, prefix): if s.startswith(prefix): s = s[len(prefix):] return s def find_calling_context(ignore_modules=None, depth=4, output_writer=None, indent=True): """ Look through the calling stack and return the frame which called this function and is part of core module ( ie. rhodecode.* ) :param ignore_modules: list of modules to ignore eg. ['rhodecode.lib'] :param depth: :param output_writer: :param indent: usage:: from rhodecode.lib.utils2 import find_calling_context calling_context = find_calling_context(ignore_modules=[ 'rhodecode.lib.caching_query', 'rhodecode.model.settings', ]) """ import inspect if not output_writer: try: from rich import print as pprint except ImportError: pprint = print output_writer = pprint frame = inspect.currentframe() cc = [] try: for i in range(depth): # current frame + 3 callers frame = frame.f_back if not frame: break info = inspect.getframeinfo(frame) name = frame.f_globals.get('__name__') if name not in ignore_modules: cc.insert(0, f'CALL_CONTEXT:{i}: file {info.filename}:{info.lineno} -> {info.function}') finally: # Avoids a reference cycle del frame output_writer('* INFO: This code was called from: *') for cnt, frm_info in enumerate(cc): if not indent: cnt = 1 output_writer(' ' * cnt + frm_info) def ping_connection(connection, branch): if branch: # "branch" refers to a sub-connection of a connection, # we don't want to bother pinging on these. return # turn off "close with result". This flag is only used with # "connectionless" execution, otherwise will be False in any case save_should_close_with_result = connection.should_close_with_result connection.should_close_with_result = False try: # run a SELECT 1. use a core select() so that # the SELECT of a scalar value without a table is # appropriately formatted for the backend connection.scalar(sqlalchemy.sql.select([1])) except sqlalchemy.exc.DBAPIError as err: # catch SQLAlchemy's DBAPIError, which is a wrapper # for the DBAPI's exception. It includes a .connection_invalidated # attribute which specifies if this connection is a "disconnect" # condition, which is based on inspection of the original exception # by the dialect in use. if err.connection_invalidated: # run the same SELECT again - the connection will re-validate # itself and establish a new connection. The disconnect detection # here also causes the whole connection pool to be invalidated # so that all stale connections are discarded. connection.scalar(sqlalchemy.sql.select([1])) else: raise finally: # restore "close with result" connection.should_close_with_result = save_should_close_with_result def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs): """Custom engine_from_config functions.""" log = logging.getLogger('sqlalchemy.engine') use_ping_connection = asbool(configuration.pop('sqlalchemy.db1.ping_connection', None)) debug = asbool(configuration.pop('sqlalchemy.db1.debug_query', None)) engine = sqlalchemy.engine_from_config(configuration, prefix, **kwargs) def color_sql(sql): color_seq = '\033[1;33m' # This is yellow: code 33 normal = '\x1b[0m' return ''.join([color_seq, sql, normal]) if use_ping_connection: log.debug('Adding ping_connection on the engine config.') sqlalchemy.event.listen(engine, "engine_connect", ping_connection) if debug: # attach events only for debug configuration def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): setattr(conn, 'query_start_time', time.time()) log.info(color_sql(">>>>> STARTING QUERY >>>>>")) find_calling_context(ignore_modules=[ 'rhodecode.lib.caching_query', 'rhodecode.model.settings', ], output_writer=log.info) def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): delattr(conn, 'query_start_time') sqlalchemy.event.listen(engine, "before_cursor_execute", before_cursor_execute) sqlalchemy.event.listen(engine, "after_cursor_execute", after_cursor_execute) return engine def get_encryption_key(config) -> bytes: secret = config.get('rhodecode.encrypted_values.secret') default = config['beaker.session.secret'] enc_key = secret or default return safe_bytes(enc_key) def age(prevdate, now=None, show_short_version=False, show_suffix=True, short_format=False): """ Turns a datetime into an age string. If show_short_version is True, this generates a shorter string with an approximate age; ex. '1 day ago', rather than '1 day and 23 hours ago'. * IMPORTANT* Code of this function is written in special way so it's easier to backport it to javascript. If you mean to update it, please also update `jquery.timeago-extension.js` file :param prevdate: datetime object :param now: get current time, if not define we use `datetime.datetime.now()` :param show_short_version: if it should approximate the date and return a shorter string :param show_suffix: :param short_format: show short format, eg 2D instead of 2 days :rtype: unicode :returns: unicode words describing age """ def _get_relative_delta(now, prevdate): base = dateutil.relativedelta.relativedelta(now, prevdate) return { 'year': base.years, 'month': base.months, 'day': base.days, 'hour': base.hours, 'minute': base.minutes, 'second': base.seconds, } def _is_leap_year(year): return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) def get_month(prevdate): return prevdate.month def get_year(prevdate): return prevdate.year now = now or datetime.datetime.now() order = ['year', 'month', 'day', 'hour', 'minute', 'second'] deltas = {} future = False if prevdate > now: now_old = now now = prevdate prevdate = now_old future = True if future: prevdate = prevdate.replace(microsecond=0) # Get date parts deltas for part in order: rel_delta = _get_relative_delta(now, prevdate) deltas[part] = rel_delta[part] # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00, # not 1 hour, -59 minutes and -59 seconds) offsets = [[5, 60], [4, 60], [3, 24]] for element in offsets: # seconds, minutes, hours num = element[0] length = element[1] part = order[num] carry_part = order[num - 1] if deltas[part] < 0: deltas[part] += length deltas[carry_part] -= 1 # Same thing for days except that the increment depends on the (variable) # number of days in the month month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] if deltas['day'] < 0: if get_month(prevdate) == 2 and _is_leap_year(get_year(prevdate)): deltas['day'] += 29 else: deltas['day'] += month_lengths[get_month(prevdate) - 1] deltas['month'] -= 1 if deltas['month'] < 0: deltas['month'] += 12 deltas['year'] -= 1 # Format the result if short_format: fmt_funcs = { 'year': lambda d: '%dy' % d, 'month': lambda d: '%dm' % d, 'day': lambda d: '%dd' % d, 'hour': lambda d: '%dh' % d, 'minute': lambda d: '%dmin' % d, 'second': lambda d: '%dsec' % d, } else: fmt_funcs = { 'year': lambda d: _pluralize('${num} year', '${num} years', d, mapping={'num': d}).interpolate(), 'month': lambda d: _pluralize('${num} month', '${num} months', d, mapping={'num': d}).interpolate(), 'day': lambda d: _pluralize('${num} day', '${num} days', d, mapping={'num': d}).interpolate(), 'hour': lambda d: _pluralize('${num} hour', '${num} hours', d, mapping={'num': d}).interpolate(), 'minute': lambda d: _pluralize('${num} minute', '${num} minutes', d, mapping={'num': d}).interpolate(), 'second': lambda d: _pluralize('${num} second', '${num} seconds', d, mapping={'num': d}).interpolate(), } i = 0 for part in order: value = deltas[part] if value != 0: if i < 5: sub_part = order[i + 1] sub_value = deltas[sub_part] else: sub_value = 0 if sub_value == 0 or show_short_version: _val = fmt_funcs[part](value) if future: if show_suffix: return _('in ${ago}', mapping={'ago': _val}) else: return _(_val) else: if show_suffix: return _('${ago} ago', mapping={'ago': _val}) else: return _(_val) val = fmt_funcs[part](value) val_detail = fmt_funcs[sub_part](sub_value) mapping = {'val': val, 'detail': val_detail} if short_format: datetime_tmpl = _('${val}, ${detail}', mapping=mapping) if show_suffix: datetime_tmpl = _('${val}, ${detail} ago', mapping=mapping) if future: datetime_tmpl = _('in ${val}, ${detail}', mapping=mapping) else: datetime_tmpl = _('${val} and ${detail}', mapping=mapping) if show_suffix: datetime_tmpl = _('${val} and ${detail} ago', mapping=mapping) if future: datetime_tmpl = _('in ${val} and ${detail}', mapping=mapping) return datetime_tmpl i += 1 return _('just now') def age_from_seconds(seconds): seconds = safe_int(seconds) or 0 prevdate = time_to_datetime(time.time() + seconds) return age(prevdate, show_suffix=False, show_short_version=True) def cleaned_uri(uri): """ Quotes '[' and ']' from uri if there is only one of them. according to RFC3986 we cannot use such chars in uri :param uri: :return: uri without this chars """ return urllib.parse.quote(uri, safe='@$:/') def credentials_filter(uri): """ Returns a url with removed credentials :param uri: """ import urlobject if isinstance(uri, rhodecode.lib.encrypt.InvalidDecryptedValue): return 'InvalidDecryptionKey' url_obj = urlobject.URLObject(cleaned_uri(uri)) url_obj = url_obj.without_password().without_username() return url_obj def get_host_info(request): """ Generate host info, to obtain full url e.g https://server.com use this `{scheme}://{netloc}` """ if not request: return {} qualified_home_url = request.route_url('home') parsed_url = urlobject.URLObject(qualified_home_url) decoded_path = safe_str(urllib.parse.unquote(parsed_url.path.rstrip('/'))) return { 'scheme': parsed_url.scheme, 'netloc': parsed_url.netloc+decoded_path, 'hostname': parsed_url.hostname, } def get_clone_url(request, uri_tmpl, repo_name, repo_id, repo_type, **override): qualified_home_url = request.route_url('home') parsed_url = urlobject.URLObject(qualified_home_url) decoded_path = safe_str(urllib.parse.unquote(parsed_url.path.rstrip('/'))) args = { 'scheme': parsed_url.scheme, 'user': '', 'sys_user': getpass.getuser(), # path if we use proxy-prefix 'netloc': parsed_url.netloc+decoded_path, 'hostname': parsed_url.hostname, 'prefix': decoded_path, 'repo': repo_name, 'repoid': str(repo_id), 'repo_type': repo_type } args.update(override) args['user'] = urllib.parse.quote(safe_str(args['user'])) for k, v in list(args.items()): tmpl_key = '{%s}' % k uri_tmpl = uri_tmpl.replace(tmpl_key, v) # special case for SVN clone url if repo_type == 'svn': uri_tmpl = uri_tmpl.replace('ssh://', 'svn+ssh://') # remove leading @ sign if it's present. Case of empty user url_obj = urlobject.URLObject(uri_tmpl) url = url_obj.with_netloc(url_obj.netloc.lstrip('@')) return safe_str(url) def get_commit_safe(repo, commit_id=None, commit_idx=None, pre_load=None, maybe_unreachable=False, reference_obj=None): """ Safe version of get_commit if this commit doesn't exists for a repository it returns a Dummy one instead :param repo: repository instance :param commit_id: commit id as str :param commit_idx: numeric commit index :param pre_load: optional list of commit attributes to load :param maybe_unreachable: translate unreachable commits on git repos :param reference_obj: explicitly search via a reference obj in git. E.g "branch:123" would mean branch "123" """ # TODO(skreft): remove these circular imports from rhodecode.lib.vcs.backends.base import BaseRepository, EmptyCommit from rhodecode.lib.vcs.exceptions import RepositoryError if not isinstance(repo, BaseRepository): raise Exception('You must pass an Repository ' 'object as first argument got %s', type(repo)) try: commit = repo.get_commit( commit_id=commit_id, commit_idx=commit_idx, pre_load=pre_load, maybe_unreachable=maybe_unreachable, reference_obj=reference_obj) except (RepositoryError, LookupError): commit = EmptyCommit() return commit def datetime_to_time(dt): if dt: return time.mktime(dt.timetuple()) def time_to_datetime(tm): if tm: if isinstance(tm, str): try: tm = float(tm) except ValueError: return return datetime.datetime.fromtimestamp(tm) def time_to_utcdatetime(tm): if tm: if isinstance(tm, str): try: tm = float(tm) except ValueError: return return datetime.datetime.utcfromtimestamp(tm) MENTIONS_REGEX = re.compile( # ^@ or @ without any special chars in front r'(?:^@|[^a-zA-Z0-9\-\_\.]@)' # main body starts with letter, then can be . - _ r'([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)', re.VERBOSE | re.MULTILINE) def extract_mentioned_users(s): """ Returns unique usernames from given string s that have @mention :param s: string to get mentions """ usrs = set() for username in MENTIONS_REGEX.findall(s): usrs.add(username) return sorted(list(usrs), key=lambda k: k.lower()) def fix_PATH(os_=None): """ Get current active python path, and append it to PATH variable to fix issues of subprocess calls and different python versions """ if os_ is None: import os else: os = os_ cur_path = os.path.split(sys.executable)[0] os_path = os.environ['PATH'] if not os.environ['PATH'].startswith(cur_path): os.environ['PATH'] = f'{cur_path}:{os_path}' def obfuscate_url_pw(engine): _url = engine or '' try: _url = sqlalchemy.engine.url.make_url(engine) except Exception: pass return repr(_url) def get_server_url(environ): req = webob.Request(environ) return req.host_url + req.script_name def unique_id(hexlen=32): alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz" return suuid(truncate_to=hexlen, alphabet=alphabet) def suuid(url=None, truncate_to=22, alphabet=None): """ Generate and return a short URL safe UUID. If the url parameter is provided, set the namespace to the provided URL and generate a UUID. :param url to get the uuid for :truncate_to: truncate the basic 22 UUID to shorter version The IDs won't be universally unique any longer, but the probability of a collision will still be very low. """ # Define our alphabet. _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ" # If no URL is given, generate a random UUID. if url is None: unique_id = uuid.uuid4().int else: unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int alphabet_length = len(_ALPHABET) output = [] while unique_id > 0: digit = unique_id % alphabet_length output.append(_ALPHABET[digit]) unique_id = int(unique_id / alphabet_length) return "".join(output)[:truncate_to] def get_current_rhodecode_user(request=None): """ Gets rhodecode user from request """ import pyramid.threadlocal pyramid_request = request or pyramid.threadlocal.get_current_request() # web case if pyramid_request and hasattr(pyramid_request, 'user'): return pyramid_request.user # api case if pyramid_request and hasattr(pyramid_request, 'rpc_user'): return pyramid_request.rpc_user return None def action_logger_generic(action, namespace=''): """ A generic logger for actions useful to the system overview, tries to find an acting user for the context of the call otherwise reports unknown user :param action: logging message eg 'comment 5 deleted' :param type: string :param namespace: namespace of the logging message eg. 'repo.comments' :param type: string """ logger_name = 'rhodecode.actions' if namespace: logger_name += '.' + namespace log = logging.getLogger(logger_name) # get a user if we can user = get_current_rhodecode_user() logfunc = log.info if not user: user = '' logfunc = log.warning logfunc(f'Logging action by {user}: {action}') def escape_split(text, sep=',', maxsplit=-1): r""" Allows for escaping of the separator: e.g. arg='foo\, bar' It should be noted that the way bash et. al. do command line parsing, those single quotes are required. """ escaped_sep = r'\%s' % sep if escaped_sep not in text: return text.split(sep, maxsplit) before, _mid, after = text.partition(escaped_sep) startlist = before.split(sep, maxsplit) # a regular split is fine here unfinished = startlist[-1] startlist = startlist[:-1] # recurse because there may be more escaped separators endlist = escape_split(after, sep, maxsplit) # finish building the escaped value. we use endlist[0] becaue the first # part of the string sent in recursion is the rest of the escaped value. unfinished += sep + endlist[0] return startlist + [unfinished] + endlist[1:] # put together all the parts class OptionalAttr(object): """ Special Optional Option that defines other attribute. Example:: def test(apiuser, userid=Optional(OAttr('apiuser')): user = Optional.extract(userid) # calls """ def __init__(self, attr_name): self.attr_name = attr_name def __repr__(self): return '' % self.attr_name def __call__(self): return self # alias OAttr = OptionalAttr class Optional(object): """ Defines an optional parameter:: param = param.getval() if isinstance(param, Optional) else param param = param() if isinstance(param, Optional) else param is equivalent of:: param = Optional.extract(param) """ def __init__(self, type_): self.type_ = type_ def __repr__(self): return '' % self.type_.__repr__() def __call__(self): return self.getval() def getval(self): """ returns value from this Optional instance """ if isinstance(self.type_, OAttr): # use params name return self.type_.attr_name return self.type_ @classmethod def extract(cls, val): """ Extracts value from Optional() instance :param val: :return: original value if it's not Optional instance else value of instance """ if isinstance(val, cls): return val.getval() return val def glob2re(pat): import fnmatch return fnmatch.translate(pat) def parse_byte_string(size_str): match = re.match(r'(\d+)(MB|KB)', size_str, re.IGNORECASE) if not match: raise ValueError(f'Given size:{size_str} is invalid, please make sure ' f'to use format of (MB|KB)') _parts = match.groups() num, type_ = _parts return int(num) * {'mb': 1024*1024, 'kb': 1024}[type_.lower()] class CachedProperty(object): """ Lazy Attributes. With option to invalidate the cache by running a method >>> class Foo(object): ... ... @CachedProperty ... def heavy_func(self): ... return 'super-calculation' ... ... foo = Foo() ... foo.heavy_func() # first computation ... foo.heavy_func() # fetch from cache ... foo._invalidate_prop_cache('heavy_func') # at this point calling foo.heavy_func() will be re-computed """ def __init__(self, func, func_name=None): if func_name is None: func_name = func.__name__ self.data = (func, func_name) functools.update_wrapper(self, func) def __get__(self, inst, class_): if inst is None: return self func, func_name = self.data value = func(inst) inst.__dict__[func_name] = value if '_invalidate_prop_cache' not in inst.__dict__: inst.__dict__['_invalidate_prop_cache'] = functools.partial( self._invalidate_prop_cache, inst) return value def _invalidate_prop_cache(self, inst, name): inst.__dict__.pop(name, None) def retry(func=None, exception=Exception, n_tries=5, delay=5, backoff=1, logger=True): """ Retry decorator with exponential backoff. Parameters ---------- func : typing.Callable, optional Callable on which the decorator is applied, by default None exception : Exception or tuple of Exceptions, optional Exception(s) that invoke retry, by default Exception n_tries : int, optional Number of tries before giving up, by default 5 delay : int, optional Initial delay between retries in seconds, by default 5 backoff : int, optional Backoff multiplier e.g. value of 2 will double the delay, by default 1 logger : bool, optional Option to log or print, by default False Returns ------- typing.Callable Decorated callable that calls itself when exception(s) occur. Examples -------- >>> import random >>> @retry(exception=Exception, n_tries=3) ... def test_random(text): ... x = random.random() ... if x < 0.5: ... raise Exception("Fail") ... else: ... print("Success: ", text) >>> test_random("It works!") """ if func is None: return functools.partial( retry, exception=exception, n_tries=n_tries, delay=delay, backoff=backoff, logger=logger, ) @functools.wraps(func) def wrapper(*args, **kwargs): _n_tries, n_delay = n_tries, delay log = logging.getLogger('rhodecode.retry') while _n_tries > 1: try: return func(*args, **kwargs) except exception as e: e_details = repr(e) msg = "Exception on calling func {func}: {e}, " \ "Retrying in {n_delay} seconds..."\ .format(func=func, e=e_details, n_delay=n_delay) if logger: log.warning(msg) else: print(msg) time.sleep(n_delay) _n_tries -= 1 n_delay *= backoff return func(*args, **kwargs) return wrapper def user_agent_normalizer(user_agent_raw, safe=True): log = logging.getLogger('rhodecode.user_agent_normalizer') ua = (user_agent_raw or '').strip().lower() ua = ua.replace('"', '') try: if 'mercurial/proto-1.0' in ua: ua = ua.replace('mercurial/proto-1.0', '') ua = ua.replace('(', '').replace(')', '').strip() ua = ua.replace('mercurial ', 'mercurial/') elif ua.startswith('git'): parts = ua.split(' ') if parts: ua = parts[0] ua = re.sub(r'\.windows\.\d', '', ua).strip() return ua except Exception: log.exception('Failed to parse scm user-agent') if not safe: raise return ua def get_available_port(min_port=40000, max_port=55555, use_range=False): hostname = '' for _check_port in range(min_port, max_port): pick_port = 0 if use_range: pick_port = random.randint(min_port, max_port) with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: try: s.bind((hostname, pick_port)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] except socket.error as e: if e.args[0] in [errno.EADDRINUSE, errno.ECONNREFUSED]: continue raise except OSError: continue