##// END OF EJS Templates
fix(tests): fixed svn tests cases when interactive prompt was displayed
super-admin -
r5469:e7062d28 default
parent child Browse files
Show More
@@ -1,214 +1,220 b''
1
1
2 # Copyright (C) 2010-2023 RhodeCode GmbH
2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software: you can redistribute it and/or modify
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
5 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
6 # (only), as published by the Free Software Foundation.
7 #
7 #
8 # This program is distributed in the hope that it will be useful,
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
11 # GNU General Public License for more details.
12 #
12 #
13 # You should have received a copy of the GNU Affero General Public License
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
15 #
16 # This program is dual-licensed. If you wish to learn more about the
16 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
19
20 """
20 """
21 Base for test suite for making push/pull operations.
21 Base for test suite for making push/pull operations.
22
22
23 .. important::
23 .. important::
24
24
25 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
25 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
26 to redirect things to stderr instead of stdout.
26 to redirect things to stderr instead of stdout.
27 """
27 """
28
28
29
29
30 import logging
30 import logging
31 import os
31 import os
32 import tempfile
32 import tempfile
33 import subprocess
33 import subprocess
34
34
35 from rhodecode.lib.str_utils import safe_str
35 from rhodecode.lib.str_utils import safe_str
36 from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
36 from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
37
37
38 DEBUG = True
38 DEBUG = True
39 RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
39 RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
40 REPO_GROUP = 'a_repo_group'
40 REPO_GROUP = 'a_repo_group'
41 HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
41 HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
42 GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
42 GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
43 SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
43 SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 class Command(object):
48 class Command(object):
49
49
50 def __init__(self, cwd):
50 def __init__(self, cwd):
51 self.cwd = cwd
51 self.cwd = cwd
52 self.process = None
52 self.process = None
53
53
54 def execute(self, cmd, *args):
54 def execute(self, cmd, *args):
55 """
55 """
56 Runs command on the system with given ``args``.
56 Runs command on the system with given ``args``.
57 """
57 """
58
58
59 command = cmd + ' ' + ' '.join(args)
59 command = cmd + ' ' + ' '.join(args)
60 if DEBUG:
60 if DEBUG:
61 log.debug('*** CMD %s ***', command)
61 log.debug('*** CMD %s ***', command)
62
62
63 env = dict(os.environ)
63 env = dict(os.environ)
64 # Delete coverage variables, as they make the test fail for Mercurial
64 # Delete coverage variables, as they make the test fail for Mercurial
65 for key in env.keys():
65 for key in env.keys():
66 if key.startswith('COV_CORE_'):
66 if key.startswith('COV_CORE_'):
67 del env[key]
67 del env[key]
68
68
69 self.process = subprocess.Popen(
69 self.process = subprocess.Popen(
70 command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
70 command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
71 cwd=self.cwd, env=env)
71 cwd=self.cwd, env=env)
72 stdout, stderr = self.process.communicate()
72 stdout, stderr = self.process.communicate()
73
73
74 stdout = safe_str(stdout)
74 stdout = safe_str(stdout)
75 stderr = safe_str(stderr)
75 stderr = safe_str(stderr)
76
76
77 if DEBUG:
77 if DEBUG:
78 log.debug('STDOUT:%s', stdout)
78 log.debug('STDOUT:%s', stdout)
79 log.debug('STDERR:%s', stderr)
79 log.debug('STDERR:%s', stderr)
80 return stdout, stderr
80 return stdout, stderr
81
81
82 def assert_returncode_success(self):
82 def assert_returncode_success(self):
83 assert self.process.returncode == 0
83 assert self.process.returncode == 0
84
84
85
85
86 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
86 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
87 full_name = 'Marcin KuΕΊminski'
87 full_name = 'Marcin KuΕΊminski'
88 email = 'me@email.com'
88 email = 'me@email.com'
89 git_ident = f"git config user.name {full_name} && git config user.email {email}"
89 git_ident = f"git config user.name {full_name} && git config user.email {email}"
90 cwd = path = os.path.join(dest)
90 cwd = path = os.path.join(dest)
91
91
92 tags = tags or []
92 tags = tags or []
93 name_sequence = next(tempfile._RandomNameSequence())
93 name_sequence = next(tempfile._RandomNameSequence())
94 added_file = os.path.join(path, f'{name_sequence}_setup.py')
94 added_file = os.path.join(path, f'{name_sequence}_setup.py')
95
95
96 Command(cwd).execute(f'touch {added_file}')
96 Command(cwd).execute(f'touch {added_file}')
97 Command(cwd).execute(f'{vcs} add {added_file}')
97 Command(cwd).execute(f'{vcs} add {added_file}')
98 author_str = 'Marcin KuΕΊminski <me@email.com>'
98 author_str = 'Marcin KuΕΊminski <me@email.com>'
99
99
100 for i in range(kwargs.get('files_no', 3)):
100 for i in range(kwargs.get('files_no', 3)):
101 cmd = f"""echo 'added_line{i}' >> {added_file}"""
101 cmd = f"""echo 'added_line{i}' >> {added_file}"""
102 Command(cwd).execute(cmd)
102 Command(cwd).execute(cmd)
103
103
104 if vcs == 'hg':
104 if vcs == 'hg':
105 cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """
105 cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """
106 elif vcs == 'git':
106 elif vcs == 'git':
107 cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}"""
107 cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}"""
108 Command(cwd).execute(cmd)
108 Command(cwd).execute(cmd)
109
109
110 for tag in tags:
110 for tag in tags:
111 if vcs == 'hg':
111 if vcs == 'hg':
112 Command(cwd).execute(
112 Command(cwd).execute(
113 f"""hg tag -m "{tag['commit']}" -u "{author_str}" """,
113 f"""hg tag -m "{tag['commit']}" -u "{author_str}" """,
114 tag['name'])
114 tag['name'])
115 elif vcs == 'git':
115 elif vcs == 'git':
116 if tag['commit']:
116 if tag['commit']:
117 # annotated tag
117 # annotated tag
118 _stdout, _stderr = Command(cwd).execute(
118 _stdout, _stderr = Command(cwd).execute(
119 f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """
119 f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """
120 )
120 )
121 else:
121 else:
122 # lightweight tag
122 # lightweight tag
123 _stdout, _stderr = Command(cwd).execute(
123 _stdout, _stderr = Command(cwd).execute(
124 f"""{git_ident} && git tag {tag['name']}"""
124 f"""{git_ident} && git tag {tag['name']}"""
125 )
125 )
126
126
127
127
128 def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
128 def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
129 new_branch=False, **kwargs):
129 new_branch=False, **kwargs):
130 """
130 """
131 Generate some files, add it to DEST repo and push back
131 Generate some files, add it to DEST repo and push back
132 vcs is git or hg and defines what VCS we want to make those files for
132 vcs is git or hg and defines what VCS we want to make those files for
133 """
133 """
134 git_ident = "git config user.name Marcin KuΕΊminski && git config user.email me@email.com"
134 git_ident = "git config user.name Marcin KuΕΊminski && git config user.email me@email.com"
135 cwd = os.path.join(dest)
135 cwd = os.path.join(dest)
136
136
137 # commit some stuff into this repo
137 # commit some stuff into this repo
138 _add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
138 _add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
139
139
140 default_target_branch = {
140 default_target_branch = {
141 'git': 'master',
141 'git': 'master',
142 'hg': 'default'
142 'hg': 'default'
143 }.get(vcs)
143 }.get(vcs)
144
144
145 target_branch = target_branch or default_target_branch
145 target_branch = target_branch or default_target_branch
146
146
147 # PUSH it back
147 # PUSH it back
148 stdout = stderr = None
148 stdout = stderr = None
149 if vcs == 'hg':
149 if vcs == 'hg':
150 maybe_new_branch = ''
150 maybe_new_branch = ''
151 if new_branch:
151 if new_branch:
152 maybe_new_branch = '--new-branch'
152 maybe_new_branch = '--new-branch'
153 stdout, stderr = Command(cwd).execute(
153 stdout, stderr = Command(cwd).execute(
154 f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}'
154 f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}'
155 )
155 )
156 elif vcs == 'git':
156 elif vcs == 'git':
157 stdout, stderr = Command(cwd).execute(
157 stdout, stderr = Command(cwd).execute(
158 f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}'
158 f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}'
159 )
159 )
160 elif vcs == 'svn':
160 elif vcs == 'svn':
161 username = kwargs.pop('username', '')
162 password = kwargs.pop('password', '')
163 auth = ''
164 if username and password:
165 auth = f'--username {username} --password {password}'
166
161 stdout, stderr = Command(cwd).execute(
167 stdout, stderr = Command(cwd).execute(
162 f'svn ci -m "pushing to {target_branch}"'
168 f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"'
163 )
169 )
164
170
165 return stdout, stderr
171 return stdout, stderr
166
172
167
173
168 def _check_proper_git_push(
174 def _check_proper_git_push(
169 stdout, stderr, branch='master', should_set_default_branch=False):
175 stdout, stderr, branch='master', should_set_default_branch=False):
170 # Note: Git is writing most information to stderr intentionally
176 # Note: Git is writing most information to stderr intentionally
171 assert 'fatal' not in stderr
177 assert 'fatal' not in stderr
172 assert 'rejected' not in stderr
178 assert 'rejected' not in stderr
173 assert 'Pushing to' in stderr
179 assert 'Pushing to' in stderr
174 assert '%s -> %s' % (branch, branch) in stderr
180 assert '%s -> %s' % (branch, branch) in stderr
175
181
176 if should_set_default_branch:
182 if should_set_default_branch:
177 assert "Setting default branch to %s" % branch in stderr
183 assert "Setting default branch to %s" % branch in stderr
178 else:
184 else:
179 assert "Setting default branch" not in stderr
185 assert "Setting default branch" not in stderr
180
186
181
187
182 def _check_proper_hg_push(stdout, stderr, branch='default'):
188 def _check_proper_hg_push(stdout, stderr, branch='default'):
183 assert 'pushing to' in stdout
189 assert 'pushing to' in stdout
184 assert 'searching for changes' in stdout
190 assert 'searching for changes' in stdout
185
191
186 assert 'abort:' not in stderr
192 assert 'abort:' not in stderr
187
193
188
194
189 def _check_proper_svn_push(stdout, stderr):
195 def _check_proper_svn_push(stdout, stderr):
190 assert 'pushing to' in stdout
196 assert 'pushing to' in stdout
191 assert 'searching for changes' in stdout
197 assert 'searching for changes' in stdout
192
198
193 assert 'abort:' not in stderr
199 assert 'abort:' not in stderr
194
200
195
201
196 def _check_proper_clone(stdout, stderr, vcs):
202 def _check_proper_clone(stdout, stderr, vcs):
197 if vcs == 'hg':
203 if vcs == 'hg':
198 assert 'requesting all changes' in stdout
204 assert 'requesting all changes' in stdout
199 assert 'adding changesets' in stdout
205 assert 'adding changesets' in stdout
200 assert 'adding manifests' in stdout
206 assert 'adding manifests' in stdout
201 assert 'adding file changes' in stdout
207 assert 'adding file changes' in stdout
202
208
203 assert stderr == ''
209 assert stderr == ''
204
210
205 if vcs == 'git':
211 if vcs == 'git':
206 assert '' == stdout
212 assert '' == stdout
207 assert 'Cloning into' in stderr
213 assert 'Cloning into' in stderr
208 assert 'abort:' not in stderr
214 assert 'abort:' not in stderr
209 assert 'fatal:' not in stderr
215 assert 'fatal:' not in stderr
210
216
211 if vcs == 'svn':
217 if vcs == 'svn':
212 assert 'dupa' in stdout
218 assert 'dupa' in stdout
213
219
214
220
@@ -1,197 +1,224 b''
1 # Copyright (C) 2010-2023 RhodeCode GmbH
1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 """
19 """
20 Test suite for making push/pull operations, on specially modified INI files
20 Test suite for making push/pull operations, on specially modified INI files
21
21
22 .. important::
22 .. important::
23
23
24 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
24 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
25 to redirect things to stderr instead of stdout.
25 to redirect things to stderr instead of stdout.
26 """
26 """
27
27
28
28
29 import time
29 import time
30 import pytest
30 import pytest
31
31
32 from rhodecode.model.db import Repository, UserIpMap
32 from rhodecode.model.db import Repository, UserIpMap
33 from rhodecode.model.meta import Session
33 from rhodecode.model.meta import Session
34 from rhodecode.model.repo import RepoModel
34 from rhodecode.model.repo import RepoModel
35 from rhodecode.model.user import UserModel
35 from rhodecode.model.user import UserModel
36 from rhodecode.tests import (SVN_REPO, TEST_USER_ADMIN_LOGIN)
36 from rhodecode.tests import (SVN_REPO, TEST_USER_ADMIN_LOGIN)
37
37
38
38
39 from rhodecode.tests.vcs_operations import (
39 from rhodecode.tests.vcs_operations import (
40 Command, _check_proper_clone, _check_proper_svn_push,
40 Command, _check_proper_clone, _check_proper_svn_push,
41 _add_files_and_push, SVN_REPO_WITH_GROUP)
41 _add_files_and_push, SVN_REPO_WITH_GROUP)
42
42
43
43
44 def get_cli_flags(username, password):
45 flags = '--no-auth-cache --non-interactive'
46 auth = ''
47 if username and password:
48 auth = f'--username {username} --password {password}'
49 return flags, auth
50
51
44 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
52 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
45 class TestVCSOperations(object):
53 class TestVCSOperations(object):
46
54
47 def test_clone_svn_repo_by_admin(self, rc_web_server, tmpdir):
55 def test_clone_svn_repo_by_admin(self, rc_web_server, tmpdir):
48 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
56 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
49 username, password = rc_web_server.repo_clone_credentials()
57 username, password = rc_web_server.repo_clone_credentials()
50
58
51 cmd = Command('/tmp')
59 cmd = Command('/tmp')
52
60
53 auth = f'--non-interactive --username={username} --password={password}'
61 flags, auth = get_cli_flags(username, password)
54 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
62
63 stdout, stderr = Command('/tmp').execute(
64 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
65
55 _check_proper_clone(stdout, stderr, 'svn')
66 _check_proper_clone(stdout, stderr, 'svn')
56 cmd.assert_returncode_success()
67 cmd.assert_returncode_success()
57
68
58 def test_clone_svn_repo_by_id_by_admin(self, rc_web_server, tmpdir):
69 def test_clone_svn_repo_by_id_by_admin(self, rc_web_server, tmpdir):
59 repo_id = Repository.get_by_repo_name(SVN_REPO).repo_id
70 repo_id = Repository.get_by_repo_name(SVN_REPO).repo_id
60 username, password = rc_web_server.repo_clone_credentials()
71 username, password = rc_web_server.repo_clone_credentials()
61
72
62 clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
73 clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
63 cmd = Command('/tmp')
74 cmd = Command('/tmp')
64 auth = f'--non-interactive --username={username} --password={password}'
75
65 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
76 flags, auth = get_cli_flags(username, password)
77
78 stdout, stderr = Command('/tmp').execute(
79 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
80
66 _check_proper_clone(stdout, stderr, 'svn')
81 _check_proper_clone(stdout, stderr, 'svn')
67 cmd.assert_returncode_success()
82 cmd.assert_returncode_success()
68
83
69 def test_clone_svn_repo_with_group_by_admin(self, rc_web_server, tmpdir):
84 def test_clone_svn_repo_with_group_by_admin(self, rc_web_server, tmpdir):
70 clone_url = rc_web_server.repo_clone_url(SVN_REPO_WITH_GROUP)
85 clone_url = rc_web_server.repo_clone_url(SVN_REPO_WITH_GROUP)
71 username, password = rc_web_server.repo_clone_credentials()
86 username, password = rc_web_server.repo_clone_credentials()
72
87
73 cmd = Command('/tmp')
88 flags, auth = get_cli_flags(username, password)
74 auth = f'--non-interactive --username={username} --password={password}'
89
75 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
90 stdout, stderr = Command('/tmp').execute(
91 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
92
76 _check_proper_clone(stdout, stderr, 'svn')
93 _check_proper_clone(stdout, stderr, 'svn')
77 cmd.assert_returncode_success()
94 cmd.assert_returncode_success()
78
95
79 def test_clone_wrong_credentials_svn(self, rc_web_server, tmpdir):
96 def test_clone_wrong_credentials_svn(self, rc_web_server, tmpdir):
80 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
97 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
81 username, password = rc_web_server.repo_clone_credentials()
98 username, password = rc_web_server.repo_clone_credentials()
82 password = 'bad-password'
99 password = 'bad-password'
83
100
84 auth = f'--non-interactive --username={username} --password={password}'
101 flags, auth = get_cli_flags(username, password)
102
85 stdout, stderr = Command('/tmp').execute(
103 stdout, stderr = Command('/tmp').execute(
86 f'svn checkout {auth}', clone_url, tmpdir.strpath)
104 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
87 assert 'fatal: Authentication failed' in stderr
105 assert 'fatal: Authentication failed' in stderr
88
106
89 def test_clone_svn_with_slashes(self, rc_web_server, tmpdir):
107 def test_clone_svn_with_slashes(self, rc_web_server, tmpdir):
90 clone_url = rc_web_server.repo_clone_url('//' + SVN_REPO)
108 clone_url = rc_web_server.repo_clone_url('//' + SVN_REPO)
91 stdout, stderr = Command('/tmp').execute('svn checkout', clone_url)
109 username, password = '', ''
110 flags, auth = get_cli_flags(username, password)
111
112 stdout, stderr = Command('/tmp').execute(
113 f'svn checkout {flags} {auth}', clone_url)
114
92 assert 'not found' in stderr
115 assert 'not found' in stderr
93
116
94 def test_clone_existing_path_svn_not_in_database(
117 def test_clone_existing_path_svn_not_in_database(
95 self, rc_web_server, tmpdir, fs_repo_only):
118 self, rc_web_server, tmpdir, fs_repo_only):
96 db_name = fs_repo_only('not-in-db-git', repo_type='git')
119 db_name = fs_repo_only('not-in-db-git', repo_type='git')
97 clone_url = rc_web_server.repo_clone_url(db_name)
120 clone_url = rc_web_server.repo_clone_url(db_name)
98 username, password = '', ''
121 username, password = '', ''
99 auth = f'--non-interactive --username={username} --password={password}'
122 flags, auth = get_cli_flags(username, password)
100
123
101 stdout, stderr = Command('/tmp').execute(
124 stdout, stderr = Command('/tmp').execute(
102 f'svn checkout {auth}', clone_url, tmpdir.strpath)
125 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
103 assert 'not found' in stderr
126 assert 'not found' in stderr
104
127
105 def test_clone_existing_path_svn_not_in_database_different_scm(
128 def test_clone_existing_path_svn_not_in_database_different_scm(
106 self, rc_web_server, tmpdir, fs_repo_only):
129 self, rc_web_server, tmpdir, fs_repo_only):
107 db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
130 db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
108 clone_url = rc_web_server.repo_clone_url(db_name)
131 clone_url = rc_web_server.repo_clone_url(db_name)
109
132
110 username, password = '', ''
133 username, password = '', ''
111 auth = f'--non-interactive --username={username} --password={password}'
134 flags, auth = get_cli_flags(username, password)
135
112 stdout, stderr = Command('/tmp').execute(
136 stdout, stderr = Command('/tmp').execute(
113 f'svn checkout {auth}', clone_url, tmpdir.strpath)
137 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
114 assert 'not found' in stderr
138 assert 'not found' in stderr
115
139
116 def test_clone_non_existing_store_path_svn(self, rc_web_server, tmpdir, user_util):
140 def test_clone_non_existing_store_path_svn(self, rc_web_server, tmpdir, user_util):
117 repo = user_util.create_repo(repo_type='git')
141 repo = user_util.create_repo(repo_type='git')
118 clone_url = rc_web_server.repo_clone_url(repo.repo_name)
142 clone_url = rc_web_server.repo_clone_url(repo.repo_name)
119
143
120 # Damage repo by removing it's folder
144 # Damage repo by removing it's folder
121 RepoModel()._delete_filesystem_repo(repo)
145 RepoModel()._delete_filesystem_repo(repo)
122
146
123 username, password = '', ''
147 username, password = '', ''
124 auth = f'--non-interactive --username={username} --password={password}'
148 flags, auth = get_cli_flags(username, password)
149
125 stdout, stderr = Command('/tmp').execute(
150 stdout, stderr = Command('/tmp').execute(
126 f'svn checkout {auth}', clone_url, tmpdir.strpath)
151 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
127 assert 'not found' in stderr
152 assert 'not found' in stderr
128
153
129 def test_push_new_file_svn(self, rc_web_server, tmpdir):
154 def test_push_new_file_svn(self, rc_web_server, tmpdir):
130 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
155 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
131 username, password = '', ''
156 username, password = '', ''
132 auth = f'--non-interactive --username={username} --password={password}'
157 flags, auth = get_cli_flags(username, password)
133
158
134 stdout, stderr = Command('/tmp').execute(
159 stdout, stderr = Command('/tmp').execute(
135 f'svn checkout {auth}', clone_url, tmpdir.strpath)
160 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
136
161
137 # commit some stuff into this repo
162 # commit some stuff into this repo
138 stdout, stderr = _add_files_and_push(
163 stdout, stderr = _add_files_and_push(
139 'svn', tmpdir.strpath, clone_url=clone_url)
164 'svn', tmpdir.strpath, clone_url=clone_url, username=username, password=password)
140
165
141 _check_proper_svn_push(stdout, stderr)
166 _check_proper_svn_push(stdout, stderr)
142
167
143 def test_push_wrong_credentials_svn(self, rc_web_server, tmpdir):
168 def test_push_wrong_credentials_svn(self, rc_web_server, tmpdir):
144 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
169 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
145
170
146 username, password = '', ''
171 username, password = rc_web_server.repo_clone_credentials()
147 auth = f'--non-interactive --username={username} --password={password}'
172 flags, auth = get_cli_flags(username, password)
173
148 stdout, stderr = Command('/tmp').execute(
174 stdout, stderr = Command('/tmp').execute(
149 f'svn checkout {auth}', clone_url, tmpdir.strpath)
175 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
150
176
151 push_url = rc_web_server.repo_clone_url(
177 push_url = rc_web_server.repo_clone_url(
152 SVN_REPO, user='bad', passwd='name')
178 SVN_REPO, user='bad', passwd='name')
153 stdout, stderr = _add_files_and_push(
179 stdout, stderr = _add_files_and_push(
154 'svn', tmpdir.strpath, clone_url=push_url)
180 'svn', tmpdir.strpath, clone_url=push_url, username=username, password=password)
155
181
156 assert 'fatal: Authentication failed' in stderr
182 assert 'fatal: Authentication failed' in stderr
157
183
158 def test_push_back_to_wrong_url_svn(self, rc_web_server, tmpdir):
184 def test_push_back_to_wrong_url_svn(self, rc_web_server, tmpdir):
159 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
185 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
160 username, password = '', ''
186 username, password = '', ''
161 auth = f'--non-interactive --username={username} --password={password}'
187 flags, auth = get_cli_flags(username, password)
162 Command('/tmp').execute(
188
163 f'svn checkout {auth}', clone_url, tmpdir.strpath)
189 stdout, stderr = Command('/tmp').execute(
190 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
164
191
165 stdout, stderr = _add_files_and_push(
192 stdout, stderr = _add_files_and_push(
166 'svn', tmpdir.strpath,
193 'svn', tmpdir.strpath,
167 clone_url=rc_web_server.repo_clone_url('not-existing'))
194 clone_url=rc_web_server.repo_clone_url('not-existing'), username=username, password=password)
168
195
169 assert 'not found' in stderr
196 assert 'not found' in stderr
170
197
171 def test_ip_restriction_svn(self, rc_web_server, tmpdir):
198 def test_ip_restriction_svn(self, rc_web_server, tmpdir):
172 user_model = UserModel()
199 user_model = UserModel()
173 username, password = '', ''
200 username, password = '', ''
174 auth = f'--non-interactive --username={username} --password={password}'
201 flags, auth = get_cli_flags(username, password)
175
202
176 try:
203 try:
177 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
204 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
178 Session().commit()
205 Session().commit()
179 time.sleep(2)
206 time.sleep(2)
180 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
207 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
181
208
182 stdout, stderr = Command('/tmp').execute(
209 stdout, stderr = Command('/tmp').execute(
183 f'svn checkout {auth}', clone_url, tmpdir.strpath)
210 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
184 msg = "The requested URL returned error: 403"
211 msg = "The requested URL returned error: 403"
185 assert msg in stderr
212 assert msg in stderr
186 finally:
213 finally:
187 # release IP restrictions
214 # release IP restrictions
188 for ip in UserIpMap.getAll():
215 for ip in UserIpMap.getAll():
189 UserIpMap.delete(ip.ip_id)
216 UserIpMap.delete(ip.ip_id)
190 Session().commit()
217 Session().commit()
191
218
192 time.sleep(2)
219 time.sleep(2)
193
220
194 cmd = Command('/tmp')
221 cmd = Command('/tmp')
195 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
222 stdout, stderr = cmd.execute(f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
196 cmd.assert_returncode_success()
223 cmd.assert_returncode_success()
197 _check_proper_clone(stdout, stderr, 'svn')
224 _check_proper_clone(stdout, stderr, 'svn')
General Comments 0
You need to be logged in to leave comments. Login now