##// END OF EJS Templates
fix(tests): fixed svn tests cases when interactive prompt was displayed
super-admin -
r5469:e7062d28 default
parent child Browse files
Show More
@@ -1,214 +1,220 b''
1 1
2 2 # Copyright (C) 2010-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software: you can redistribute it and/or modify
5 5 # it under the terms of the GNU Affero General Public License, version 3
6 6 # (only), as published by the Free Software Foundation.
7 7 #
8 8 # This program is distributed in the hope that it will be useful,
9 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 11 # GNU General Public License for more details.
12 12 #
13 13 # You should have received a copy of the GNU Affero General Public License
14 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 15 #
16 16 # This program is dual-licensed. If you wish to learn more about the
17 17 # RhodeCode Enterprise Edition, including its added features, Support services,
18 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 19
20 20 """
21 21 Base for test suite for making push/pull operations.
22 22
23 23 .. important::
24 24
25 25 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
26 26 to redirect things to stderr instead of stdout.
27 27 """
28 28
29 29
30 30 import logging
31 31 import os
32 32 import tempfile
33 33 import subprocess
34 34
35 35 from rhodecode.lib.str_utils import safe_str
36 36 from rhodecode.tests import GIT_REPO, HG_REPO, SVN_REPO
37 37
38 38 DEBUG = True
39 39 RC_LOG = os.path.join(tempfile.gettempdir(), 'rc.log')
40 40 REPO_GROUP = 'a_repo_group'
41 41 HG_REPO_WITH_GROUP = f'{REPO_GROUP}/{HG_REPO}'
42 42 GIT_REPO_WITH_GROUP = f'{REPO_GROUP}/{GIT_REPO}'
43 43 SVN_REPO_WITH_GROUP = f'{REPO_GROUP}/{SVN_REPO}'
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class Command(object):
49 49
50 50 def __init__(self, cwd):
51 51 self.cwd = cwd
52 52 self.process = None
53 53
54 54 def execute(self, cmd, *args):
55 55 """
56 56 Runs command on the system with given ``args``.
57 57 """
58 58
59 59 command = cmd + ' ' + ' '.join(args)
60 60 if DEBUG:
61 61 log.debug('*** CMD %s ***', command)
62 62
63 63 env = dict(os.environ)
64 64 # Delete coverage variables, as they make the test fail for Mercurial
65 65 for key in env.keys():
66 66 if key.startswith('COV_CORE_'):
67 67 del env[key]
68 68
69 69 self.process = subprocess.Popen(
70 70 command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
71 71 cwd=self.cwd, env=env)
72 72 stdout, stderr = self.process.communicate()
73 73
74 74 stdout = safe_str(stdout)
75 75 stderr = safe_str(stderr)
76 76
77 77 if DEBUG:
78 78 log.debug('STDOUT:%s', stdout)
79 79 log.debug('STDERR:%s', stderr)
80 80 return stdout, stderr
81 81
82 82 def assert_returncode_success(self):
83 83 assert self.process.returncode == 0
84 84
85 85
86 86 def _add_files(vcs, dest, clone_url=None, tags=None, target_branch=None, new_branch=False, **kwargs):
87 87 full_name = 'Marcin KuΕΊminski'
88 88 email = 'me@email.com'
89 89 git_ident = f"git config user.name {full_name} && git config user.email {email}"
90 90 cwd = path = os.path.join(dest)
91 91
92 92 tags = tags or []
93 93 name_sequence = next(tempfile._RandomNameSequence())
94 94 added_file = os.path.join(path, f'{name_sequence}_setup.py')
95 95
96 96 Command(cwd).execute(f'touch {added_file}')
97 97 Command(cwd).execute(f'{vcs} add {added_file}')
98 98 author_str = 'Marcin KuΕΊminski <me@email.com>'
99 99
100 100 for i in range(kwargs.get('files_no', 3)):
101 101 cmd = f"""echo 'added_line{i}' >> {added_file}"""
102 102 Command(cwd).execute(cmd)
103 103
104 104 if vcs == 'hg':
105 105 cmd = f"""hg commit -m 'committed new {i}' -u '{author_str}' {added_file} """
106 106 elif vcs == 'git':
107 107 cmd = f"""{git_ident} && git commit -m 'committed new {i}' {added_file}"""
108 108 Command(cwd).execute(cmd)
109 109
110 110 for tag in tags:
111 111 if vcs == 'hg':
112 112 Command(cwd).execute(
113 113 f"""hg tag -m "{tag['commit']}" -u "{author_str}" """,
114 114 tag['name'])
115 115 elif vcs == 'git':
116 116 if tag['commit']:
117 117 # annotated tag
118 118 _stdout, _stderr = Command(cwd).execute(
119 119 f"""{git_ident} && git tag -a {tag['name']} -m "{tag['commit']}" """
120 120 )
121 121 else:
122 122 # lightweight tag
123 123 _stdout, _stderr = Command(cwd).execute(
124 124 f"""{git_ident} && git tag {tag['name']}"""
125 125 )
126 126
127 127
128 128 def _add_files_and_push(vcs, dest, clone_url=None, tags=None, target_branch=None,
129 129 new_branch=False, **kwargs):
130 130 """
131 131 Generate some files, add it to DEST repo and push back
132 132 vcs is git or hg and defines what VCS we want to make those files for
133 133 """
134 134 git_ident = "git config user.name Marcin KuΕΊminski && git config user.email me@email.com"
135 135 cwd = os.path.join(dest)
136 136
137 137 # commit some stuff into this repo
138 138 _add_files(vcs, dest, clone_url, tags, target_branch, new_branch, **kwargs)
139 139
140 140 default_target_branch = {
141 141 'git': 'master',
142 142 'hg': 'default'
143 143 }.get(vcs)
144 144
145 145 target_branch = target_branch or default_target_branch
146 146
147 147 # PUSH it back
148 148 stdout = stderr = None
149 149 if vcs == 'hg':
150 150 maybe_new_branch = ''
151 151 if new_branch:
152 152 maybe_new_branch = '--new-branch'
153 153 stdout, stderr = Command(cwd).execute(
154 154 f'hg push --traceback --verbose {maybe_new_branch} -r {target_branch} {clone_url}'
155 155 )
156 156 elif vcs == 'git':
157 157 stdout, stderr = Command(cwd).execute(
158 158 f'{git_ident} && git push --verbose --tags {clone_url} {target_branch}'
159 159 )
160 160 elif vcs == 'svn':
161 username = kwargs.pop('username', '')
162 password = kwargs.pop('password', '')
163 auth = ''
164 if username and password:
165 auth = f'--username {username} --password {password}'
166
161 167 stdout, stderr = Command(cwd).execute(
162 f'svn ci -m "pushing to {target_branch}"'
168 f'svn commit --no-auth-cache --non-interactive {auth} -m "pushing to {target_branch}"'
163 169 )
164 170
165 171 return stdout, stderr
166 172
167 173
168 174 def _check_proper_git_push(
169 175 stdout, stderr, branch='master', should_set_default_branch=False):
170 176 # Note: Git is writing most information to stderr intentionally
171 177 assert 'fatal' not in stderr
172 178 assert 'rejected' not in stderr
173 179 assert 'Pushing to' in stderr
174 180 assert '%s -> %s' % (branch, branch) in stderr
175 181
176 182 if should_set_default_branch:
177 183 assert "Setting default branch to %s" % branch in stderr
178 184 else:
179 185 assert "Setting default branch" not in stderr
180 186
181 187
182 188 def _check_proper_hg_push(stdout, stderr, branch='default'):
183 189 assert 'pushing to' in stdout
184 190 assert 'searching for changes' in stdout
185 191
186 192 assert 'abort:' not in stderr
187 193
188 194
189 195 def _check_proper_svn_push(stdout, stderr):
190 196 assert 'pushing to' in stdout
191 197 assert 'searching for changes' in stdout
192 198
193 199 assert 'abort:' not in stderr
194 200
195 201
196 202 def _check_proper_clone(stdout, stderr, vcs):
197 203 if vcs == 'hg':
198 204 assert 'requesting all changes' in stdout
199 205 assert 'adding changesets' in stdout
200 206 assert 'adding manifests' in stdout
201 207 assert 'adding file changes' in stdout
202 208
203 209 assert stderr == ''
204 210
205 211 if vcs == 'git':
206 212 assert '' == stdout
207 213 assert 'Cloning into' in stderr
208 214 assert 'abort:' not in stderr
209 215 assert 'fatal:' not in stderr
210 216
211 217 if vcs == 'svn':
212 218 assert 'dupa' in stdout
213 219
214 220
@@ -1,197 +1,224 b''
1 1 # Copyright (C) 2010-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 """
20 20 Test suite for making push/pull operations, on specially modified INI files
21 21
22 22 .. important::
23 23
24 24 You must have git >= 1.8.5 for tests to work fine. With 68b939b git started
25 25 to redirect things to stderr instead of stdout.
26 26 """
27 27
28 28
29 29 import time
30 30 import pytest
31 31
32 32 from rhodecode.model.db import Repository, UserIpMap
33 33 from rhodecode.model.meta import Session
34 34 from rhodecode.model.repo import RepoModel
35 35 from rhodecode.model.user import UserModel
36 36 from rhodecode.tests import (SVN_REPO, TEST_USER_ADMIN_LOGIN)
37 37
38 38
39 39 from rhodecode.tests.vcs_operations import (
40 40 Command, _check_proper_clone, _check_proper_svn_push,
41 41 _add_files_and_push, SVN_REPO_WITH_GROUP)
42 42
43 43
44 def get_cli_flags(username, password):
45 flags = '--no-auth-cache --non-interactive'
46 auth = ''
47 if username and password:
48 auth = f'--username {username} --password {password}'
49 return flags, auth
50
51
44 52 @pytest.mark.usefixtures("disable_locking", "disable_anonymous_user")
45 53 class TestVCSOperations(object):
46 54
47 55 def test_clone_svn_repo_by_admin(self, rc_web_server, tmpdir):
48 56 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
49 57 username, password = rc_web_server.repo_clone_credentials()
50 58
51 59 cmd = Command('/tmp')
52 60
53 auth = f'--non-interactive --username={username} --password={password}'
54 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
61 flags, auth = get_cli_flags(username, password)
62
63 stdout, stderr = Command('/tmp').execute(
64 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
65
55 66 _check_proper_clone(stdout, stderr, 'svn')
56 67 cmd.assert_returncode_success()
57 68
58 69 def test_clone_svn_repo_by_id_by_admin(self, rc_web_server, tmpdir):
59 70 repo_id = Repository.get_by_repo_name(SVN_REPO).repo_id
60 71 username, password = rc_web_server.repo_clone_credentials()
61 72
62 73 clone_url = rc_web_server.repo_clone_url('_%s' % repo_id)
63 74 cmd = Command('/tmp')
64 auth = f'--non-interactive --username={username} --password={password}'
65 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
75
76 flags, auth = get_cli_flags(username, password)
77
78 stdout, stderr = Command('/tmp').execute(
79 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
80
66 81 _check_proper_clone(stdout, stderr, 'svn')
67 82 cmd.assert_returncode_success()
68 83
69 84 def test_clone_svn_repo_with_group_by_admin(self, rc_web_server, tmpdir):
70 85 clone_url = rc_web_server.repo_clone_url(SVN_REPO_WITH_GROUP)
71 86 username, password = rc_web_server.repo_clone_credentials()
72 87
73 cmd = Command('/tmp')
74 auth = f'--non-interactive --username={username} --password={password}'
75 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
88 flags, auth = get_cli_flags(username, password)
89
90 stdout, stderr = Command('/tmp').execute(
91 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
92
76 93 _check_proper_clone(stdout, stderr, 'svn')
77 94 cmd.assert_returncode_success()
78 95
79 96 def test_clone_wrong_credentials_svn(self, rc_web_server, tmpdir):
80 97 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
81 98 username, password = rc_web_server.repo_clone_credentials()
82 99 password = 'bad-password'
83 100
84 auth = f'--non-interactive --username={username} --password={password}'
101 flags, auth = get_cli_flags(username, password)
102
85 103 stdout, stderr = Command('/tmp').execute(
86 f'svn checkout {auth}', clone_url, tmpdir.strpath)
104 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
87 105 assert 'fatal: Authentication failed' in stderr
88 106
89 107 def test_clone_svn_with_slashes(self, rc_web_server, tmpdir):
90 108 clone_url = rc_web_server.repo_clone_url('//' + SVN_REPO)
91 stdout, stderr = Command('/tmp').execute('svn checkout', clone_url)
109 username, password = '', ''
110 flags, auth = get_cli_flags(username, password)
111
112 stdout, stderr = Command('/tmp').execute(
113 f'svn checkout {flags} {auth}', clone_url)
114
92 115 assert 'not found' in stderr
93 116
94 117 def test_clone_existing_path_svn_not_in_database(
95 118 self, rc_web_server, tmpdir, fs_repo_only):
96 119 db_name = fs_repo_only('not-in-db-git', repo_type='git')
97 120 clone_url = rc_web_server.repo_clone_url(db_name)
98 121 username, password = '', ''
99 auth = f'--non-interactive --username={username} --password={password}'
122 flags, auth = get_cli_flags(username, password)
100 123
101 124 stdout, stderr = Command('/tmp').execute(
102 f'svn checkout {auth}', clone_url, tmpdir.strpath)
125 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
103 126 assert 'not found' in stderr
104 127
105 128 def test_clone_existing_path_svn_not_in_database_different_scm(
106 129 self, rc_web_server, tmpdir, fs_repo_only):
107 130 db_name = fs_repo_only('not-in-db-hg', repo_type='hg')
108 131 clone_url = rc_web_server.repo_clone_url(db_name)
109 132
110 133 username, password = '', ''
111 auth = f'--non-interactive --username={username} --password={password}'
134 flags, auth = get_cli_flags(username, password)
135
112 136 stdout, stderr = Command('/tmp').execute(
113 f'svn checkout {auth}', clone_url, tmpdir.strpath)
137 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
114 138 assert 'not found' in stderr
115 139
116 140 def test_clone_non_existing_store_path_svn(self, rc_web_server, tmpdir, user_util):
117 141 repo = user_util.create_repo(repo_type='git')
118 142 clone_url = rc_web_server.repo_clone_url(repo.repo_name)
119 143
120 144 # Damage repo by removing it's folder
121 145 RepoModel()._delete_filesystem_repo(repo)
122 146
123 147 username, password = '', ''
124 auth = f'--non-interactive --username={username} --password={password}'
148 flags, auth = get_cli_flags(username, password)
149
125 150 stdout, stderr = Command('/tmp').execute(
126 f'svn checkout {auth}', clone_url, tmpdir.strpath)
151 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
127 152 assert 'not found' in stderr
128 153
129 154 def test_push_new_file_svn(self, rc_web_server, tmpdir):
130 155 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
131 156 username, password = '', ''
132 auth = f'--non-interactive --username={username} --password={password}'
157 flags, auth = get_cli_flags(username, password)
133 158
134 159 stdout, stderr = Command('/tmp').execute(
135 f'svn checkout {auth}', clone_url, tmpdir.strpath)
160 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
136 161
137 162 # commit some stuff into this repo
138 163 stdout, stderr = _add_files_and_push(
139 'svn', tmpdir.strpath, clone_url=clone_url)
164 'svn', tmpdir.strpath, clone_url=clone_url, username=username, password=password)
140 165
141 166 _check_proper_svn_push(stdout, stderr)
142 167
143 168 def test_push_wrong_credentials_svn(self, rc_web_server, tmpdir):
144 169 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
145 170
146 username, password = '', ''
147 auth = f'--non-interactive --username={username} --password={password}'
171 username, password = rc_web_server.repo_clone_credentials()
172 flags, auth = get_cli_flags(username, password)
173
148 174 stdout, stderr = Command('/tmp').execute(
149 f'svn checkout {auth}', clone_url, tmpdir.strpath)
175 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
150 176
151 177 push_url = rc_web_server.repo_clone_url(
152 178 SVN_REPO, user='bad', passwd='name')
153 179 stdout, stderr = _add_files_and_push(
154 'svn', tmpdir.strpath, clone_url=push_url)
180 'svn', tmpdir.strpath, clone_url=push_url, username=username, password=password)
155 181
156 182 assert 'fatal: Authentication failed' in stderr
157 183
158 184 def test_push_back_to_wrong_url_svn(self, rc_web_server, tmpdir):
159 185 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
160 186 username, password = '', ''
161 auth = f'--non-interactive --username={username} --password={password}'
162 Command('/tmp').execute(
163 f'svn checkout {auth}', clone_url, tmpdir.strpath)
187 flags, auth = get_cli_flags(username, password)
188
189 stdout, stderr = Command('/tmp').execute(
190 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
164 191
165 192 stdout, stderr = _add_files_and_push(
166 193 'svn', tmpdir.strpath,
167 clone_url=rc_web_server.repo_clone_url('not-existing'))
194 clone_url=rc_web_server.repo_clone_url('not-existing'), username=username, password=password)
168 195
169 196 assert 'not found' in stderr
170 197
171 198 def test_ip_restriction_svn(self, rc_web_server, tmpdir):
172 199 user_model = UserModel()
173 200 username, password = '', ''
174 auth = f'--non-interactive --username={username} --password={password}'
201 flags, auth = get_cli_flags(username, password)
175 202
176 203 try:
177 204 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
178 205 Session().commit()
179 206 time.sleep(2)
180 207 clone_url = rc_web_server.repo_clone_url(SVN_REPO)
181 208
182 209 stdout, stderr = Command('/tmp').execute(
183 f'svn checkout {auth}', clone_url, tmpdir.strpath)
210 f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
184 211 msg = "The requested URL returned error: 403"
185 212 assert msg in stderr
186 213 finally:
187 214 # release IP restrictions
188 215 for ip in UserIpMap.getAll():
189 216 UserIpMap.delete(ip.ip_id)
190 217 Session().commit()
191 218
192 219 time.sleep(2)
193 220
194 221 cmd = Command('/tmp')
195 stdout, stderr = cmd.execute(f'svn checkout {auth}', clone_url, tmpdir.strpath)
222 stdout, stderr = cmd.execute(f'svn checkout {flags} {auth}', clone_url, tmpdir.strpath)
196 223 cmd.assert_returncode_success()
197 224 _check_proper_clone(stdout, stderr, 'svn')
General Comments 0
You need to be logged in to leave comments. Login now