##// END OF EJS Templates
tests: rewrote code for running vcs_operations. Now it starts it's own...
marcink -
r2457:240dad60 default
parent child Browse files
Show More
@@ -0,0 +1,194 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21
22 import os
23 import time
24 import tempfile
25 import pytest
26 import subprocess32
27 import configobj
28
29 from urllib2 import urlopen, URLError
30 from pyramid.compat import configparser
31
32
33 from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
34 from rhodecode.tests.utils import is_url_reachable
35
36
37 def get_port(pyramid_config):
38 config = configparser.ConfigParser()
39 config.read(pyramid_config)
40 return config.get('server:main', 'port')
41
42
43 def get_host_url(pyramid_config):
44 """Construct the host url using the port in the test configuration."""
45 return '127.0.0.1:%s' % get_port(pyramid_config)
46
47
48 def assert_no_running_instance(url):
49 if is_url_reachable(url):
50 print("Hint: Usually this means another instance of server "
51 "is running in the background at %s." % url)
52 pytest.fail(
53 "Port is not free at %s, cannot start server at" % url)
54
55
56 class ServerBase(object):
57 _args = []
58 log_file_name = 'NOT_DEFINED.log'
59 status_url_tmpl = 'http://{host}:{port}'
60
61 def __init__(self, config_file, log_file):
62 self.config_file = config_file
63 config_data = configobj.ConfigObj(config_file)
64 self._config = config_data['server:main']
65
66 self._args = []
67 self.log_file = log_file or os.path.join(
68 tempfile.gettempdir(), self.log_file_name)
69 self.process = None
70 self.server_out = None
71 print("Using the {} configuration:{}".format(
72 self.__class__.__name__, config_file))
73
74 if not os.path.isfile(config_file):
75 raise RuntimeError('Failed to get config at {}'.format(config_file))
76
77 @property
78 def command(self):
79 return ' '.join(self._args)
80
81 @property
82 def http_url(self):
83 template = 'http://{host}:{port}/'
84 return template.format(**self._config)
85
86 def host_url(self):
87 return 'http://' + get_host_url(self.config_file)
88
89 def get_rc_log(self):
90 with open(self.log_file) as f:
91 return f.read()
92
93 def wait_until_ready(self, timeout=15):
94 host = self._config['host']
95 port = self._config['port']
96 status_url = self.status_url_tmpl.format(host=host, port=port)
97 start = time.time()
98
99 while time.time() - start < timeout:
100 try:
101 urlopen(status_url)
102 break
103 except URLError:
104 time.sleep(0.2)
105 else:
106 pytest.exit(
107 "Starting the {} failed or took more than {} "
108 "seconds. cmd: `{}`".format(
109 self.__class__.__name__, timeout, self.command))
110
111 def shutdown(self):
112 self.process.kill()
113 self.server_out.flush()
114 self.server_out.close()
115
116 def get_log_file_with_port(self):
117 log_file = list(self.log_file.partition('.log'))
118 log_file.insert(1, get_port(self.config_file))
119 log_file = ''.join(log_file)
120 return log_file
121
122
123 class RcVCSServer(ServerBase):
124 """
125 Represents a running VCSServer instance.
126 """
127
128 log_file_name = 'rc-vcsserver.log'
129 status_url_tmpl = 'http://{host}:{port}/status'
130
131 def __init__(self, config_file, log_file=None):
132 super(RcVCSServer, self).__init__(config_file, log_file)
133 self._args = [
134 'gunicorn', '--paste', self.config_file]
135
136 def start(self):
137 env = os.environ.copy()
138
139 self.log_file = self.get_log_file_with_port()
140 self.server_out = open(self.log_file, 'w')
141
142 host_url = self.host_url()
143 assert_no_running_instance(host_url)
144
145 print('rhodecode-vcsserver starting at: {}'.format(host_url))
146 print('rhodecode-vcsserver command: {}'.format(self.command))
147 print('rhodecode-vcsserver logfile: {}'.format(self.log_file))
148
149 self.process = subprocess32.Popen(
150 self._args, bufsize=0, env=env,
151 stdout=self.server_out, stderr=self.server_out)
152
153
154 class RcWebServer(ServerBase):
155 """
156 Represents a running RCE web server used as a test fixture.
157 """
158
159 log_file_name = 'rc-web.log'
160 status_url_tmpl = 'http://{host}:{port}/_admin/ops/ping'
161
162 def __init__(self, config_file, log_file=None):
163 super(RcWebServer, self).__init__(config_file, log_file)
164 self._args = [
165 'gunicorn', '--worker-class', 'gevent', '--paste', config_file]
166
167 def start(self):
168 env = os.environ.copy()
169 env['RC_NO_TMP_PATH'] = '1'
170
171 self.log_file = self.get_log_file_with_port()
172 self.server_out = open(self.log_file, 'w')
173
174 host_url = self.host_url()
175 assert_no_running_instance(host_url)
176
177 print('rhodecode-web starting at: {}'.format(host_url))
178 print('rhodecode-web command: {}'.format(self.command))
179 print('rhodecode-web logfile: {}'.format(self.log_file))
180
181 self.process = subprocess32.Popen(
182 self._args, bufsize=0, env=env,
183 stdout=self.server_out, stderr=self.server_out)
184
185 def repo_clone_url(self, repo_name, **kwargs):
186 params = {
187 'user': TEST_USER_ADMIN_LOGIN,
188 'passwd': TEST_USER_ADMIN_PASS,
189 'host': get_host_url(self.config_file),
190 'cloned_repo': repo_name,
191 }
192 params.update(**kwargs)
193 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params
194 return _url
@@ -1,83 +1,82 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests.utils import CustomTestApp
24 24 from rhodecode.lib.middleware.utils import scm_app_http, scm_app
25 25 from rhodecode.lib.vcs.conf import settings
26 26
27 27
28 28 def vcs_http_app(vcsserver_http_echo_app):
29 29 """
30 30 VcsHttpProxy wrapped in WebTest.
31 31 """
32 32 git_url = vcsserver_http_echo_app.http_url + 'stream/git/'
33 33 vcs_http_proxy = scm_app_http.VcsHttpProxy(
34 34 git_url, 'stub_path', 'stub_name', None)
35 35 app = CustomTestApp(vcs_http_proxy)
36 36 return app
37 37
38 38
39 39 @pytest.fixture(scope='module')
40 40 def vcsserver_http_echo_app(request, vcsserver_factory):
41 41 """
42 42 A running VCSServer with the EchoApp activated via HTTP.
43 43 """
44 44 vcsserver = vcsserver_factory(
45 45 request=request,
46 use_http=True,
47 46 overrides=[{'app:main': {'dev.use_echo_app': 'true'}}])
48 47 return vcsserver
49 48
50 49
51 50 @pytest.fixture(scope='session')
52 51 def data():
53 52 one_kb = "x" * 1024
54 53 return one_kb * 1024 * 10
55 54
56 55
57 56 def test_reuse_app_no_data(repeat, vcsserver_http_echo_app):
58 57 app = vcs_http_app(vcsserver_http_echo_app)
59 58 for x in xrange(repeat / 10):
60 59 response = app.post('/')
61 60 assert response.status_code == 200
62 61
63 62
64 63 def test_reuse_app_with_data(data, repeat, vcsserver_http_echo_app):
65 64 app = vcs_http_app(vcsserver_http_echo_app)
66 65 for x in xrange(repeat / 10):
67 66 response = app.post('/', params=data)
68 67 assert response.status_code == 200
69 68
70 69
71 70 def test_create_app_per_request_no_data(repeat, vcsserver_http_echo_app):
72 71 for x in xrange(repeat / 10):
73 72 app = vcs_http_app(vcsserver_http_echo_app)
74 73 response = app.post('/')
75 74 assert response.status_code == 200
76 75
77 76
78 77 def test_create_app_per_request_with_data(
79 78 data, repeat, vcsserver_http_echo_app):
80 79 for x in xrange(repeat / 10):
81 80 app = vcs_http_app(vcsserver_http_echo_app)
82 81 response = app.post('/', params=data)
83 82 assert response.status_code == 200
@@ -1,408 +1,297 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
22 21 import json
23 import time
24 22 import platform
25 23 import socket
26 import tempfile
27 import subprocess32
28 24
29 from urllib2 import urlopen, URLError
30
31 import configobj
32 25 import pytest
33 26
34
35 27 from rhodecode.lib.pyramid_utils import get_app_config
36 28 from rhodecode.tests.fixture import TestINI
37 from rhodecode.tests.vcs_operations.conftest import get_host_url, get_port
38
39 VCSSERVER_LOG = os.path.join(tempfile.gettempdir(), 'rc-vcsserver.log')
29 from rhodecode.tests.server_utils import RcVCSServer
40 30
41 31
42 32 def _parse_json(value):
43 33 return json.loads(value) if value else None
44 34
45 35
46 36 def pytest_addoption(parser):
47 37 parser.addoption(
48 38 '--test-loglevel', dest='test_loglevel',
49 39 help="Set default Logging level for tests, warn (default), info, debug")
50 40 group = parser.getgroup('pylons')
51 41 group.addoption(
52 42 '--with-pylons', dest='pyramid_config',
53 43 help="Set up a Pylons environment with the specified config file.")
54 44 group.addoption(
55 45 '--ini-config-override', action='store', type=_parse_json,
56 46 default=None, dest='pyramid_config_override', help=(
57 47 "Overrides the .ini file settings. Should be specified in JSON"
58 48 " format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
59 49 )
60 50 )
61 51 parser.addini(
62 52 'pyramid_config',
63 53 "Set up a Pyramid environment with the specified config file.")
64 54
65 55 vcsgroup = parser.getgroup('vcs')
66 56 vcsgroup.addoption(
67 57 '--without-vcsserver', dest='with_vcsserver', action='store_false',
68 58 help="Do not start the VCSServer in a background process.")
69 59 vcsgroup.addoption(
70 60 '--with-vcsserver-http', dest='vcsserver_config_http',
71 61 help="Start the HTTP VCSServer with the specified config file.")
72 62 vcsgroup.addoption(
73 63 '--vcsserver-protocol', dest='vcsserver_protocol',
74 64 help="Start the VCSServer with HTTP protocol support.")
75 65 vcsgroup.addoption(
76 66 '--vcsserver-config-override', action='store', type=_parse_json,
77 67 default=None, dest='vcsserver_config_override', help=(
78 68 "Overrides the .ini file settings for the VCSServer. "
79 69 "Should be specified in JSON "
80 70 "format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
81 71 )
82 72 )
83 73 vcsgroup.addoption(
84 74 '--vcsserver-port', action='store', type=int,
85 75 default=None, help=(
86 76 "Allows to set the port of the vcsserver. Useful when testing "
87 77 "against an already running server and random ports cause "
88 78 "trouble."))
89 79 parser.addini(
90 80 'vcsserver_config_http',
91 81 "Start the HTTP VCSServer with the specified config file.")
92 82 parser.addini(
93 83 'vcsserver_protocol',
94 84 "Start the VCSServer with HTTP protocol support.")
95 85
96 86
97 @pytest.fixture(scope="session")
98 def vcs_server_config_override(request):
99 """
100 Allows injecting the overrides by specifying this inside test class
101 """
102
103 return ()
104
105
106 87 @pytest.fixture(scope='session')
107 def vcsserver(request, vcsserver_port, vcsserver_factory, vcs_server_config_override):
88 def vcsserver(request, vcsserver_port, vcsserver_factory):
108 89 """
109 90 Session scope VCSServer.
110 91
111 92 Tests wich need the VCSServer have to rely on this fixture in order
112 93 to ensure it will be running.
113 94
114 95 For specific needs, the fixture vcsserver_factory can be used. It allows to
115 96 adjust the configuration file for the test run.
116 97
117 98 Command line args:
118 99
119 100 --without-vcsserver: Allows to switch this fixture off. You have to
120 101 manually start the server.
121 102
122 103 --vcsserver-port: Will expect the VCSServer to listen on this port.
123 104 """
124 105
125 106 if not request.config.getoption('with_vcsserver'):
126 107 return None
127 108
128 use_http = _use_vcs_http_server(request.config)
129 109 return vcsserver_factory(
130 request, use_http=use_http, vcsserver_port=vcsserver_port,
131 overrides=vcs_server_config_override)
110 request, vcsserver_port=vcsserver_port)
132 111
133 112
134 113 @pytest.fixture(scope='session')
135 114 def vcsserver_factory(tmpdir_factory):
136 115 """
137 116 Use this if you need a running vcsserver with a special configuration.
138 117 """
139 118
140 def factory(request, use_http=True, overrides=(), vcsserver_port=None):
119 def factory(request, overrides=(), vcsserver_port=None,
120 log_file=None):
141 121
142 122 if vcsserver_port is None:
143 123 vcsserver_port = get_available_port()
144 124
145 125 overrides = list(overrides)
146 if use_http:
147 126 overrides.append({'server:main': {'port': vcsserver_port}})
148 else:
149 overrides.append({'DEFAULT': {'port': vcsserver_port}})
150 127
151 128 if is_cygwin():
152 129 platform_override = {'DEFAULT': {
153 130 'beaker.cache.repo_object.type': 'nocache'}}
154 131 overrides.append(platform_override)
155 132
156 option_name = 'vcsserver_config_http' if use_http else ''
133 option_name = 'vcsserver_config_http'
157 134 override_option_name = 'vcsserver_config_override'
158 135 config_file = get_config(
159 136 request.config, option_name=option_name,
160 137 override_option_name=override_option_name, overrides=overrides,
161 138 basetemp=tmpdir_factory.getbasetemp().strpath,
162 139 prefix='test_vcs_')
163 140
164 print("Using the VCSServer configuration:{}".format(config_file))
165 ServerClass = HttpVCSServer if use_http else None
166 server = ServerClass(config_file)
141 server = RcVCSServer(config_file, log_file)
167 142 server.start()
168 143
169 144 @request.addfinalizer
170 145 def cleanup():
171 146 server.shutdown()
172 147
173 148 server.wait_until_ready()
174 149 return server
175 150
176 151 return factory
177 152
178 153
179 154 def is_cygwin():
180 155 return 'cygwin' in platform.system().lower()
181 156
182 157
183 def _use_vcs_http_server(config):
184 protocol_option = 'vcsserver_protocol'
185 protocol = (
186 config.getoption(protocol_option) or
187 config.getini(protocol_option) or
188 'http')
189 return protocol == 'http'
190
191
192 158 def _use_log_level(config):
193 159 level = config.getoption('test_loglevel') or 'warn'
194 160 return level.upper()
195 161
196 162
197 class VCSServer(object):
198 """
199 Represents a running VCSServer instance.
200 """
201
202 _args = []
203
204 def start(self):
205 print("Starting the VCSServer: {}".format(self._args))
206 self.process = subprocess32.Popen(self._args)
207
208 def wait_until_ready(self, timeout=30):
209 raise NotImplementedError()
210
211 def shutdown(self):
212 self.process.kill()
213
214
215 class HttpVCSServer(VCSServer):
216 """
217 Represents a running VCSServer instance.
218 """
219 def __init__(self, config_file):
220 self.config_file = config_file
221 config_data = configobj.ConfigObj(config_file)
222 self._config = config_data['server:main']
223
224 args = ['gunicorn', '--paste', config_file]
225 self._args = args
226
227 @property
228 def http_url(self):
229 template = 'http://{host}:{port}/'
230 return template.format(**self._config)
231
232 def start(self):
233 env = os.environ.copy()
234 host_url = 'http://' + get_host_url(self.config_file)
235
236 rc_log = list(VCSSERVER_LOG.partition('.log'))
237 rc_log.insert(1, get_port(self.config_file))
238 rc_log = ''.join(rc_log)
239
240 server_out = open(rc_log, 'w')
241
242 command = ' '.join(self._args)
243 print('rhodecode-vcsserver starting at: {}'.format(host_url))
244 print('rhodecode-vcsserver command: {}'.format(command))
245 print('rhodecode-vcsserver logfile: {}'.format(rc_log))
246 self.process = subprocess32.Popen(
247 self._args, bufsize=0, env=env, stdout=server_out, stderr=server_out)
248
249 def wait_until_ready(self, timeout=30):
250 host = self._config['host']
251 port = self._config['port']
252 status_url = 'http://{host}:{port}/status'.format(host=host, port=port)
253 start = time.time()
254
255 while time.time() - start < timeout:
256 try:
257 urlopen(status_url)
258 break
259 except URLError:
260 time.sleep(0.2)
261 else:
262 pytest.exit(
263 "Starting the VCSServer failed or took more than {} "
264 "seconds. cmd: `{}`".format(timeout, ' '.join(self._args)))
265
266 def shutdown(self):
267 self.process.kill()
268
269
270 163 @pytest.fixture(scope='session')
271 164 def ini_config(request, tmpdir_factory, rcserver_port, vcsserver_port):
272 165 option_name = 'pyramid_config'
273 166 log_level = _use_log_level(request.config)
274 167
275 168 overrides = [
276 169 {'server:main': {'port': rcserver_port}},
277 170 {'app:main': {
278 171 'vcs.server': 'localhost:%s' % vcsserver_port,
279 172 # johbo: We will always start the VCSServer on our own based on the
280 173 # fixtures of the test cases. For the test run it must always be
281 174 # off in the INI file.
282 175 'vcs.start_server': 'false',
176
177 'vcs.server.protocol': 'http',
178 'vcs.scm_app_implementation': 'http',
179 'vcs.hooks.protocol': 'http',
283 180 }},
284 181
285 182 {'handler_console': {
286 183 'class ': 'StreamHandler',
287 184 'args ': '(sys.stderr,)',
288 185 'level': log_level,
289 186 }},
290 187
291 188 ]
292 if _use_vcs_http_server(request.config):
293 overrides.append({
294 'app:main': {
295 'vcs.server.protocol': 'http',
296 'vcs.scm_app_implementation': 'http',
297 'vcs.hooks.protocol': 'http',
298 }
299 })
300 189
301 190 filename = get_config(
302 191 request.config, option_name=option_name,
303 192 override_option_name='{}_override'.format(option_name),
304 193 overrides=overrides,
305 194 basetemp=tmpdir_factory.getbasetemp().strpath,
306 195 prefix='test_rce_')
307 196 return filename
308 197
309 198
310 199 @pytest.fixture(scope='session')
311 200 def ini_settings(ini_config):
312 201 ini_path = ini_config
313 202 return get_app_config(ini_path)
314 203
315 204
205 def get_available_port():
206 family = socket.AF_INET
207 socktype = socket.SOCK_STREAM
208 host = '127.0.0.1'
209
210 mysocket = socket.socket(family, socktype)
211 mysocket.bind((host, 0))
212 port = mysocket.getsockname()[1]
213 mysocket.close()
214 del mysocket
215 return port
216
217
316 218 @pytest.fixture(scope='session')
317 219 def rcserver_port(request):
318 220 port = get_available_port()
319 221 print('Using rcserver port {}'.format(port))
320 222 return port
321 223
322 224
323 225 @pytest.fixture(scope='session')
324 226 def vcsserver_port(request):
325 227 port = request.config.getoption('--vcsserver-port')
326 228 if port is None:
327 229 port = get_available_port()
328 230 print('Using vcsserver port {}'.format(port))
329 231 return port
330 232
331 233
332 def get_available_port():
333 family = socket.AF_INET
334 socktype = socket.SOCK_STREAM
335 host = '127.0.0.1'
336
337 mysocket = socket.socket(family, socktype)
338 mysocket.bind((host, 0))
339 port = mysocket.getsockname()[1]
340 mysocket.close()
341 del mysocket
342 return port
343
344
345 234 @pytest.fixture(scope='session')
346 235 def available_port_factory():
347 236 """
348 237 Returns a callable which returns free port numbers.
349 238 """
350 239 return get_available_port
351 240
352 241
353 242 @pytest.fixture
354 243 def available_port(available_port_factory):
355 244 """
356 245 Gives you one free port for the current test.
357 246
358 247 Uses "available_port_factory" to retrieve the port.
359 248 """
360 249 return available_port_factory()
361 250
362 251
363 252 @pytest.fixture(scope='session')
364 253 def testini_factory(tmpdir_factory, ini_config):
365 254 """
366 255 Factory to create an INI file based on TestINI.
367 256
368 257 It will make sure to place the INI file in the correct directory.
369 258 """
370 259 basetemp = tmpdir_factory.getbasetemp().strpath
371 260 return TestIniFactory(basetemp, ini_config)
372 261
373 262
374 263 class TestIniFactory(object):
375 264
376 265 def __init__(self, basetemp, template_ini):
377 266 self._basetemp = basetemp
378 267 self._template_ini = template_ini
379 268
380 269 def __call__(self, ini_params, new_file_prefix='test'):
381 270 ini_file = TestINI(
382 271 self._template_ini, ini_params=ini_params,
383 272 new_file_prefix=new_file_prefix, dir=self._basetemp)
384 273 result = ini_file.create()
385 274 return result
386 275
387 276
388 277 def get_config(
389 278 config, option_name, override_option_name, overrides=None,
390 279 basetemp=None, prefix='test'):
391 280 """
392 281 Find a configuration file and apply overrides for the given `prefix`.
393 282 """
394 283 config_file = (
395 284 config.getoption(option_name) or config.getini(option_name))
396 285 if not config_file:
397 286 pytest.exit(
398 287 "Configuration error, could not extract {}.".format(option_name))
399 288
400 289 overrides = overrides or []
401 290 config_override = config.getoption(override_option_name)
402 291 if config_override:
403 292 overrides.append(config_override)
404 293 temp_ini_file = TestINI(
405 294 config_file, ini_params=overrides, new_file_prefix=prefix,
406 295 dir=basetemp)
407 296
408 297 return temp_ini_file.create()
@@ -1,262 +1,257 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import shutil
23 23 import datetime
24 24
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs.backends import get_backend
28 28 from rhodecode.lib.vcs.backends.base import Config
29 29 from rhodecode.lib.vcs.nodes import FileNode
30 30 from rhodecode.tests import get_new_dir
31 31 from rhodecode.tests.utils import check_skip_backends, check_xfail_backends
32 32
33 33
34 @pytest.fixture(scope="session")
35 def vcs_server_config_override():
36 return ({'server:main': {'workers': 1}},)
37
38
39 34 @pytest.fixture()
40 35 def vcs_repository_support(
41 36 request, backend_alias, baseapp, _vcs_repo_container):
42 37 """
43 38 Provide a test repository for the test run.
44 39
45 40 Depending on the value of `recreate_repo_per_test` a new repo for each
46 41 test will be created.
47 42
48 43 The parameter `--backends` can be used to limit this fixture to specific
49 44 backend implementations.
50 45 """
51 46 cls = request.cls
52 47
53 48 check_skip_backends(request.node, backend_alias)
54 49 check_xfail_backends(request.node, backend_alias)
55 50
56 51 if _should_create_repo_per_test(cls):
57 52 _vcs_repo_container = _create_vcs_repo_container(request)
58 53
59 54 repo = _vcs_repo_container.get_repo(cls, backend_alias=backend_alias)
60 55
61 56 # TODO: johbo: Supporting old test class api, think about removing this
62 57 cls.repo = repo
63 58 cls.repo_path = repo.path
64 59 cls.default_branch = repo.DEFAULT_BRANCH_NAME
65 60 cls.Backend = cls.backend_class = repo.__class__
66 61 cls.imc = repo.in_memory_commit
67 62
68 63 return backend_alias, repo
69 64
70 65
71 66 @pytest.fixture(scope='class')
72 67 def _vcs_repo_container(request):
73 68 """
74 69 Internal fixture intended to help support class based scoping on demand.
75 70 """
76 71 return _create_vcs_repo_container(request)
77 72
78 73
79 74 def _create_vcs_repo_container(request):
80 75 repo_container = VcsRepoContainer()
81 76 if not request.config.getoption('--keep-tmp-path'):
82 77 request.addfinalizer(repo_container.cleanup)
83 78 return repo_container
84 79
85 80
86 81 class VcsRepoContainer(object):
87 82
88 83 def __init__(self):
89 84 self._cleanup_paths = []
90 85 self._repos = {}
91 86
92 87 def get_repo(self, test_class, backend_alias):
93 88 if backend_alias not in self._repos:
94 89 repo = _create_empty_repository(test_class, backend_alias)
95 90
96 91 self._cleanup_paths.append(repo.path)
97 92 self._repos[backend_alias] = repo
98 93 return self._repos[backend_alias]
99 94
100 95 def cleanup(self):
101 96 for repo_path in reversed(self._cleanup_paths):
102 97 shutil.rmtree(repo_path)
103 98
104 99
105 100 def _should_create_repo_per_test(cls):
106 101 return getattr(cls, 'recreate_repo_per_test', False)
107 102
108 103
109 104 def _create_empty_repository(cls, backend_alias=None):
110 105 Backend = get_backend(backend_alias or cls.backend_alias)
111 106 repo_path = get_new_dir(str(time.time()))
112 107 repo = Backend(repo_path, create=True)
113 108 if hasattr(cls, '_get_commits'):
114 109 commits = cls._get_commits()
115 110 cls.tip = _add_commits_to_repo(repo, commits)
116 111
117 112 return repo
118 113
119 114
120 115 @pytest.fixture
121 116 def config():
122 117 """
123 118 Instance of a repository config.
124 119
125 120 The instance contains only one value:
126 121
127 122 - Section: "section-a"
128 123 - Key: "a-1"
129 124 - Value: "value-a-1"
130 125
131 126 The intended usage is for cases where a config instance is needed but no
132 127 specific content is required.
133 128 """
134 129 config = Config()
135 130 config.set('section-a', 'a-1', 'value-a-1')
136 131 return config
137 132
138 133
139 134 def _add_commits_to_repo(repo, commits):
140 135 imc = repo.in_memory_commit
141 136 tip = None
142 137
143 138 for commit in commits:
144 139 for node in commit.get('added', []):
145 140 imc.add(FileNode(node.path, content=node.content))
146 141 for node in commit.get('changed', []):
147 142 imc.change(FileNode(node.path, content=node.content))
148 143 for node in commit.get('removed', []):
149 144 imc.remove(FileNode(node.path))
150 145
151 146 tip = imc.commit(
152 147 message=unicode(commit['message']),
153 148 author=unicode(commit['author']),
154 149 date=commit['date'],
155 150 branch=commit.get('branch'))
156 151
157 152 return tip
158 153
159 154
160 155 @pytest.fixture
161 156 def vcs_repo(request, backend_alias):
162 157 Backend = get_backend(backend_alias)
163 158 repo_path = get_new_dir(str(time.time()))
164 159 repo = Backend(repo_path, create=True)
165 160
166 161 @request.addfinalizer
167 162 def cleanup():
168 163 shutil.rmtree(repo_path)
169 164
170 165 return repo
171 166
172 167
173 168 @pytest.fixture
174 169 def generate_repo_with_commits(vcs_repo):
175 170 """
176 171 Creates a fabric to generate N comits with some file nodes on a randomly
177 172 generated repository
178 173 """
179 174
180 175 def commit_generator(num):
181 176 start_date = datetime.datetime(2010, 1, 1, 20)
182 177 for x in xrange(num):
183 178 yield {
184 179 'message': 'Commit %d' % x,
185 180 'author': 'Joe Doe <joe.doe@example.com>',
186 181 'date': start_date + datetime.timedelta(hours=12 * x),
187 182 'added': [
188 183 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
189 184 ],
190 185 'modified': [
191 186 FileNode('file_%d.txt' % x,
192 187 content='Foobar %d modified' % (x-1)),
193 188 ]
194 189 }
195 190
196 191 def commit_maker(num=5):
197 192 _add_commits_to_repo(vcs_repo, commit_generator(num))
198 193 return vcs_repo
199 194
200 195 return commit_maker
201 196
202 197
203 198 @pytest.fixture
204 199 def hg_repo(request, vcs_repo):
205 200 repo = vcs_repo
206 201
207 202 commits = repo._get_commits()
208 203 _add_commits_to_repo(repo, commits)
209 204
210 205 return repo
211 206
212 207
213 208 @pytest.fixture
214 209 def hg_commit(hg_repo):
215 210 return hg_repo.get_commit()
216 211
217 212
218 213 class BackendTestMixin(object):
219 214 """
220 215 This is a backend independent test case class which should be created
221 216 with ``type`` method.
222 217
223 218 It is required to set following attributes at subclass:
224 219
225 220 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
226 221 - ``repo_path``: path to the repository which would be created for set of
227 222 tests
228 223 - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be
229 224 created
230 225 before every single test. Defaults to ``True``.
231 226 """
232 227 recreate_repo_per_test = True
233 228
234 229 @classmethod
235 230 def _get_commits(cls):
236 231 commits = [
237 232 {
238 233 'message': u'Initial commit',
239 234 'author': u'Joe Doe <joe.doe@example.com>',
240 235 'date': datetime.datetime(2010, 1, 1, 20),
241 236 'added': [
242 237 FileNode('foobar', content='Foobar'),
243 238 FileNode('foobar2', content='Foobar II'),
244 239 FileNode('foo/bar/baz', content='baz here!'),
245 240 ],
246 241 },
247 242 {
248 243 'message': u'Changes...',
249 244 'author': u'Jane Doe <jane.doe@example.com>',
250 245 'date': datetime.datetime(2010, 1, 1, 21),
251 246 'added': [
252 247 FileNode('some/new.txt', content='news...'),
253 248 ],
254 249 'changed': [
255 250 FileNode('foobar', 'Foobar I'),
256 251 ],
257 252 'removed': [],
258 253 },
259 254 ]
260 255 return commits
261 256
262 257
@@ -1,276 +1,262 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 py.test config for test suite for making push/pull operations.
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 import ConfigParser
31 30 import os
32 import subprocess32
33 31 import tempfile
34 32 import textwrap
35 33 import pytest
36 34
37 import rhodecode
35 from rhodecode import events
36 from rhodecode.model.db import Integration
37 from rhodecode.model.integration import IntegrationModel
38 38 from rhodecode.model.db import Repository
39 39 from rhodecode.model.meta import Session
40 40 from rhodecode.model.settings import SettingsModel
41 from rhodecode.tests import (
42 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,)
41 from rhodecode.integrations.types.webhook import WebhookIntegrationType
42
43 from rhodecode.tests import GIT_REPO, HG_REPO
43 44 from rhodecode.tests.fixture import Fixture
44 from rhodecode.tests.utils import is_url_reachable, wait_for_url
45 from rhodecode.tests.server_utils import RcWebServer
45 46
46 RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
47 47 REPO_GROUP = 'a_repo_group'
48 48 HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO)
49 49 GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO)
50 50
51 51
52 def assert_no_running_instance(url):
53 if is_url_reachable(url):
54 print("Hint: Usually this means another instance of Enterprise "
55 "is running in the background.")
56 pytest.fail(
57 "Port is not free at %s, cannot start web interface" % url)
58
59
60 def get_port(pyramid_config):
61 config = ConfigParser.ConfigParser()
62 config.read(pyramid_config)
63 return config.get('server:main', 'port')
64
65
66 def get_host_url(pyramid_config):
67 """Construct the host url using the port in the test configuration."""
68 return '127.0.0.1:%s' % get_port(pyramid_config)
69
70
71 class RcWebServer(object):
72 """
73 Represents a running RCE web server used as a test fixture.
74 """
75 def __init__(self, pyramid_config, log_file):
76 self.pyramid_config = pyramid_config
77 self.log_file = log_file
78
79 def repo_clone_url(self, repo_name, **kwargs):
80 params = {
81 'user': TEST_USER_ADMIN_LOGIN,
82 'passwd': TEST_USER_ADMIN_PASS,
83 'host': get_host_url(self.pyramid_config),
84 'cloned_repo': repo_name,
85 }
86 params.update(**kwargs)
87 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params
88 return _url
89
90 def host_url(self):
91 return 'http://' + get_host_url(self.pyramid_config)
92
93 def get_rc_log(self):
94 with open(self.log_file) as f:
95 return f.read()
96
97
98 52 @pytest.fixture(scope="module")
99 def rcextensions(request, baseapp, tmpdir_factory):
53 def rcextensions(request, db_connection, tmpdir_factory):
100 54 """
101 55 Installs a testing rcextensions pack to ensure they work as expected.
102 56 """
103 57 init_content = textwrap.dedent("""
104 58 # Forward import the example rcextensions to make it
105 59 # active for our tests.
106 60 from rhodecode.tests.other.example_rcextensions import *
107 61 """)
108 62
109 63 # Note: rcextensions are looked up based on the path of the ini file
110 64 root_path = tmpdir_factory.getbasetemp()
111 65 rcextensions_path = root_path.join('rcextensions')
112 66 init_path = rcextensions_path.join('__init__.py')
113 67
114 68 if rcextensions_path.check():
115 69 pytest.fail(
116 70 "Path for rcextensions already exists, please clean up before "
117 71 "test run this path: %s" % (rcextensions_path, ))
118 72 return
119 73
120 74 request.addfinalizer(rcextensions_path.remove)
121 75 init_path.write_binary(init_content, ensure=True)
122 76
123 77
124 78 @pytest.fixture(scope="module")
125 def repos(request, baseapp):
79 def repos(request, db_connection):
126 80 """Create a copy of each test repo in a repo group."""
127 81 fixture = Fixture()
128 82 repo_group = fixture.create_repo_group(REPO_GROUP)
129 83 repo_group_id = repo_group.group_id
130 84 fixture.create_fork(HG_REPO, HG_REPO,
131 85 repo_name_full=HG_REPO_WITH_GROUP,
132 86 repo_group=repo_group_id)
133 87 fixture.create_fork(GIT_REPO, GIT_REPO,
134 88 repo_name_full=GIT_REPO_WITH_GROUP,
135 89 repo_group=repo_group_id)
136 90
137 91 @request.addfinalizer
138 92 def cleanup():
139 93 fixture.destroy_repo(HG_REPO_WITH_GROUP)
140 94 fixture.destroy_repo(GIT_REPO_WITH_GROUP)
141 95 fixture.destroy_repo_group(repo_group_id)
142 96
143 97
144 @pytest.fixture(scope="session")
145 def vcs_server_config_override():
146 return ({'server:main': {'workers': 2}},)
98 @pytest.fixture(scope="module")
99 def rc_web_server_config_modification():
100 return []
147 101
148 102
149 103 @pytest.fixture(scope="module")
150 def rc_web_server_config(testini_factory):
104 def rc_web_server_config_factory(testini_factory, rc_web_server_config_modification):
151 105 """
152 106 Configuration file used for the fixture `rc_web_server`.
153 107 """
154 CUSTOM_PARAMS = [
108
109 def factory(vcsserver_port):
110 custom_params = [
155 111 {'handler_console': {'level': 'DEBUG'}},
112 {'app:main': {'vcs.server': 'localhost:%s' % vcsserver_port}}
156 113 ]
157 return testini_factory(CUSTOM_PARAMS)
114 custom_params.extend(rc_web_server_config_modification)
115 return testini_factory(custom_params)
116 return factory
158 117
159 118
160 119 @pytest.fixture(scope="module")
161 120 def rc_web_server(
162 request, baseapp, rc_web_server_config, repos, rcextensions):
163 """
164 Run the web server as a subprocess.
165
166 Since we have already a running vcsserver, this is not spawned again.
121 request, vcsserver_factory, available_port_factory,
122 rc_web_server_config_factory, repos, rcextensions):
167 123 """
168 env = os.environ.copy()
169 env['RC_NO_TMP_PATH'] = '1'
124 Run the web server as a subprocess. with it's own instance of vcsserver
125 """
170 126
171 rc_log = list(RC_LOG.partition('.log'))
172 rc_log.insert(1, get_port(rc_web_server_config))
173 rc_log = ''.join(rc_log)
127 vcsserver_port = available_port_factory()
128 print('Using vcsserver ops test port {}'.format(vcsserver_port))
174 129
175 server_out = open(rc_log, 'w')
176
177 host_url = 'http://' + get_host_url(rc_web_server_config)
178 assert_no_running_instance(host_url)
179 command = ['gunicorn', '--worker-class', 'gevent', '--paste', rc_web_server_config]
130 vcs_log = os.path.join(tempfile.gettempdir(), 'rc_op_vcs.log')
131 vcsserver_factory(
132 request, vcsserver_port=vcsserver_port,
133 log_file=vcs_log,
134 overrides=({'server:main': {'workers': 2}},))
180 135
181 print('rhodecode-web starting at: {}'.format(host_url))
182 print('rhodecode-web command: {}'.format(command))
183 print('rhodecode-web logfile: {}'.format(rc_log))
184
185 proc = subprocess32.Popen(
186 command, bufsize=0, env=env, stdout=server_out, stderr=server_out)
187
188 wait_for_url(host_url, timeout=30)
136 rc_log = os.path.join(tempfile.gettempdir(), 'rc_op_web.log')
137 rc_web_server_config = rc_web_server_config_factory(
138 vcsserver_port=vcsserver_port)
139 server = RcWebServer(rc_web_server_config, log_file=rc_log)
140 server.start()
189 141
190 142 @request.addfinalizer
191 def stop_web_server():
192 # TODO: Find out how to integrate with the reporting of py.test to
193 # make this information available.
194 print("\nServer log file written to %s" % (rc_log, ))
195 proc.kill()
196 server_out.flush()
197 server_out.close()
143 def cleanup():
144 server.shutdown()
198 145
199 return RcWebServer(rc_web_server_config, log_file=rc_log)
146 server.wait_until_ready()
147 return server
200 148
201 149
202 150 @pytest.fixture
203 151 def disable_locking(baseapp):
204 152 r = Repository.get_by_repo_name(GIT_REPO)
205 153 Repository.unlock(r)
206 154 r.enable_locking = False
207 155 Session().add(r)
208 156 Session().commit()
209 157
210 158 r = Repository.get_by_repo_name(HG_REPO)
211 159 Repository.unlock(r)
212 160 r.enable_locking = False
213 161 Session().add(r)
214 162 Session().commit()
215 163
216 164
217 165 @pytest.fixture
218 166 def enable_auth_plugins(request, baseapp, csrf_token):
219 167 """
220 168 Return a factory object that when called, allows to control which
221 169 authentication plugins are enabled.
222 170 """
223 171 def _enable_plugins(plugins_list, override=None):
224 172 override = override or {}
225 173 params = {
226 174 'auth_plugins': ','.join(plugins_list),
227 175 }
228 176
229 177 # helper translate some names to others
230 178 name_map = {
231 179 'token': 'authtoken'
232 180 }
233 181
234 182 for module in plugins_list:
235 183 plugin_name = module.partition('#')[-1]
236 184 if plugin_name in name_map:
237 185 plugin_name = name_map[plugin_name]
238 186 enabled_plugin = 'auth_%s_enabled' % plugin_name
239 187 cache_ttl = 'auth_%s_cache_ttl' % plugin_name
240 188
241 189 # default params that are needed for each plugin,
242 190 # `enabled` and `cache_ttl`
243 191 params.update({
244 192 enabled_plugin: True,
245 193 cache_ttl: 0
246 194 })
247 195 if override.get:
248 196 params.update(override.get(module, {}))
249 197
250 198 validated_params = params
251 199 for k, v in validated_params.items():
252 200 setting = SettingsModel().create_or_update_setting(k, v)
253 201 Session().add(setting)
254 202 Session().commit()
255 203
256 204 def cleanup():
257 205 _enable_plugins(['egg:rhodecode-enterprise-ce#rhodecode'])
258 206
259 207 request.addfinalizer(cleanup)
260 208
261 209 return _enable_plugins
262 210
263 211
264 212 @pytest.fixture
265 213 def fs_repo_only(request, rhodecode_fixtures):
266 214 def fs_repo_fabric(repo_name, repo_type):
267 215 rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
268 216 rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
269 217
270 218 def cleanup():
271 219 rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
272 220 rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
273 221
274 222 request.addfinalizer(cleanup)
275 223
276 224 return fs_repo_fabric
225
226
227 @pytest.fixture
228 def enable_webhook_push_integration(request):
229 integration = Integration()
230 integration.integration_type = WebhookIntegrationType.key
231 Session().add(integration)
232
233 settings = dict(
234 url='http://httpbin.org',
235 secret_token='secret',
236 username=None,
237 password=None,
238 custom_header_key=None,
239 custom_header_val=None,
240 method_type='get',
241 events=[events.RepoPushEvent.name],
242 log_data=True
243 )
244
245 IntegrationModel().update_integration(
246 integration,
247 name='IntegrationWebhookTest',
248 enabled=True,
249 settings=settings,
250 repo=None,
251 repo_group=None,
252 child_repos_only=False,
253 )
254 Session().commit()
255 integration_id = integration.integration_id
256
257 @request.addfinalizer
258 def cleanup():
259 integration = Integration.get(integration_id)
260 Session().delete(integration)
261 Session().commit()
262
@@ -1,59 +1,57 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 # override rc_web_server_config fixture with custom INI
37 @pytest.fixture(scope='module')
38 def rc_web_server_config(testini_factory):
39 CUSTOM_PARAMS = [
36 @pytest.fixture(scope="module")
37 def rc_web_server_config_modification():
38 return [
40 39 {'app:main': {'auth_ret_code': '403'}},
41 40 {'app:main': {'auth_ret_code_detection': 'true'}},
42 41 ]
43 return testini_factory(CUSTOM_PARAMS)
44 42
45 43
46 44 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 45 class TestVCSOperationsOnCustomIniConfig(object):
48 46
49 47 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 48 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 49 stdout, stderr = Command('/tmp').execute(
52 50 'hg clone', clone_url, tmpdir.strpath)
53 51 assert 'abort: HTTP Error 403: Forbidden' in stderr
54 52
55 53 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 54 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 55 stdout, stderr = Command('/tmp').execute(
58 56 'git clone', clone_url, tmpdir.strpath)
59 57 assert 'The requested URL returned error: 403' in stderr
@@ -1,59 +1,57 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 # override rc_web_server_config fixture with custom INI
37 @pytest.fixture(scope='module')
38 def rc_web_server_config(testini_factory):
39 CUSTOM_PARAMS = [
36 @pytest.fixture(scope="module")
37 def rc_web_server_config_modification():
38 return [
40 39 {'app:main': {'auth_ret_code': '404'}},
41 40 {'app:main': {'auth_ret_code_detection': 'false'}},
42 41 ]
43 return testini_factory(CUSTOM_PARAMS)
44 42
45 43
46 44 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 45 class TestVCSOperationsOnCustomIniConfig(object):
48 46
49 47 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 48 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 49 stdout, stderr = Command('/tmp').execute(
52 50 'hg clone', clone_url, tmpdir.strpath)
53 51 assert 'abort: HTTP Error 404: Not Found' in stderr
54 52
55 53 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 54 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 55 stdout, stderr = Command('/tmp').execute(
58 56 'git clone', clone_url, tmpdir.strpath)
59 57 assert 'not found' in stderr
@@ -1,59 +1,57 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 # override rc_web_server_config fixture with custom INI
37 @pytest.fixture(scope='module')
38 def rc_web_server_config(testini_factory):
39 CUSTOM_PARAMS = [
36 @pytest.fixture(scope="module")
37 def rc_web_server_config_modification():
38 return [
40 39 {'app:main': {'auth_ret_code': '600'}},
41 40 {'app:main': {'auth_ret_code_detection': 'false'}},
42 41 ]
43 return testini_factory(CUSTOM_PARAMS)
44 42
45 43
46 44 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 45 class TestVCSOperationsOnCustomIniConfig(object):
48 46
49 47 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 48 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 49 stdout, stderr = Command('/tmp').execute(
52 50 'hg clone', clone_url, tmpdir.strpath)
53 51 assert 'abort: HTTP Error 403: Forbidden' in stderr
54 52
55 53 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 54 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 55 stdout, stderr = Command('/tmp').execute(
58 56 'git clone', clone_url, tmpdir.strpath)
59 57 assert 'The requested URL returned error: 403' in stderr
@@ -1,79 +1,67 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import os
31 31 import pytest
32 32
33 33 from rhodecode.lib.vcs.backends.git.repository import GitRepository
34 34 from rhodecode.lib.vcs.nodes import FileNode
35 35 from rhodecode.tests import GIT_REPO
36 36 from rhodecode.tests.vcs_operations import Command
37 37 from .test_vcs_operations import _check_proper_clone, _check_proper_git_push
38 38
39 39
40 # override rc_web_server_config fixture with custom INI
41 @pytest.fixture(scope="module")
42 def rc_web_server_config(testini_factory):
43 """
44 Configuration file used for the fixture `rc_web_server`.
45 """
46 CUSTOM_PARAMS = [
47 {'handler_console': {'level': 'DEBUG'}},
48 ]
49 return testini_factory(CUSTOM_PARAMS)
50
51
52 40 def test_git_clone_with_small_push_buffer(backend_git, rc_web_server, tmpdir):
53 41 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
54 42 cmd = Command('/tmp')
55 43 stdout, stderr = cmd.execute(
56 44 'git -c http.postBuffer=1024 clone', clone_url, tmpdir.strpath)
57 45 _check_proper_clone(stdout, stderr, 'git')
58 46 cmd.assert_returncode_success()
59 47
60 48
61 49 def test_git_push_with_small_push_buffer(backend_git, rc_web_server, tmpdir):
62 50 empty_repo = backend_git.create_repo()
63 51
64 52 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
65 53
66 54 cmd = Command(tmpdir.strpath)
67 55 cmd.execute('git clone', clone_url)
68 56
69 57 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
70 58 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
71 59 repo.in_memory_commit.commit(
72 60 message='Commit on branch Master',
73 61 author='Automatic test',
74 62 branch='master')
75 63
76 64 repo_cmd = Command(repo.path)
77 65 stdout, stderr = repo_cmd.execute(
78 66 'git -c http.postBuffer=1024 push --verbose origin master')
79 67 _check_proper_git_push(stdout, stderr, branch='master')
@@ -1,222 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30
31 31 import pytest
32 32
33 33 from rhodecode.model.db import User, Repository
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36
37 37 from rhodecode.tests import (
38 38 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN,
39 39 TEST_USER_REGULAR_PASS)
40 40 from rhodecode.tests.vcs_operations import (
41 41 Command, _check_proper_clone, _check_proper_git_push, _add_files_and_push)
42 42
43 43
44 # override rc_web_server_config fixture with custom INI
45 @pytest.fixture(scope='module')
46 def rc_web_server_config(testini_factory):
47 CUSTOM_PARAMS = [
44 @pytest.fixture(scope="module")
45 def rc_web_server_config_modification():
46 return [
48 47 {'app:main': {'lock_ret_code': '423'}},
49 48 ]
50 return testini_factory(CUSTOM_PARAMS)
51 49
52 50
53 51 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
54 52 class TestVCSOperationsOnCustomIniConfig(object):
55 53
56 54 def test_clone_and_create_lock_hg(self, rc_web_server, tmpdir):
57 55 # enable locking
58 56 r = Repository.get_by_repo_name(HG_REPO)
59 57 r.enable_locking = True
60 58 Session().add(r)
61 59 Session().commit()
62 60 # clone
63 61 clone_url = rc_web_server.repo_clone_url(HG_REPO)
64 62 stdout, stderr = Command('/tmp').execute(
65 63 'hg clone', clone_url, tmpdir.strpath)
66 64
67 65 # check if lock was made
68 66 r = Repository.get_by_repo_name(HG_REPO)
69 67 assert r.locked[0] == User.get_by_username(
70 68 TEST_USER_ADMIN_LOGIN).user_id
71 69
72 70 def test_clone_and_create_lock_git(self, rc_web_server, tmpdir):
73 71 # enable locking
74 72 r = Repository.get_by_repo_name(GIT_REPO)
75 73 r.enable_locking = True
76 74 Session().add(r)
77 75 Session().commit()
78 76 # clone
79 77 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
80 78 stdout, stderr = Command('/tmp').execute(
81 79 'git clone', clone_url, tmpdir.strpath)
82 80
83 81 # check if lock was made
84 82 r = Repository.get_by_repo_name(GIT_REPO)
85 83 assert r.locked[0] == User.get_by_username(
86 84 TEST_USER_ADMIN_LOGIN).user_id
87 85
88 86 def test_clone_after_repo_was_locked_hg(self, rc_web_server, tmpdir):
89 87 # lock repo
90 88 r = Repository.get_by_repo_name(HG_REPO)
91 89 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
92 90 # pull fails since repo is locked
93 91 clone_url = rc_web_server.repo_clone_url(HG_REPO)
94 92 stdout, stderr = Command('/tmp').execute(
95 93 'hg clone', clone_url, tmpdir.strpath)
96 94 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
97 95 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
98 96 assert msg in stderr
99 97
100 98 def test_clone_after_repo_was_locked_git(self, rc_web_server, tmpdir):
101 99 # lock repo
102 100 r = Repository.get_by_repo_name(GIT_REPO)
103 101 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
104 102 # pull fails since repo is locked
105 103 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
106 104 stdout, stderr = Command('/tmp').execute(
107 105 'git clone', clone_url, tmpdir.strpath)
108 106
109 107 lock_msg = (
110 108 'remote: ERROR: Repository `vcs_test_git` locked by user ' +
111 109 '`test_admin`. Reason:`lock_auto`')
112 110 assert lock_msg in stderr
113 111 assert 'remote: Pre pull hook failed: aborting' in stderr
114 112 assert 'fatal: remote did not send all necessary objects' in stderr
115 113
116 114 def test_push_on_locked_repo_by_other_user_hg(self, rc_web_server, tmpdir):
117 115 clone_url = rc_web_server.repo_clone_url(HG_REPO)
118 116 stdout, stderr = Command('/tmp').execute(
119 117 'hg clone', clone_url, tmpdir.strpath)
120 118
121 119 # lock repo
122 120 r = Repository.get_by_repo_name(HG_REPO)
123 121 # let this user actually push !
124 122 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
125 123 perm='repository.write')
126 124 Session().commit()
127 125 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
128 126
129 127 # push fails repo is locked by other user !
130 128 push_url = rc_web_server.repo_clone_url(
131 129 HG_REPO,
132 130 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
133 131 stdout, stderr = _add_files_and_push(
134 132 'hg', tmpdir.strpath, clone_url=push_url)
135 133 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
136 134 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
137 135 assert msg in stderr
138 136
139 137 def test_push_on_locked_repo_by_other_user_git(
140 138 self, rc_web_server, tmpdir):
141 139 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
142 140 stdout, stderr = Command('/tmp').execute(
143 141 'git clone', clone_url, tmpdir.strpath)
144 142
145 143 # lock repo
146 144 r = Repository.get_by_repo_name(GIT_REPO)
147 145 # let this user actually push !
148 146 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
149 147 perm='repository.write')
150 148 Session().commit()
151 149 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
152 150
153 151 # push fails repo is locked by other user!
154 152 push_url = rc_web_server.repo_clone_url(
155 153 GIT_REPO,
156 154 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
157 155 stdout, stderr = _add_files_and_push(
158 156 'git', tmpdir.strpath, clone_url=push_url)
159 157
160 158 err = 'Repository `%s` locked by user `%s`' % (
161 159 GIT_REPO, TEST_USER_ADMIN_LOGIN)
162 160 # err = 'RPC failed; result=22, HTTP code = 423'
163 161 assert err in stderr
164 162
165 163 def test_push_unlocks_repository_hg(self, rc_web_server, tmpdir):
166 164 # enable locking
167 165 r = Repository.get_by_repo_name(HG_REPO)
168 166 r.enable_locking = True
169 167 Session().add(r)
170 168 Session().commit()
171 169
172 170 clone_url = rc_web_server.repo_clone_url(HG_REPO)
173 171 stdout, stderr = Command('/tmp').execute(
174 172 'hg clone', clone_url, tmpdir.strpath)
175 173 _check_proper_clone(stdout, stderr, 'hg')
176 174
177 175 # check for lock repo after clone
178 176 r = Repository.get_by_repo_name(HG_REPO)
179 177 uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
180 178 assert r.locked[0] == uid
181 179
182 180 # push is ok and repo is now unlocked
183 181 stdout, stderr = _add_files_and_push(
184 182 'hg', tmpdir.strpath, clone_url=clone_url)
185 183 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
186 184 # we need to cleanup the Session Here !
187 185 Session.remove()
188 186 r = Repository.get_by_repo_name(HG_REPO)
189 187 assert r.locked == [None, None, None]
190 188
191 189 def test_push_unlocks_repository_git(self, rc_web_server, tmpdir):
192 190
193 191 # Note: Did a first debugging session. Seems that
194 192 # Repository.get_locking_state is called twice. The second call
195 193 # has the action "pull" and does not reset the lock.
196 194
197 195 # enable locking
198 196 r = Repository.get_by_repo_name(GIT_REPO)
199 197 r.enable_locking = True
200 198 Session().add(r)
201 199 Session().commit()
202 200
203 201 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
204 202 stdout, stderr = Command('/tmp').execute(
205 203 'git clone', clone_url, tmpdir.strpath)
206 204 _check_proper_clone(stdout, stderr, 'git')
207 205
208 206 # check for lock repo after clone
209 207 r = Repository.get_by_repo_name(GIT_REPO)
210 208 assert r.locked[0] == User.get_by_username(
211 209 TEST_USER_ADMIN_LOGIN).user_id
212 210
213 211 # push is ok and repo is now unlocked
214 212 stdout, stderr = _add_files_and_push(
215 213 'git', tmpdir.strpath, clone_url=clone_url)
216 214 _check_proper_git_push(stdout, stderr)
217 215
218 216 # assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
219 217 # we need to cleanup the Session Here !
220 218 Session.remove()
221 219 r = Repository.get_by_repo_name(GIT_REPO)
222 220 assert r.locked == [None, None, None]
@@ -1,130 +1,128 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30
31 31 import pytest
32 32
33 33 from rhodecode.model.db import User, Repository
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36
37 37 from rhodecode.tests import (
38 38 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN,
39 39 TEST_USER_REGULAR_PASS)
40 40 from rhodecode.tests.vcs_operations import Command, _add_files_and_push
41 41
42 42
43 # override rc_web_server_config fixture with custom INI
44 @pytest.fixture(scope='module')
45 def rc_web_server_config(testini_factory):
46 CUSTOM_PARAMS = [
43 @pytest.fixture(scope="module")
44 def rc_web_server_config_modification():
45 return [
47 46 {'app:main': {'lock_ret_code': '400'}},
48 47 ]
49 return testini_factory(CUSTOM_PARAMS)
50 48
51 49
52 50 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
53 51 class TestVCSOperationsOnCustomIniConfig(object):
54 52
55 53 def test_clone_after_repo_was_locked_hg(self, rc_web_server, tmpdir):
56 54 # lock repo
57 55 r = Repository.get_by_repo_name(HG_REPO)
58 56 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
59 57 # pull fails since repo is locked
60 58 clone_url = rc_web_server.repo_clone_url(HG_REPO)
61 59 stdout, stderr = Command('/tmp').execute(
62 60 'hg clone', clone_url, tmpdir.strpath)
63 61 msg = ("""abort: HTTP Error 400: Repository `%s` locked by user `%s`"""
64 62 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
65 63 assert msg in stderr
66 64
67 65 def test_clone_after_repo_was_locked_git(self, rc_web_server, tmpdir):
68 66 # lock repo
69 67 r = Repository.get_by_repo_name(GIT_REPO)
70 68 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
71 69 # pull fails since repo is locked
72 70 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
73 71 stdout, stderr = Command('/tmp').execute(
74 72 'git clone', clone_url, tmpdir.strpath)
75 73
76 74 lock_msg = (
77 75 'remote: ERROR: Repository `vcs_test_git` locked by user ' +
78 76 '`test_admin`. Reason:`lock_auto`')
79 77 assert lock_msg in stderr
80 78 assert 'remote: Pre pull hook failed: aborting' in stderr
81 79 assert 'fatal: remote did not send all necessary objects' in stderr
82 80
83 81 def test_push_on_locked_repo_by_other_user_hg(self, rc_web_server, tmpdir):
84 82 clone_url = rc_web_server.repo_clone_url(HG_REPO)
85 83 stdout, stderr = Command('/tmp').execute(
86 84 'hg clone', clone_url, tmpdir.strpath)
87 85
88 86 # lock repo
89 87 r = Repository.get_by_repo_name(HG_REPO)
90 88 # let this user actually push !
91 89 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
92 90 perm='repository.write')
93 91 Session().commit()
94 92 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
95 93
96 94 # push fails repo is locked by other user !
97 95 push_url = rc_web_server.repo_clone_url(
98 96 HG_REPO,
99 97 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
100 98 stdout, stderr = _add_files_and_push(
101 99 'hg', tmpdir.strpath, clone_url=push_url)
102 100 msg = ("""abort: HTTP Error 400: Repository `%s` locked by user `%s`"""
103 101 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
104 102 assert msg in stderr
105 103
106 104 def test_push_on_locked_repo_by_other_user_git(
107 105 self, rc_web_server, tmpdir):
108 106 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
109 107 stdout, stderr = Command('/tmp').execute(
110 108 'git clone', clone_url, tmpdir.strpath)
111 109
112 110 # lock repo
113 111 r = Repository.get_by_repo_name(GIT_REPO)
114 112 # let this user actually push !
115 113 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
116 114 perm='repository.write')
117 115 Session().commit()
118 116 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
119 117
120 118 # push fails repo is locked by other user!
121 119 push_url = rc_web_server.repo_clone_url(
122 120 GIT_REPO,
123 121 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
124 122 stdout, stderr = _add_files_and_push(
125 123 'git', tmpdir.strpath, clone_url=push_url)
126 124
127 125 err = 'Repository `%s` locked by user `%s`' % (
128 126 GIT_REPO, TEST_USER_ADMIN_LOGIN)
129 127
130 128 assert err in stderr
@@ -1,244 +1,244 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 import pytest
24 24
25 25 from rhodecode.lib.vcs.backends.git.repository import GitRepository
26 26 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
27 27 from rhodecode.lib.vcs.nodes import FileNode
28 28 from rhodecode.model.meta import Session
29 29
30 30 from rhodecode.tests.vcs_operations import (
31 31 Command, _check_proper_clone, _check_proper_git_push, _check_proper_hg_push)
32 32
33 33
34 34 @pytest.mark.usefixtures("disable_locking")
35 35 class TestVCSOperationsSpecial(object):
36 36
37 37 def test_git_sets_default_branch_if_not_master(
38 38 self, backend_git, tmpdir, rc_web_server):
39 39 empty_repo = backend_git.create_repo()
40 40 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
41 41
42 42 cmd = Command(tmpdir.strpath)
43 43 cmd.execute('git clone', clone_url)
44 44
45 45 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
46 46 repo.in_memory_commit.add(FileNode('file', content=''))
47 47 repo.in_memory_commit.commit(
48 48 message='Commit on branch test',
49 49 author='Automatic test',
50 50 branch='test')
51 51
52 52 repo_cmd = Command(repo.path)
53 53 stdout, stderr = repo_cmd.execute('git push --verbose origin test')
54 54 _check_proper_git_push(
55 55 stdout, stderr, branch='test', should_set_default_branch=True)
56 56
57 57 stdout, stderr = cmd.execute(
58 58 'git clone', clone_url, empty_repo.repo_name + '-clone')
59 59 _check_proper_clone(stdout, stderr, 'git')
60 60
61 61 # Doing an explicit commit in order to get latest user logs on MySQL
62 62 Session().commit()
63 63
64 # def test_git_fetches_from_remote_repository_with_annotated_tags(
65 # self, backend_git, rc_web_server):
66 # # Note: This is a test specific to the git backend. It checks the
67 # # integration of fetching from a remote repository which contains
68 # # annotated tags.
69 #
70 # # Dulwich shows this specific behavior only when
71 # # operating against a remote repository.
72 # source_repo = backend_git['annotated-tag']
73 # target_vcs_repo = backend_git.create_repo().scm_instance()
74 # target_vcs_repo.fetch(rc_web_server.repo_clone_url(source_repo.repo_name))
64 def test_git_fetches_from_remote_repository_with_annotated_tags(
65 self, backend_git, rc_web_server):
66 # Note: This is a test specific to the git backend. It checks the
67 # integration of fetching from a remote repository which contains
68 # annotated tags.
69
70 # Dulwich shows this specific behavior only when
71 # operating against a remote repository.
72 source_repo = backend_git['annotated-tag']
73 target_vcs_repo = backend_git.create_repo().scm_instance()
74 target_vcs_repo.fetch(rc_web_server.repo_clone_url(source_repo.repo_name))
75 75
76 76 def test_git_push_shows_pull_request_refs(self, backend_git, rc_web_server, tmpdir):
77 77 """
78 78 test if remote info about refs is visible
79 79 """
80 80 empty_repo = backend_git.create_repo()
81 81
82 82 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
83 83
84 84 cmd = Command(tmpdir.strpath)
85 85 cmd.execute('git clone', clone_url)
86 86
87 87 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
88 88 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
89 89 repo.in_memory_commit.commit(
90 90 message='Commit on branch Master',
91 91 author='Automatic test',
92 92 branch='master')
93 93
94 94 repo_cmd = Command(repo.path)
95 95 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
96 96 _check_proper_git_push(stdout, stderr, branch='master')
97 97
98 98 ref = '{}/{}/pull-request/new?branch=master'.format(
99 99 rc_web_server.host_url(), empty_repo.repo_name)
100 100 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
101 101 assert 'remote: RhodeCode: push completed' in stderr
102 102
103 103 # push on the same branch
104 104 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
105 105 repo.in_memory_commit.add(FileNode('setup.py', content='print\n'))
106 106 repo.in_memory_commit.commit(
107 107 message='Commit2 on branch Master',
108 108 author='Automatic test2',
109 109 branch='master')
110 110
111 111 repo_cmd = Command(repo.path)
112 112 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
113 113 _check_proper_git_push(stdout, stderr, branch='master')
114 114
115 115 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
116 116 assert 'remote: RhodeCode: push completed' in stderr
117 117
118 118 # new Branch
119 119 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
120 120 repo.in_memory_commit.add(FileNode('feature1.py', content='## Hello world'))
121 121 repo.in_memory_commit.commit(
122 122 message='Commit on branch feature',
123 123 author='Automatic test',
124 124 branch='feature')
125 125
126 126 repo_cmd = Command(repo.path)
127 127 stdout, stderr = repo_cmd.execute('git push --verbose origin feature')
128 128 _check_proper_git_push(stdout, stderr, branch='feature')
129 129
130 130 ref = '{}/{}/pull-request/new?branch=feature'.format(
131 131 rc_web_server.host_url(), empty_repo.repo_name)
132 132 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
133 133 assert 'remote: RhodeCode: push completed' in stderr
134 134
135 135 def test_hg_push_shows_pull_request_refs(self, backend_hg, rc_web_server, tmpdir):
136 136 empty_repo = backend_hg.create_repo()
137 137
138 138 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
139 139
140 140 cmd = Command(tmpdir.strpath)
141 141 cmd.execute('hg clone', clone_url)
142 142
143 143 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
144 144 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
145 145 repo.in_memory_commit.commit(
146 146 message=u'Commit on branch default',
147 147 author=u'Automatic test',
148 148 branch='default')
149 149
150 150 repo_cmd = Command(repo.path)
151 151 repo_cmd.execute('hg checkout default')
152 152
153 153 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
154 154 _check_proper_hg_push(stdout, stderr, branch='default')
155 155
156 156 ref = '{}/{}/pull-request/new?branch=default'.format(
157 157 rc_web_server.host_url(), empty_repo.repo_name)
158 158 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
159 159 assert 'remote: RhodeCode: push completed' in stdout
160 160
161 161 # push on the same branch
162 162 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
163 163 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
164 164 repo.in_memory_commit.commit(
165 165 message=u'Commit2 on branch default',
166 166 author=u'Automatic test2',
167 167 branch=u'default')
168 168
169 169 repo_cmd = Command(repo.path)
170 170 repo_cmd.execute('hg checkout default')
171 171
172 172 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
173 173 _check_proper_hg_push(stdout, stderr, branch='default')
174 174
175 175 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
176 176 assert 'remote: RhodeCode: push completed' in stdout
177 177
178 178 # new Branch
179 179 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
180 180 repo.in_memory_commit.add(FileNode(u'feature1.py', content=u'## Hello world'))
181 181 repo.in_memory_commit.commit(
182 182 message=u'Commit on branch feature',
183 183 author=u'Automatic test',
184 184 branch=u'feature')
185 185
186 186 repo_cmd = Command(repo.path)
187 187 repo_cmd.execute('hg checkout feature')
188 188
189 189 stdout, stderr = repo_cmd.execute('hg push --new-branch --verbose', clone_url)
190 190 _check_proper_hg_push(stdout, stderr, branch='feature')
191 191
192 192 ref = '{}/{}/pull-request/new?branch=feature'.format(
193 193 rc_web_server.host_url(), empty_repo.repo_name)
194 194 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
195 195 assert 'remote: RhodeCode: push completed' in stdout
196 196
197 197 def test_hg_push_shows_pull_request_refs_book(self, backend_hg, rc_web_server, tmpdir):
198 198 empty_repo = backend_hg.create_repo()
199 199
200 200 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
201 201
202 202 cmd = Command(tmpdir.strpath)
203 203 cmd.execute('hg clone', clone_url)
204 204
205 205 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
206 206 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
207 207 repo.in_memory_commit.commit(
208 208 message=u'Commit on branch default',
209 209 author=u'Automatic test',
210 210 branch='default')
211 211
212 212 repo_cmd = Command(repo.path)
213 213 repo_cmd.execute('hg checkout default')
214 214
215 215 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
216 216 _check_proper_hg_push(stdout, stderr, branch='default')
217 217
218 218 ref = '{}/{}/pull-request/new?branch=default'.format(
219 219 rc_web_server.host_url(), empty_repo.repo_name)
220 220 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
221 221 assert 'remote: RhodeCode: push completed' in stdout
222 222
223 223 # add bookmark
224 224 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
225 225 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
226 226 repo.in_memory_commit.commit(
227 227 message=u'Commit2 on branch default',
228 228 author=u'Automatic test2',
229 229 branch=u'default')
230 230
231 231 repo_cmd = Command(repo.path)
232 232 repo_cmd.execute('hg checkout default')
233 233 repo_cmd.execute('hg bookmark feature2')
234 234 stdout, stderr = repo_cmd.execute('hg push -B feature2 --verbose', clone_url)
235 235 _check_proper_hg_push(stdout, stderr, branch='default')
236 236
237 237 ref = '{}/{}/pull-request/new?branch=default'.format(
238 238 rc_web_server.host_url(), empty_repo.repo_name)
239 239 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
240 240 ref = '{}/{}/pull-request/new?bookmark=feature2'.format(
241 241 rc_web_server.host_url(), empty_repo.repo_name)
242 242 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
243 243 assert 'remote: RhodeCode: push completed' in stdout
244 244 assert 'exporting bookmark feature2' in stdout
@@ -1,141 +1,98 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31 import requests
32 32
33 from rhodecode import events
34 from rhodecode.model.db import Integration
35 from rhodecode.model.integration import IntegrationModel
36 from rhodecode.model.meta import Session
37
38 33 from rhodecode.tests import GIT_REPO, HG_REPO
39 34 from rhodecode.tests.vcs_operations import Command, _add_files_and_push
40 from rhodecode.integrations.types.webhook import WebhookIntegrationType
41 35
42 36
43 37 def check_connection():
44 38 try:
45 39 response = requests.get('http://httpbin.org')
46 40 return response.status_code == 200
47 41 except Exception as e:
48 42 print(e)
49 43
50 44 return False
51 45
52 46
53 47 connection_available = pytest.mark.skipif(
54 48 not check_connection(), reason="No outside internet connection available")
55 49
56 50
57 @pytest.fixture
58 def enable_webhook_push_integration(request):
59 integration = Integration()
60 integration.integration_type = WebhookIntegrationType.key
61 Session().add(integration)
62
63 settings = dict(
64 url='http://httpbin.org',
65 secret_token='secret',
66 username=None,
67 password=None,
68 custom_header_key=None,
69 custom_header_val=None,
70 method_type='get',
71 events=[events.RepoPushEvent.name],
72 log_data=True
73 )
74
75 IntegrationModel().update_integration(
76 integration,
77 name='IntegrationWebhookTest',
78 enabled=True,
79 settings=settings,
80 repo=None,
81 repo_group=None,
82 child_repos_only=False,
83 )
84 Session().commit()
85 integration_id = integration.integration_id
86
87 @request.addfinalizer
88 def cleanup():
89 integration = Integration.get(integration_id)
90 Session().delete(integration)
91 Session().commit()
92
93
94 51 @pytest.mark.usefixtures(
95 52 "disable_locking", "disable_anonymous_user",
96 53 "enable_webhook_push_integration")
97 54 class TestVCSOperationsOnCustomIniConfig(object):
98 55
99 56 def test_push_tag_with_commit_hg(self, rc_web_server, tmpdir):
100 57 clone_url = rc_web_server.repo_clone_url(HG_REPO)
101 58 stdout, stderr = Command('/tmp').execute(
102 59 'hg clone', clone_url, tmpdir.strpath)
103 60
104 61 push_url = rc_web_server.repo_clone_url(HG_REPO)
105 62 _add_files_and_push(
106 63 'hg', tmpdir.strpath, clone_url=push_url,
107 64 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
108 65
109 66 rc_log = rc_web_server.get_rc_log()
110 67 assert 'ERROR' not in rc_log
111 68 assert "'name': u'v1.0.0'" in rc_log
112 69
113 70 def test_push_tag_with_commit_git(
114 71 self, rc_web_server, tmpdir):
115 72 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
116 73 stdout, stderr = Command('/tmp').execute(
117 74 'git clone', clone_url, tmpdir.strpath)
118 75
119 76 push_url = rc_web_server.repo_clone_url(GIT_REPO)
120 77 _add_files_and_push(
121 78 'git', tmpdir.strpath, clone_url=push_url,
122 79 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
123 80
124 81 rc_log = rc_web_server.get_rc_log()
125 82 assert 'ERROR' not in rc_log
126 83 assert "'name': u'v1.0.0'" in rc_log
127 84
128 85 def test_push_tag_with_no_commit_git(
129 86 self, rc_web_server, tmpdir):
130 87 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
131 88 stdout, stderr = Command('/tmp').execute(
132 89 'git clone', clone_url, tmpdir.strpath)
133 90
134 91 push_url = rc_web_server.repo_clone_url(GIT_REPO)
135 92 _add_files_and_push(
136 93 'git', tmpdir.strpath, clone_url=push_url,
137 94 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
138 95
139 96 rc_log = rc_web_server.get_rc_log()
140 97 assert 'ERROR' not in rc_log
141 98 assert "'name': u'v1.0.0'" in rc_log
General Comments 0
You need to be logged in to leave comments. Login now