##// END OF EJS Templates
vcsserver: fixed settings maker tests
super-admin -
r1022:ff345e8f default
parent child Browse files
Show More
@@ -1,177 +1,190 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import textwrap
23 23 import string
24 24 import functools
25 25 import logging
26 26 import tempfile
27 27 import logging.config
28 28 log = logging.getLogger(__name__)
29 29
30 30
31 31 def str2bool(_str):
32 32 """
33 33 returns True/False value from given string, it tries to translate the
34 34 string into boolean
35 35
36 36 :param _str: string value to translate into boolean
37 37 :rtype: boolean
38 38 :returns: boolean from given string
39 39 """
40 40 if _str is None:
41 41 return False
42 42 if _str in (True, False):
43 43 return _str
44 44 _str = str(_str).strip().lower()
45 45 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
46 46
47 47
48 48 def aslist(obj, sep=None, strip=True):
49 49 """
50 50 Returns given string separated by sep as list
51 51
52 52 :param obj:
53 53 :param sep:
54 54 :param strip:
55 55 """
56 56 if isinstance(obj, (basestring,)):
57 if obj in ['', ""]:
58 return []
59
57 60 lst = obj.split(sep)
58 61 if strip:
59 62 lst = [v.strip() for v in lst]
60 63 return lst
61 64 elif isinstance(obj, (list, tuple)):
62 65 return obj
63 66 elif obj is None:
64 67 return []
65 68 else:
66 69 return [obj]
67 70
68 71
69 72 class SettingsMaker(object):
70 73
71 74 def __init__(self, app_settings):
72 75 self.settings = app_settings
73 76
74 77 @classmethod
75 78 def _bool_func(cls, input_val):
76 79 if isinstance(input_val, unicode):
77 80 input_val = input_val.encode('utf8')
78 81 return str2bool(input_val)
79 82
80 83 @classmethod
81 84 def _int_func(cls, input_val):
82 85 return int(input_val)
83 86
84 87 @classmethod
85 88 def _list_func(cls, input_val, sep=','):
86 89 return aslist(input_val, sep=sep)
87 90
88 91 @classmethod
89 92 def _string_func(cls, input_val, lower=True):
90 93 if lower:
91 94 input_val = input_val.lower()
92 95 return input_val
93 96
94 97 @classmethod
95 98 def _float_func(cls, input_val):
96 99 return float(input_val)
97 100
98 101 @classmethod
99 102 def _dir_func(cls, input_val, ensure_dir=False, mode=0o755):
100 103
101 104 # ensure we have our dir created
102 105 if not os.path.isdir(input_val) and ensure_dir:
103 106 os.makedirs(input_val, mode=mode)
104 107
105 108 if not os.path.isdir(input_val):
106 109 raise Exception('Dir at {} does not exist'.format(input_val))
107 110 return input_val
108 111
109 112 @classmethod
110 113 def _file_path_func(cls, input_val, ensure_dir=False, mode=0o755):
111 114 dirname = os.path.dirname(input_val)
112 115 cls._dir_func(dirname, ensure_dir=ensure_dir)
113 116 return input_val
114 117
115 118 @classmethod
116 119 def _key_transformator(cls, key):
117 120 return "{}_{}".format('RC'.upper(), key.upper().replace('.', '_').replace('-', '_'))
118 121
119 def enable_logging(self, logging_conf=None):
122 def enable_logging(self, logging_conf=None, level='INFO', formatter='generic'):
120 123 """
121 124 Helper to enable debug on running instance
122 125 :return:
123 126 """
127
124 128 if not str2bool(self.settings.get('logging.autoconfigure')):
125 129 log.info('logging configuration based on main .ini file')
126 130 return
127 131
128 132 if logging_conf is None:
129 133 logging_conf = self.settings.get('logging.logging_conf_file') or ''
130 134
131 135 if not os.path.isfile(logging_conf):
132 136 log.error('Unable to setup logging based on %s, file does not exist...', logging_conf)
133 137 return
134 138
135 139 with open(logging_conf, 'rb') as f:
136 140 ini_template = textwrap.dedent(f.read())
137 141 ini_template = string.Template(ini_template).safe_substitute(
142 RC_LOGGING_LEVEL=os.environ.get('RC_LOGGING_LEVEL', '') or level,
143 RC_LOGGING_FORMATTER=os.environ.get('RC_LOGGING_FORMATTER', '') or formatter
144 )
145
146
147 with open(logging_conf, 'rb') as f:
148 ini_template = textwrap.dedent(f.read())
149 ini_template = string.Template(ini_template).safe_substitute(
138 150 RC_LOGGING_LEVEL=os.environ.get('RC_LOGGING_LEVEL', '') or 'INFO',
139 151 RC_LOGGING_FORMATTER=os.environ.get('RC_LOGGING_FORMATTER', '') or 'generic'
140 152 )
141 153
142 154 with tempfile.NamedTemporaryFile(prefix='rc_logging_', suffix='.ini', delete=False) as f:
143 155 log.info('Saved Temporary LOGGING config at %s', f.name)
144 156 f.write(ini_template)
145 157
146 158 logging.config.fileConfig(f.name)
147 159 os.remove(f.name)
148 160
149 161 def make_setting(self, key, default, lower=False, default_when_empty=False, parser=None):
150 162
151 163 input_val = self.settings.get(key, default)
152 164
153 165 if default_when_empty and not input_val:
154 166 # use default value when value is set in the config but it is empty
155 167 input_val = default
156 168
157 169 parser_func = {
158 170 'bool': self._bool_func,
159 171 'int': self._int_func,
160 172 'list': self._list_func,
161 173 'list:newline': functools.partial(self._list_func, sep='/n'),
174 'list:spacesep': functools.partial(self._list_func, sep=' '),
162 175 'string': functools.partial(self._string_func, lower=lower),
163 176 'dir': self._dir_func,
164 177 'dir:ensured': functools.partial(self._dir_func, ensure_dir=True),
165 178 'file': self._file_path_func,
166 179 'file:ensured': functools.partial(self._file_path_func, ensure_dir=True),
167 180 None: lambda i: i
168 181 }[parser]
169 182
170 183 # now maybe we have this KEY in env, search and use the value with higher priority.
171 184 transformed_key = self._key_transformator(key)
172 185 envvar_value = os.environ.get(transformed_key)
173 186 if envvar_value:
174 187 log.debug('using `%s` key instead of `%s` key for config', transformed_key, key)
175 188 input_val = envvar_value
176 189 self.settings[key] = parser_func(input_val)
177 190 return self.settings[key]
@@ -1,39 +1,42 b''
1 1 """
2 2 Tests used to profile the HTTP based implementation.
3 3 """
4 4
5 5 import pytest
6 6 import webtest
7 7
8 8 from vcsserver.http_main import main
9 9
10 10
11 11 @pytest.fixture
12 12 def vcs_app():
13 13 stub_settings = {
14 14 'dev.use_echo_app': 'true',
15 15 'locale': 'en_US.UTF-8',
16 16 }
17 vcs_app = main({}, **stub_settings)
17 stub_global_conf = {
18 '__file__': ''
19 }
20 vcs_app = main(stub_global_conf, **stub_settings)
18 21 app = webtest.TestApp(vcs_app)
19 22 return app
20 23
21 24
22 25 @pytest.fixture(scope='module')
23 26 def data():
24 27 one_kb = 'x' * 1024
25 28 return one_kb * 1024 * 10
26 29
27 30
28 31 def test_http_app_streaming_with_data(data, repeat, vcs_app):
29 32 app = vcs_app
30 33 for x in xrange(repeat / 10):
31 34 response = app.post('/stream/git/', params=data)
32 35 assert response.status_code == 200
33 36
34 37
35 38 def test_http_app_streaming_no_data(repeat, vcs_app):
36 39 app = vcs_app
37 40 for x in xrange(repeat / 10):
38 41 response = app.post('/stream/git/')
39 42 assert response.status_code == 200
@@ -1,57 +1,57 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import mock
19 19 import pytest
20 20
21 21 from vcsserver import http_main
22 22 from vcsserver.base import obfuscate_qs
23 23
24 24
25 25 @mock.patch('vcsserver.http_main.VCS', mock.Mock())
26 26 @mock.patch('vcsserver.hgpatches.patch_largefiles_capabilities')
27 27 def test_applies_largefiles_patch(patch_largefiles_capabilities):
28 http_main.main({})
28 http_main.main({'__file__': ''})
29 29 patch_largefiles_capabilities.assert_called_once_with()
30 30
31 31
32 32 @mock.patch('vcsserver.http_main.VCS', mock.Mock())
33 33 @mock.patch('vcsserver.http_main.MercurialFactory', None)
34 34 @mock.patch(
35 35 'vcsserver.hgpatches.patch_largefiles_capabilities',
36 36 mock.Mock(side_effect=Exception("Must not be called")))
37 37 def test_applies_largefiles_patch_only_if_mercurial_is_available():
38 http_main.main({})
38 http_main.main({'__file__': ''})
39 39
40 40
41 41 @pytest.mark.parametrize('given, expected', [
42 42 ('bad', 'bad'),
43 43 ('query&foo=bar', 'query&foo=bar'),
44 44 ('equery&auth_token=bar', 'equery&auth_token=*****'),
45 45 ('a;b;c;query&foo=bar&auth_token=secret',
46 46 'a&b&c&query&foo=bar&auth_token=*****'),
47 47 ('', ''),
48 48 (None, None),
49 49 ('foo=bar', 'foo=bar'),
50 50 ('auth_token=secret', 'auth_token=*****'),
51 51 ('auth_token=secret&api_key=secret2',
52 52 'auth_token=*****&api_key=*****'),
53 53 ('auth_token=secret&api_key=secret2&param=value',
54 54 'auth_token=*****&api_key=*****&param=value'),
55 55 ])
56 56 def test_obfuscate_qs(given, expected):
57 57 assert expected == obfuscate_qs(given)
General Comments 0
You need to be logged in to leave comments. Login now