# Copyright (C) 2016-2024 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import logging import os import string import functools import collections import urllib.request import urllib.parse import urllib.error log = logging.getLogger('rhodecode.' + __name__) class HookResponse: def __init__(self, status, output): self.status = status self.output = output def __add__(self, other): other_status = getattr(other, 'status', 0) new_status = max(self.status, other_status) other_output = getattr(other, 'output', '') new_output = self.output + other_output return HookResponse(new_status, new_output) def __bool__(self): return self.status == 0 def to_json(self): return {'status': self.status, 'output': self.output} def __repr__(self): return self.to_json().__repr__() class DotDict(dict): def __contains__(self, k): try: return dict.__contains__(self, k) or hasattr(self, k) except Exception: return False # only called if k not found in normal places def __getattr__(self, k): try: return object.__getattribute__(self, k) except AttributeError: try: return self[k] except KeyError: raise AttributeError(k) def __setattr__(self, k, v): try: object.__getattribute__(self, k) except AttributeError: try: self[k] = v except: raise AttributeError(k) else: object.__setattr__(self, k, v) def __delattr__(self, k): try: object.__getattribute__(self, k) except AttributeError: try: del self[k] except KeyError: raise AttributeError(k) else: object.__delattr__(self, k) def toDict(self): return unserialize(self) def __repr__(self): keys = list(self.keys()) keys.sort() args = ', '.join(['{}={!r}'.format(key, self[key]) for key in keys]) return '{}({})'.format(self.__class__.__name__, args) @staticmethod def fromDict(d): return serialize(d) def serialize(x): if isinstance(x, dict): return DotDict((k, serialize(v)) for k, v in x.items()) elif isinstance(x, (list, tuple)): return type(x)(serialize(v) for v in x) else: return x def unserialize(x): if isinstance(x, dict): return {k: unserialize(v) for k, v in x.items()} elif isinstance(x, (list, tuple)): return type(x)(unserialize(v) for v in x) else: return x def _verify_kwargs(func_name, expected_parameters, kwargs): """ Verify that exactly `expected_parameters` are passed in as `kwargs`. """ expected_parameters = set(expected_parameters) kwargs_keys = set(kwargs.keys()) if kwargs_keys != expected_parameters: missing_kwargs = expected_parameters - kwargs_keys unexpected_kwargs = kwargs_keys - expected_parameters raise AssertionError( "func:%s: missing parameters: %r, unexpected parameters: %s" % (func_name, missing_kwargs, unexpected_kwargs)) def has_kwargs(required_args): """ decorator to verify extension calls arguments. :param required_args: """ def wrap(func): def wrapper(*args, **kwargs): _verify_kwargs(func.__name__, required_args.keys(), kwargs) # in case there's `calls` defined on module we store the data maybe_log_call(func.__name__, args, kwargs) log.debug('Calling rcextensions function %s', func.__name__) return func(*args, **kwargs) return wrapper return wrap def maybe_log_call(name, args, kwargs): from rhodecode.config import rcextensions if hasattr(rcextensions, 'calls'): calls = rcextensions.calls calls[name].append((args, kwargs)) def str2bool(_str) -> bool: """ returns True/False value from given string, it tries to translate the string into boolean :param _str: string value to translate into boolean :rtype: boolean :returns: boolean from given string """ if _str is None: return False if _str in (True, False): return _str _str = str(_str).strip().lower() return _str in ('t', 'true', 'y', 'yes', 'on', '1') def aslist(obj, sep=None, strip=True): """ Returns given string separated by sep as list :param obj: :param sep: :param strip: """ if isinstance(obj, (str,)): lst = obj.split(sep) if strip: lst = [v.strip() for v in lst] return lst elif isinstance(obj, (list, tuple)): return obj elif obj is None: return [] else: return [obj] class UrlTemplate(string.Template): def safe_substitute(self, **kws): # url encode the kw for usage in url kws = {k: urllib.parse.quote(str(v)) for k, v in kws.items()} return super(UrlTemplate, self).safe_substitute(**kws)