##// END OF EJS Templates
tests: move vcs_operations into its own module.
marcink -
r2456:db312489 default
parent child Browse files
Show More
@@ -0,0 +1,244 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import os
22
23 import pytest
24
25 from rhodecode.lib.vcs.backends.git.repository import GitRepository
26 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
27 from rhodecode.lib.vcs.nodes import FileNode
28 from rhodecode.model.meta import Session
29
30 from rhodecode.tests.vcs_operations import (
31 Command, _check_proper_clone, _check_proper_git_push, _check_proper_hg_push)
32
33
34 @pytest.mark.usefixtures("disable_locking")
35 class TestVCSOperationsSpecial(object):
36
37 def test_git_sets_default_branch_if_not_master(
38 self, backend_git, tmpdir, rc_web_server):
39 empty_repo = backend_git.create_repo()
40 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
41
42 cmd = Command(tmpdir.strpath)
43 cmd.execute('git clone', clone_url)
44
45 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
46 repo.in_memory_commit.add(FileNode('file', content=''))
47 repo.in_memory_commit.commit(
48 message='Commit on branch test',
49 author='Automatic test',
50 branch='test')
51
52 repo_cmd = Command(repo.path)
53 stdout, stderr = repo_cmd.execute('git push --verbose origin test')
54 _check_proper_git_push(
55 stdout, stderr, branch='test', should_set_default_branch=True)
56
57 stdout, stderr = cmd.execute(
58 'git clone', clone_url, empty_repo.repo_name + '-clone')
59 _check_proper_clone(stdout, stderr, 'git')
60
61 # Doing an explicit commit in order to get latest user logs on MySQL
62 Session().commit()
63
64 # def test_git_fetches_from_remote_repository_with_annotated_tags(
65 # self, backend_git, rc_web_server):
66 # # Note: This is a test specific to the git backend. It checks the
67 # # integration of fetching from a remote repository which contains
68 # # annotated tags.
69 #
70 # # Dulwich shows this specific behavior only when
71 # # operating against a remote repository.
72 # source_repo = backend_git['annotated-tag']
73 # target_vcs_repo = backend_git.create_repo().scm_instance()
74 # target_vcs_repo.fetch(rc_web_server.repo_clone_url(source_repo.repo_name))
75
76 def test_git_push_shows_pull_request_refs(self, backend_git, rc_web_server, tmpdir):
77 """
78 test if remote info about refs is visible
79 """
80 empty_repo = backend_git.create_repo()
81
82 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
83
84 cmd = Command(tmpdir.strpath)
85 cmd.execute('git clone', clone_url)
86
87 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
88 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
89 repo.in_memory_commit.commit(
90 message='Commit on branch Master',
91 author='Automatic test',
92 branch='master')
93
94 repo_cmd = Command(repo.path)
95 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
96 _check_proper_git_push(stdout, stderr, branch='master')
97
98 ref = '{}/{}/pull-request/new?branch=master'.format(
99 rc_web_server.host_url(), empty_repo.repo_name)
100 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
101 assert 'remote: RhodeCode: push completed' in stderr
102
103 # push on the same branch
104 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
105 repo.in_memory_commit.add(FileNode('setup.py', content='print\n'))
106 repo.in_memory_commit.commit(
107 message='Commit2 on branch Master',
108 author='Automatic test2',
109 branch='master')
110
111 repo_cmd = Command(repo.path)
112 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
113 _check_proper_git_push(stdout, stderr, branch='master')
114
115 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
116 assert 'remote: RhodeCode: push completed' in stderr
117
118 # new Branch
119 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
120 repo.in_memory_commit.add(FileNode('feature1.py', content='## Hello world'))
121 repo.in_memory_commit.commit(
122 message='Commit on branch feature',
123 author='Automatic test',
124 branch='feature')
125
126 repo_cmd = Command(repo.path)
127 stdout, stderr = repo_cmd.execute('git push --verbose origin feature')
128 _check_proper_git_push(stdout, stderr, branch='feature')
129
130 ref = '{}/{}/pull-request/new?branch=feature'.format(
131 rc_web_server.host_url(), empty_repo.repo_name)
132 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
133 assert 'remote: RhodeCode: push completed' in stderr
134
135 def test_hg_push_shows_pull_request_refs(self, backend_hg, rc_web_server, tmpdir):
136 empty_repo = backend_hg.create_repo()
137
138 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
139
140 cmd = Command(tmpdir.strpath)
141 cmd.execute('hg clone', clone_url)
142
143 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
144 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
145 repo.in_memory_commit.commit(
146 message=u'Commit on branch default',
147 author=u'Automatic test',
148 branch='default')
149
150 repo_cmd = Command(repo.path)
151 repo_cmd.execute('hg checkout default')
152
153 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
154 _check_proper_hg_push(stdout, stderr, branch='default')
155
156 ref = '{}/{}/pull-request/new?branch=default'.format(
157 rc_web_server.host_url(), empty_repo.repo_name)
158 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
159 assert 'remote: RhodeCode: push completed' in stdout
160
161 # push on the same branch
162 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
163 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
164 repo.in_memory_commit.commit(
165 message=u'Commit2 on branch default',
166 author=u'Automatic test2',
167 branch=u'default')
168
169 repo_cmd = Command(repo.path)
170 repo_cmd.execute('hg checkout default')
171
172 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
173 _check_proper_hg_push(stdout, stderr, branch='default')
174
175 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
176 assert 'remote: RhodeCode: push completed' in stdout
177
178 # new Branch
179 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
180 repo.in_memory_commit.add(FileNode(u'feature1.py', content=u'## Hello world'))
181 repo.in_memory_commit.commit(
182 message=u'Commit on branch feature',
183 author=u'Automatic test',
184 branch=u'feature')
185
186 repo_cmd = Command(repo.path)
187 repo_cmd.execute('hg checkout feature')
188
189 stdout, stderr = repo_cmd.execute('hg push --new-branch --verbose', clone_url)
190 _check_proper_hg_push(stdout, stderr, branch='feature')
191
192 ref = '{}/{}/pull-request/new?branch=feature'.format(
193 rc_web_server.host_url(), empty_repo.repo_name)
194 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
195 assert 'remote: RhodeCode: push completed' in stdout
196
197 def test_hg_push_shows_pull_request_refs_book(self, backend_hg, rc_web_server, tmpdir):
198 empty_repo = backend_hg.create_repo()
199
200 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
201
202 cmd = Command(tmpdir.strpath)
203 cmd.execute('hg clone', clone_url)
204
205 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
206 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
207 repo.in_memory_commit.commit(
208 message=u'Commit on branch default',
209 author=u'Automatic test',
210 branch='default')
211
212 repo_cmd = Command(repo.path)
213 repo_cmd.execute('hg checkout default')
214
215 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
216 _check_proper_hg_push(stdout, stderr, branch='default')
217
218 ref = '{}/{}/pull-request/new?branch=default'.format(
219 rc_web_server.host_url(), empty_repo.repo_name)
220 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
221 assert 'remote: RhodeCode: push completed' in stdout
222
223 # add bookmark
224 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
225 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
226 repo.in_memory_commit.commit(
227 message=u'Commit2 on branch default',
228 author=u'Automatic test2',
229 branch=u'default')
230
231 repo_cmd = Command(repo.path)
232 repo_cmd.execute('hg checkout default')
233 repo_cmd.execute('hg bookmark feature2')
234 stdout, stderr = repo_cmd.execute('hg push -B feature2 --verbose', clone_url)
235 _check_proper_hg_push(stdout, stderr, branch='default')
236
237 ref = '{}/{}/pull-request/new?branch=default'.format(
238 rc_web_server.host_url(), empty_repo.repo_name)
239 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
240 ref = '{}/{}/pull-request/new?bookmark=feature2'.format(
241 rc_web_server.host_url(), empty_repo.repo_name)
242 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
243 assert 'remote: RhodeCode: push completed' in stdout
244 assert 'exporting bookmark feature2' in stdout
@@ -1,410 +1,408 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import json
23 import time
23 24 import platform
24 25 import socket
25 26 import tempfile
27 import subprocess32
26 28
27 import subprocess32
28 import time
29 29 from urllib2 import urlopen, URLError
30 30
31 31 import configobj
32 32 import pytest
33 33
34 import pyramid.paster
35 34
36 35 from rhodecode.lib.pyramid_utils import get_app_config
37 36 from rhodecode.tests.fixture import TestINI
38 import rhodecode
39 from rhodecode.tests.other.vcs_operations.conftest import get_host_url, get_port
37 from rhodecode.tests.vcs_operations.conftest import get_host_url, get_port
40 38
41 39 VCSSERVER_LOG = os.path.join(tempfile.gettempdir(), 'rc-vcsserver.log')
42 40
43 41
44 42 def _parse_json(value):
45 43 return json.loads(value) if value else None
46 44
47 45
48 46 def pytest_addoption(parser):
49 47 parser.addoption(
50 48 '--test-loglevel', dest='test_loglevel',
51 49 help="Set default Logging level for tests, warn (default), info, debug")
52 50 group = parser.getgroup('pylons')
53 51 group.addoption(
54 52 '--with-pylons', dest='pyramid_config',
55 53 help="Set up a Pylons environment with the specified config file.")
56 54 group.addoption(
57 55 '--ini-config-override', action='store', type=_parse_json,
58 56 default=None, dest='pyramid_config_override', help=(
59 57 "Overrides the .ini file settings. Should be specified in JSON"
60 58 " format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
61 59 )
62 60 )
63 61 parser.addini(
64 62 'pyramid_config',
65 63 "Set up a Pyramid environment with the specified config file.")
66 64
67 65 vcsgroup = parser.getgroup('vcs')
68 66 vcsgroup.addoption(
69 67 '--without-vcsserver', dest='with_vcsserver', action='store_false',
70 68 help="Do not start the VCSServer in a background process.")
71 69 vcsgroup.addoption(
72 70 '--with-vcsserver-http', dest='vcsserver_config_http',
73 71 help="Start the HTTP VCSServer with the specified config file.")
74 72 vcsgroup.addoption(
75 73 '--vcsserver-protocol', dest='vcsserver_protocol',
76 74 help="Start the VCSServer with HTTP protocol support.")
77 75 vcsgroup.addoption(
78 76 '--vcsserver-config-override', action='store', type=_parse_json,
79 77 default=None, dest='vcsserver_config_override', help=(
80 78 "Overrides the .ini file settings for the VCSServer. "
81 79 "Should be specified in JSON "
82 80 "format, e.g. '{\"section\": {\"parameter\": \"value\", ...}}'"
83 81 )
84 82 )
85 83 vcsgroup.addoption(
86 84 '--vcsserver-port', action='store', type=int,
87 85 default=None, help=(
88 86 "Allows to set the port of the vcsserver. Useful when testing "
89 87 "against an already running server and random ports cause "
90 88 "trouble."))
91 89 parser.addini(
92 90 'vcsserver_config_http',
93 91 "Start the HTTP VCSServer with the specified config file.")
94 92 parser.addini(
95 93 'vcsserver_protocol',
96 94 "Start the VCSServer with HTTP protocol support.")
97 95
98 96
99 97 @pytest.fixture(scope="session")
100 98 def vcs_server_config_override(request):
101 99 """
102 100 Allows injecting the overrides by specifying this inside test class
103 101 """
104 102
105 103 return ()
106 104
107 105
108 106 @pytest.fixture(scope='session')
109 107 def vcsserver(request, vcsserver_port, vcsserver_factory, vcs_server_config_override):
110 108 """
111 109 Session scope VCSServer.
112 110
113 111 Tests wich need the VCSServer have to rely on this fixture in order
114 112 to ensure it will be running.
115 113
116 114 For specific needs, the fixture vcsserver_factory can be used. It allows to
117 115 adjust the configuration file for the test run.
118 116
119 117 Command line args:
120 118
121 119 --without-vcsserver: Allows to switch this fixture off. You have to
122 120 manually start the server.
123 121
124 122 --vcsserver-port: Will expect the VCSServer to listen on this port.
125 123 """
126 124
127 125 if not request.config.getoption('with_vcsserver'):
128 126 return None
129 127
130 128 use_http = _use_vcs_http_server(request.config)
131 129 return vcsserver_factory(
132 130 request, use_http=use_http, vcsserver_port=vcsserver_port,
133 131 overrides=vcs_server_config_override)
134 132
135 133
136 134 @pytest.fixture(scope='session')
137 135 def vcsserver_factory(tmpdir_factory):
138 136 """
139 137 Use this if you need a running vcsserver with a special configuration.
140 138 """
141 139
142 140 def factory(request, use_http=True, overrides=(), vcsserver_port=None):
143 141
144 142 if vcsserver_port is None:
145 143 vcsserver_port = get_available_port()
146 144
147 145 overrides = list(overrides)
148 146 if use_http:
149 147 overrides.append({'server:main': {'port': vcsserver_port}})
150 148 else:
151 149 overrides.append({'DEFAULT': {'port': vcsserver_port}})
152 150
153 151 if is_cygwin():
154 152 platform_override = {'DEFAULT': {
155 153 'beaker.cache.repo_object.type': 'nocache'}}
156 154 overrides.append(platform_override)
157 155
158 156 option_name = 'vcsserver_config_http' if use_http else ''
159 157 override_option_name = 'vcsserver_config_override'
160 158 config_file = get_config(
161 159 request.config, option_name=option_name,
162 160 override_option_name=override_option_name, overrides=overrides,
163 161 basetemp=tmpdir_factory.getbasetemp().strpath,
164 162 prefix='test_vcs_')
165 163
166 164 print("Using the VCSServer configuration:{}".format(config_file))
167 165 ServerClass = HttpVCSServer if use_http else None
168 166 server = ServerClass(config_file)
169 167 server.start()
170 168
171 169 @request.addfinalizer
172 170 def cleanup():
173 171 server.shutdown()
174 172
175 173 server.wait_until_ready()
176 174 return server
177 175
178 176 return factory
179 177
180 178
181 179 def is_cygwin():
182 180 return 'cygwin' in platform.system().lower()
183 181
184 182
185 183 def _use_vcs_http_server(config):
186 184 protocol_option = 'vcsserver_protocol'
187 185 protocol = (
188 186 config.getoption(protocol_option) or
189 187 config.getini(protocol_option) or
190 188 'http')
191 189 return protocol == 'http'
192 190
193 191
194 192 def _use_log_level(config):
195 193 level = config.getoption('test_loglevel') or 'warn'
196 194 return level.upper()
197 195
198 196
199 197 class VCSServer(object):
200 198 """
201 199 Represents a running VCSServer instance.
202 200 """
203 201
204 202 _args = []
205 203
206 204 def start(self):
207 205 print("Starting the VCSServer: {}".format(self._args))
208 206 self.process = subprocess32.Popen(self._args)
209 207
210 208 def wait_until_ready(self, timeout=30):
211 209 raise NotImplementedError()
212 210
213 211 def shutdown(self):
214 212 self.process.kill()
215 213
216 214
217 215 class HttpVCSServer(VCSServer):
218 216 """
219 217 Represents a running VCSServer instance.
220 218 """
221 219 def __init__(self, config_file):
222 220 self.config_file = config_file
223 221 config_data = configobj.ConfigObj(config_file)
224 222 self._config = config_data['server:main']
225 223
226 224 args = ['gunicorn', '--paste', config_file]
227 225 self._args = args
228 226
229 227 @property
230 228 def http_url(self):
231 229 template = 'http://{host}:{port}/'
232 230 return template.format(**self._config)
233 231
234 232 def start(self):
235 233 env = os.environ.copy()
236 234 host_url = 'http://' + get_host_url(self.config_file)
237 235
238 236 rc_log = list(VCSSERVER_LOG.partition('.log'))
239 237 rc_log.insert(1, get_port(self.config_file))
240 238 rc_log = ''.join(rc_log)
241 239
242 240 server_out = open(rc_log, 'w')
243 241
244 242 command = ' '.join(self._args)
245 243 print('rhodecode-vcsserver starting at: {}'.format(host_url))
246 244 print('rhodecode-vcsserver command: {}'.format(command))
247 245 print('rhodecode-vcsserver logfile: {}'.format(rc_log))
248 246 self.process = subprocess32.Popen(
249 247 self._args, bufsize=0, env=env, stdout=server_out, stderr=server_out)
250 248
251 249 def wait_until_ready(self, timeout=30):
252 250 host = self._config['host']
253 251 port = self._config['port']
254 252 status_url = 'http://{host}:{port}/status'.format(host=host, port=port)
255 253 start = time.time()
256 254
257 255 while time.time() - start < timeout:
258 256 try:
259 257 urlopen(status_url)
260 258 break
261 259 except URLError:
262 260 time.sleep(0.2)
263 261 else:
264 262 pytest.exit(
265 263 "Starting the VCSServer failed or took more than {} "
266 264 "seconds. cmd: `{}`".format(timeout, ' '.join(self._args)))
267 265
268 266 def shutdown(self):
269 267 self.process.kill()
270 268
271 269
272 270 @pytest.fixture(scope='session')
273 271 def ini_config(request, tmpdir_factory, rcserver_port, vcsserver_port):
274 272 option_name = 'pyramid_config'
275 273 log_level = _use_log_level(request.config)
276 274
277 275 overrides = [
278 276 {'server:main': {'port': rcserver_port}},
279 277 {'app:main': {
280 278 'vcs.server': 'localhost:%s' % vcsserver_port,
281 279 # johbo: We will always start the VCSServer on our own based on the
282 280 # fixtures of the test cases. For the test run it must always be
283 281 # off in the INI file.
284 282 'vcs.start_server': 'false',
285 283 }},
286 284
287 285 {'handler_console': {
288 286 'class ': 'StreamHandler',
289 287 'args ': '(sys.stderr,)',
290 288 'level': log_level,
291 289 }},
292 290
293 291 ]
294 292 if _use_vcs_http_server(request.config):
295 293 overrides.append({
296 294 'app:main': {
297 295 'vcs.server.protocol': 'http',
298 296 'vcs.scm_app_implementation': 'http',
299 297 'vcs.hooks.protocol': 'http',
300 298 }
301 299 })
302 300
303 301 filename = get_config(
304 302 request.config, option_name=option_name,
305 303 override_option_name='{}_override'.format(option_name),
306 304 overrides=overrides,
307 305 basetemp=tmpdir_factory.getbasetemp().strpath,
308 306 prefix='test_rce_')
309 307 return filename
310 308
311 309
312 310 @pytest.fixture(scope='session')
313 311 def ini_settings(ini_config):
314 312 ini_path = ini_config
315 313 return get_app_config(ini_path)
316 314
317 315
318 316 @pytest.fixture(scope='session')
319 317 def rcserver_port(request):
320 318 port = get_available_port()
321 319 print('Using rcserver port {}'.format(port))
322 320 return port
323 321
324 322
325 323 @pytest.fixture(scope='session')
326 324 def vcsserver_port(request):
327 325 port = request.config.getoption('--vcsserver-port')
328 326 if port is None:
329 327 port = get_available_port()
330 328 print('Using vcsserver port {}'.format(port))
331 329 return port
332 330
333 331
334 332 def get_available_port():
335 333 family = socket.AF_INET
336 334 socktype = socket.SOCK_STREAM
337 335 host = '127.0.0.1'
338 336
339 337 mysocket = socket.socket(family, socktype)
340 338 mysocket.bind((host, 0))
341 339 port = mysocket.getsockname()[1]
342 340 mysocket.close()
343 341 del mysocket
344 342 return port
345 343
346 344
347 345 @pytest.fixture(scope='session')
348 346 def available_port_factory():
349 347 """
350 348 Returns a callable which returns free port numbers.
351 349 """
352 350 return get_available_port
353 351
354 352
355 353 @pytest.fixture
356 354 def available_port(available_port_factory):
357 355 """
358 356 Gives you one free port for the current test.
359 357
360 358 Uses "available_port_factory" to retrieve the port.
361 359 """
362 360 return available_port_factory()
363 361
364 362
365 363 @pytest.fixture(scope='session')
366 364 def testini_factory(tmpdir_factory, ini_config):
367 365 """
368 366 Factory to create an INI file based on TestINI.
369 367
370 368 It will make sure to place the INI file in the correct directory.
371 369 """
372 370 basetemp = tmpdir_factory.getbasetemp().strpath
373 371 return TestIniFactory(basetemp, ini_config)
374 372
375 373
376 374 class TestIniFactory(object):
377 375
378 376 def __init__(self, basetemp, template_ini):
379 377 self._basetemp = basetemp
380 378 self._template_ini = template_ini
381 379
382 380 def __call__(self, ini_params, new_file_prefix='test'):
383 381 ini_file = TestINI(
384 382 self._template_ini, ini_params=ini_params,
385 383 new_file_prefix=new_file_prefix, dir=self._basetemp)
386 384 result = ini_file.create()
387 385 return result
388 386
389 387
390 388 def get_config(
391 389 config, option_name, override_option_name, overrides=None,
392 390 basetemp=None, prefix='test'):
393 391 """
394 392 Find a configuration file and apply overrides for the given `prefix`.
395 393 """
396 394 config_file = (
397 395 config.getoption(option_name) or config.getini(option_name))
398 396 if not config_file:
399 397 pytest.exit(
400 398 "Configuration error, could not extract {}.".format(option_name))
401 399
402 400 overrides = overrides or []
403 401 config_override = config.getoption(override_option_name)
404 402 if config_override:
405 403 overrides.append(config_override)
406 404 temp_ini_file = TestINI(
407 405 config_file, ini_params=overrides, new_file_prefix=prefix,
408 406 dir=basetemp)
409 407
410 408 return temp_ini_file.create()
@@ -1,257 +1,262 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import shutil
23 23 import datetime
24 24
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs.backends import get_backend
28 28 from rhodecode.lib.vcs.backends.base import Config
29 29 from rhodecode.lib.vcs.nodes import FileNode
30 30 from rhodecode.tests import get_new_dir
31 31 from rhodecode.tests.utils import check_skip_backends, check_xfail_backends
32 32
33 33
34 @pytest.fixture(scope="session")
35 def vcs_server_config_override():
36 return ({'server:main': {'workers': 1}},)
37
38
34 39 @pytest.fixture()
35 40 def vcs_repository_support(
36 41 request, backend_alias, baseapp, _vcs_repo_container):
37 42 """
38 43 Provide a test repository for the test run.
39 44
40 45 Depending on the value of `recreate_repo_per_test` a new repo for each
41 46 test will be created.
42 47
43 48 The parameter `--backends` can be used to limit this fixture to specific
44 49 backend implementations.
45 50 """
46 51 cls = request.cls
47 52
48 53 check_skip_backends(request.node, backend_alias)
49 54 check_xfail_backends(request.node, backend_alias)
50 55
51 56 if _should_create_repo_per_test(cls):
52 57 _vcs_repo_container = _create_vcs_repo_container(request)
53 58
54 59 repo = _vcs_repo_container.get_repo(cls, backend_alias=backend_alias)
55 60
56 61 # TODO: johbo: Supporting old test class api, think about removing this
57 62 cls.repo = repo
58 63 cls.repo_path = repo.path
59 64 cls.default_branch = repo.DEFAULT_BRANCH_NAME
60 65 cls.Backend = cls.backend_class = repo.__class__
61 66 cls.imc = repo.in_memory_commit
62 67
63 68 return backend_alias, repo
64 69
65 70
66 71 @pytest.fixture(scope='class')
67 72 def _vcs_repo_container(request):
68 73 """
69 74 Internal fixture intended to help support class based scoping on demand.
70 75 """
71 76 return _create_vcs_repo_container(request)
72 77
73 78
74 79 def _create_vcs_repo_container(request):
75 80 repo_container = VcsRepoContainer()
76 81 if not request.config.getoption('--keep-tmp-path'):
77 82 request.addfinalizer(repo_container.cleanup)
78 83 return repo_container
79 84
80 85
81 86 class VcsRepoContainer(object):
82 87
83 88 def __init__(self):
84 89 self._cleanup_paths = []
85 90 self._repos = {}
86 91
87 92 def get_repo(self, test_class, backend_alias):
88 93 if backend_alias not in self._repos:
89 94 repo = _create_empty_repository(test_class, backend_alias)
90 95
91 96 self._cleanup_paths.append(repo.path)
92 97 self._repos[backend_alias] = repo
93 98 return self._repos[backend_alias]
94 99
95 100 def cleanup(self):
96 101 for repo_path in reversed(self._cleanup_paths):
97 102 shutil.rmtree(repo_path)
98 103
99 104
100 105 def _should_create_repo_per_test(cls):
101 106 return getattr(cls, 'recreate_repo_per_test', False)
102 107
103 108
104 109 def _create_empty_repository(cls, backend_alias=None):
105 110 Backend = get_backend(backend_alias or cls.backend_alias)
106 111 repo_path = get_new_dir(str(time.time()))
107 112 repo = Backend(repo_path, create=True)
108 113 if hasattr(cls, '_get_commits'):
109 114 commits = cls._get_commits()
110 115 cls.tip = _add_commits_to_repo(repo, commits)
111 116
112 117 return repo
113 118
114 119
115 120 @pytest.fixture
116 121 def config():
117 122 """
118 123 Instance of a repository config.
119 124
120 125 The instance contains only one value:
121 126
122 127 - Section: "section-a"
123 128 - Key: "a-1"
124 129 - Value: "value-a-1"
125 130
126 131 The intended usage is for cases where a config instance is needed but no
127 132 specific content is required.
128 133 """
129 134 config = Config()
130 135 config.set('section-a', 'a-1', 'value-a-1')
131 136 return config
132 137
133 138
134 139 def _add_commits_to_repo(repo, commits):
135 140 imc = repo.in_memory_commit
136 141 tip = None
137 142
138 143 for commit in commits:
139 144 for node in commit.get('added', []):
140 145 imc.add(FileNode(node.path, content=node.content))
141 146 for node in commit.get('changed', []):
142 147 imc.change(FileNode(node.path, content=node.content))
143 148 for node in commit.get('removed', []):
144 149 imc.remove(FileNode(node.path))
145 150
146 151 tip = imc.commit(
147 152 message=unicode(commit['message']),
148 153 author=unicode(commit['author']),
149 154 date=commit['date'],
150 155 branch=commit.get('branch'))
151 156
152 157 return tip
153 158
154 159
155 160 @pytest.fixture
156 161 def vcs_repo(request, backend_alias):
157 162 Backend = get_backend(backend_alias)
158 163 repo_path = get_new_dir(str(time.time()))
159 164 repo = Backend(repo_path, create=True)
160 165
161 166 @request.addfinalizer
162 167 def cleanup():
163 168 shutil.rmtree(repo_path)
164 169
165 170 return repo
166 171
167 172
168 173 @pytest.fixture
169 174 def generate_repo_with_commits(vcs_repo):
170 175 """
171 176 Creates a fabric to generate N comits with some file nodes on a randomly
172 177 generated repository
173 178 """
174 179
175 180 def commit_generator(num):
176 181 start_date = datetime.datetime(2010, 1, 1, 20)
177 182 for x in xrange(num):
178 183 yield {
179 184 'message': 'Commit %d' % x,
180 185 'author': 'Joe Doe <joe.doe@example.com>',
181 186 'date': start_date + datetime.timedelta(hours=12 * x),
182 187 'added': [
183 188 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
184 189 ],
185 190 'modified': [
186 191 FileNode('file_%d.txt' % x,
187 192 content='Foobar %d modified' % (x-1)),
188 193 ]
189 194 }
190 195
191 196 def commit_maker(num=5):
192 197 _add_commits_to_repo(vcs_repo, commit_generator(num))
193 198 return vcs_repo
194 199
195 200 return commit_maker
196 201
197 202
198 203 @pytest.fixture
199 204 def hg_repo(request, vcs_repo):
200 205 repo = vcs_repo
201 206
202 207 commits = repo._get_commits()
203 208 _add_commits_to_repo(repo, commits)
204 209
205 210 return repo
206 211
207 212
208 213 @pytest.fixture
209 214 def hg_commit(hg_repo):
210 215 return hg_repo.get_commit()
211 216
212 217
213 218 class BackendTestMixin(object):
214 219 """
215 220 This is a backend independent test case class which should be created
216 221 with ``type`` method.
217 222
218 223 It is required to set following attributes at subclass:
219 224
220 225 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
221 226 - ``repo_path``: path to the repository which would be created for set of
222 227 tests
223 228 - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be
224 229 created
225 230 before every single test. Defaults to ``True``.
226 231 """
227 232 recreate_repo_per_test = True
228 233
229 234 @classmethod
230 235 def _get_commits(cls):
231 236 commits = [
232 237 {
233 238 'message': u'Initial commit',
234 239 'author': u'Joe Doe <joe.doe@example.com>',
235 240 'date': datetime.datetime(2010, 1, 1, 20),
236 241 'added': [
237 242 FileNode('foobar', content='Foobar'),
238 243 FileNode('foobar2', content='Foobar II'),
239 244 FileNode('foo/bar/baz', content='baz here!'),
240 245 ],
241 246 },
242 247 {
243 248 'message': u'Changes...',
244 249 'author': u'Jane Doe <jane.doe@example.com>',
245 250 'date': datetime.datetime(2010, 1, 1, 21),
246 251 'added': [
247 252 FileNode('some/new.txt', content='news...'),
248 253 ],
249 254 'changed': [
250 255 FileNode('foobar', 'Foobar I'),
251 256 ],
252 257 'removed': [],
253 258 },
254 259 ]
255 260 return commits
256 261
257 262
1 NO CONTENT: file renamed from rhodecode/tests/other/vcs_operations/__init__.py to rhodecode/tests/vcs_operations/__init__.py
@@ -1,271 +1,276 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 py.test config for test suite for making push/pull operations.
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import ConfigParser
31 31 import os
32 32 import subprocess32
33 33 import tempfile
34 34 import textwrap
35 35 import pytest
36 36
37 37 import rhodecode
38 38 from rhodecode.model.db import Repository
39 39 from rhodecode.model.meta import Session
40 40 from rhodecode.model.settings import SettingsModel
41 41 from rhodecode.tests import (
42 42 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,)
43 43 from rhodecode.tests.fixture import Fixture
44 44 from rhodecode.tests.utils import is_url_reachable, wait_for_url
45 45
46 46 RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
47 47 REPO_GROUP = 'a_repo_group'
48 48 HG_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, HG_REPO)
49 49 GIT_REPO_WITH_GROUP = '%s/%s' % (REPO_GROUP, GIT_REPO)
50 50
51 51
52 52 def assert_no_running_instance(url):
53 53 if is_url_reachable(url):
54 54 print("Hint: Usually this means another instance of Enterprise "
55 55 "is running in the background.")
56 56 pytest.fail(
57 57 "Port is not free at %s, cannot start web interface" % url)
58 58
59 59
60 60 def get_port(pyramid_config):
61 61 config = ConfigParser.ConfigParser()
62 62 config.read(pyramid_config)
63 63 return config.get('server:main', 'port')
64 64
65 65
66 66 def get_host_url(pyramid_config):
67 67 """Construct the host url using the port in the test configuration."""
68 68 return '127.0.0.1:%s' % get_port(pyramid_config)
69 69
70 70
71 71 class RcWebServer(object):
72 72 """
73 73 Represents a running RCE web server used as a test fixture.
74 74 """
75 75 def __init__(self, pyramid_config, log_file):
76 76 self.pyramid_config = pyramid_config
77 77 self.log_file = log_file
78 78
79 79 def repo_clone_url(self, repo_name, **kwargs):
80 80 params = {
81 81 'user': TEST_USER_ADMIN_LOGIN,
82 82 'passwd': TEST_USER_ADMIN_PASS,
83 83 'host': get_host_url(self.pyramid_config),
84 84 'cloned_repo': repo_name,
85 85 }
86 86 params.update(**kwargs)
87 87 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s' % params
88 88 return _url
89 89
90 90 def host_url(self):
91 91 return 'http://' + get_host_url(self.pyramid_config)
92 92
93 93 def get_rc_log(self):
94 94 with open(self.log_file) as f:
95 95 return f.read()
96 96
97 97
98 98 @pytest.fixture(scope="module")
99 99 def rcextensions(request, baseapp, tmpdir_factory):
100 100 """
101 101 Installs a testing rcextensions pack to ensure they work as expected.
102 102 """
103 103 init_content = textwrap.dedent("""
104 104 # Forward import the example rcextensions to make it
105 105 # active for our tests.
106 106 from rhodecode.tests.other.example_rcextensions import *
107 107 """)
108 108
109 109 # Note: rcextensions are looked up based on the path of the ini file
110 110 root_path = tmpdir_factory.getbasetemp()
111 111 rcextensions_path = root_path.join('rcextensions')
112 112 init_path = rcextensions_path.join('__init__.py')
113 113
114 114 if rcextensions_path.check():
115 115 pytest.fail(
116 116 "Path for rcextensions already exists, please clean up before "
117 117 "test run this path: %s" % (rcextensions_path, ))
118 118 return
119 119
120 120 request.addfinalizer(rcextensions_path.remove)
121 121 init_path.write_binary(init_content, ensure=True)
122 122
123 123
124 124 @pytest.fixture(scope="module")
125 125 def repos(request, baseapp):
126 126 """Create a copy of each test repo in a repo group."""
127 127 fixture = Fixture()
128 128 repo_group = fixture.create_repo_group(REPO_GROUP)
129 129 repo_group_id = repo_group.group_id
130 130 fixture.create_fork(HG_REPO, HG_REPO,
131 131 repo_name_full=HG_REPO_WITH_GROUP,
132 132 repo_group=repo_group_id)
133 133 fixture.create_fork(GIT_REPO, GIT_REPO,
134 134 repo_name_full=GIT_REPO_WITH_GROUP,
135 135 repo_group=repo_group_id)
136 136
137 137 @request.addfinalizer
138 138 def cleanup():
139 139 fixture.destroy_repo(HG_REPO_WITH_GROUP)
140 140 fixture.destroy_repo(GIT_REPO_WITH_GROUP)
141 141 fixture.destroy_repo_group(repo_group_id)
142 142
143 143
144 @pytest.fixture(scope="session")
145 def vcs_server_config_override():
146 return ({'server:main': {'workers': 2}},)
147
148
144 149 @pytest.fixture(scope="module")
145 150 def rc_web_server_config(testini_factory):
146 151 """
147 152 Configuration file used for the fixture `rc_web_server`.
148 153 """
149 154 CUSTOM_PARAMS = [
150 155 {'handler_console': {'level': 'DEBUG'}},
151 156 ]
152 157 return testini_factory(CUSTOM_PARAMS)
153 158
154 159
155 160 @pytest.fixture(scope="module")
156 161 def rc_web_server(
157 162 request, baseapp, rc_web_server_config, repos, rcextensions):
158 163 """
159 164 Run the web server as a subprocess.
160 165
161 166 Since we have already a running vcsserver, this is not spawned again.
162 167 """
163 168 env = os.environ.copy()
164 169 env['RC_NO_TMP_PATH'] = '1'
165 170
166 171 rc_log = list(RC_LOG.partition('.log'))
167 172 rc_log.insert(1, get_port(rc_web_server_config))
168 173 rc_log = ''.join(rc_log)
169 174
170 175 server_out = open(rc_log, 'w')
171 176
172 177 host_url = 'http://' + get_host_url(rc_web_server_config)
173 178 assert_no_running_instance(host_url)
174 179 command = ['gunicorn', '--worker-class', 'gevent', '--paste', rc_web_server_config]
175 180
176 181 print('rhodecode-web starting at: {}'.format(host_url))
177 182 print('rhodecode-web command: {}'.format(command))
178 183 print('rhodecode-web logfile: {}'.format(rc_log))
179 184
180 185 proc = subprocess32.Popen(
181 186 command, bufsize=0, env=env, stdout=server_out, stderr=server_out)
182 187
183 188 wait_for_url(host_url, timeout=30)
184 189
185 190 @request.addfinalizer
186 191 def stop_web_server():
187 192 # TODO: Find out how to integrate with the reporting of py.test to
188 193 # make this information available.
189 194 print("\nServer log file written to %s" % (rc_log, ))
190 195 proc.kill()
191 196 server_out.flush()
192 197 server_out.close()
193 198
194 199 return RcWebServer(rc_web_server_config, log_file=rc_log)
195 200
196 201
197 202 @pytest.fixture
198 203 def disable_locking(baseapp):
199 204 r = Repository.get_by_repo_name(GIT_REPO)
200 205 Repository.unlock(r)
201 206 r.enable_locking = False
202 207 Session().add(r)
203 208 Session().commit()
204 209
205 210 r = Repository.get_by_repo_name(HG_REPO)
206 211 Repository.unlock(r)
207 212 r.enable_locking = False
208 213 Session().add(r)
209 214 Session().commit()
210 215
211 216
212 217 @pytest.fixture
213 218 def enable_auth_plugins(request, baseapp, csrf_token):
214 219 """
215 220 Return a factory object that when called, allows to control which
216 221 authentication plugins are enabled.
217 222 """
218 223 def _enable_plugins(plugins_list, override=None):
219 224 override = override or {}
220 225 params = {
221 226 'auth_plugins': ','.join(plugins_list),
222 227 }
223 228
224 229 # helper translate some names to others
225 230 name_map = {
226 231 'token': 'authtoken'
227 232 }
228 233
229 234 for module in plugins_list:
230 235 plugin_name = module.partition('#')[-1]
231 236 if plugin_name in name_map:
232 237 plugin_name = name_map[plugin_name]
233 238 enabled_plugin = 'auth_%s_enabled' % plugin_name
234 239 cache_ttl = 'auth_%s_cache_ttl' % plugin_name
235 240
236 241 # default params that are needed for each plugin,
237 242 # `enabled` and `cache_ttl`
238 243 params.update({
239 244 enabled_plugin: True,
240 245 cache_ttl: 0
241 246 })
242 247 if override.get:
243 248 params.update(override.get(module, {}))
244 249
245 250 validated_params = params
246 251 for k, v in validated_params.items():
247 252 setting = SettingsModel().create_or_update_setting(k, v)
248 253 Session().add(setting)
249 254 Session().commit()
250 255
251 256 def cleanup():
252 257 _enable_plugins(['egg:rhodecode-enterprise-ce#rhodecode'])
253 258
254 259 request.addfinalizer(cleanup)
255 260
256 261 return _enable_plugins
257 262
258 263
259 264 @pytest.fixture
260 265 def fs_repo_only(request, rhodecode_fixtures):
261 266 def fs_repo_fabric(repo_name, repo_type):
262 267 rhodecode_fixtures.create_repo(repo_name, repo_type=repo_type)
263 268 rhodecode_fixtures.destroy_repo(repo_name, fs_remove=False)
264 269
265 270 def cleanup():
266 271 rhodecode_fixtures.destroy_repo(repo_name, fs_remove=True)
267 272 rhodecode_fixtures.destroy_repo_on_filesystem(repo_name)
268 273
269 274 request.addfinalizer(cleanup)
270 275
271 276 return fs_repo_fabric
@@ -1,59 +1,59 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 from rhodecode.tests.other.vcs_operations import Command
33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 36 # override rc_web_server_config fixture with custom INI
37 37 @pytest.fixture(scope='module')
38 38 def rc_web_server_config(testini_factory):
39 39 CUSTOM_PARAMS = [
40 40 {'app:main': {'auth_ret_code': '403'}},
41 41 {'app:main': {'auth_ret_code_detection': 'true'}},
42 42 ]
43 43 return testini_factory(CUSTOM_PARAMS)
44 44
45 45
46 46 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 47 class TestVCSOperationsOnCustomIniConfig(object):
48 48
49 49 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 50 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 51 stdout, stderr = Command('/tmp').execute(
52 52 'hg clone', clone_url, tmpdir.strpath)
53 53 assert 'abort: HTTP Error 403: Forbidden' in stderr
54 54
55 55 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 56 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 57 stdout, stderr = Command('/tmp').execute(
58 58 'git clone', clone_url, tmpdir.strpath)
59 59 assert 'The requested URL returned error: 403' in stderr
@@ -1,59 +1,59 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 from rhodecode.tests.other.vcs_operations import Command
33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 36 # override rc_web_server_config fixture with custom INI
37 37 @pytest.fixture(scope='module')
38 38 def rc_web_server_config(testini_factory):
39 39 CUSTOM_PARAMS = [
40 40 {'app:main': {'auth_ret_code': '404'}},
41 41 {'app:main': {'auth_ret_code_detection': 'false'}},
42 42 ]
43 43 return testini_factory(CUSTOM_PARAMS)
44 44
45 45
46 46 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 47 class TestVCSOperationsOnCustomIniConfig(object):
48 48
49 49 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 50 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 51 stdout, stderr = Command('/tmp').execute(
52 52 'hg clone', clone_url, tmpdir.strpath)
53 53 assert 'abort: HTTP Error 404: Not Found' in stderr
54 54
55 55 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 56 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 57 stdout, stderr = Command('/tmp').execute(
58 58 'git clone', clone_url, tmpdir.strpath)
59 59 assert 'not found' in stderr
@@ -1,59 +1,59 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31
32 32 from rhodecode.tests import (GIT_REPO, HG_REPO)
33 from rhodecode.tests.other.vcs_operations import Command
33 from rhodecode.tests.vcs_operations import Command
34 34
35 35
36 36 # override rc_web_server_config fixture with custom INI
37 37 @pytest.fixture(scope='module')
38 38 def rc_web_server_config(testini_factory):
39 39 CUSTOM_PARAMS = [
40 40 {'app:main': {'auth_ret_code': '600'}},
41 41 {'app:main': {'auth_ret_code_detection': 'false'}},
42 42 ]
43 43 return testini_factory(CUSTOM_PARAMS)
44 44
45 45
46 46 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
47 47 class TestVCSOperationsOnCustomIniConfig(object):
48 48
49 49 def test_clone_wrong_credentials_hg_ret_code(self, rc_web_server, tmpdir):
50 50 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
51 51 stdout, stderr = Command('/tmp').execute(
52 52 'hg clone', clone_url, tmpdir.strpath)
53 53 assert 'abort: HTTP Error 403: Forbidden' in stderr
54 54
55 55 def test_clone_wrong_credentials_git_ret_code(self, rc_web_server, tmpdir):
56 56 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
57 57 stdout, stderr = Command('/tmp').execute(
58 58 'git clone', clone_url, tmpdir.strpath)
59 59 assert 'The requested URL returned error: 403' in stderr
@@ -1,79 +1,79 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import os
31 31 import pytest
32 32
33 33 from rhodecode.lib.vcs.backends.git.repository import GitRepository
34 34 from rhodecode.lib.vcs.nodes import FileNode
35 35 from rhodecode.tests import GIT_REPO
36 from rhodecode.tests.other.vcs_operations import Command
36 from rhodecode.tests.vcs_operations import Command
37 37 from .test_vcs_operations import _check_proper_clone, _check_proper_git_push
38 38
39 39
40 40 # override rc_web_server_config fixture with custom INI
41 41 @pytest.fixture(scope="module")
42 42 def rc_web_server_config(testini_factory):
43 43 """
44 44 Configuration file used for the fixture `rc_web_server`.
45 45 """
46 46 CUSTOM_PARAMS = [
47 47 {'handler_console': {'level': 'DEBUG'}},
48 48 ]
49 49 return testini_factory(CUSTOM_PARAMS)
50 50
51 51
52 52 def test_git_clone_with_small_push_buffer(backend_git, rc_web_server, tmpdir):
53 53 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
54 54 cmd = Command('/tmp')
55 55 stdout, stderr = cmd.execute(
56 56 'git -c http.postBuffer=1024 clone', clone_url, tmpdir.strpath)
57 57 _check_proper_clone(stdout, stderr, 'git')
58 58 cmd.assert_returncode_success()
59 59
60 60
61 61 def test_git_push_with_small_push_buffer(backend_git, rc_web_server, tmpdir):
62 62 empty_repo = backend_git.create_repo()
63 63
64 64 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
65 65
66 66 cmd = Command(tmpdir.strpath)
67 67 cmd.execute('git clone', clone_url)
68 68
69 69 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
70 70 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
71 71 repo.in_memory_commit.commit(
72 72 message='Commit on branch Master',
73 73 author='Automatic test',
74 74 branch='master')
75 75
76 76 repo_cmd = Command(repo.path)
77 77 stdout, stderr = repo_cmd.execute(
78 78 'git -c http.postBuffer=1024 push --verbose origin master')
79 79 _check_proper_git_push(stdout, stderr, branch='master')
@@ -1,682 +1,460 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30
31 import os
32 31 import time
33 32
34 33 import pytest
35 34
36 from rhodecode.lib.vcs.backends.git.repository import GitRepository
37 from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
38 from rhodecode.lib.vcs.nodes import FileNode
39 35 from rhodecode.model.auth_token import AuthTokenModel
40 36 from rhodecode.model.db import Repository, UserIpMap, CacheKey
41 37 from rhodecode.model.meta import Session
42 38 from rhodecode.model.repo import RepoModel
43 39 from rhodecode.model.user import UserModel
44 40 from rhodecode.tests import (GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN)
45 41
46 from rhodecode.tests.other.vcs_operations import (
42 from rhodecode.tests.vcs_operations import (
47 43 Command, _check_proper_clone, _check_proper_git_push,
48 _check_proper_hg_push, _add_files_and_push,
49 HG_REPO_WITH_GROUP, GIT_REPO_WITH_GROUP)
50
51
52 @pytest.fixture(scope="session")
53 def vcs_server_config_override():
54 return ({'server:main': {'workers': 2}},)
44 _add_files_and_push, HG_REPO_WITH_GROUP, GIT_REPO_WITH_GROUP)
55 45
56 46
57 47 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
58 48 class TestVCSOperations(object):
59 49
60 50 def test_clone_hg_repo_by_admin(self, rc_web_server, tmpdir):
61 51 clone_url = rc_web_server.repo_clone_url(HG_REPO)
62 52 stdout, stderr = Command('/tmp').execute(
63 53 'hg clone', clone_url, tmpdir.strpath)
64 54 _check_proper_clone(stdout, stderr, 'hg')
65 55
66 56 def test_clone_git_repo_by_admin(self, rc_web_server, tmpdir):
67 57 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
68 58 cmd = Command('/tmp')
69 59 stdout, stderr = cmd.execute('git clone', clone_url, tmpdir.strpath)
70 60 _check_proper_clone(stdout, stderr, 'git')
71 61 cmd.assert_returncode_success()
72 62
73 63 def test_clone_git_repo_by_admin_with_git_suffix(self, rc_web_server, tmpdir):
74 64 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
75 65 cmd = Command('/tmp')
76 66 stdout, stderr = cmd.execute('git clone', clone_url+".git", tmpdir.strpath)
77 67 _check_proper_clone(stdout, stderr, 'git')
78 68 cmd.assert_returncode_success()
79 69
80 70 def test_clone_hg_repo_by_id_by_admin(self, rc_web_server, tmpdir):
81 71 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
82 72 clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
83 73 stdout, stderr = Command('/tmp').execute(
84 74 'hg clone', clone_url, tmpdir.strpath)
85 75 _check_proper_clone(stdout, stderr, 'hg')
86 76
87 77 def test_clone_git_repo_by_id_by_admin(self, rc_web_server, tmpdir):
88 78 repo_id = Repository.get_by_repo_name(GIT_REPO).repo_id
89 79 clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
90 80 cmd = Command('/tmp')
91 81 stdout, stderr = cmd.execute('git clone', clone_url, tmpdir.strpath)
92 82 _check_proper_clone(stdout, stderr, 'git')
93 83 cmd.assert_returncode_success()
94 84
95 85 def test_clone_hg_repo_with_group_by_admin(self, rc_web_server, tmpdir):
96 86 clone_url = rc_web_server.repo_clone_url(HG_REPO_WITH_GROUP)
97 87 stdout, stderr = Command('/tmp').execute(
98 88 'hg clone', clone_url, tmpdir.strpath)
99 89 _check_proper_clone(stdout, stderr, 'hg')
100 90
101 91 def test_clone_git_repo_with_group_by_admin(self, rc_web_server, tmpdir):
102 92 clone_url = rc_web_server.repo_clone_url(GIT_REPO_WITH_GROUP)
103 93 cmd = Command('/tmp')
104 94 stdout, stderr = cmd.execute('git clone', clone_url, tmpdir.strpath)
105 95 _check_proper_clone(stdout, stderr, 'git')
106 96 cmd.assert_returncode_success()
107 97
108 98 def test_clone_git_repo_shallow_by_admin(self, rc_web_server, tmpdir):
109 99 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
110 100 cmd = Command('/tmp')
111 101 stdout, stderr = cmd.execute(
112 102 'git clone --depth=1', clone_url, tmpdir.strpath)
113 103
114 104 assert '' == stdout
115 105 assert 'Cloning into' in stderr
116 106 cmd.assert_returncode_success()
117 107
118 108 def test_clone_wrong_credentials_hg(self, rc_web_server, tmpdir):
119 109 clone_url = rc_web_server.repo_clone_url(HG_REPO, passwd='bad!')
120 110 stdout, stderr = Command('/tmp').execute(
121 111 'hg clone', clone_url, tmpdir.strpath)
122 112 assert 'abort: authorization failed' in stderr
123 113
124 114 def test_clone_wrong_credentials_git(self, rc_web_server, tmpdir):
125 115 clone_url = rc_web_server.repo_clone_url(GIT_REPO, passwd='bad!')
126 116 stdout, stderr = Command('/tmp').execute(
127 117 'git clone', clone_url, tmpdir.strpath)
128 118 assert 'fatal: Authentication failed' in stderr
129 119
130 120 def test_clone_git_dir_as_hg(self, rc_web_server, tmpdir):
131 121 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
132 122 stdout, stderr = Command('/tmp').execute(
133 123 'hg clone', clone_url, tmpdir.strpath)
134 124 assert 'HTTP Error 404: Not Found' in stderr
135 125
136 126 def test_clone_hg_repo_as_git(self, rc_web_server, tmpdir):
137 127 clone_url = rc_web_server.repo_clone_url(HG_REPO)
138 128 stdout, stderr = Command('/tmp').execute(
139 129 'git clone', clone_url, tmpdir.strpath)
140 130 assert 'not found' in stderr
141 131
142 132 def test_clone_non_existing_path_hg(self, rc_web_server, tmpdir):
143 133 clone_url = rc_web_server.repo_clone_url('trololo')
144 134 stdout, stderr = Command('/tmp').execute(
145 135 'hg clone', clone_url, tmpdir.strpath)
146 136 assert 'HTTP Error 404: Not Found' in stderr
147 137
148 138 def test_clone_non_existing_path_git(self, rc_web_server, tmpdir):
149 139 clone_url = rc_web_server.repo_clone_url('trololo')
150 140 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
151 141 assert 'not found' in stderr
152 142
153 143 def test_clone_existing_path_hg_not_in_database(
154 144 self, rc_web_server, tmpdir, fs_repo_only):
155 145
156 146 db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
157 147 clone_url = rc_web_server.repo_clone_url(db_name)
158 148 stdout, stderr = Command('/tmp').execute(
159 149 'hg clone', clone_url, tmpdir.strpath)
160 150 assert 'HTTP Error 404: Not Found' in stderr
161 151
162 152 def test_clone_existing_path_git_not_in_database(
163 153 self, rc_web_server, tmpdir, fs_repo_only):
164 154 db_name = fs_repo_only('not-in-db-git', repo_type='git')
165 155 clone_url = rc_web_server.repo_clone_url(db_name)
166 156 stdout, stderr = Command('/tmp').execute(
167 157 'git clone', clone_url, tmpdir.strpath)
168 158 assert 'not found' in stderr
169 159
170 160 def test_clone_existing_path_hg_not_in_database_different_scm(
171 161 self, rc_web_server, tmpdir, fs_repo_only):
172 162 db_name = fs_repo_only('not-in-db-git', repo_type='git')
173 163 clone_url = rc_web_server.repo_clone_url(db_name)
174 164 stdout, stderr = Command('/tmp').execute(
175 165 'hg clone', clone_url, tmpdir.strpath)
176 166 assert 'HTTP Error 404: Not Found' in stderr
177 167
178 168 def test_clone_existing_path_git_not_in_database_different_scm(
179 169 self, rc_web_server, tmpdir, fs_repo_only):
180 170 db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
181 171 clone_url = rc_web_server.repo_clone_url(db_name)
182 172 stdout, stderr = Command('/tmp').execute(
183 173 'git clone', clone_url, tmpdir.strpath)
184 174 assert 'not found' in stderr
185 175
186 176 def test_clone_non_existing_store_path_hg(self, rc_web_server, tmpdir, user_util):
187 177 repo = user_util.create_repo()
188 178 clone_url = rc_web_server.repo_clone_url(repo.repo_name)
189 179
190 180 # Damage repo by removing it's folder
191 181 RepoModel()._delete_filesystem_repo(repo)
192 182
193 183 stdout, stderr = Command('/tmp').execute(
194 184 'hg clone', clone_url, tmpdir.strpath)
195 185 assert 'HTTP Error 404: Not Found' in stderr
196 186
197 187 def test_clone_non_existing_store_path_git(self, rc_web_server, tmpdir, user_util):
198 188 repo = user_util.create_repo(repo_type='git')
199 189 clone_url = rc_web_server.repo_clone_url(repo.repo_name)
200 190
201 191 # Damage repo by removing it's folder
202 192 RepoModel()._delete_filesystem_repo(repo)
203 193
204 194 stdout, stderr = Command('/tmp').execute(
205 195 'git clone', clone_url, tmpdir.strpath)
206 196 assert 'not found' in stderr
207 197
208 198 def test_push_new_file_hg(self, rc_web_server, tmpdir):
209 199 clone_url = rc_web_server.repo_clone_url(HG_REPO)
210 200 stdout, stderr = Command('/tmp').execute(
211 201 'hg clone', clone_url, tmpdir.strpath)
212 202
213 203 stdout, stderr = _add_files_and_push(
214 204 'hg', tmpdir.strpath, clone_url=clone_url)
215 205
216 206 assert 'pushing to' in stdout
217 207 assert 'size summary' in stdout
218 208
219 209 def test_push_new_file_git(self, rc_web_server, tmpdir):
220 210 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
221 211 stdout, stderr = Command('/tmp').execute(
222 212 'git clone', clone_url, tmpdir.strpath)
223 213
224 214 # commit some stuff into this repo
225 215 stdout, stderr = _add_files_and_push(
226 216 'git', tmpdir.strpath, clone_url=clone_url)
227 217
228 218 _check_proper_git_push(stdout, stderr)
229 219
230 220 def test_push_invalidates_cache_hg(self, rc_web_server, tmpdir):
231 221 key = CacheKey.query().filter(CacheKey.cache_key == HG_REPO).scalar()
232 222 if not key:
233 223 key = CacheKey(HG_REPO, HG_REPO)
234 224
235 225 key.cache_active = True
236 226 Session().add(key)
237 227 Session().commit()
238 228
239 229 clone_url = rc_web_server.repo_clone_url(HG_REPO)
240 230 stdout, stderr = Command('/tmp').execute(
241 231 'hg clone', clone_url, tmpdir.strpath)
242 232
243 233 stdout, stderr = _add_files_and_push(
244 234 'hg', tmpdir.strpath, clone_url=clone_url, files_no=1)
245 235
246 236 key = CacheKey.query().filter(CacheKey.cache_key == HG_REPO).one()
247 237 assert key.cache_active is False
248 238
249 239 def test_push_invalidates_cache_git(self, rc_web_server, tmpdir):
250 240 key = CacheKey.query().filter(CacheKey.cache_key == GIT_REPO).scalar()
251 241 if not key:
252 242 key = CacheKey(GIT_REPO, GIT_REPO)
253 243
254 244 key.cache_active = True
255 245 Session().add(key)
256 246 Session().commit()
257 247
258 248 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
259 249 stdout, stderr = Command('/tmp').execute(
260 250 'git clone', clone_url, tmpdir.strpath)
261 251
262 252 # commit some stuff into this repo
263 253 stdout, stderr = _add_files_and_push(
264 254 'git', tmpdir.strpath, clone_url=clone_url, files_no=1)
265 255 _check_proper_git_push(stdout, stderr)
266 256
267 257 key = CacheKey.query().filter(CacheKey.cache_key == GIT_REPO).one()
268 258
269 259 assert key.cache_active is False
270 260
271 261 def test_push_wrong_credentials_hg(self, rc_web_server, tmpdir):
272 262 clone_url = rc_web_server.repo_clone_url(HG_REPO)
273 263 stdout, stderr = Command('/tmp').execute(
274 264 'hg clone', clone_url, tmpdir.strpath)
275 265
276 266 push_url = rc_web_server.repo_clone_url(
277 267 HG_REPO, user='bad', passwd='name')
278 268 stdout, stderr = _add_files_and_push(
279 269 'hg', tmpdir.strpath, clone_url=push_url)
280 270
281 271 assert 'abort: authorization failed' in stderr
282 272
283 273 def test_push_wrong_credentials_git(self, rc_web_server, tmpdir):
284 274 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
285 275 stdout, stderr = Command('/tmp').execute(
286 276 'git clone', clone_url, tmpdir.strpath)
287 277
288 278 push_url = rc_web_server.repo_clone_url(
289 279 GIT_REPO, user='bad', passwd='name')
290 280 stdout, stderr = _add_files_and_push(
291 281 'git', tmpdir.strpath, clone_url=push_url)
292 282
293 283 assert 'fatal: Authentication failed' in stderr
294 284
295 285 def test_push_back_to_wrong_url_hg(self, rc_web_server, tmpdir):
296 286 clone_url = rc_web_server.repo_clone_url(HG_REPO)
297 287 stdout, stderr = Command('/tmp').execute(
298 288 'hg clone', clone_url, tmpdir.strpath)
299 289
300 290 stdout, stderr = _add_files_and_push(
301 291 'hg', tmpdir.strpath,
302 292 clone_url=rc_web_server.repo_clone_url('not-existing'))
303 293
304 294 assert 'HTTP Error 404: Not Found' in stderr
305 295
306 296 def test_push_back_to_wrong_url_git(self, rc_web_server, tmpdir):
307 297 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
308 298 stdout, stderr = Command('/tmp').execute(
309 299 'git clone', clone_url, tmpdir.strpath)
310 300
311 301 stdout, stderr = _add_files_and_push(
312 302 'git', tmpdir.strpath,
313 303 clone_url=rc_web_server.repo_clone_url('not-existing'))
314 304
315 305 assert 'not found' in stderr
316 306
317 307 def test_ip_restriction_hg(self, rc_web_server, tmpdir):
318 308 user_model = UserModel()
319 309 try:
320 310 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
321 311 Session().commit()
322 312 time.sleep(2)
323 313 clone_url = rc_web_server.repo_clone_url(HG_REPO)
324 314 stdout, stderr = Command('/tmp').execute(
325 315 'hg clone', clone_url, tmpdir.strpath)
326 316 assert 'abort: HTTP Error 403: Forbidden' in stderr
327 317 finally:
328 318 # release IP restrictions
329 319 for ip in UserIpMap.getAll():
330 320 UserIpMap.delete(ip.ip_id)
331 321 Session().commit()
332 322
333 323 time.sleep(2)
334 324
335 325 stdout, stderr = Command('/tmp').execute(
336 326 'hg clone', clone_url, tmpdir.strpath)
337 327 _check_proper_clone(stdout, stderr, 'hg')
338 328
339 329 def test_ip_restriction_git(self, rc_web_server, tmpdir):
340 330 user_model = UserModel()
341 331 try:
342 332 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
343 333 Session().commit()
344 334 time.sleep(2)
345 335 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
346 336 stdout, stderr = Command('/tmp').execute(
347 337 'git clone', clone_url, tmpdir.strpath)
348 338 msg = "The requested URL returned error: 403"
349 339 assert msg in stderr
350 340 finally:
351 341 # release IP restrictions
352 342 for ip in UserIpMap.getAll():
353 343 UserIpMap.delete(ip.ip_id)
354 344 Session().commit()
355 345
356 346 time.sleep(2)
357 347
358 348 cmd = Command('/tmp')
359 349 stdout, stderr = cmd.execute('git clone', clone_url, tmpdir.strpath)
360 350 cmd.assert_returncode_success()
361 351 _check_proper_clone(stdout, stderr, 'git')
362 352
363 353 def test_clone_by_auth_token(
364 354 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
365 355 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
366 356 'egg:rhodecode-enterprise-ce#rhodecode'])
367 357
368 358 user = user_util.create_user()
369 359 token = user.auth_tokens[1]
370 360
371 361 clone_url = rc_web_server.repo_clone_url(
372 362 HG_REPO, user=user.username, passwd=token)
373 363
374 364 stdout, stderr = Command('/tmp').execute(
375 365 'hg clone', clone_url, tmpdir.strpath)
376 366 _check_proper_clone(stdout, stderr, 'hg')
377 367
378 368 def test_clone_by_auth_token_expired(
379 369 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
380 370 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
381 371 'egg:rhodecode-enterprise-ce#rhodecode'])
382 372
383 373 user = user_util.create_user()
384 374 auth_token = AuthTokenModel().create(
385 375 user.user_id, 'test-token', -10, AuthTokenModel.cls.ROLE_VCS)
386 376 token = auth_token.api_key
387 377
388 378 clone_url = rc_web_server.repo_clone_url(
389 379 HG_REPO, user=user.username, passwd=token)
390 380
391 381 stdout, stderr = Command('/tmp').execute(
392 382 'hg clone', clone_url, tmpdir.strpath)
393 383 assert 'abort: authorization failed' in stderr
394 384
395 385 def test_clone_by_auth_token_bad_role(
396 386 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
397 387 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
398 388 'egg:rhodecode-enterprise-ce#rhodecode'])
399 389
400 390 user = user_util.create_user()
401 391 auth_token = AuthTokenModel().create(
402 392 user.user_id, 'test-token', -1, AuthTokenModel.cls.ROLE_API)
403 393 token = auth_token.api_key
404 394
405 395 clone_url = rc_web_server.repo_clone_url(
406 396 HG_REPO, user=user.username, passwd=token)
407 397
408 398 stdout, stderr = Command('/tmp').execute(
409 399 'hg clone', clone_url, tmpdir.strpath)
410 400 assert 'abort: authorization failed' in stderr
411 401
412 402 def test_clone_by_auth_token_user_disabled(
413 403 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
414 404 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
415 405 'egg:rhodecode-enterprise-ce#rhodecode'])
416 406 user = user_util.create_user()
417 407 user.active = False
418 408 Session().add(user)
419 409 Session().commit()
420 410 token = user.auth_tokens[1]
421 411
422 412 clone_url = rc_web_server.repo_clone_url(
423 413 HG_REPO, user=user.username, passwd=token)
424 414
425 415 stdout, stderr = Command('/tmp').execute(
426 416 'hg clone', clone_url, tmpdir.strpath)
427 417 assert 'abort: authorization failed' in stderr
428 418
429 419 def test_clone_by_auth_token_with_scope(
430 420 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
431 421 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
432 422 'egg:rhodecode-enterprise-ce#rhodecode'])
433 423 user = user_util.create_user()
434 424 auth_token = AuthTokenModel().create(
435 425 user.user_id, 'test-token', -1, AuthTokenModel.cls.ROLE_VCS)
436 426 token = auth_token.api_key
437 427
438 428 # manually set scope
439 429 auth_token.repo = Repository.get_by_repo_name(HG_REPO)
440 430 Session().add(auth_token)
441 431 Session().commit()
442 432
443 433 clone_url = rc_web_server.repo_clone_url(
444 434 HG_REPO, user=user.username, passwd=token)
445 435
446 436 stdout, stderr = Command('/tmp').execute(
447 437 'hg clone', clone_url, tmpdir.strpath)
448 438 _check_proper_clone(stdout, stderr, 'hg')
449 439
450 440 def test_clone_by_auth_token_with_wrong_scope(
451 441 self, rc_web_server, tmpdir, user_util, enable_auth_plugins):
452 442 enable_auth_plugins(['egg:rhodecode-enterprise-ce#token',
453 443 'egg:rhodecode-enterprise-ce#rhodecode'])
454 444 user = user_util.create_user()
455 445 auth_token = AuthTokenModel().create(
456 446 user.user_id, 'test-token', -1, AuthTokenModel.cls.ROLE_VCS)
457 447 token = auth_token.api_key
458 448
459 449 # manually set scope
460 450 auth_token.repo = Repository.get_by_repo_name(GIT_REPO)
461 451 Session().add(auth_token)
462 452 Session().commit()
463 453
464 454 clone_url = rc_web_server.repo_clone_url(
465 455 HG_REPO, user=user.username, passwd=token)
466 456
467 457 stdout, stderr = Command('/tmp').execute(
468 458 'hg clone', clone_url, tmpdir.strpath)
469 459 assert 'abort: authorization failed' in stderr
470 460
471
472 @pytest.mark.usefixtures("disable_locking")
473 class TestVCSOperationsSpecial(object):
474
475 def test_git_sets_default_branch_if_not_master(
476 self, backend_git, tmpdir, rc_web_server):
477 empty_repo = backend_git.create_repo()
478 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
479
480 cmd = Command(tmpdir.strpath)
481 cmd.execute('git clone', clone_url)
482
483 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
484 repo.in_memory_commit.add(FileNode('file', content=''))
485 repo.in_memory_commit.commit(
486 message='Commit on branch test',
487 author='Automatic test',
488 branch='test')
489
490 repo_cmd = Command(repo.path)
491 stdout, stderr = repo_cmd.execute('git push --verbose origin test')
492 _check_proper_git_push(
493 stdout, stderr, branch='test', should_set_default_branch=True)
494
495 stdout, stderr = cmd.execute(
496 'git clone', clone_url, empty_repo.repo_name + '-clone')
497 _check_proper_clone(stdout, stderr, 'git')
498
499 # Doing an explicit commit in order to get latest user logs on MySQL
500 Session().commit()
501
502 def test_git_fetches_from_remote_repository_with_annotated_tags(
503 self, backend_git, rc_web_server):
504 # Note: This is a test specific to the git backend. It checks the
505 # integration of fetching from a remote repository which contains
506 # annotated tags.
507
508 # Dulwich shows this specific behavior only when
509 # operating against a remote repository.
510 source_repo = backend_git['annotated-tag']
511 target_vcs_repo = backend_git.create_repo().scm_instance()
512 target_vcs_repo.fetch(rc_web_server.repo_clone_url(source_repo.repo_name))
513
514 def test_git_push_shows_pull_request_refs(self, backend_git, rc_web_server, tmpdir):
515 """
516 test if remote info about refs is visible
517 """
518 empty_repo = backend_git.create_repo()
519
520 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
521
522 cmd = Command(tmpdir.strpath)
523 cmd.execute('git clone', clone_url)
524
525 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
526 repo.in_memory_commit.add(FileNode('readme.md', content='## Hello'))
527 repo.in_memory_commit.commit(
528 message='Commit on branch Master',
529 author='Automatic test',
530 branch='master')
531
532 repo_cmd = Command(repo.path)
533 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
534 _check_proper_git_push(stdout, stderr, branch='master')
535
536 ref = '{}/{}/pull-request/new?branch=master'.format(
537 rc_web_server.host_url(), empty_repo.repo_name)
538 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
539 assert 'remote: RhodeCode: push completed' in stderr
540
541 # push on the same branch
542 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
543 repo.in_memory_commit.add(FileNode('setup.py', content='print\n'))
544 repo.in_memory_commit.commit(
545 message='Commit2 on branch Master',
546 author='Automatic test2',
547 branch='master')
548
549 repo_cmd = Command(repo.path)
550 stdout, stderr = repo_cmd.execute('git push --verbose origin master')
551 _check_proper_git_push(stdout, stderr, branch='master')
552
553 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
554 assert 'remote: RhodeCode: push completed' in stderr
555
556 # new Branch
557 repo = GitRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
558 repo.in_memory_commit.add(FileNode('feature1.py', content='## Hello world'))
559 repo.in_memory_commit.commit(
560 message='Commit on branch feature',
561 author='Automatic test',
562 branch='feature')
563
564 repo_cmd = Command(repo.path)
565 stdout, stderr = repo_cmd.execute('git push --verbose origin feature')
566 _check_proper_git_push(stdout, stderr, branch='feature')
567
568 ref = '{}/{}/pull-request/new?branch=feature'.format(
569 rc_web_server.host_url(), empty_repo.repo_name)
570 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stderr
571 assert 'remote: RhodeCode: push completed' in stderr
572
573 def test_hg_push_shows_pull_request_refs(self, backend_hg, rc_web_server, tmpdir):
574 empty_repo = backend_hg.create_repo()
575
576 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
577
578 cmd = Command(tmpdir.strpath)
579 cmd.execute('hg clone', clone_url)
580
581 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
582 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
583 repo.in_memory_commit.commit(
584 message=u'Commit on branch default',
585 author=u'Automatic test',
586 branch='default')
587
588 repo_cmd = Command(repo.path)
589 repo_cmd.execute('hg checkout default')
590
591 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
592 _check_proper_hg_push(stdout, stderr, branch='default')
593
594 ref = '{}/{}/pull-request/new?branch=default'.format(
595 rc_web_server.host_url(), empty_repo.repo_name)
596 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
597 assert 'remote: RhodeCode: push completed' in stdout
598
599 # push on the same branch
600 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
601 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
602 repo.in_memory_commit.commit(
603 message=u'Commit2 on branch default',
604 author=u'Automatic test2',
605 branch=u'default')
606
607 repo_cmd = Command(repo.path)
608 repo_cmd.execute('hg checkout default')
609
610 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
611 _check_proper_hg_push(stdout, stderr, branch='default')
612
613 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
614 assert 'remote: RhodeCode: push completed' in stdout
615
616 # new Branch
617 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
618 repo.in_memory_commit.add(FileNode(u'feature1.py', content=u'## Hello world'))
619 repo.in_memory_commit.commit(
620 message=u'Commit on branch feature',
621 author=u'Automatic test',
622 branch=u'feature')
623
624 repo_cmd = Command(repo.path)
625 repo_cmd.execute('hg checkout feature')
626
627 stdout, stderr = repo_cmd.execute('hg push --new-branch --verbose', clone_url)
628 _check_proper_hg_push(stdout, stderr, branch='feature')
629
630 ref = '{}/{}/pull-request/new?branch=feature'.format(
631 rc_web_server.host_url(), empty_repo.repo_name)
632 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
633 assert 'remote: RhodeCode: push completed' in stdout
634
635 def test_hg_push_shows_pull_request_refs_book(self, backend_hg, rc_web_server, tmpdir):
636 empty_repo = backend_hg.create_repo()
637
638 clone_url = rc_web_server.repo_clone_url(empty_repo.repo_name)
639
640 cmd = Command(tmpdir.strpath)
641 cmd.execute('hg clone', clone_url)
642
643 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
644 repo.in_memory_commit.add(FileNode(u'readme.md', content=u'## Hello'))
645 repo.in_memory_commit.commit(
646 message=u'Commit on branch default',
647 author=u'Automatic test',
648 branch='default')
649
650 repo_cmd = Command(repo.path)
651 repo_cmd.execute('hg checkout default')
652
653 stdout, stderr = repo_cmd.execute('hg push --verbose', clone_url)
654 _check_proper_hg_push(stdout, stderr, branch='default')
655
656 ref = '{}/{}/pull-request/new?branch=default'.format(
657 rc_web_server.host_url(), empty_repo.repo_name)
658 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
659 assert 'remote: RhodeCode: push completed' in stdout
660
661 # add bookmark
662 repo = MercurialRepository(os.path.join(tmpdir.strpath, empty_repo.repo_name))
663 repo.in_memory_commit.add(FileNode(u'setup.py', content=u'print\n'))
664 repo.in_memory_commit.commit(
665 message=u'Commit2 on branch default',
666 author=u'Automatic test2',
667 branch=u'default')
668
669 repo_cmd = Command(repo.path)
670 repo_cmd.execute('hg checkout default')
671 repo_cmd.execute('hg bookmark feature2')
672 stdout, stderr = repo_cmd.execute('hg push -B feature2 --verbose', clone_url)
673 _check_proper_hg_push(stdout, stderr, branch='default')
674
675 ref = '{}/{}/pull-request/new?branch=default'.format(
676 rc_web_server.host_url(), empty_repo.repo_name)
677 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
678 ref = '{}/{}/pull-request/new?bookmark=feature2'.format(
679 rc_web_server.host_url(), empty_repo.repo_name)
680 assert 'remote: RhodeCode: open pull request link: {}'.format(ref) in stdout
681 assert 'remote: RhodeCode: push completed' in stdout
682 assert 'exporting bookmark feature2' in stdout
@@ -1,222 +1,222 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30
31 31 import pytest
32 32
33 33 from rhodecode.model.db import User, Repository
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36
37 37 from rhodecode.tests import (
38 38 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN,
39 39 TEST_USER_REGULAR_PASS)
40 from rhodecode.tests.other.vcs_operations import (
40 from rhodecode.tests.vcs_operations import (
41 41 Command, _check_proper_clone, _check_proper_git_push, _add_files_and_push)
42 42
43 43
44 44 # override rc_web_server_config fixture with custom INI
45 45 @pytest.fixture(scope='module')
46 46 def rc_web_server_config(testini_factory):
47 47 CUSTOM_PARAMS = [
48 48 {'app:main': {'lock_ret_code': '423'}},
49 49 ]
50 50 return testini_factory(CUSTOM_PARAMS)
51 51
52 52
53 53 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
54 54 class TestVCSOperationsOnCustomIniConfig(object):
55 55
56 56 def test_clone_and_create_lock_hg(self, rc_web_server, tmpdir):
57 57 # enable locking
58 58 r = Repository.get_by_repo_name(HG_REPO)
59 59 r.enable_locking = True
60 60 Session().add(r)
61 61 Session().commit()
62 62 # clone
63 63 clone_url = rc_web_server.repo_clone_url(HG_REPO)
64 64 stdout, stderr = Command('/tmp').execute(
65 65 'hg clone', clone_url, tmpdir.strpath)
66 66
67 67 # check if lock was made
68 68 r = Repository.get_by_repo_name(HG_REPO)
69 69 assert r.locked[0] == User.get_by_username(
70 70 TEST_USER_ADMIN_LOGIN).user_id
71 71
72 72 def test_clone_and_create_lock_git(self, rc_web_server, tmpdir):
73 73 # enable locking
74 74 r = Repository.get_by_repo_name(GIT_REPO)
75 75 r.enable_locking = True
76 76 Session().add(r)
77 77 Session().commit()
78 78 # clone
79 79 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
80 80 stdout, stderr = Command('/tmp').execute(
81 81 'git clone', clone_url, tmpdir.strpath)
82 82
83 83 # check if lock was made
84 84 r = Repository.get_by_repo_name(GIT_REPO)
85 85 assert r.locked[0] == User.get_by_username(
86 86 TEST_USER_ADMIN_LOGIN).user_id
87 87
88 88 def test_clone_after_repo_was_locked_hg(self, rc_web_server, tmpdir):
89 89 # lock repo
90 90 r = Repository.get_by_repo_name(HG_REPO)
91 91 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
92 92 # pull fails since repo is locked
93 93 clone_url = rc_web_server.repo_clone_url(HG_REPO)
94 94 stdout, stderr = Command('/tmp').execute(
95 95 'hg clone', clone_url, tmpdir.strpath)
96 96 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
97 97 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
98 98 assert msg in stderr
99 99
100 100 def test_clone_after_repo_was_locked_git(self, rc_web_server, tmpdir):
101 101 # lock repo
102 102 r = Repository.get_by_repo_name(GIT_REPO)
103 103 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
104 104 # pull fails since repo is locked
105 105 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
106 106 stdout, stderr = Command('/tmp').execute(
107 107 'git clone', clone_url, tmpdir.strpath)
108 108
109 109 lock_msg = (
110 110 'remote: ERROR: Repository `vcs_test_git` locked by user ' +
111 111 '`test_admin`. Reason:`lock_auto`')
112 112 assert lock_msg in stderr
113 113 assert 'remote: Pre pull hook failed: aborting' in stderr
114 114 assert 'fatal: remote did not send all necessary objects' in stderr
115 115
116 116 def test_push_on_locked_repo_by_other_user_hg(self, rc_web_server, tmpdir):
117 117 clone_url = rc_web_server.repo_clone_url(HG_REPO)
118 118 stdout, stderr = Command('/tmp').execute(
119 119 'hg clone', clone_url, tmpdir.strpath)
120 120
121 121 # lock repo
122 122 r = Repository.get_by_repo_name(HG_REPO)
123 123 # let this user actually push !
124 124 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
125 125 perm='repository.write')
126 126 Session().commit()
127 127 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
128 128
129 129 # push fails repo is locked by other user !
130 130 push_url = rc_web_server.repo_clone_url(
131 131 HG_REPO,
132 132 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
133 133 stdout, stderr = _add_files_and_push(
134 134 'hg', tmpdir.strpath, clone_url=push_url)
135 135 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
136 136 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
137 137 assert msg in stderr
138 138
139 139 def test_push_on_locked_repo_by_other_user_git(
140 140 self, rc_web_server, tmpdir):
141 141 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
142 142 stdout, stderr = Command('/tmp').execute(
143 143 'git clone', clone_url, tmpdir.strpath)
144 144
145 145 # lock repo
146 146 r = Repository.get_by_repo_name(GIT_REPO)
147 147 # let this user actually push !
148 148 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
149 149 perm='repository.write')
150 150 Session().commit()
151 151 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
152 152
153 153 # push fails repo is locked by other user!
154 154 push_url = rc_web_server.repo_clone_url(
155 155 GIT_REPO,
156 156 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
157 157 stdout, stderr = _add_files_and_push(
158 158 'git', tmpdir.strpath, clone_url=push_url)
159 159
160 160 err = 'Repository `%s` locked by user `%s`' % (
161 161 GIT_REPO, TEST_USER_ADMIN_LOGIN)
162 162 # err = 'RPC failed; result=22, HTTP code = 423'
163 163 assert err in stderr
164 164
165 165 def test_push_unlocks_repository_hg(self, rc_web_server, tmpdir):
166 166 # enable locking
167 167 r = Repository.get_by_repo_name(HG_REPO)
168 168 r.enable_locking = True
169 169 Session().add(r)
170 170 Session().commit()
171 171
172 172 clone_url = rc_web_server.repo_clone_url(HG_REPO)
173 173 stdout, stderr = Command('/tmp').execute(
174 174 'hg clone', clone_url, tmpdir.strpath)
175 175 _check_proper_clone(stdout, stderr, 'hg')
176 176
177 177 # check for lock repo after clone
178 178 r = Repository.get_by_repo_name(HG_REPO)
179 179 uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
180 180 assert r.locked[0] == uid
181 181
182 182 # push is ok and repo is now unlocked
183 183 stdout, stderr = _add_files_and_push(
184 184 'hg', tmpdir.strpath, clone_url=clone_url)
185 185 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
186 186 # we need to cleanup the Session Here !
187 187 Session.remove()
188 188 r = Repository.get_by_repo_name(HG_REPO)
189 189 assert r.locked == [None, None, None]
190 190
191 191 def test_push_unlocks_repository_git(self, rc_web_server, tmpdir):
192 192
193 193 # Note: Did a first debugging session. Seems that
194 194 # Repository.get_locking_state is called twice. The second call
195 195 # has the action "pull" and does not reset the lock.
196 196
197 197 # enable locking
198 198 r = Repository.get_by_repo_name(GIT_REPO)
199 199 r.enable_locking = True
200 200 Session().add(r)
201 201 Session().commit()
202 202
203 203 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
204 204 stdout, stderr = Command('/tmp').execute(
205 205 'git clone', clone_url, tmpdir.strpath)
206 206 _check_proper_clone(stdout, stderr, 'git')
207 207
208 208 # check for lock repo after clone
209 209 r = Repository.get_by_repo_name(GIT_REPO)
210 210 assert r.locked[0] == User.get_by_username(
211 211 TEST_USER_ADMIN_LOGIN).user_id
212 212
213 213 # push is ok and repo is now unlocked
214 214 stdout, stderr = _add_files_and_push(
215 215 'git', tmpdir.strpath, clone_url=clone_url)
216 216 _check_proper_git_push(stdout, stderr)
217 217
218 218 # assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
219 219 # we need to cleanup the Session Here !
220 220 Session.remove()
221 221 r = Repository.get_by_repo_name(GIT_REPO)
222 222 assert r.locked == [None, None, None]
@@ -1,130 +1,130 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30
31 31 import pytest
32 32
33 33 from rhodecode.model.db import User, Repository
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36
37 37 from rhodecode.tests import (
38 38 GIT_REPO, HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN,
39 39 TEST_USER_REGULAR_PASS)
40 from rhodecode.tests.other.vcs_operations import Command, _add_files_and_push
40 from rhodecode.tests.vcs_operations import Command, _add_files_and_push
41 41
42 42
43 43 # override rc_web_server_config fixture with custom INI
44 44 @pytest.fixture(scope='module')
45 45 def rc_web_server_config(testini_factory):
46 46 CUSTOM_PARAMS = [
47 47 {'app:main': {'lock_ret_code': '400'}},
48 48 ]
49 49 return testini_factory(CUSTOM_PARAMS)
50 50
51 51
52 52 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
53 53 class TestVCSOperationsOnCustomIniConfig(object):
54 54
55 55 def test_clone_after_repo_was_locked_hg(self, rc_web_server, tmpdir):
56 56 # lock repo
57 57 r = Repository.get_by_repo_name(HG_REPO)
58 58 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
59 59 # pull fails since repo is locked
60 60 clone_url = rc_web_server.repo_clone_url(HG_REPO)
61 61 stdout, stderr = Command('/tmp').execute(
62 62 'hg clone', clone_url, tmpdir.strpath)
63 63 msg = ("""abort: HTTP Error 400: Repository `%s` locked by user `%s`"""
64 64 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
65 65 assert msg in stderr
66 66
67 67 def test_clone_after_repo_was_locked_git(self, rc_web_server, tmpdir):
68 68 # lock repo
69 69 r = Repository.get_by_repo_name(GIT_REPO)
70 70 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
71 71 # pull fails since repo is locked
72 72 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
73 73 stdout, stderr = Command('/tmp').execute(
74 74 'git clone', clone_url, tmpdir.strpath)
75 75
76 76 lock_msg = (
77 77 'remote: ERROR: Repository `vcs_test_git` locked by user ' +
78 78 '`test_admin`. Reason:`lock_auto`')
79 79 assert lock_msg in stderr
80 80 assert 'remote: Pre pull hook failed: aborting' in stderr
81 81 assert 'fatal: remote did not send all necessary objects' in stderr
82 82
83 83 def test_push_on_locked_repo_by_other_user_hg(self, rc_web_server, tmpdir):
84 84 clone_url = rc_web_server.repo_clone_url(HG_REPO)
85 85 stdout, stderr = Command('/tmp').execute(
86 86 'hg clone', clone_url, tmpdir.strpath)
87 87
88 88 # lock repo
89 89 r = Repository.get_by_repo_name(HG_REPO)
90 90 # let this user actually push !
91 91 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
92 92 perm='repository.write')
93 93 Session().commit()
94 94 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
95 95
96 96 # push fails repo is locked by other user !
97 97 push_url = rc_web_server.repo_clone_url(
98 98 HG_REPO,
99 99 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
100 100 stdout, stderr = _add_files_and_push(
101 101 'hg', tmpdir.strpath, clone_url=push_url)
102 102 msg = ("""abort: HTTP Error 400: Repository `%s` locked by user `%s`"""
103 103 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
104 104 assert msg in stderr
105 105
106 106 def test_push_on_locked_repo_by_other_user_git(
107 107 self, rc_web_server, tmpdir):
108 108 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
109 109 stdout, stderr = Command('/tmp').execute(
110 110 'git clone', clone_url, tmpdir.strpath)
111 111
112 112 # lock repo
113 113 r = Repository.get_by_repo_name(GIT_REPO)
114 114 # let this user actually push !
115 115 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
116 116 perm='repository.write')
117 117 Session().commit()
118 118 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
119 119
120 120 # push fails repo is locked by other user!
121 121 push_url = rc_web_server.repo_clone_url(
122 122 GIT_REPO,
123 123 user=TEST_USER_REGULAR_LOGIN, passwd=TEST_USER_REGULAR_PASS)
124 124 stdout, stderr = _add_files_and_push(
125 125 'git', tmpdir.strpath, clone_url=push_url)
126 126
127 127 err = 'Repository `%s` locked by user `%s`' % (
128 128 GIT_REPO, TEST_USER_ADMIN_LOGIN)
129 129
130 130 assert err in stderr
@@ -1,146 +1,141 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Test suite for making push/pull operations, on specially modified INI files
23 23
24 24 .. important::
25 25
26 26 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
27 27 to redirect things to stderr instead of stdout.
28 28 """
29 29
30 30 import pytest
31 31 import requests
32 32
33 33 from rhodecode import events
34 34 from rhodecode.model.db import Integration
35 35 from rhodecode.model.integration import IntegrationModel
36 36 from rhodecode.model.meta import Session
37 37
38 38 from rhodecode.tests import GIT_REPO, HG_REPO
39 from rhodecode.tests.other.vcs_operations import Command, _add_files_and_push
39 from rhodecode.tests.vcs_operations import Command, _add_files_and_push
40 40 from rhodecode.integrations.types.webhook import WebhookIntegrationType
41 41
42 42
43 43 def check_connection():
44 44 try:
45 45 response = requests.get('http://httpbin.org')
46 46 return response.status_code == 200
47 47 except Exception as e:
48 48 print(e)
49 49
50 50 return False
51 51
52 52
53 53 connection_available = pytest.mark.skipif(
54 54 not check_connection(), reason="No outside internet connection available")
55 55
56 56
57 57 @pytest.fixture
58 58 def enable_webhook_push_integration(request):
59 59 integration = Integration()
60 60 integration.integration_type = WebhookIntegrationType.key
61 61 Session().add(integration)
62 62
63 63 settings = dict(
64 64 url='http://httpbin.org',
65 65 secret_token='secret',
66 66 username=None,
67 67 password=None,
68 68 custom_header_key=None,
69 69 custom_header_val=None,
70 70 method_type='get',
71 71 events=[events.RepoPushEvent.name],
72 72 log_data=True
73 73 )
74 74
75 75 IntegrationModel().update_integration(
76 76 integration,
77 77 name='IntegrationWebhookTest',
78 78 enabled=True,
79 79 settings=settings,
80 80 repo=None,
81 81 repo_group=None,
82 82 child_repos_only=False,
83 83 )
84 84 Session().commit()
85 85 integration_id = integration.integration_id
86 86
87 87 @request.addfinalizer
88 88 def cleanup():
89 89 integration = Integration.get(integration_id)
90 90 Session().delete(integration)
91 91 Session().commit()
92 92
93 93
94 @pytest.fixture(scope="session")
95 def vcs_server_config_override():
96 return ({'server:main': {'workers': 2}},)
97
98
99 94 @pytest.mark.usefixtures(
100 95 "disable_locking", "disable_anonymous_user",
101 96 "enable_webhook_push_integration")
102 97 class TestVCSOperationsOnCustomIniConfig(object):
103 98
104 99 def test_push_tag_with_commit_hg(self, rc_web_server, tmpdir):
105 100 clone_url = rc_web_server.repo_clone_url(HG_REPO)
106 101 stdout, stderr = Command('/tmp').execute(
107 102 'hg clone', clone_url, tmpdir.strpath)
108 103
109 104 push_url = rc_web_server.repo_clone_url(HG_REPO)
110 105 _add_files_and_push(
111 106 'hg', tmpdir.strpath, clone_url=push_url,
112 107 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
113 108
114 109 rc_log = rc_web_server.get_rc_log()
115 110 assert 'ERROR' not in rc_log
116 111 assert "'name': u'v1.0.0'" in rc_log
117 112
118 113 def test_push_tag_with_commit_git(
119 114 self, rc_web_server, tmpdir):
120 115 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
121 116 stdout, stderr = Command('/tmp').execute(
122 117 'git clone', clone_url, tmpdir.strpath)
123 118
124 119 push_url = rc_web_server.repo_clone_url(GIT_REPO)
125 120 _add_files_and_push(
126 121 'git', tmpdir.strpath, clone_url=push_url,
127 122 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
128 123
129 124 rc_log = rc_web_server.get_rc_log()
130 125 assert 'ERROR' not in rc_log
131 126 assert "'name': u'v1.0.0'" in rc_log
132 127
133 128 def test_push_tag_with_no_commit_git(
134 129 self, rc_web_server, tmpdir):
135 130 clone_url = rc_web_server.repo_clone_url(GIT_REPO)
136 131 stdout, stderr = Command('/tmp').execute(
137 132 'git clone', clone_url, tmpdir.strpath)
138 133
139 134 push_url = rc_web_server.repo_clone_url(GIT_REPO)
140 135 _add_files_and_push(
141 136 'git', tmpdir.strpath, clone_url=push_url,
142 137 tags=[{'name': 'v1.0.0', 'commit': 'added tag v1.0.0'}])
143 138
144 139 rc_log = rc_web_server.get_rc_log()
145 140 assert 'ERROR' not in rc_log
146 141 assert "'name': u'v1.0.0'" in rc_log
General Comments 0
You need to be logged in to leave comments. Login now