##// END OF EJS Templates
added integration tests for IP restriction option
marcink -
r3131:21307b01 beta
parent child Browse files
Show More
@@ -1,422 +1,461 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_scm_operations
3 rhodecode.tests.test_scm_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations.
6 Test suite for making push/pull operations.
7 Run using::
7 Run using::
8
8
9 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
9 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
10
10
11 :created_on: Dec 30, 2010
11 :created_on: Dec 30, 2010
12 :author: marcink
12 :author: marcink
13 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
13 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
14 :license: GPLv3, see COPYING for more details.
14 :license: GPLv3, see COPYING for more details.
15 """
15 """
16 # This program is free software: you can redistribute it and/or modify
16 # This program is free software: you can redistribute it and/or modify
17 # it under the terms of the GNU General Public License as published by
17 # it under the terms of the GNU General Public License as published by
18 # the Free Software Foundation, either version 3 of the License, or
18 # the Free Software Foundation, either version 3 of the License, or
19 # (at your option) any later version.
19 # (at your option) any later version.
20 #
20 #
21 # This program is distributed in the hope that it will be useful,
21 # This program is distributed in the hope that it will be useful,
22 # but WITHOUT ANY WARRANTY; without even the implied warranty of
22 # but WITHOUT ANY WARRANTY; without even the implied warranty of
23 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 # GNU General Public License for more details.
24 # GNU General Public License for more details.
25 #
25 #
26 # You should have received a copy of the GNU General Public License
26 # You should have received a copy of the GNU General Public License
27 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 # along with this program. If not, see <http://www.gnu.org/licenses/>.
28
28
29 import os
29 import os
30 import tempfile
30 import tempfile
31 import unittest
31 import unittest
32 from os.path import join as jn
32 from os.path import join as jn
33 from os.path import dirname as dn
33 from os.path import dirname as dn
34
34
35 from tempfile import _RandomNameSequence
35 from tempfile import _RandomNameSequence
36 from subprocess import Popen, PIPE
36 from subprocess import Popen, PIPE
37
37
38 from rhodecode.tests import *
38 from rhodecode.tests import *
39 from rhodecode.model.db import User, Repository, UserLog
39 from rhodecode.model.db import User, Repository, UserLog
40 from rhodecode.model.meta import Session
40 from rhodecode.model.meta import Session
41 from rhodecode.model.repo import RepoModel
41 from rhodecode.model.repo import RepoModel
42 from rhodecode.model.user import UserModel
42
43
43 DEBUG = True
44 DEBUG = True
44 HOST = '127.0.0.1:5000' # test host
45 HOST = '127.0.0.1:5000' # test host
45
46
46
47
47 class Command(object):
48 class Command(object):
48
49
49 def __init__(self, cwd):
50 def __init__(self, cwd):
50 self.cwd = cwd
51 self.cwd = cwd
51
52
52 def execute(self, cmd, *args):
53 def execute(self, cmd, *args):
53 """
54 """
54 Runs command on the system with given ``args``.
55 Runs command on the system with given ``args``.
55 """
56 """
56
57
57 command = cmd + ' ' + ' '.join(args)
58 command = cmd + ' ' + ' '.join(args)
58 if DEBUG:
59 if DEBUG:
59 print '*** CMD %s ***' % command
60 print '*** CMD %s ***' % command
60 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
61 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
61 stdout, stderr = p.communicate()
62 stdout, stderr = p.communicate()
62 if DEBUG:
63 if DEBUG:
63 print stdout, stderr
64 print stdout, stderr
64 return stdout, stderr
65 return stdout, stderr
65
66
66
67
67 def _get_tmp_dir():
68 def _get_tmp_dir():
68 return tempfile.mkdtemp(prefix='rc_integration_test')
69 return tempfile.mkdtemp(prefix='rc_integration_test')
69
70
70
71
71 def _construct_url(repo, dest=None, **kwargs):
72 def _construct_url(repo, dest=None, **kwargs):
72 if dest is None:
73 if dest is None:
73 #make temp clone
74 #make temp clone
74 dest = _get_tmp_dir()
75 dest = _get_tmp_dir()
75 params = {
76 params = {
76 'user': TEST_USER_ADMIN_LOGIN,
77 'user': TEST_USER_ADMIN_LOGIN,
77 'passwd': TEST_USER_ADMIN_PASS,
78 'passwd': TEST_USER_ADMIN_PASS,
78 'host': HOST,
79 'host': HOST,
79 'cloned_repo': repo,
80 'cloned_repo': repo,
80 'dest': dest
81 'dest': dest
81 }
82 }
82 params.update(**kwargs)
83 params.update(**kwargs)
83 if params['user'] and params['passwd']:
84 if params['user'] and params['passwd']:
84 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
85 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
85 else:
86 else:
86 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
87 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
87 return _url
88 return _url
88
89
89
90
90 def _add_files_and_push(vcs, DEST, **kwargs):
91 def _add_files_and_push(vcs, DEST, **kwargs):
91 """
92 """
92 Generate some files, add it to DEST repo and push back
93 Generate some files, add it to DEST repo and push back
93 vcs is git or hg and defines what VCS we want to make those files for
94 vcs is git or hg and defines what VCS we want to make those files for
94
95
95 :param vcs:
96 :param vcs:
96 :param DEST:
97 :param DEST:
97 """
98 """
98 # commit some stuff into this repo
99 # commit some stuff into this repo
99 cwd = path = jn(DEST)
100 cwd = path = jn(DEST)
100 #added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
101 #added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
101 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
102 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
102 Command(cwd).execute('touch %s' % added_file)
103 Command(cwd).execute('touch %s' % added_file)
103 Command(cwd).execute('%s add %s' % (vcs, added_file))
104 Command(cwd).execute('%s add %s' % (vcs, added_file))
104
105
105 for i in xrange(3):
106 for i in xrange(3):
106 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
107 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
107 Command(cwd).execute(cmd)
108 Command(cwd).execute(cmd)
108 if vcs == 'hg':
109 if vcs == 'hg':
109 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
110 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
110 i, 'Marcin KuΕΊminski <marcin@python-blog.com>', added_file
111 i, 'Marcin KuΕΊminski <marcin@python-blog.com>', added_file
111 )
112 )
112 elif vcs == 'git':
113 elif vcs == 'git':
113 cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
114 cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
114 i, 'Marcin KuΕΊminski <marcin@python-blog.com>', added_file
115 i, 'Marcin KuΕΊminski <marcin@python-blog.com>', added_file
115 )
116 )
116 Command(cwd).execute(cmd)
117 Command(cwd).execute(cmd)
117 # PUSH it back
118 # PUSH it back
118 if vcs == 'hg':
119 if vcs == 'hg':
119 _REPO = HG_REPO
120 _REPO = HG_REPO
120 elif vcs == 'git':
121 elif vcs == 'git':
121 _REPO = GIT_REPO
122 _REPO = GIT_REPO
122
123
123 kwargs['dest'] = ''
124 kwargs['dest'] = ''
124 clone_url = _construct_url(_REPO, **kwargs)
125 clone_url = _construct_url(_REPO, **kwargs)
125 if 'clone_url' in kwargs:
126 if 'clone_url' in kwargs:
126 clone_url = kwargs['clone_url']
127 clone_url = kwargs['clone_url']
127 if vcs == 'hg':
128 if vcs == 'hg':
128 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
129 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
129 elif vcs == 'git':
130 elif vcs == 'git':
130 stdout, stderr = Command(cwd).execute('git push', clone_url + " master")
131 stdout, stderr = Command(cwd).execute('git push', clone_url + " master")
131
132
132 return stdout, stderr
133 return stdout, stderr
133
134
134
135
135 def set_anonymous_access(enable=True):
136 def set_anonymous_access(enable=True):
136 user = User.get_by_username(User.DEFAULT_USER)
137 user = User.get_by_username(User.DEFAULT_USER)
137 user.active = enable
138 user.active = enable
138 Session().add(user)
139 Session().add(user)
139 Session().commit()
140 Session().commit()
140 print '\tanonymous access is now:', enable
141 print '\tanonymous access is now:', enable
141 if enable != User.get_by_username(User.DEFAULT_USER).active:
142 if enable != User.get_by_username(User.DEFAULT_USER).active:
142 raise Exception('Cannot set anonymous access')
143 raise Exception('Cannot set anonymous access')
143
144
144
145
145 #==============================================================================
146 #==============================================================================
146 # TESTS
147 # TESTS
147 #==============================================================================
148 #==============================================================================
148
149
149 class TestVCSOperations(unittest.TestCase):
150 class TestVCSOperations(unittest.TestCase):
150
151
151 @classmethod
152 @classmethod
152 def setup_class(cls):
153 def setup_class(cls):
153 #DISABLE ANONYMOUS ACCESS
154 #DISABLE ANONYMOUS ACCESS
154 set_anonymous_access(False)
155 set_anonymous_access(False)
155
156
156 def setUp(self):
157 def setUp(self):
157 r = Repository.get_by_repo_name(GIT_REPO)
158 r = Repository.get_by_repo_name(GIT_REPO)
158 Repository.unlock(r)
159 Repository.unlock(r)
159 r.enable_locking = False
160 r.enable_locking = False
160 Session().add(r)
161 Session().add(r)
161 Session().commit()
162 Session().commit()
162
163
163 r = Repository.get_by_repo_name(HG_REPO)
164 r = Repository.get_by_repo_name(HG_REPO)
164 Repository.unlock(r)
165 Repository.unlock(r)
165 r.enable_locking = False
166 r.enable_locking = False
166 Session().add(r)
167 Session().add(r)
167 Session().commit()
168 Session().commit()
168
169
169 def test_clone_hg_repo_by_admin(self):
170 def test_clone_hg_repo_by_admin(self):
170 clone_url = _construct_url(HG_REPO)
171 clone_url = _construct_url(HG_REPO)
171 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
172 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
172
173
173 assert 'requesting all changes' in stdout
174 assert 'requesting all changes' in stdout
174 assert 'adding changesets' in stdout
175 assert 'adding changesets' in stdout
175 assert 'adding manifests' in stdout
176 assert 'adding manifests' in stdout
176 assert 'adding file changes' in stdout
177 assert 'adding file changes' in stdout
177
178
178 assert stderr == ''
179 assert stderr == ''
179
180
180 def test_clone_git_repo_by_admin(self):
181 def test_clone_git_repo_by_admin(self):
181 clone_url = _construct_url(GIT_REPO)
182 clone_url = _construct_url(GIT_REPO)
182 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
183 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
183
184
184 assert 'Cloning into' in stdout
185 assert 'Cloning into' in stdout
185 assert stderr == ''
186 assert stderr == ''
186
187
187 def test_clone_wrong_credentials_hg(self):
188 def test_clone_wrong_credentials_hg(self):
188 clone_url = _construct_url(HG_REPO, passwd='bad!')
189 clone_url = _construct_url(HG_REPO, passwd='bad!')
189 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
190 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
190 assert 'abort: authorization failed' in stderr
191 assert 'abort: authorization failed' in stderr
191
192
192 def test_clone_wrong_credentials_git(self):
193 def test_clone_wrong_credentials_git(self):
193 clone_url = _construct_url(GIT_REPO, passwd='bad!')
194 clone_url = _construct_url(GIT_REPO, passwd='bad!')
194 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
195 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
195 assert 'fatal: Authentication failed' in stderr
196 assert 'fatal: Authentication failed' in stderr
196
197
197 def test_clone_git_dir_as_hg(self):
198 def test_clone_git_dir_as_hg(self):
198 clone_url = _construct_url(GIT_REPO)
199 clone_url = _construct_url(GIT_REPO)
199 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
200 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
200 assert 'HTTP Error 404: Not Found' in stderr
201 assert 'HTTP Error 404: Not Found' in stderr
201
202
202 def test_clone_hg_repo_as_git(self):
203 def test_clone_hg_repo_as_git(self):
203 clone_url = _construct_url(HG_REPO)
204 clone_url = _construct_url(HG_REPO)
204 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
205 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
205 assert 'not found:' in stderr
206 assert 'not found:' in stderr
206
207
207 def test_clone_non_existing_path_hg(self):
208 def test_clone_non_existing_path_hg(self):
208 clone_url = _construct_url('trololo')
209 clone_url = _construct_url('trololo')
209 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
210 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
210 assert 'HTTP Error 404: Not Found' in stderr
211 assert 'HTTP Error 404: Not Found' in stderr
211
212
212 def test_clone_non_existing_path_git(self):
213 def test_clone_non_existing_path_git(self):
213 clone_url = _construct_url('trololo')
214 clone_url = _construct_url('trololo')
214 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
215 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
215 assert 'not found:' in stderr
216 assert 'not found:' in stderr
216
217
217 def test_push_new_file_hg(self):
218 def test_push_new_file_hg(self):
218 DEST = _get_tmp_dir()
219 DEST = _get_tmp_dir()
219 clone_url = _construct_url(HG_REPO, dest=DEST)
220 clone_url = _construct_url(HG_REPO, dest=DEST)
220 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
221 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
221
222
222 stdout, stderr = _add_files_and_push('hg', DEST)
223 stdout, stderr = _add_files_and_push('hg', DEST)
223
224
224 assert 'pushing to' in stdout
225 assert 'pushing to' in stdout
225 assert 'Repository size' in stdout
226 assert 'Repository size' in stdout
226 assert 'Last revision is now' in stdout
227 assert 'Last revision is now' in stdout
227
228
228 def test_push_new_file_git(self):
229 def test_push_new_file_git(self):
229 DEST = _get_tmp_dir()
230 DEST = _get_tmp_dir()
230 clone_url = _construct_url(GIT_REPO, dest=DEST)
231 clone_url = _construct_url(GIT_REPO, dest=DEST)
231 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
232 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
232
233
233 # commit some stuff into this repo
234 # commit some stuff into this repo
234 stdout, stderr = _add_files_and_push('git', DEST)
235 stdout, stderr = _add_files_and_push('git', DEST)
235
236
236 #WTF git stderr ?!
237 #WTF git stderr ?!
237 assert 'master -> master' in stderr
238 assert 'master -> master' in stderr
238
239
239 def test_push_wrong_credentials_hg(self):
240 def test_push_wrong_credentials_hg(self):
240 DEST = _get_tmp_dir()
241 DEST = _get_tmp_dir()
241 clone_url = _construct_url(HG_REPO, dest=DEST)
242 clone_url = _construct_url(HG_REPO, dest=DEST)
242 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
243 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
243
244
244 stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
245 stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
245 passwd='name')
246 passwd='name')
246
247
247 assert 'abort: authorization failed' in stderr
248 assert 'abort: authorization failed' in stderr
248
249
249 def test_push_wrong_credentials_git(self):
250 def test_push_wrong_credentials_git(self):
250 DEST = _get_tmp_dir()
251 DEST = _get_tmp_dir()
251 clone_url = _construct_url(GIT_REPO, dest=DEST)
252 clone_url = _construct_url(GIT_REPO, dest=DEST)
252 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
253 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
253
254
254 stdout, stderr = _add_files_and_push('git', DEST, user='bad',
255 stdout, stderr = _add_files_and_push('git', DEST, user='bad',
255 passwd='name')
256 passwd='name')
256
257
257 assert 'fatal: Authentication failed' in stderr
258 assert 'fatal: Authentication failed' in stderr
258
259
259 def test_push_back_to_wrong_url_hg(self):
260 def test_push_back_to_wrong_url_hg(self):
260 DEST = _get_tmp_dir()
261 DEST = _get_tmp_dir()
261 clone_url = _construct_url(HG_REPO, dest=DEST)
262 clone_url = _construct_url(HG_REPO, dest=DEST)
262 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
263 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
263
264
264 stdout, stderr = _add_files_and_push('hg', DEST,
265 stdout, stderr = _add_files_and_push('hg', DEST,
265 clone_url='http://127.0.0.1:5000/tmp',)
266 clone_url='http://127.0.0.1:5000/tmp',)
266
267
267 assert 'HTTP Error 404: Not Found' in stderr
268 assert 'HTTP Error 404: Not Found' in stderr
268
269
269 def test_push_back_to_wrong_url_git(self):
270 def test_push_back_to_wrong_url_git(self):
270 DEST = _get_tmp_dir()
271 DEST = _get_tmp_dir()
271 clone_url = _construct_url(GIT_REPO, dest=DEST)
272 clone_url = _construct_url(GIT_REPO, dest=DEST)
272 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
273 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
273
274
274 stdout, stderr = _add_files_and_push('git', DEST,
275 stdout, stderr = _add_files_and_push('git', DEST,
275 clone_url='http://127.0.0.1:5000/tmp',)
276 clone_url='http://127.0.0.1:5000/tmp',)
276
277
277 assert 'not found:' in stderr
278 assert 'not found:' in stderr
278
279
279 def test_clone_and_create_lock_hg(self):
280 def test_clone_and_create_lock_hg(self):
280 # enable locking
281 # enable locking
281 r = Repository.get_by_repo_name(HG_REPO)
282 r = Repository.get_by_repo_name(HG_REPO)
282 r.enable_locking = True
283 r.enable_locking = True
283 Session().add(r)
284 Session().add(r)
284 Session().commit()
285 Session().commit()
285 # clone
286 # clone
286 clone_url = _construct_url(HG_REPO)
287 clone_url = _construct_url(HG_REPO)
287 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
288 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
288
289
289 #check if lock was made
290 #check if lock was made
290 r = Repository.get_by_repo_name(HG_REPO)
291 r = Repository.get_by_repo_name(HG_REPO)
291 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
292 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
292
293
293 def test_clone_and_create_lock_git(self):
294 def test_clone_and_create_lock_git(self):
294 # enable locking
295 # enable locking
295 r = Repository.get_by_repo_name(GIT_REPO)
296 r = Repository.get_by_repo_name(GIT_REPO)
296 r.enable_locking = True
297 r.enable_locking = True
297 Session().add(r)
298 Session().add(r)
298 Session().commit()
299 Session().commit()
299 # clone
300 # clone
300 clone_url = _construct_url(GIT_REPO)
301 clone_url = _construct_url(GIT_REPO)
301 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
302 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
302
303
303 #check if lock was made
304 #check if lock was made
304 r = Repository.get_by_repo_name(GIT_REPO)
305 r = Repository.get_by_repo_name(GIT_REPO)
305 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
306 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
306
307
307 def test_clone_after_repo_was_locked_hg(self):
308 def test_clone_after_repo_was_locked_hg(self):
308 #lock repo
309 #lock repo
309 r = Repository.get_by_repo_name(HG_REPO)
310 r = Repository.get_by_repo_name(HG_REPO)
310 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
311 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
311 #pull fails since repo is locked
312 #pull fails since repo is locked
312 clone_url = _construct_url(HG_REPO)
313 clone_url = _construct_url(HG_REPO)
313 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
314 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
314 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
315 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
315 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
316 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
316 assert msg in stderr
317 assert msg in stderr
317
318
318 def test_clone_after_repo_was_locked_git(self):
319 def test_clone_after_repo_was_locked_git(self):
319 #lock repo
320 #lock repo
320 r = Repository.get_by_repo_name(GIT_REPO)
321 r = Repository.get_by_repo_name(GIT_REPO)
321 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
322 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
322 #pull fails since repo is locked
323 #pull fails since repo is locked
323 clone_url = _construct_url(GIT_REPO)
324 clone_url = _construct_url(GIT_REPO)
324 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
325 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
325 msg = ("""423 Repository `%s` locked by user `%s`"""
326 msg = ("""423 Repository `%s` locked by user `%s`"""
326 % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
327 % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
327 assert msg in stderr
328 assert msg in stderr
328
329
329 def test_push_on_locked_repo_by_other_user_hg(self):
330 def test_push_on_locked_repo_by_other_user_hg(self):
330 #clone some temp
331 #clone some temp
331 DEST = _get_tmp_dir()
332 DEST = _get_tmp_dir()
332 clone_url = _construct_url(HG_REPO, dest=DEST)
333 clone_url = _construct_url(HG_REPO, dest=DEST)
333 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
334 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
334
335
335 #lock repo
336 #lock repo
336 r = Repository.get_by_repo_name(HG_REPO)
337 r = Repository.get_by_repo_name(HG_REPO)
337 # let this user actually push !
338 # let this user actually push !
338 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
339 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
339 perm='repository.write')
340 perm='repository.write')
340 Session().commit()
341 Session().commit()
341 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
342 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
342
343
343 #push fails repo is locked by other user !
344 #push fails repo is locked by other user !
344 stdout, stderr = _add_files_and_push('hg', DEST,
345 stdout, stderr = _add_files_and_push('hg', DEST,
345 user=TEST_USER_REGULAR_LOGIN,
346 user=TEST_USER_REGULAR_LOGIN,
346 passwd=TEST_USER_REGULAR_PASS)
347 passwd=TEST_USER_REGULAR_PASS)
347 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
348 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
348 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
349 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
349 assert msg in stderr
350 assert msg in stderr
350
351
351 #TODO: fix me ! somehow during tests hooks don't get called on GIT
352 #TODO: fix me ! somehow during tests hooks don't get called on GIT
352 # def test_push_on_locked_repo_by_other_user_git(self):
353 # def test_push_on_locked_repo_by_other_user_git(self):
353 # #clone some temp
354 # #clone some temp
354 # DEST = _get_tmp_dir()
355 # DEST = _get_tmp_dir()
355 # clone_url = _construct_url(GIT_REPO, dest=DEST)
356 # clone_url = _construct_url(GIT_REPO, dest=DEST)
356 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
357 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
357 #
358 #
358 # #lock repo
359 # #lock repo
359 # r = Repository.get_by_repo_name(GIT_REPO)
360 # r = Repository.get_by_repo_name(GIT_REPO)
360 # # let this user actually push !
361 # # let this user actually push !
361 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
362 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
362 # perm='repository.write')
363 # perm='repository.write')
363 # Session().commit()
364 # Session().commit()
364 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
365 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
365 #
366 #
366 # #push fails repo is locked by other user !
367 # #push fails repo is locked by other user !
367 # stdout, stderr = _add_files_and_push('git', DEST,
368 # stdout, stderr = _add_files_and_push('git', DEST,
368 # user=TEST_USER_REGULAR_LOGIN,
369 # user=TEST_USER_REGULAR_LOGIN,
369 # passwd=TEST_USER_REGULAR_PASS)
370 # passwd=TEST_USER_REGULAR_PASS)
370 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
371 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
371 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
372 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
372 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
373 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
373 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
374 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
374 # msg = "405 Method Not Allowed"
375 # msg = "405 Method Not Allowed"
375 # assert msg in stderr
376 # assert msg in stderr
376
377
377 def test_push_unlocks_repository_hg(self):
378 def test_push_unlocks_repository_hg(self):
378 # enable locking
379 # enable locking
379 r = Repository.get_by_repo_name(HG_REPO)
380 r = Repository.get_by_repo_name(HG_REPO)
380 r.enable_locking = True
381 r.enable_locking = True
381 Session().add(r)
382 Session().add(r)
382 Session().commit()
383 Session().commit()
383 #clone some temp
384 #clone some temp
384 DEST = _get_tmp_dir()
385 DEST = _get_tmp_dir()
385 clone_url = _construct_url(HG_REPO, dest=DEST)
386 clone_url = _construct_url(HG_REPO, dest=DEST)
386 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
387 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
387
388
388 #check for lock repo after clone
389 #check for lock repo after clone
389 r = Repository.get_by_repo_name(HG_REPO)
390 r = Repository.get_by_repo_name(HG_REPO)
390 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
391 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
391
392
392 #push is ok and repo is now unlocked
393 #push is ok and repo is now unlocked
393 stdout, stderr = _add_files_and_push('hg', DEST)
394 stdout, stderr = _add_files_and_push('hg', DEST)
394 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
395 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
395 #we need to cleanup the Session Here !
396 #we need to cleanup the Session Here !
396 Session.remove()
397 Session.remove()
397 r = Repository.get_by_repo_name(HG_REPO)
398 r = Repository.get_by_repo_name(HG_REPO)
398 assert r.locked == [None, None]
399 assert r.locked == [None, None]
399
400
400 #TODO: fix me ! somehow during tests hooks don't get called on GIT
401 #TODO: fix me ! somehow during tests hooks don't get called on GIT
401 # def test_push_unlocks_repository_git(self):
402 # def test_push_unlocks_repository_git(self):
402 # # enable locking
403 # # enable locking
403 # r = Repository.get_by_repo_name(GIT_REPO)
404 # r = Repository.get_by_repo_name(GIT_REPO)
404 # r.enable_locking = True
405 # r.enable_locking = True
405 # Session().add(r)
406 # Session().add(r)
406 # Session().commit()
407 # Session().commit()
407 # #clone some temp
408 # #clone some temp
408 # DEST = _get_tmp_dir()
409 # DEST = _get_tmp_dir()
409 # clone_url = _construct_url(GIT_REPO, dest=DEST)
410 # clone_url = _construct_url(GIT_REPO, dest=DEST)
410 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
411 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
411 #
412 #
412 # #check for lock repo after clone
413 # #check for lock repo after clone
413 # r = Repository.get_by_repo_name(GIT_REPO)
414 # r = Repository.get_by_repo_name(GIT_REPO)
414 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
415 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
415 #
416 #
416 # #push is ok and repo is now unlocked
417 # #push is ok and repo is now unlocked
417 # stdout, stderr = _add_files_and_push('git', DEST)
418 # stdout, stderr = _add_files_and_push('git', DEST)
418 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
419 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
419 # #we need to cleanup the Session Here !
420 # #we need to cleanup the Session Here !
420 # Session.remove()
421 # Session.remove()
421 # r = Repository.get_by_repo_name(GIT_REPO)
422 # r = Repository.get_by_repo_name(GIT_REPO)
422 # assert r.locked == [None, None]
423 # assert r.locked == [None, None]
424
425 def test_ip_restriction_hg(self):
426 user_model = UserModel()
427 new_ip = user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
428 Session().commit()
429 clone_url = _construct_url(HG_REPO)
430 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
431 assert 'abort: HTTP Error 403: Forbidden' in stderr
432
433 #release IP restrictions
434 clone_url = _construct_url(HG_REPO)
435 user_model.delete_extra_ip(TEST_USER_ADMIN_LOGIN, new_ip.ip_id)
436 Session().commit()
437 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
438
439 assert 'requesting all changes' in stdout
440 assert 'adding changesets' in stdout
441 assert 'adding manifests' in stdout
442 assert 'adding file changes' in stdout
443
444 assert stderr == ''
445
446 def test_ip_restriction_git(self):
447 user_model = UserModel()
448 new_ip = user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
449 Session().commit()
450 clone_url = _construct_url(GIT_REPO)
451 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
452 assert 'error: The requested URL returned error: 403 Forbidden' in stderr
453
454 #release IP restrictions
455 clone_url = _construct_url(GIT_REPO)
456 user_model.delete_extra_ip(TEST_USER_ADMIN_LOGIN, new_ip.ip_id)
457 Session().commit()
458 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
459
460 assert 'Cloning into' in stdout
461 assert stderr == ''
General Comments 0
You need to be logged in to leave comments. Login now