##// END OF EJS Templates
tests: fixed test suite for celery adoption
tests: fixed test suite for celery adoption

File last commit:

r5607:39b20522 default
r5607:39b20522 default
Show More
utils.py
206 lines | 5.8 KiB | text/x-python | PythonLexer
# Copyright (C) 2016-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import os
import string
import functools
import collections
import urllib.request
import urllib.parse
import urllib.error
log = logging.getLogger('rhodecode.' + __name__)
class HookResponse:
def __init__(self, status, output):
self.status = status
self.output = output
def __add__(self, other):
other_status = getattr(other, 'status', 0)
new_status = max(self.status, other_status)
other_output = getattr(other, 'output', '')
new_output = self.output + other_output
return HookResponse(new_status, new_output)
def __bool__(self):
return self.status == 0
def to_json(self):
return {'status': self.status, 'output': self.output}
def __repr__(self):
return self.to_json().__repr__()
class DotDict(dict):
def __contains__(self, k):
try:
return dict.__contains__(self, k) or hasattr(self, k)
except Exception:
return False
# only called if k not found in normal places
def __getattr__(self, k):
try:
return object.__getattribute__(self, k)
except AttributeError:
try:
return self[k]
except KeyError:
raise AttributeError(k)
def __setattr__(self, k, v):
try:
object.__getattribute__(self, k)
except AttributeError:
try:
self[k] = v
except:
raise AttributeError(k)
else:
object.__setattr__(self, k, v)
def __delattr__(self, k):
try:
object.__getattribute__(self, k)
except AttributeError:
try:
del self[k]
except KeyError:
raise AttributeError(k)
else:
object.__delattr__(self, k)
def toDict(self):
return unserialize(self)
def __repr__(self):
keys = list(self.keys())
keys.sort()
args = ', '.join(['{}={!r}'.format(key, self[key]) for key in keys])
return '{}({})'.format(self.__class__.__name__, args)
@staticmethod
def fromDict(d):
return serialize(d)
def serialize(x):
if isinstance(x, dict):
return DotDict((k, serialize(v)) for k, v in x.items())
elif isinstance(x, (list, tuple)):
return type(x)(serialize(v) for v in x)
else:
return x
def unserialize(x):
if isinstance(x, dict):
return {k: unserialize(v) for k, v in x.items()}
elif isinstance(x, (list, tuple)):
return type(x)(unserialize(v) for v in x)
else:
return x
def _verify_kwargs(func_name, expected_parameters, kwargs):
"""
Verify that exactly `expected_parameters` are passed in as `kwargs`.
"""
expected_parameters = set(expected_parameters)
kwargs_keys = set(kwargs.keys())
if kwargs_keys != expected_parameters:
missing_kwargs = expected_parameters - kwargs_keys
unexpected_kwargs = kwargs_keys - expected_parameters
raise AssertionError(
"func:%s: missing parameters: %r, unexpected parameters: %s" %
(func_name, missing_kwargs, unexpected_kwargs))
def has_kwargs(required_args):
"""
decorator to verify extension calls arguments.
:param required_args:
"""
def wrap(func):
def wrapper(*args, **kwargs):
_verify_kwargs(func.__name__, required_args.keys(), kwargs)
# in case there's `calls` defined on module we store the data
maybe_log_call(func.__name__, args, kwargs)
log.debug('Calling rcextensions function %s', func.__name__)
return func(*args, **kwargs)
return wrapper
return wrap
def maybe_log_call(name, args, kwargs):
from rhodecode.config import rcextensions
if hasattr(rcextensions, 'calls'):
calls = rcextensions.calls
calls[name].append((args, kwargs))
def str2bool(_str) -> bool:
"""
returns True/False value from given string, it tries to translate the
string into boolean
:param _str: string value to translate into boolean
:rtype: boolean
:returns: boolean from given string
"""
if _str is None:
return False
if _str in (True, False):
return _str
_str = str(_str).strip().lower()
return _str in ('t', 'true', 'y', 'yes', 'on', '1')
def aslist(obj, sep=None, strip=True):
"""
Returns given string separated by sep as list
:param obj:
:param sep:
:param strip:
"""
if isinstance(obj, (str,)):
lst = obj.split(sep)
if strip:
lst = [v.strip() for v in lst]
return lst
elif isinstance(obj, (list, tuple)):
return obj
elif obj is None:
return []
else:
return [obj]
class UrlTemplate(string.Template):
def safe_substitute(self, **kws):
# url encode the kw for usage in url
kws = {k: urllib.parse.quote(str(v)) for k, v in kws.items()}
return super(UrlTemplate, self).safe_substitute(**kws)