##// END OF EJS Templates
more usage of fixture tools...
marcink -
r3647:8a86836f beta
parent child Browse files
Show More
@@ -1,210 +1,175 b''
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9
10 10 nosetests -x - fail on first error
11 11 nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
12 12 nosetests --pdb --pdb-failures
13 13 nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
14 14
15 15 optional FLAGS:
16 16 RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
17 17 RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
18 18
19 19 """
20 20 import os
21 21 import time
22 22 import logging
23 23 import datetime
24 24 import hashlib
25 25 import tempfile
26 26 from os.path import join as jn
27 27
28 28 from unittest import TestCase
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33 from pylons import config, url
34 34 from routes.util import URLGenerator
35 35 from webtest import TestApp
36 36 from nose.plugins.skip import SkipTest
37 37
38 38 from rhodecode import is_windows
39 39 from rhodecode.model.meta import Session
40 40 from rhodecode.model.db import User
41 41 from rhodecode.tests.nose_parametrized import parameterized
42 42
43 43 import pylons.test
44 44 from rhodecode.lib.utils2 import safe_unicode, safe_str
45 45
46 46
47 47 os.environ['TZ'] = 'UTC'
48 48 if not is_windows:
49 49 time.tzset()
50 50
51 51 log = logging.getLogger(__name__)
52 52
53 53 __all__ = [
54 54 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
55 55 'SkipTest',
56 56 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
57 57 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
58 58 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
59 59 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
60 60 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
61 61 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
62 62 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
63 'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params',
64 '_get_group_create_params'
63 'GIT_REMOTE_REPO', 'SCM_TESTS',
65 64 ]
66 65
67 66 # Invoke websetup with the current config file
68 67 # SetupCommand('setup-app').run([config_file])
69 68
70 69 environ = {}
71 70
72 71 #SOME GLOBALS FOR TESTS
73 72
74 73 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
75 74 TEST_USER_ADMIN_LOGIN = 'test_admin'
76 75 TEST_USER_ADMIN_PASS = 'test12'
77 76 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
78 77
79 78 TEST_USER_REGULAR_LOGIN = 'test_regular'
80 79 TEST_USER_REGULAR_PASS = 'test12'
81 80 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
82 81
83 82 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
84 83 TEST_USER_REGULAR2_PASS = 'test12'
85 84 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
86 85
87 86 HG_REPO = 'vcs_test_hg'
88 87 GIT_REPO = 'vcs_test_git'
89 88
90 89 NEW_HG_REPO = 'vcs_test_hg_new'
91 90 NEW_GIT_REPO = 'vcs_test_git_new'
92 91
93 92 HG_FORK = 'vcs_test_hg_fork'
94 93 GIT_FORK = 'vcs_test_git_fork'
95 94
96 95 ## VCS
97 96 SCM_TESTS = ['hg', 'git']
98 97 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
99 98
100 99 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
101 100
102 101 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
103 102 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
104 103 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
105 104
106 105
107 106 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
108 107
109 108 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
110 109 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
111 110 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
112 111
113 112 TEST_DIR = tempfile.gettempdir()
114 113 TEST_REPO_PREFIX = 'vcs-test'
115 114
116 115 # cached repos if any !
117 116 # comment out to get some other repos from bb or github
118 117 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
119 118 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
120 119
121 120
122 121 def get_new_dir(title):
123 122 """
124 123 Returns always new directory path.
125 124 """
126 125 from rhodecode.tests.vcs.utils import get_normalized_path
127 126 name = TEST_REPO_PREFIX
128 127 if title:
129 128 name = '-'.join((name, title))
130 129 hex = hashlib.sha1(str(time.time())).hexdigest()
131 130 name = '-'.join((name, hex))
132 131 path = os.path.join(TEST_DIR, name)
133 132 return get_normalized_path(path)
134 133
135 134
136 135 class TestController(TestCase):
137 136
138 137 def __init__(self, *args, **kwargs):
139 138 wsgiapp = pylons.test.pylonsapp
140 139 config = wsgiapp.config
141 140
142 141 self.app = TestApp(wsgiapp)
143 142 url._push_object(URLGenerator(config['routes.map'], environ))
144 143 self.Session = Session
145 144 self.index_location = config['app_conf']['index_dir']
146 145 TestCase.__init__(self, *args, **kwargs)
147 146
148 147 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
149 148 password=TEST_USER_ADMIN_PASS):
150 149 self._logged_username = username
151 150 response = self.app.post(url(controller='login', action='index'),
152 151 {'username': username,
153 152 'password': password})
154 153
155 154 if 'invalid user name' in response.body:
156 155 self.fail('could not login using %s %s' % (username, password))
157 156
158 157 self.assertEqual(response.status, '302 Found')
159 158 ses = response.session['rhodecode_user']
160 159 self.assertEqual(ses.get('username'), username)
161 160 response = response.follow()
162 161 self.assertEqual(ses.get('is_authenticated'), True)
163 162
164 163 return response.session['rhodecode_user']
165 164
166 165 def _get_logged_user(self):
167 166 return User.get_by_username(self._logged_username)
168 167
169 168 def checkSessionFlash(self, response, msg):
170 169 self.assertTrue('flash' in response.session,
171 170 msg='Response session:%r have no flash'
172 171 % response.session)
173 172 if not msg in response.session['flash'][0][1]:
174 173 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
175 174 msg, response.session['flash'][0][1])
176 175 self.fail(safe_str(msg))
177
178
179 ## HELPERS ##
180
181 def _get_repo_create_params(**custom):
182 defs = {
183 'repo_name': None,
184 'repo_type': 'hg',
185 'clone_uri': '',
186 'repo_group': '',
187 'repo_description': 'DESC',
188 'repo_private': False,
189 'repo_landing_rev': 'tip'
190 }
191 defs.update(custom)
192 if 'repo_name_full' not in custom:
193 defs.update({'repo_name_full': defs['repo_name']})
194
195 return defs
196
197
198 def _get_group_create_params(**custom):
199 defs = dict(
200 group_name=None,
201 group_description='DESC',
202 group_parent_id=None,
203 perms_updates=[],
204 perms_new=[],
205 enable_locking=False,
206 recursive=False
207 )
208 defs.update(custom)
209
210 return defs
@@ -1,1331 +1,1311 b''
1 1 from __future__ import with_statement
2 2 import random
3 3 import mock
4 4
5 5 from rhodecode.tests import *
6 from rhodecode.tests.fixture import Fixture
6 7 from rhodecode.lib.compat import json
7 8 from rhodecode.lib.auth import AuthUser
8 9 from rhodecode.model.user import UserModel
9 10 from rhodecode.model.users_group import UserGroupModel
10 11 from rhodecode.model.repo import RepoModel
11 12 from rhodecode.model.meta import Session
12 13 from rhodecode.model.scm import ScmModel
13 14 from rhodecode.model.db import Repository
14 15
16
15 17 API_URL = '/_admin/api'
18 TEST_USER_GROUP = 'test_users_group'
19
20 fixture = Fixture()
16 21
17 22
18 23 def _build_data(apikey, method, **kw):
19 24 """
20 25 Builds API data with given random ID
21 26
22 27 :param random_id:
23 28 :type random_id:
24 29 """
25 30 random_id = random.randrange(1, 9999)
26 31 return random_id, json.dumps({
27 32 "id": random_id,
28 33 "api_key": apikey,
29 34 "method": method,
30 35 "args": kw
31 36 })
32 37
33 38 jsonify = lambda obj: json.loads(json.dumps(obj))
34 39
35 40
36 41 def crash(*args, **kwargs):
37 42 raise Exception('Total Crash !')
38 43
39 44
40 45 def api_call(test_obj, params):
41 46 response = test_obj.app.post(API_URL, content_type='application/json',
42 47 params=params)
43 48 return response
44 49
45 50
46 TEST_USER_GROUP = 'test_users_group'
47
48
51 ## helpers
49 52 def make_users_group(name=TEST_USER_GROUP):
50 53 gr = UserGroupModel().create(name=name)
51 54 UserGroupModel().add_user_to_group(users_group=gr,
52 55 user=TEST_USER_ADMIN_LOGIN)
53 56 Session().commit()
54 57 return gr
55 58
56 59
57 60 def destroy_users_group(name=TEST_USER_GROUP):
58 61 UserGroupModel().delete(users_group=name, force=True)
59 62 Session().commit()
60 63
61 64
62 def create_repo(repo_name, repo_type, owner=None):
63 # create new repo
64 form_data = _get_repo_create_params(
65 repo_name_full=repo_name,
66 repo_description='description %s' % repo_name,
67 )
68 cur_user = UserModel().get_by_username(owner or TEST_USER_ADMIN_LOGIN)
69 r = RepoModel().create(form_data, cur_user)
70 Session().commit()
71 return r
72
73
74 def create_fork(fork_name, fork_type, fork_of):
75 fork = RepoModel(Session())._get_repo(fork_of)
76 r = create_repo(fork_name, fork_type)
77 r.fork = fork
78 Session().add(r)
79 Session().commit()
80 return r
81
82
83 def destroy_repo(repo_name):
84 RepoModel().delete(repo_name)
85 Session().commit()
86
87
88 65 class BaseTestApi(object):
89 66 REPO = None
90 67 REPO_TYPE = None
91 68
92 69 @classmethod
93 70 def setUpClass(self):
94 71 self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
95 72 self.apikey = self.usr.api_key
96 73 self.test_user = UserModel().create_or_update(
97 74 username='test-api',
98 75 password='test',
99 76 email='test@api.rhodecode.org',
100 77 firstname='first',
101 78 lastname='last'
102 79 )
103 80 Session().commit()
104 81 self.TEST_USER_LOGIN = self.test_user.username
105 82 self.apikey_regular = self.test_user.api_key
106 83
107 84 @classmethod
108 85 def teardownClass(self):
109 86 pass
110 87
111 88 def setUp(self):
112 89 self.maxDiff = None
113 90 make_users_group()
114 91
115 92 def tearDown(self):
116 93 destroy_users_group()
117 94
118 95 def _compare_ok(self, id_, expected, given):
119 96 expected = jsonify({
120 97 'id': id_,
121 98 'error': None,
122 99 'result': expected
123 100 })
124 101 given = json.loads(given)
125 102 self.assertEqual(expected, given)
126 103
127 104 def _compare_error(self, id_, expected, given):
128 105 expected = jsonify({
129 106 'id': id_,
130 107 'error': expected,
131 108 'result': None
132 109 })
133 110 given = json.loads(given)
134 111 self.assertEqual(expected, given)
135 112
136 113 # def test_Optional(self):
137 114 # from rhodecode.controllers.api.api import Optional
138 115 # option1 = Optional(None)
139 116 # self.assertEqual('<Optional:%s>' % None, repr(option1))
140 117 #
141 118 # self.assertEqual(1, Optional.extract(Optional(1)))
142 119 # self.assertEqual('trololo', Optional.extract('trololo'))
143 120
144 121 def test_api_wrong_key(self):
145 122 id_, params = _build_data('trololo', 'get_user')
146 123 response = api_call(self, params)
147 124
148 125 expected = 'Invalid API KEY'
149 126 self._compare_error(id_, expected, given=response.body)
150 127
151 128 def test_api_missing_non_optional_param(self):
152 129 id_, params = _build_data(self.apikey, 'get_repo')
153 130 response = api_call(self, params)
154 131
155 132 expected = 'Missing non optional `repoid` arg in JSON DATA'
156 133 self._compare_error(id_, expected, given=response.body)
157 134
158 135 def test_api_missing_non_optional_param_args_null(self):
159 136 id_, params = _build_data(self.apikey, 'get_repo')
160 137 params = params.replace('"args": {}', '"args": null')
161 138 response = api_call(self, params)
162 139
163 140 expected = 'Missing non optional `repoid` arg in JSON DATA'
164 141 self._compare_error(id_, expected, given=response.body)
165 142
166 143 def test_api_missing_non_optional_param_args_bad(self):
167 144 id_, params = _build_data(self.apikey, 'get_repo')
168 145 params = params.replace('"args": {}', '"args": 1')
169 146 response = api_call(self, params)
170 147
171 148 expected = 'Missing non optional `repoid` arg in JSON DATA'
172 149 self._compare_error(id_, expected, given=response.body)
173 150
174 151 def test_api_args_is_null(self):
175 152 id_, params = _build_data(self.apikey, 'get_users',)
176 153 params = params.replace('"args": {}', '"args": null')
177 154 response = api_call(self, params)
178 155 self.assertEqual(response.status, '200 OK')
179 156
180 157 def test_api_args_is_bad(self):
181 158 id_, params = _build_data(self.apikey, 'get_users',)
182 159 params = params.replace('"args": {}', '"args": 1')
183 160 response = api_call(self, params)
184 161 self.assertEqual(response.status, '200 OK')
185 162
186 163 def test_api_get_users(self):
187 164 id_, params = _build_data(self.apikey, 'get_users',)
188 165 response = api_call(self, params)
189 166 ret_all = []
190 167 for usr in UserModel().get_all():
191 168 ret = usr.get_api_data()
192 169 ret_all.append(jsonify(ret))
193 170 expected = ret_all
194 171 self._compare_ok(id_, expected, given=response.body)
195 172
196 173 def test_api_get_user(self):
197 174 id_, params = _build_data(self.apikey, 'get_user',
198 175 userid=TEST_USER_ADMIN_LOGIN)
199 176 response = api_call(self, params)
200 177
201 178 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
202 179 ret = usr.get_api_data()
203 180 ret['permissions'] = AuthUser(usr.user_id).permissions
204 181
205 182 expected = ret
206 183 self._compare_ok(id_, expected, given=response.body)
207 184
208 185 def test_api_get_user_that_does_not_exist(self):
209 186 id_, params = _build_data(self.apikey, 'get_user',
210 187 userid='trololo')
211 188 response = api_call(self, params)
212 189
213 190 expected = "user `%s` does not exist" % 'trololo'
214 191 self._compare_error(id_, expected, given=response.body)
215 192
216 193 def test_api_get_user_without_giving_userid(self):
217 194 id_, params = _build_data(self.apikey, 'get_user')
218 195 response = api_call(self, params)
219 196
220 197 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
221 198 ret = usr.get_api_data()
222 199 ret['permissions'] = AuthUser(usr.user_id).permissions
223 200
224 201 expected = ret
225 202 self._compare_ok(id_, expected, given=response.body)
226 203
227 204 def test_api_get_user_without_giving_userid_non_admin(self):
228 205 id_, params = _build_data(self.apikey_regular, 'get_user')
229 206 response = api_call(self, params)
230 207
231 208 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
232 209 ret = usr.get_api_data()
233 210 ret['permissions'] = AuthUser(usr.user_id).permissions
234 211
235 212 expected = ret
236 213 self._compare_ok(id_, expected, given=response.body)
237 214
238 215 def test_api_get_user_with_giving_userid_non_admin(self):
239 216 id_, params = _build_data(self.apikey_regular, 'get_user',
240 217 userid=self.TEST_USER_LOGIN)
241 218 response = api_call(self, params)
242 219
243 220 expected = 'userid is not the same as your user'
244 221 self._compare_error(id_, expected, given=response.body)
245 222
246 223 def test_api_pull(self):
247 224 #TODO: issues with rhodecode_extras here.. not sure why !
248 225 pass
249 226
250 227 # repo_name = 'test_pull'
251 # r = create_repo(repo_name, self.REPO_TYPE)
228 # r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
252 229 # r.clone_uri = TEST_self.REPO
253 230 # Session.add(r)
254 231 # Session.commit()
255 232 #
256 233 # id_, params = _build_data(self.apikey, 'pull',
257 234 # repoid=repo_name,)
258 235 # response = self.app.post(API_URL, content_type='application/json',
259 236 # params=params)
260 237 #
261 238 # expected = 'Pulled from `%s`' % repo_name
262 239 # self._compare_ok(id_, expected, given=response.body)
263 240 #
264 # destroy_repo(repo_name)
241 # fixture.destroy_repo(repo_name)
265 242
266 243 def test_api_pull_error(self):
267 244 id_, params = _build_data(self.apikey, 'pull',
268 245 repoid=self.REPO,)
269 246 response = api_call(self, params)
270 247
271 248 expected = 'Unable to pull changes from `%s`' % self.REPO
272 249 self._compare_error(id_, expected, given=response.body)
273 250
274 251 def test_api_rescan_repos(self):
275 252 id_, params = _build_data(self.apikey, 'rescan_repos')
276 253 response = api_call(self, params)
277 254
278 255 expected = {'added': [], 'removed': []}
279 256 self._compare_ok(id_, expected, given=response.body)
280 257
281 258 @mock.patch.object(ScmModel, 'repo_scan', crash)
282 259 def test_api_rescann_error(self):
283 260 id_, params = _build_data(self.apikey, 'rescan_repos',)
284 261 response = api_call(self, params)
285 262
286 263 expected = 'Error occurred during rescan repositories action'
287 264 self._compare_error(id_, expected, given=response.body)
288 265
289 266 def test_api_invalidate_cache(self):
290 267 id_, params = _build_data(self.apikey, 'invalidate_cache',
291 268 repoid=self.REPO)
292 269 response = api_call(self, params)
293 270
294 271 expected = ("Cache for repository `%s` was invalidated: "
295 272 "invalidated cache keys: %s" % (self.REPO,
296 273 [unicode(self.REPO)]))
297 274 self._compare_ok(id_, expected, given=response.body)
298 275
299 276 @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
300 277 def test_api_invalidate_cache_error(self):
301 278 id_, params = _build_data(self.apikey, 'invalidate_cache',
302 279 repoid=self.REPO)
303 280 response = api_call(self, params)
304 281
305 282 expected = 'Error occurred during cache invalidation action'
306 283 self._compare_error(id_, expected, given=response.body)
307 284
308 285 def test_api_lock_repo_lock_aquire(self):
309 286 id_, params = _build_data(self.apikey, 'lock',
310 287 userid=TEST_USER_ADMIN_LOGIN,
311 288 repoid=self.REPO,
312 289 locked=True)
313 290 response = api_call(self, params)
314 291 expected = ('User `%s` set lock state for repo `%s` to `%s`'
315 292 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
316 293 self._compare_ok(id_, expected, given=response.body)
317 294
318 295 def test_api_lock_repo_lock_aquire_by_non_admin(self):
319 296 repo_name = 'api_delete_me'
320 create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
297 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
298 cur_user=self.TEST_USER_LOGIN)
321 299 try:
322 300 id_, params = _build_data(self.apikey_regular, 'lock',
323 301 repoid=repo_name,
324 302 locked=True)
325 303 response = api_call(self, params)
326 304 expected = ('User `%s` set lock state for repo `%s` to `%s`'
327 305 % (self.TEST_USER_LOGIN, repo_name, True))
328 306 self._compare_ok(id_, expected, given=response.body)
329 307 finally:
330 destroy_repo(repo_name)
308 fixture.destroy_repo(repo_name)
331 309
332 310 def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
333 311 repo_name = 'api_delete_me'
334 create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
312 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
313 cur_user=self.TEST_USER_LOGIN)
335 314 try:
336 315 id_, params = _build_data(self.apikey_regular, 'lock',
337 316 userid=TEST_USER_ADMIN_LOGIN,
338 317 repoid=repo_name,
339 318 locked=True)
340 319 response = api_call(self, params)
341 320 expected = 'userid is not the same as your user'
342 321 self._compare_error(id_, expected, given=response.body)
343 322 finally:
344 destroy_repo(repo_name)
323 fixture.destroy_repo(repo_name)
345 324
346 325 def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
347 326 id_, params = _build_data(self.apikey_regular, 'lock',
348 327 repoid=self.REPO,
349 328 locked=True)
350 329 response = api_call(self, params)
351 330 expected = 'repository `%s` does not exist' % (self.REPO)
352 331 self._compare_error(id_, expected, given=response.body)
353 332
354 333 def test_api_lock_repo_lock_release(self):
355 334 id_, params = _build_data(self.apikey, 'lock',
356 335 userid=TEST_USER_ADMIN_LOGIN,
357 336 repoid=self.REPO,
358 337 locked=False)
359 338 response = api_call(self, params)
360 339 expected = ('User `%s` set lock state for repo `%s` to `%s`'
361 340 % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
362 341 self._compare_ok(id_, expected, given=response.body)
363 342
364 343 def test_api_lock_repo_lock_aquire_optional_userid(self):
365 344 id_, params = _build_data(self.apikey, 'lock',
366 345 repoid=self.REPO,
367 346 locked=True)
368 347 response = api_call(self, params)
369 348 expected = ('User `%s` set lock state for repo `%s` to `%s`'
370 349 % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
371 350 self._compare_ok(id_, expected, given=response.body)
372 351
373 352 def test_api_lock_repo_lock_optional_locked(self):
374 353 from rhodecode.lib.utils2 import time_to_datetime
375 354 _locked_since = json.dumps(time_to_datetime(Repository\
376 355 .get_by_repo_name(self.REPO).locked[1]))
377 356 id_, params = _build_data(self.apikey, 'lock',
378 357 repoid=self.REPO)
379 358 response = api_call(self, params)
380 359 expected = ('Repo `%s` locked by `%s`. Locked=`True`. Locked since: `%s`'
381 360 % (self.REPO, TEST_USER_ADMIN_LOGIN, _locked_since))
382 361 self._compare_ok(id_, expected, given=response.body)
383 362
384 363 @mock.patch.object(Repository, 'lock', crash)
385 364 def test_api_lock_error(self):
386 365 id_, params = _build_data(self.apikey, 'lock',
387 366 userid=TEST_USER_ADMIN_LOGIN,
388 367 repoid=self.REPO,
389 368 locked=True)
390 369 response = api_call(self, params)
391 370
392 371 expected = 'Error occurred locking repository `%s`' % self.REPO
393 372 self._compare_error(id_, expected, given=response.body)
394 373
395 374 def test_api_get_locks_regular_user(self):
396 375 id_, params = _build_data(self.apikey_regular, 'get_locks')
397 376 response = api_call(self, params)
398 377 expected = []
399 378 self._compare_ok(id_, expected, given=response.body)
400 379
401 380 def test_api_get_locks_with_userid_regular_user(self):
402 381 id_, params = _build_data(self.apikey_regular, 'get_locks',
403 382 userid=TEST_USER_ADMIN_LOGIN)
404 383 response = api_call(self, params)
405 384 expected = 'userid is not the same as your user'
406 385 self._compare_error(id_, expected, given=response.body)
407 386
408 387 def test_api_get_locks(self):
409 388 id_, params = _build_data(self.apikey, 'get_locks')
410 389 response = api_call(self, params)
411 390 expected = []
412 391 self._compare_ok(id_, expected, given=response.body)
413 392
414 393 def test_api_get_locks_with_userid(self):
415 394 id_, params = _build_data(self.apikey, 'get_locks',
416 395 userid=TEST_USER_REGULAR_LOGIN)
417 396 response = api_call(self, params)
418 397 expected = []
419 398 self._compare_ok(id_, expected, given=response.body)
420 399
421 400 def test_api_create_existing_user(self):
422 401 id_, params = _build_data(self.apikey, 'create_user',
423 402 username=TEST_USER_ADMIN_LOGIN,
424 403 email='test@foo.com',
425 404 password='trololo')
426 405 response = api_call(self, params)
427 406
428 407 expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
429 408 self._compare_error(id_, expected, given=response.body)
430 409
431 410 def test_api_create_user_with_existing_email(self):
432 411 id_, params = _build_data(self.apikey, 'create_user',
433 412 username=TEST_USER_ADMIN_LOGIN + 'new',
434 413 email=TEST_USER_REGULAR_EMAIL,
435 414 password='trololo')
436 415 response = api_call(self, params)
437 416
438 417 expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
439 418 self._compare_error(id_, expected, given=response.body)
440 419
441 420 def test_api_create_user(self):
442 421 username = 'test_new_api_user'
443 422 email = username + "@foo.com"
444 423
445 424 id_, params = _build_data(self.apikey, 'create_user',
446 425 username=username,
447 426 email=email,
448 427 password='trololo')
449 428 response = api_call(self, params)
450 429
451 430 usr = UserModel().get_by_username(username)
452 431 ret = dict(
453 432 msg='created new user `%s`' % username,
454 433 user=jsonify(usr.get_api_data())
455 434 )
456 435
457 436 expected = ret
458 437 self._compare_ok(id_, expected, given=response.body)
459 438
460 439 UserModel().delete(usr.user_id)
461 440 Session().commit()
462 441
463 442 @mock.patch.object(UserModel, 'create_or_update', crash)
464 443 def test_api_create_user_when_exception_happened(self):
465 444
466 445 username = 'test_new_api_user'
467 446 email = username + "@foo.com"
468 447
469 448 id_, params = _build_data(self.apikey, 'create_user',
470 449 username=username,
471 450 email=email,
472 451 password='trololo')
473 452 response = api_call(self, params)
474 453 expected = 'failed to create user `%s`' % username
475 454 self._compare_error(id_, expected, given=response.body)
476 455
477 456 def test_api_delete_user(self):
478 457 usr = UserModel().create_or_update(username=u'test_user',
479 458 password=u'qweqwe',
480 459 email=u'u232@rhodecode.org',
481 460 firstname=u'u1', lastname=u'u1')
482 461 Session().commit()
483 462 username = usr.username
484 463 email = usr.email
485 464 usr_id = usr.user_id
486 465 ## DELETE THIS USER NOW
487 466
488 467 id_, params = _build_data(self.apikey, 'delete_user',
489 468 userid=username,)
490 469 response = api_call(self, params)
491 470
492 471 ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
493 472 'user': None}
494 473 expected = ret
495 474 self._compare_ok(id_, expected, given=response.body)
496 475
497 476 @mock.patch.object(UserModel, 'delete', crash)
498 477 def test_api_delete_user_when_exception_happened(self):
499 478 usr = UserModel().create_or_update(username=u'test_user',
500 479 password=u'qweqwe',
501 480 email=u'u232@rhodecode.org',
502 481 firstname=u'u1', lastname=u'u1')
503 482 Session().commit()
504 483 username = usr.username
505 484
506 485 id_, params = _build_data(self.apikey, 'delete_user',
507 486 userid=username,)
508 487 response = api_call(self, params)
509 488 ret = 'failed to delete ID:%s %s' % (usr.user_id,
510 489 usr.username)
511 490 expected = ret
512 491 self._compare_error(id_, expected, given=response.body)
513 492
514 493 @parameterized.expand([('firstname', 'new_username'),
515 494 ('lastname', 'new_username'),
516 495 ('email', 'new_username'),
517 496 ('admin', True),
518 497 ('admin', False),
519 498 ('ldap_dn', 'test'),
520 499 ('ldap_dn', None),
521 500 ('active', False),
522 501 ('active', True),
523 502 ('password', 'newpass')
524 503 ])
525 504 def test_api_update_user(self, name, expected):
526 505 usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
527 506 kw = {name: expected,
528 507 'userid': usr.user_id}
529 508 id_, params = _build_data(self.apikey, 'update_user', **kw)
530 509 response = api_call(self, params)
531 510
532 511 ret = {
533 512 'msg': 'updated user ID:%s %s' % (usr.user_id, self.TEST_USER_LOGIN),
534 513 'user': jsonify(UserModel()\
535 514 .get_by_username(self.TEST_USER_LOGIN)\
536 515 .get_api_data())
537 516 }
538 517
539 518 expected = ret
540 519 self._compare_ok(id_, expected, given=response.body)
541 520
542 521 def test_api_update_user_no_changed_params(self):
543 522 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
544 523 ret = jsonify(usr.get_api_data())
545 524 id_, params = _build_data(self.apikey, 'update_user',
546 525 userid=TEST_USER_ADMIN_LOGIN)
547 526
548 527 response = api_call(self, params)
549 528 ret = {
550 529 'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN),
551 530 'user': ret
552 531 }
553 532 expected = ret
554 533 self._compare_ok(id_, expected, given=response.body)
555 534
556 535 def test_api_update_user_by_user_id(self):
557 536 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
558 537 ret = jsonify(usr.get_api_data())
559 538 id_, params = _build_data(self.apikey, 'update_user',
560 539 userid=usr.user_id)
561 540
562 541 response = api_call(self, params)
563 542 ret = {
564 543 'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN),
565 544 'user': ret
566 545 }
567 546 expected = ret
568 547 self._compare_ok(id_, expected, given=response.body)
569 548
570 549 @mock.patch.object(UserModel, 'update_user', crash)
571 550 def test_api_update_user_when_exception_happens(self):
572 551 usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
573 552 ret = jsonify(usr.get_api_data())
574 553 id_, params = _build_data(self.apikey, 'update_user',
575 554 userid=usr.user_id)
576 555
577 556 response = api_call(self, params)
578 557 ret = 'failed to update user `%s`' % usr.user_id
579 558
580 559 expected = ret
581 560 self._compare_error(id_, expected, given=response.body)
582 561
583 562 def test_api_get_repo(self):
584 563 new_group = 'some_new_group'
585 564 make_users_group(new_group)
586 565 RepoModel().grant_users_group_permission(repo=self.REPO,
587 566 group_name=new_group,
588 567 perm='repository.read')
589 568 Session().commit()
590 569 id_, params = _build_data(self.apikey, 'get_repo',
591 570 repoid=self.REPO)
592 571 response = api_call(self, params)
593 572
594 573 repo = RepoModel().get_by_repo_name(self.REPO)
595 574 ret = repo.get_api_data()
596 575
597 576 members = []
598 577 followers = []
599 578 for user in repo.repo_to_perm:
600 579 perm = user.permission.permission_name
601 580 user = user.user
602 581 user_data = user.get_api_data()
603 582 user_data['type'] = "user"
604 583 user_data['permission'] = perm
605 584 members.append(user_data)
606 585
607 586 for users_group in repo.users_group_to_perm:
608 587 perm = users_group.permission.permission_name
609 588 users_group = users_group.users_group
610 589 users_group_data = users_group.get_api_data()
611 590 users_group_data['type'] = "users_group"
612 591 users_group_data['permission'] = perm
613 592 members.append(users_group_data)
614 593
615 594 for user in repo.followers:
616 595 followers.append(user.user.get_api_data())
617 596
618 597 ret['members'] = members
619 598 ret['followers'] = followers
620 599
621 600 expected = ret
622 601 self._compare_ok(id_, expected, given=response.body)
623 602 destroy_users_group(new_group)
624 603
625 604 def test_api_get_repo_by_non_admin(self):
626 605 id_, params = _build_data(self.apikey, 'get_repo',
627 606 repoid=self.REPO)
628 607 response = api_call(self, params)
629 608
630 609 repo = RepoModel().get_by_repo_name(self.REPO)
631 610 ret = repo.get_api_data()
632 611
633 612 members = []
634 613 followers = []
635 614 for user in repo.repo_to_perm:
636 615 perm = user.permission.permission_name
637 616 user = user.user
638 617 user_data = user.get_api_data()
639 618 user_data['type'] = "user"
640 619 user_data['permission'] = perm
641 620 members.append(user_data)
642 621
643 622 for users_group in repo.users_group_to_perm:
644 623 perm = users_group.permission.permission_name
645 624 users_group = users_group.users_group
646 625 users_group_data = users_group.get_api_data()
647 626 users_group_data['type'] = "users_group"
648 627 users_group_data['permission'] = perm
649 628 members.append(users_group_data)
650 629
651 630 for user in repo.followers:
652 631 followers.append(user.user.get_api_data())
653 632
654 633 ret['members'] = members
655 634 ret['followers'] = followers
656 635
657 636 expected = ret
658 637 self._compare_ok(id_, expected, given=response.body)
659 638
660 639 def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
661 640 RepoModel().grant_user_permission(repo=self.REPO,
662 641 user=self.TEST_USER_LOGIN,
663 642 perm='repository.none')
664 643
665 644 id_, params = _build_data(self.apikey_regular, 'get_repo',
666 645 repoid=self.REPO)
667 646 response = api_call(self, params)
668 647
669 648 expected = 'repository `%s` does not exist' % (self.REPO)
670 649 self._compare_error(id_, expected, given=response.body)
671 650
672 651 def test_api_get_repo_that_doesn_not_exist(self):
673 652 id_, params = _build_data(self.apikey, 'get_repo',
674 653 repoid='no-such-repo')
675 654 response = api_call(self, params)
676 655
677 656 ret = 'repository `%s` does not exist' % 'no-such-repo'
678 657 expected = ret
679 658 self._compare_error(id_, expected, given=response.body)
680 659
681 660 def test_api_get_repos(self):
682 661 id_, params = _build_data(self.apikey, 'get_repos')
683 662 response = api_call(self, params)
684 663
685 664 result = []
686 665 for repo in RepoModel().get_all():
687 666 result.append(repo.get_api_data())
688 667 ret = jsonify(result)
689 668
690 669 expected = ret
691 670 self._compare_ok(id_, expected, given=response.body)
692 671
693 672 def test_api_get_repos_non_admin(self):
694 673 id_, params = _build_data(self.apikey_regular, 'get_repos')
695 674 response = api_call(self, params)
696 675
697 676 result = []
698 677 for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
699 678 result.append(repo.get_api_data())
700 679 ret = jsonify(result)
701 680
702 681 expected = ret
703 682 self._compare_ok(id_, expected, given=response.body)
704 683
705 684 @parameterized.expand([('all', 'all'),
706 685 ('dirs', 'dirs'),
707 686 ('files', 'files'), ])
708 687 def test_api_get_repo_nodes(self, name, ret_type):
709 688 rev = 'tip'
710 689 path = '/'
711 690 id_, params = _build_data(self.apikey, 'get_repo_nodes',
712 691 repoid=self.REPO, revision=rev,
713 692 root_path=path,
714 693 ret_type=ret_type)
715 694 response = api_call(self, params)
716 695
717 696 # we don't the actual return types here since it's tested somewhere
718 697 # else
719 698 expected = json.loads(response.body)['result']
720 699 self._compare_ok(id_, expected, given=response.body)
721 700
722 701 def test_api_get_repo_nodes_bad_revisions(self):
723 702 rev = 'i-dont-exist'
724 703 path = '/'
725 704 id_, params = _build_data(self.apikey, 'get_repo_nodes',
726 705 repoid=self.REPO, revision=rev,
727 706 root_path=path,)
728 707 response = api_call(self, params)
729 708
730 709 expected = 'failed to get repo: `%s` nodes' % self.REPO
731 710 self._compare_error(id_, expected, given=response.body)
732 711
733 712 def test_api_get_repo_nodes_bad_path(self):
734 713 rev = 'tip'
735 714 path = '/idontexits'
736 715 id_, params = _build_data(self.apikey, 'get_repo_nodes',
737 716 repoid=self.REPO, revision=rev,
738 717 root_path=path,)
739 718 response = api_call(self, params)
740 719
741 720 expected = 'failed to get repo: `%s` nodes' % self.REPO
742 721 self._compare_error(id_, expected, given=response.body)
743 722
744 723 def test_api_get_repo_nodes_bad_ret_type(self):
745 724 rev = 'tip'
746 725 path = '/'
747 726 ret_type = 'error'
748 727 id_, params = _build_data(self.apikey, 'get_repo_nodes',
749 728 repoid=self.REPO, revision=rev,
750 729 root_path=path,
751 730 ret_type=ret_type)
752 731 response = api_call(self, params)
753 732
754 733 expected = 'ret_type must be one of %s' % (['files', 'dirs', 'all'])
755 734 self._compare_error(id_, expected, given=response.body)
756 735
757 736 def test_api_create_repo(self):
758 737 repo_name = 'api-repo'
759 738 id_, params = _build_data(self.apikey, 'create_repo',
760 739 repo_name=repo_name,
761 740 owner=TEST_USER_ADMIN_LOGIN,
762 741 repo_type='hg',
763 742 )
764 743 response = api_call(self, params)
765 744
766 745 repo = RepoModel().get_by_repo_name(repo_name)
767 746 ret = {
768 747 'msg': 'Created new repository `%s`' % repo_name,
769 748 'repo': jsonify(repo.get_api_data())
770 749 }
771 750 expected = ret
772 751 self._compare_ok(id_, expected, given=response.body)
773 destroy_repo(repo_name)
752 fixture.destroy_repo(repo_name)
774 753
775 754 def test_api_create_repo_unknown_owner(self):
776 755 repo_name = 'api-repo'
777 756 owner = 'i-dont-exist'
778 757 id_, params = _build_data(self.apikey, 'create_repo',
779 758 repo_name=repo_name,
780 759 owner=owner,
781 760 repo_type='hg',
782 761 )
783 762 response = api_call(self, params)
784 763 expected = 'user `%s` does not exist' % owner
785 764 self._compare_error(id_, expected, given=response.body)
786 765
787 766 def test_api_create_repo_dont_specify_owner(self):
788 767 repo_name = 'api-repo'
789 768 owner = 'i-dont-exist'
790 769 id_, params = _build_data(self.apikey, 'create_repo',
791 770 repo_name=repo_name,
792 771 repo_type='hg',
793 772 )
794 773 response = api_call(self, params)
795 774
796 775 repo = RepoModel().get_by_repo_name(repo_name)
797 776 ret = {
798 777 'msg': 'Created new repository `%s`' % repo_name,
799 778 'repo': jsonify(repo.get_api_data())
800 779 }
801 780 expected = ret
802 781 self._compare_ok(id_, expected, given=response.body)
803 destroy_repo(repo_name)
782 fixture.destroy_repo(repo_name)
804 783
805 784 def test_api_create_repo_by_non_admin(self):
806 785 repo_name = 'api-repo'
807 786 owner = 'i-dont-exist'
808 787 id_, params = _build_data(self.apikey_regular, 'create_repo',
809 788 repo_name=repo_name,
810 789 repo_type='hg',
811 790 )
812 791 response = api_call(self, params)
813 792
814 793 repo = RepoModel().get_by_repo_name(repo_name)
815 794 ret = {
816 795 'msg': 'Created new repository `%s`' % repo_name,
817 796 'repo': jsonify(repo.get_api_data())
818 797 }
819 798 expected = ret
820 799 self._compare_ok(id_, expected, given=response.body)
821 destroy_repo(repo_name)
800 fixture.destroy_repo(repo_name)
822 801
823 802 def test_api_create_repo_by_non_admin_specify_owner(self):
824 803 repo_name = 'api-repo'
825 804 owner = 'i-dont-exist'
826 805 id_, params = _build_data(self.apikey_regular, 'create_repo',
827 806 repo_name=repo_name,
828 807 repo_type='hg',
829 808 owner=owner
830 809 )
831 810 response = api_call(self, params)
832 811
833 812 expected = 'Only RhodeCode admin can specify `owner` param'
834 813 self._compare_error(id_, expected, given=response.body)
835 destroy_repo(repo_name)
814 fixture.destroy_repo(repo_name)
836 815
837 816 def test_api_create_repo_exists(self):
838 817 repo_name = self.REPO
839 818 id_, params = _build_data(self.apikey, 'create_repo',
840 819 repo_name=repo_name,
841 820 owner=TEST_USER_ADMIN_LOGIN,
842 821 repo_type='hg',
843 822 )
844 823 response = api_call(self, params)
845 824 expected = "repo `%s` already exist" % repo_name
846 825 self._compare_error(id_, expected, given=response.body)
847 826
848 827 @mock.patch.object(RepoModel, 'create_repo', crash)
849 828 def test_api_create_repo_exception_occurred(self):
850 829 repo_name = 'api-repo'
851 830 id_, params = _build_data(self.apikey, 'create_repo',
852 831 repo_name=repo_name,
853 832 owner=TEST_USER_ADMIN_LOGIN,
854 833 repo_type='hg',
855 834 )
856 835 response = api_call(self, params)
857 836 expected = 'failed to create repository `%s`' % repo_name
858 837 self._compare_error(id_, expected, given=response.body)
859 838
860 839 def test_api_delete_repo(self):
861 840 repo_name = 'api_delete_me'
862 create_repo(repo_name, self.REPO_TYPE)
841 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
863 842
864 843 id_, params = _build_data(self.apikey, 'delete_repo',
865 844 repoid=repo_name,)
866 845 response = api_call(self, params)
867 846
868 847 ret = {
869 848 'msg': 'Deleted repository `%s`' % repo_name,
870 849 'success': True
871 850 }
872 851 expected = ret
873 852 self._compare_ok(id_, expected, given=response.body)
874 853
875 854 def test_api_delete_repo_by_non_admin(self):
876 855 repo_name = 'api_delete_me'
877 create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
856 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
857 cur_user=self.TEST_USER_LOGIN)
878 858 try:
879 859 id_, params = _build_data(self.apikey_regular, 'delete_repo',
880 860 repoid=repo_name,)
881 861 response = api_call(self, params)
882 862
883 863 ret = {
884 864 'msg': 'Deleted repository `%s`' % repo_name,
885 865 'success': True
886 866 }
887 867 expected = ret
888 868 self._compare_ok(id_, expected, given=response.body)
889 869 finally:
890 destroy_repo(repo_name)
870 fixture.destroy_repo(repo_name)
891 871
892 872 def test_api_delete_repo_by_non_admin_no_permission(self):
893 873 repo_name = 'api_delete_me'
894 create_repo(repo_name, self.REPO_TYPE)
874 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
895 875 try:
896 876 id_, params = _build_data(self.apikey_regular, 'delete_repo',
897 877 repoid=repo_name,)
898 878 response = api_call(self, params)
899 879 expected = 'repository `%s` does not exist' % (repo_name)
900 880 self._compare_error(id_, expected, given=response.body)
901 881 finally:
902 destroy_repo(repo_name)
882 fixture.destroy_repo(repo_name)
903 883
904 884 def test_api_delete_repo_exception_occurred(self):
905 885 repo_name = 'api_delete_me'
906 create_repo(repo_name, self.REPO_TYPE)
886 fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
907 887 try:
908 888 with mock.patch.object(RepoModel, 'delete', crash):
909 889 id_, params = _build_data(self.apikey, 'delete_repo',
910 890 repoid=repo_name,)
911 891 response = api_call(self, params)
912 892
913 893 expected = 'failed to delete repository `%s`' % repo_name
914 894 self._compare_error(id_, expected, given=response.body)
915 895 finally:
916 destroy_repo(repo_name)
896 fixture.destroy_repo(repo_name)
917 897
918 898 def test_api_fork_repo(self):
919 899 fork_name = 'api-repo-fork'
920 900 id_, params = _build_data(self.apikey, 'fork_repo',
921 901 repoid=self.REPO,
922 902 fork_name=fork_name,
923 903 owner=TEST_USER_ADMIN_LOGIN,
924 904 )
925 905 response = api_call(self, params)
926 906
927 907 ret = {
928 908 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
929 909 fork_name),
930 910 'success': True
931 911 }
932 912 expected = ret
933 913 self._compare_ok(id_, expected, given=response.body)
934 destroy_repo(fork_name)
914 fixture.destroy_repo(fork_name)
935 915
936 916 def test_api_fork_repo_non_admin(self):
937 917 fork_name = 'api-repo-fork'
938 918 id_, params = _build_data(self.apikey_regular, 'fork_repo',
939 919 repoid=self.REPO,
940 920 fork_name=fork_name,
941 921 )
942 922 response = api_call(self, params)
943 923
944 924 ret = {
945 925 'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
946 926 fork_name),
947 927 'success': True
948 928 }
949 929 expected = ret
950 930 self._compare_ok(id_, expected, given=response.body)
951 destroy_repo(fork_name)
931 fixture.destroy_repo(fork_name)
952 932
953 933 def test_api_fork_repo_non_admin_specify_owner(self):
954 934 fork_name = 'api-repo-fork'
955 935 id_, params = _build_data(self.apikey_regular, 'fork_repo',
956 936 repoid=self.REPO,
957 937 fork_name=fork_name,
958 938 owner=TEST_USER_ADMIN_LOGIN,
959 939 )
960 940 response = api_call(self, params)
961 941 expected = 'Only RhodeCode admin can specify `owner` param'
962 942 self._compare_error(id_, expected, given=response.body)
963 destroy_repo(fork_name)
943 fixture.destroy_repo(fork_name)
964 944
965 945 def test_api_fork_repo_non_admin_no_permission_to_fork(self):
966 946 RepoModel().grant_user_permission(repo=self.REPO,
967 947 user=self.TEST_USER_LOGIN,
968 948 perm='repository.none')
969 949 fork_name = 'api-repo-fork'
970 950 id_, params = _build_data(self.apikey_regular, 'fork_repo',
971 951 repoid=self.REPO,
972 952 fork_name=fork_name,
973 953 )
974 954 response = api_call(self, params)
975 955 expected = 'repository `%s` does not exist' % (self.REPO)
976 956 self._compare_error(id_, expected, given=response.body)
977 destroy_repo(fork_name)
957 fixture.destroy_repo(fork_name)
978 958
979 959 def test_api_fork_repo_unknown_owner(self):
980 960 fork_name = 'api-repo-fork'
981 961 owner = 'i-dont-exist'
982 962 id_, params = _build_data(self.apikey, 'fork_repo',
983 963 repoid=self.REPO,
984 964 fork_name=fork_name,
985 965 owner=owner,
986 966 )
987 967 response = api_call(self, params)
988 968 expected = 'user `%s` does not exist' % owner
989 969 self._compare_error(id_, expected, given=response.body)
990 970
991 971 def test_api_fork_repo_fork_exists(self):
992 972 fork_name = 'api-repo-fork'
993 create_fork(fork_name, self.REPO_TYPE, self.REPO)
973 fixture.create_fork(self.REPO, fork_name)
994 974
995 975 try:
996 976 fork_name = 'api-repo-fork'
997 977
998 978 id_, params = _build_data(self.apikey, 'fork_repo',
999 979 repoid=self.REPO,
1000 980 fork_name=fork_name,
1001 981 owner=TEST_USER_ADMIN_LOGIN,
1002 982 )
1003 983 response = api_call(self, params)
1004 984
1005 985 expected = "fork `%s` already exist" % fork_name
1006 986 self._compare_error(id_, expected, given=response.body)
1007 987 finally:
1008 destroy_repo(fork_name)
988 fixture.destroy_repo(fork_name)
1009 989
1010 990 def test_api_fork_repo_repo_exists(self):
1011 991 fork_name = self.REPO
1012 992
1013 993 id_, params = _build_data(self.apikey, 'fork_repo',
1014 994 repoid=self.REPO,
1015 995 fork_name=fork_name,
1016 996 owner=TEST_USER_ADMIN_LOGIN,
1017 997 )
1018 998 response = api_call(self, params)
1019 999
1020 1000 expected = "repo `%s` already exist" % fork_name
1021 1001 self._compare_error(id_, expected, given=response.body)
1022 1002
1023 1003 @mock.patch.object(RepoModel, 'create_fork', crash)
1024 1004 def test_api_fork_repo_exception_occurred(self):
1025 1005 fork_name = 'api-repo-fork'
1026 1006 id_, params = _build_data(self.apikey, 'fork_repo',
1027 1007 repoid=self.REPO,
1028 1008 fork_name=fork_name,
1029 1009 owner=TEST_USER_ADMIN_LOGIN,
1030 1010 )
1031 1011 response = api_call(self, params)
1032 1012
1033 1013 expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
1034 1014 fork_name)
1035 1015 self._compare_error(id_, expected, given=response.body)
1036 1016
1037 1017 def test_api_get_users_group(self):
1038 1018 id_, params = _build_data(self.apikey, 'get_users_group',
1039 1019 usersgroupid=TEST_USER_GROUP)
1040 1020 response = api_call(self, params)
1041 1021
1042 1022 users_group = UserGroupModel().get_group(TEST_USER_GROUP)
1043 1023 members = []
1044 1024 for user in users_group.members:
1045 1025 user = user.user
1046 1026 members.append(user.get_api_data())
1047 1027
1048 1028 ret = users_group.get_api_data()
1049 1029 ret['members'] = members
1050 1030 expected = ret
1051 1031 self._compare_ok(id_, expected, given=response.body)
1052 1032
1053 1033 def test_api_get_users_groups(self):
1054 1034
1055 1035 make_users_group('test_users_group2')
1056 1036
1057 1037 id_, params = _build_data(self.apikey, 'get_users_groups',)
1058 1038 response = api_call(self, params)
1059 1039
1060 1040 expected = []
1061 1041 for gr_name in [TEST_USER_GROUP, 'test_users_group2']:
1062 1042 users_group = UserGroupModel().get_group(gr_name)
1063 1043 ret = users_group.get_api_data()
1064 1044 expected.append(ret)
1065 1045 self._compare_ok(id_, expected, given=response.body)
1066 1046
1067 1047 UserGroupModel().delete(users_group='test_users_group2')
1068 1048 Session().commit()
1069 1049
1070 1050 def test_api_create_users_group(self):
1071 1051 group_name = 'some_new_group'
1072 1052 id_, params = _build_data(self.apikey, 'create_users_group',
1073 1053 group_name=group_name)
1074 1054 response = api_call(self, params)
1075 1055
1076 1056 ret = {
1077 1057 'msg': 'created new user group `%s`' % group_name,
1078 1058 'users_group': jsonify(UserGroupModel()\
1079 1059 .get_by_name(group_name)\
1080 1060 .get_api_data())
1081 1061 }
1082 1062 expected = ret
1083 1063 self._compare_ok(id_, expected, given=response.body)
1084 1064
1085 1065 destroy_users_group(group_name)
1086 1066
1087 1067 def test_api_get_users_group_that_exist(self):
1088 1068 id_, params = _build_data(self.apikey, 'create_users_group',
1089 1069 group_name=TEST_USER_GROUP)
1090 1070 response = api_call(self, params)
1091 1071
1092 1072 expected = "user group `%s` already exist" % TEST_USER_GROUP
1093 1073 self._compare_error(id_, expected, given=response.body)
1094 1074
1095 1075 @mock.patch.object(UserGroupModel, 'create', crash)
1096 1076 def test_api_get_users_group_exception_occurred(self):
1097 1077 group_name = 'exception_happens'
1098 1078 id_, params = _build_data(self.apikey, 'create_users_group',
1099 1079 group_name=group_name)
1100 1080 response = api_call(self, params)
1101 1081
1102 1082 expected = 'failed to create group `%s`' % group_name
1103 1083 self._compare_error(id_, expected, given=response.body)
1104 1084
1105 1085 def test_api_add_user_to_users_group(self):
1106 1086 gr_name = 'test_group'
1107 1087 UserGroupModel().create(gr_name)
1108 1088 Session().commit()
1109 1089 id_, params = _build_data(self.apikey, 'add_user_to_users_group',
1110 1090 usersgroupid=gr_name,
1111 1091 userid=TEST_USER_ADMIN_LOGIN)
1112 1092 response = api_call(self, params)
1113 1093
1114 1094 expected = {
1115 1095 'msg': 'added member `%s` to user group `%s`' % (
1116 1096 TEST_USER_ADMIN_LOGIN, gr_name
1117 1097 ),
1118 1098 'success': True}
1119 1099 self._compare_ok(id_, expected, given=response.body)
1120 1100
1121 1101 UserGroupModel().delete(users_group=gr_name)
1122 1102 Session().commit()
1123 1103
1124 1104 def test_api_add_user_to_users_group_that_doesnt_exist(self):
1125 1105 id_, params = _build_data(self.apikey, 'add_user_to_users_group',
1126 1106 usersgroupid='false-group',
1127 1107 userid=TEST_USER_ADMIN_LOGIN)
1128 1108 response = api_call(self, params)
1129 1109
1130 1110 expected = 'user group `%s` does not exist' % 'false-group'
1131 1111 self._compare_error(id_, expected, given=response.body)
1132 1112
1133 1113 @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
1134 1114 def test_api_add_user_to_users_group_exception_occurred(self):
1135 1115 gr_name = 'test_group'
1136 1116 UserGroupModel().create(gr_name)
1137 1117 Session().commit()
1138 1118 id_, params = _build_data(self.apikey, 'add_user_to_users_group',
1139 1119 usersgroupid=gr_name,
1140 1120 userid=TEST_USER_ADMIN_LOGIN)
1141 1121 response = api_call(self, params)
1142 1122
1143 1123 expected = 'failed to add member to user group `%s`' % gr_name
1144 1124 self._compare_error(id_, expected, given=response.body)
1145 1125
1146 1126 UserGroupModel().delete(users_group=gr_name)
1147 1127 Session().commit()
1148 1128
1149 1129 def test_api_remove_user_from_users_group(self):
1150 1130 gr_name = 'test_group_3'
1151 1131 gr = UserGroupModel().create(gr_name)
1152 1132 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1153 1133 Session().commit()
1154 1134 id_, params = _build_data(self.apikey, 'remove_user_from_users_group',
1155 1135 usersgroupid=gr_name,
1156 1136 userid=TEST_USER_ADMIN_LOGIN)
1157 1137 response = api_call(self, params)
1158 1138
1159 1139 expected = {
1160 1140 'msg': 'removed member `%s` from user group `%s`' % (
1161 1141 TEST_USER_ADMIN_LOGIN, gr_name
1162 1142 ),
1163 1143 'success': True}
1164 1144 self._compare_ok(id_, expected, given=response.body)
1165 1145
1166 1146 UserGroupModel().delete(users_group=gr_name)
1167 1147 Session().commit()
1168 1148
1169 1149 @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
1170 1150 def test_api_remove_user_from_users_group_exception_occurred(self):
1171 1151 gr_name = 'test_group_3'
1172 1152 gr = UserGroupModel().create(gr_name)
1173 1153 UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
1174 1154 Session().commit()
1175 1155 id_, params = _build_data(self.apikey, 'remove_user_from_users_group',
1176 1156 usersgroupid=gr_name,
1177 1157 userid=TEST_USER_ADMIN_LOGIN)
1178 1158 response = api_call(self, params)
1179 1159
1180 1160 expected = 'failed to remove member from user group `%s`' % gr_name
1181 1161 self._compare_error(id_, expected, given=response.body)
1182 1162
1183 1163 UserGroupModel().delete(users_group=gr_name)
1184 1164 Session().commit()
1185 1165
1186 1166 @parameterized.expand([('none', 'repository.none'),
1187 1167 ('read', 'repository.read'),
1188 1168 ('write', 'repository.write'),
1189 1169 ('admin', 'repository.admin')])
1190 1170 def test_api_grant_user_permission(self, name, perm):
1191 1171 id_, params = _build_data(self.apikey, 'grant_user_permission',
1192 1172 repoid=self.REPO,
1193 1173 userid=TEST_USER_ADMIN_LOGIN,
1194 1174 perm=perm)
1195 1175 response = api_call(self, params)
1196 1176
1197 1177 ret = {
1198 1178 'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
1199 1179 perm, TEST_USER_ADMIN_LOGIN, self.REPO
1200 1180 ),
1201 1181 'success': True
1202 1182 }
1203 1183 expected = ret
1204 1184 self._compare_ok(id_, expected, given=response.body)
1205 1185
1206 1186 def test_api_grant_user_permission_wrong_permission(self):
1207 1187 perm = 'haha.no.permission'
1208 1188 id_, params = _build_data(self.apikey, 'grant_user_permission',
1209 1189 repoid=self.REPO,
1210 1190 userid=TEST_USER_ADMIN_LOGIN,
1211 1191 perm=perm)
1212 1192 response = api_call(self, params)
1213 1193
1214 1194 expected = 'permission `%s` does not exist' % perm
1215 1195 self._compare_error(id_, expected, given=response.body)
1216 1196
1217 1197 @mock.patch.object(RepoModel, 'grant_user_permission', crash)
1218 1198 def test_api_grant_user_permission_exception_when_adding(self):
1219 1199 perm = 'repository.read'
1220 1200 id_, params = _build_data(self.apikey, 'grant_user_permission',
1221 1201 repoid=self.REPO,
1222 1202 userid=TEST_USER_ADMIN_LOGIN,
1223 1203 perm=perm)
1224 1204 response = api_call(self, params)
1225 1205
1226 1206 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1227 1207 TEST_USER_ADMIN_LOGIN, self.REPO
1228 1208 )
1229 1209 self._compare_error(id_, expected, given=response.body)
1230 1210
1231 1211 def test_api_revoke_user_permission(self):
1232 1212 id_, params = _build_data(self.apikey, 'revoke_user_permission',
1233 1213 repoid=self.REPO,
1234 1214 userid=TEST_USER_ADMIN_LOGIN,)
1235 1215 response = api_call(self, params)
1236 1216
1237 1217 expected = {
1238 1218 'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
1239 1219 TEST_USER_ADMIN_LOGIN, self.REPO
1240 1220 ),
1241 1221 'success': True
1242 1222 }
1243 1223 self._compare_ok(id_, expected, given=response.body)
1244 1224
1245 1225 @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
1246 1226 def test_api_revoke_user_permission_exception_when_adding(self):
1247 1227 id_, params = _build_data(self.apikey, 'revoke_user_permission',
1248 1228 repoid=self.REPO,
1249 1229 userid=TEST_USER_ADMIN_LOGIN,)
1250 1230 response = api_call(self, params)
1251 1231
1252 1232 expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
1253 1233 TEST_USER_ADMIN_LOGIN, self.REPO
1254 1234 )
1255 1235 self._compare_error(id_, expected, given=response.body)
1256 1236
1257 1237 @parameterized.expand([('none', 'repository.none'),
1258 1238 ('read', 'repository.read'),
1259 1239 ('write', 'repository.write'),
1260 1240 ('admin', 'repository.admin')])
1261 1241 def test_api_grant_users_group_permission(self, name, perm):
1262 1242 id_, params = _build_data(self.apikey, 'grant_users_group_permission',
1263 1243 repoid=self.REPO,
1264 1244 usersgroupid=TEST_USER_GROUP,
1265 1245 perm=perm)
1266 1246 response = api_call(self, params)
1267 1247
1268 1248 ret = {
1269 1249 'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
1270 1250 perm, TEST_USER_GROUP, self.REPO
1271 1251 ),
1272 1252 'success': True
1273 1253 }
1274 1254 expected = ret
1275 1255 self._compare_ok(id_, expected, given=response.body)
1276 1256
1277 1257 def test_api_grant_users_group_permission_wrong_permission(self):
1278 1258 perm = 'haha.no.permission'
1279 1259 id_, params = _build_data(self.apikey, 'grant_users_group_permission',
1280 1260 repoid=self.REPO,
1281 1261 usersgroupid=TEST_USER_GROUP,
1282 1262 perm=perm)
1283 1263 response = api_call(self, params)
1284 1264
1285 1265 expected = 'permission `%s` does not exist' % perm
1286 1266 self._compare_error(id_, expected, given=response.body)
1287 1267
1288 1268 @mock.patch.object(RepoModel, 'grant_users_group_permission', crash)
1289 1269 def test_api_grant_users_group_permission_exception_when_adding(self):
1290 1270 perm = 'repository.read'
1291 1271 id_, params = _build_data(self.apikey, 'grant_users_group_permission',
1292 1272 repoid=self.REPO,
1293 1273 usersgroupid=TEST_USER_GROUP,
1294 1274 perm=perm)
1295 1275 response = api_call(self, params)
1296 1276
1297 1277 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1298 1278 TEST_USER_GROUP, self.REPO
1299 1279 )
1300 1280 self._compare_error(id_, expected, given=response.body)
1301 1281
1302 1282 def test_api_revoke_users_group_permission(self):
1303 1283 RepoModel().grant_users_group_permission(repo=self.REPO,
1304 1284 group_name=TEST_USER_GROUP,
1305 1285 perm='repository.read')
1306 1286 Session().commit()
1307 1287 id_, params = _build_data(self.apikey, 'revoke_users_group_permission',
1308 1288 repoid=self.REPO,
1309 1289 usersgroupid=TEST_USER_GROUP,)
1310 1290 response = api_call(self, params)
1311 1291
1312 1292 expected = {
1313 1293 'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
1314 1294 TEST_USER_GROUP, self.REPO
1315 1295 ),
1316 1296 'success': True
1317 1297 }
1318 1298 self._compare_ok(id_, expected, given=response.body)
1319 1299
1320 1300 @mock.patch.object(RepoModel, 'revoke_users_group_permission', crash)
1321 1301 def test_api_revoke_users_group_permission_exception_when_adding(self):
1322 1302
1323 1303 id_, params = _build_data(self.apikey, 'revoke_users_group_permission',
1324 1304 repoid=self.REPO,
1325 1305 usersgroupid=TEST_USER_GROUP,)
1326 1306 response = api_call(self, params)
1327 1307
1328 1308 expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
1329 1309 TEST_USER_GROUP, self.REPO
1330 1310 )
1331 1311 self._compare_error(id_, expected, given=response.body)
@@ -1,43 +1,102 b''
1 1 """
2 2 Helpers for fixture generation
3 3 """
4 import os
5 import unittest
6 4 from rhodecode.tests import *
7 from rhodecode.model.db import Repository, User
5 from rhodecode.model.db import Repository, User, RepoGroup
8 6 from rhodecode.model.meta import Session
9 7 from rhodecode.model.repo import RepoModel
8 from rhodecode.model.repos_group import ReposGroupModel
10 9
11 10
12 11 class Fixture(object):
13 12
14 13 def __init__(self):
15 14 pass
16 15
16 def _get_repo_create_params(self, **custom):
17 defs = dict(
18 repo_name=None,
19 repo_type='hg',
20 clone_uri='',
21 repo_group='',
22 repo_description='DESC',
23 repo_private=False,
24 repo_landing_rev='tip'
25 )
26 defs.update(custom)
27 if 'repo_name_full' not in custom:
28 defs.update({'repo_name_full': defs['repo_name']})
29
30 return defs
31
32 def _get_group_create_params(self, **custom):
33 defs = dict(
34 group_name=None,
35 group_description='DESC',
36 group_parent_id=None,
37 perms_updates=[],
38 perms_new=[],
39 enable_locking=False,
40 recursive=False
41 )
42 defs.update(custom)
43
44 return defs
45
17 46 def create_repo(self, name, **kwargs):
18 form_data = _get_repo_create_params(repo_name=name, **kwargs)
19 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
47 if 'skip_if_exists' in kwargs:
48 del kwargs['skip_if_exists']
49 r = Repository.get_by_repo_name(name)
50 if r:
51 return r
52
53 if isinstance(kwargs.get('repos_group'), RepoGroup):
54 #TODO: rename the repos_group !
55 kwargs['repo_group'] = kwargs['repos_group'].group_id
56 del kwargs['repos_group']
57
58 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
59 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
20 60 RepoModel().create(form_data, cur_user)
61 Session().commit()
21 62 return Repository.get_by_repo_name(name)
22 63
23 64 def create_fork(self, repo_to_fork, fork_name, **kwargs):
24 65 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
25 vcs_type = repo_to_fork.repo_type
66
67 form_data = self._get_repo_create_params(repo_name=fork_name,
68 fork_parent_id=repo_to_fork,
69 repo_type=repo_to_fork.repo_type,
70 **kwargs)
71 form_data['update_after_clone'] = False
72
73 #TODO: fix it !!
74 form_data['description'] = form_data['repo_description']
75 form_data['private'] = form_data['repo_private']
76 form_data['landing_rev'] = form_data['repo_landing_rev']
77
78 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
79 RepoModel().create_fork(form_data, cur_user=owner)
80 Session().commit()
81 r = Repository.get_by_repo_name(fork_name)
82 assert r
83 return r
26 84
27 form_data = dict(
28 repo_name=fork_name,
29 repo_name_full=fork_name,
30 repo_group=None,
31 repo_type=vcs_type,
32 description='',
33 private=False,
34 copy_permissions=False,
35 landing_rev='tip',
36 update_after_clone=False,
37 fork_parent_id=repo_to_fork,
38 )
39 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
40 RepoModel().create_fork(form_data, cur_user=cur_user)
85 def destroy_repo(self, repo_name):
86 RepoModel().delete(repo_name)
87 Session().commit()
41 88
89 def create_group(self, name, **kwargs):
90 if 'skip_if_exists' in kwargs:
91 del kwargs['skip_if_exists']
92 gr = RepoGroup.get_by_group_name(group_name=name)
93 if gr:
94 return gr
95 form_data = self._get_group_create_params(group_name=name, **kwargs)
96 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
97 gr = ReposGroupModel().create(group_name=form_data['group_name'],
98 group_description=form_data['group_name'],
99 owner=owner, parent=form_data['group_parent_id'])
42 100 Session().commit()
43 return Repository.get_by_repo_name(fork_name)
101 gr = RepoGroup.get_by_group_name(gr.group_name)
102 return gr
@@ -1,368 +1,371 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 import os
4 4 import urllib
5 5
6 6 from rhodecode.lib import vcs
7 7 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
8 8 Permission
9 9 from rhodecode.tests import *
10 10 from rhodecode.model.repos_group import ReposGroupModel
11 11 from rhodecode.model.repo import RepoModel
12 12 from rhodecode.model.meta import Session
13 from rhodecode.tests.fixture import Fixture
14
15 fixture = Fixture()
13 16
14 17
15 18 def _get_permission_for_user(user, repo):
16 19 perm = UserRepoToPerm.query()\
17 20 .filter(UserRepoToPerm.repository ==
18 21 Repository.get_by_repo_name(repo))\
19 22 .filter(UserRepoToPerm.user == User.get_by_username(user))\
20 23 .all()
21 24 return perm
22 25
23 26
24 27 class TestAdminReposController(TestController):
25 28
26 29 def test_index(self):
27 30 self.log_user()
28 31 response = self.app.get(url('repos'))
29 32 # Test response...
30 33
31 34 def test_index_as_xml(self):
32 35 response = self.app.get(url('formatted_repos', format='xml'))
33 36
34 37 def test_create_hg(self):
35 38 self.log_user()
36 39 repo_name = NEW_HG_REPO
37 40 description = 'description for newly created repo'
38 41 response = self.app.post(url('repos'),
39 _get_repo_create_params(repo_private=False,
42 fixture._get_repo_create_params(repo_private=False,
40 43 repo_name=repo_name,
41 44 repo_description=description))
42 45 self.checkSessionFlash(response,
43 46 'Created repository <a href="/%s">%s</a>'
44 47 % (repo_name, repo_name))
45 48
46 49 #test if the repo was created in the database
47 50 new_repo = self.Session().query(Repository)\
48 51 .filter(Repository.repo_name == repo_name).one()
49 52
50 53 self.assertEqual(new_repo.repo_name, repo_name)
51 54 self.assertEqual(new_repo.description, description)
52 55
53 56 #test if repository is visible in the list ?
54 57 response = response.follow()
55 58
56 59 response.mustcontain(repo_name)
57 60
58 61 #test if repository was created on filesystem
59 62 try:
60 63 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
61 64 except Exception:
62 65 self.fail('no repo %s in filesystem' % repo_name)
63 66
64 67 def test_create_hg_non_ascii(self):
65 68 self.log_user()
66 69 non_ascii = "Δ…Δ™Ε‚"
67 70 repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
68 71 repo_name_unicode = repo_name.decode('utf8')
69 72 description = 'description for newly created repo' + non_ascii
70 73 description_unicode = description.decode('utf8')
71 74 private = False
72 75 response = self.app.post(url('repos'),
73 _get_repo_create_params(repo_private=False,
76 fixture._get_repo_create_params(repo_private=False,
74 77 repo_name=repo_name,
75 78 repo_description=description))
76 79 self.checkSessionFlash(response,
77 80 u'Created repository <a href="/%s">%s</a>'
78 81 % (urllib.quote(repo_name), repo_name_unicode))
79 82 #test if the repo was created in the database
80 83 new_repo = self.Session().query(Repository)\
81 84 .filter(Repository.repo_name == repo_name_unicode).one()
82 85
83 86 self.assertEqual(new_repo.repo_name, repo_name_unicode)
84 87 self.assertEqual(new_repo.description, description_unicode)
85 88
86 89 #test if repository is visible in the list ?
87 90 response = response.follow()
88 91
89 92 response.mustcontain(repo_name)
90 93
91 94 #test if repository was created on filesystem
92 95 try:
93 96 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
94 97 except Exception:
95 98 self.fail('no repo %s in filesystem' % repo_name)
96 99
97 100 def test_create_hg_in_group(self):
98 101 self.log_user()
99 102
100 103 ## create GROUP
101 104 group_name = 'sometest'
102 105 gr = ReposGroupModel().create(group_name=group_name,
103 106 group_description='test',
104 107 owner=TEST_USER_ADMIN_LOGIN)
105 108 self.Session().commit()
106 109
107 110 repo_name = 'ingroup'
108 111 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
109 112 description = 'description for newly created repo'
110 113 response = self.app.post(url('repos'),
111 _get_repo_create_params(repo_private=False,
114 fixture._get_repo_create_params(repo_private=False,
112 115 repo_name=repo_name,
113 116 repo_description=description,
114 117 repo_group=gr.group_id,))
115 118
116 119 self.checkSessionFlash(response,
117 120 'Created repository <a href="/%s">%s</a>'
118 121 % (repo_name, repo_name))
119 122 #test if the repo was created in the database
120 123 new_repo = self.Session().query(Repository)\
121 124 .filter(Repository.repo_name == repo_name_full).one()
122 125
123 126 self.assertEqual(new_repo.repo_name, repo_name_full)
124 127 self.assertEqual(new_repo.description, description)
125 128
126 129 #test if repository is visible in the list ?
127 130 response = response.follow()
128 131
129 132 response.mustcontain(repo_name_full)
130 133
131 134 #test if repository was created on filesystem
132 135 try:
133 136 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
134 137 except Exception:
135 138 ReposGroupModel().delete(group_name)
136 139 self.Session().commit()
137 140 self.fail('no repo %s in filesystem' % repo_name)
138 141
139 142 RepoModel().delete(repo_name_full)
140 143 ReposGroupModel().delete(group_name)
141 144 self.Session().commit()
142 145
143 146 def test_create_git(self):
144 147 self.log_user()
145 148 repo_name = NEW_GIT_REPO
146 149 description = 'description for newly created repo'
147 150
148 151 response = self.app.post(url('repos'),
149 _get_repo_create_params(repo_private=False,
152 fixture._get_repo_create_params(repo_private=False,
150 153 repo_type='git',
151 154 repo_name=repo_name,
152 155 repo_description=description))
153 156 self.checkSessionFlash(response,
154 157 'Created repository <a href="/%s">%s</a>'
155 158 % (repo_name, repo_name))
156 159
157 160 #test if the repo was created in the database
158 161 new_repo = self.Session().query(Repository)\
159 162 .filter(Repository.repo_name == repo_name).one()
160 163
161 164 self.assertEqual(new_repo.repo_name, repo_name)
162 165 self.assertEqual(new_repo.description, description)
163 166
164 167 #test if repository is visible in the list ?
165 168 response = response.follow()
166 169
167 170 response.mustcontain(repo_name)
168 171
169 172 #test if repository was created on filesystem
170 173 try:
171 174 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
172 175 except Exception:
173 176 self.fail('no repo %s in filesystem' % repo_name)
174 177
175 178 def test_create_git_non_ascii(self):
176 179 self.log_user()
177 180 non_ascii = "Δ…Δ™Ε‚"
178 181 repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
179 182 repo_name_unicode = repo_name.decode('utf8')
180 183 description = 'description for newly created repo' + non_ascii
181 184 description_unicode = description.decode('utf8')
182 185 private = False
183 186 response = self.app.post(url('repos'),
184 _get_repo_create_params(repo_private=False,
187 fixture._get_repo_create_params(repo_private=False,
185 188 repo_type='git',
186 189 repo_name=repo_name,
187 190 repo_description=description))
188 191
189 192 self.checkSessionFlash(response,
190 193 u'Created repository <a href="/%s">%s</a>'
191 194 % (urllib.quote(repo_name), repo_name_unicode))
192 195
193 196 #test if the repo was created in the database
194 197 new_repo = self.Session().query(Repository)\
195 198 .filter(Repository.repo_name == repo_name_unicode).one()
196 199
197 200 self.assertEqual(new_repo.repo_name, repo_name_unicode)
198 201 self.assertEqual(new_repo.description, description_unicode)
199 202
200 203 #test if repository is visible in the list ?
201 204 response = response.follow()
202 205
203 206 response.mustcontain(repo_name)
204 207
205 208 #test if repository was created on filesystem
206 209 try:
207 210 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
208 211 except Exception:
209 212 self.fail('no repo %s in filesystem' % repo_name)
210 213
211 214 def test_update(self):
212 215 response = self.app.put(url('repo', repo_name=HG_REPO))
213 216
214 217 def test_update_browser_fakeout(self):
215 218 response = self.app.post(url('repo', repo_name=HG_REPO),
216 219 params=dict(_method='put'))
217 220
218 221 def test_delete_hg(self):
219 222 self.log_user()
220 223 repo_name = 'vcs_test_new_to_delete'
221 224 description = 'description for newly created repo'
222 225 response = self.app.post(url('repos'),
223 _get_repo_create_params(repo_private=False,
226 fixture._get_repo_create_params(repo_private=False,
224 227 repo_type='hg',
225 228 repo_name=repo_name,
226 229 repo_description=description))
227 230
228 231 self.checkSessionFlash(response,
229 232 'Created repository <a href="/%s">%s</a>'
230 233 % (repo_name, repo_name))
231 234 #test if the repo was created in the database
232 235 new_repo = self.Session().query(Repository)\
233 236 .filter(Repository.repo_name == repo_name).one()
234 237
235 238 self.assertEqual(new_repo.repo_name, repo_name)
236 239 self.assertEqual(new_repo.description, description)
237 240
238 241 #test if repository is visible in the list ?
239 242 response = response.follow()
240 243
241 244 response.mustcontain(repo_name)
242 245
243 246 #test if repository was created on filesystem
244 247 try:
245 248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
246 249 except Exception:
247 250 self.fail('no repo %s in filesystem' % repo_name)
248 251
249 252 response = self.app.delete(url('repo', repo_name=repo_name))
250 253
251 254 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
252 255
253 256 response.follow()
254 257
255 258 #check if repo was deleted from db
256 259 deleted_repo = self.Session().query(Repository)\
257 260 .filter(Repository.repo_name == repo_name).scalar()
258 261
259 262 self.assertEqual(deleted_repo, None)
260 263
261 264 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
262 265 False)
263 266
264 267 def test_delete_git(self):
265 268 self.log_user()
266 269 repo_name = 'vcs_test_new_to_delete'
267 270 description = 'description for newly created repo'
268 271 private = False
269 272 response = self.app.post(url('repos'),
270 _get_repo_create_params(repo_private=False,
273 fixture._get_repo_create_params(repo_private=False,
271 274 repo_type='git',
272 275 repo_name=repo_name,
273 276 repo_description=description))
274 277
275 278 self.checkSessionFlash(response,
276 279 'Created repository <a href="/%s">%s</a>'
277 280 % (repo_name, repo_name))
278 281 #test if the repo was created in the database
279 282 new_repo = self.Session().query(Repository)\
280 283 .filter(Repository.repo_name == repo_name).one()
281 284
282 285 self.assertEqual(new_repo.repo_name, repo_name)
283 286 self.assertEqual(new_repo.description, description)
284 287
285 288 #test if repository is visible in the list ?
286 289 response = response.follow()
287 290
288 291 response.mustcontain(repo_name)
289 292
290 293 #test if repository was created on filesystem
291 294 try:
292 295 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
293 296 except Exception:
294 297 self.fail('no repo %s in filesystem' % repo_name)
295 298
296 299 response = self.app.delete(url('repo', repo_name=repo_name))
297 300
298 301 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
299 302
300 303 response.follow()
301 304
302 305 #check if repo was deleted from db
303 306 deleted_repo = self.Session().query(Repository)\
304 307 .filter(Repository.repo_name == repo_name).scalar()
305 308
306 309 self.assertEqual(deleted_repo, None)
307 310
308 311 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
309 312 False)
310 313
311 314 def test_delete_repo_with_group(self):
312 315 #TODO:
313 316 pass
314 317
315 318 def test_delete_browser_fakeout(self):
316 319 response = self.app.post(url('repo', repo_name=HG_REPO),
317 320 params=dict(_method='delete'))
318 321
319 322 def test_show_hg(self):
320 323 self.log_user()
321 324 response = self.app.get(url('repo', repo_name=HG_REPO))
322 325
323 326 def test_show_git(self):
324 327 self.log_user()
325 328 response = self.app.get(url('repo', repo_name=GIT_REPO))
326 329
327 330
328 331 def test_edit(self):
329 332 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
330 333
331 334 def test_set_private_flag_sets_default_to_none(self):
332 335 self.log_user()
333 336 #initially repository perm should be read
334 337 perm = _get_permission_for_user(user='default', repo=HG_REPO)
335 338 self.assertTrue(len(perm), 1)
336 339 self.assertEqual(perm[0].permission.permission_name, 'repository.read')
337 340 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
338 341
339 342 response = self.app.put(url('repo', repo_name=HG_REPO),
340 _get_repo_create_params(repo_private=1,
343 fixture._get_repo_create_params(repo_private=1,
341 344 repo_name=HG_REPO,
342 345 user=TEST_USER_ADMIN_LOGIN))
343 346 self.checkSessionFlash(response,
344 347 msg='Repository %s updated successfully' % (HG_REPO))
345 348 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True)
346 349
347 350 #now the repo default permission should be None
348 351 perm = _get_permission_for_user(user='default', repo=HG_REPO)
349 352 self.assertTrue(len(perm), 1)
350 353 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
351 354
352 355 response = self.app.put(url('repo', repo_name=HG_REPO),
353 _get_repo_create_params(repo_private=False,
356 fixture._get_repo_create_params(repo_private=False,
354 357 repo_name=HG_REPO,
355 358 user=TEST_USER_ADMIN_LOGIN))
356 359 self.checkSessionFlash(response,
357 360 msg='Repository %s updated successfully' % (HG_REPO))
358 361 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
359 362
360 363 #we turn off private now the repo default permission should stay None
361 364 perm = _get_permission_for_user(user='default', repo=HG_REPO)
362 365 self.assertTrue(len(perm), 1)
363 366 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
364 367
365 368 #update this permission back
366 369 perm[0].permission = Permission.get_by_key('repository.read')
367 370 Session().add(perm[0])
368 371 Session().commit()
@@ -1,417 +1,411 b''
1 1 from rhodecode.tests import *
2 2 from rhodecode.model.repo import RepoModel
3 3 from rhodecode.model.meta import Session
4 4 from rhodecode.model.db import Repository
5 5 from rhodecode.model.scm import ScmModel
6 6 from rhodecode.lib.vcs.backends.base import EmptyChangeset
7 7 from rhodecode.tests.fixture import Fixture
8 8
9 9 fixture = Fixture()
10 10
11 11
12 12 def _commit_change(repo, filename, content, message, vcs_type, parent=None, newfile=False):
13 13 repo = Repository.get_by_repo_name(repo)
14 14 _cs = parent
15 15 if not parent:
16 16 _cs = EmptyChangeset(alias=vcs_type)
17 17
18 18 if newfile:
19 19 cs = ScmModel().create_node(
20 20 repo=repo.scm_instance, repo_name=repo.repo_name,
21 21 cs=_cs, user=TEST_USER_ADMIN_LOGIN,
22 22 author=TEST_USER_ADMIN_LOGIN,
23 23 message=message,
24 24 content=content,
25 25 f_path=filename
26 26 )
27 27 else:
28 28 cs = ScmModel().commit_change(
29 29 repo=repo.scm_instance, repo_name=repo.repo_name,
30 30 cs=parent, user=TEST_USER_ADMIN_LOGIN,
31 31 author=TEST_USER_ADMIN_LOGIN,
32 32 message=message,
33 33 content=content,
34 34 f_path=filename
35 35 )
36 36 return cs
37 37
38 38
39 39 class TestCompareController(TestController):
40 40
41 41 def setUp(self):
42 42 self.r1_id = None
43 43 self.r2_id = None
44 44
45 45 def tearDown(self):
46 46 if self.r2_id:
47 47 RepoModel().delete(self.r2_id)
48 48 if self.r1_id:
49 49 RepoModel().delete(self.r1_id)
50 50 Session().commit()
51 51 Session.remove()
52 52
53 53 def test_compare_forks_on_branch_extra_commits_hg(self):
54 54 self.log_user()
55 repo1 = RepoModel().create_repo(repo_name='one', repo_type='hg',
56 description='diff-test',
57 owner=TEST_USER_ADMIN_LOGIN)
58 Session().commit()
55 repo1 = fixture.create_repo('one', repo_type='hg',
56 repo_description='diff-test',
57 cur_user=TEST_USER_ADMIN_LOGIN)
59 58 self.r1_id = repo1.repo_id
60 59 #commit something !
61 60 cs0 = _commit_change(repo1.repo_name, filename='file1', content='line1\n',
62 61 message='commit1', vcs_type='hg', parent=None, newfile=True)
63 62
64 63 #fork this repo
65 64 repo2 = fixture.create_fork('one', 'one-fork')
66 65 self.r2_id = repo2.repo_id
67 66
68 67 #add two extra commit into fork
69 68 cs1 = _commit_change(repo2.repo_name, filename='file1', content='line1\nline2\n',
70 69 message='commit2', vcs_type='hg', parent=cs0)
71 70
72 71 cs2 = _commit_change(repo2.repo_name, filename='file1', content='line1\nline2\nline3\n',
73 72 message='commit3', vcs_type='hg', parent=cs1)
74 73
75 74 rev1 = 'default'
76 75 rev2 = 'default'
77 76
78 77 response = self.app.get(url(controller='compare', action='index',
79 78 repo_name=repo1.repo_name,
80 79 org_ref_type="branch",
81 80 org_ref=rev2,
82 81 other_repo=repo2.repo_name,
83 82 other_ref_type="branch",
84 83 other_ref=rev1,
85 84 merge='1',
86 85 ))
87 86
88 87 response.mustcontain('%s@%s -&gt; %s@%s' % (repo1.repo_name, rev2, repo2.repo_name, rev1))
89 88 response.mustcontain("""Showing 2 commits""")
90 89 response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
91 90
92 91 response.mustcontain("""<div class="message tooltip" title="commit2" style="white-space:normal">commit2</div>""")
93 92 response.mustcontain("""<div class="message tooltip" title="commit3" style="white-space:normal">commit3</div>""")
94 93
95 94 response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
96 95 response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
97 96 ## files
98 97 response.mustcontain("""<a href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=1#C--826e8142e6ba">file1</a>""" % (repo1.repo_name, rev2, rev1, repo2.repo_name))
99 98 #swap
100 99 response.mustcontain("""<a href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True">[swap]</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
101 100
102 101 def test_compare_forks_on_branch_extra_commits_origin_has_incomming_hg(self):
103 102 self.log_user()
104 103
105 repo1 = RepoModel().create_repo(repo_name='one', repo_type='hg',
106 description='diff-test',
107 owner=TEST_USER_ADMIN_LOGIN)
108 Session().commit()
104 repo1 = fixture.create_repo('one', repo_type='hg',
105 repo_description='diff-test',
106 cur_user=TEST_USER_ADMIN_LOGIN)
107
109 108 self.r1_id = repo1.repo_id
110 109
111 110 #commit something !
112 111 cs0 = _commit_change(repo1.repo_name, filename='file1', content='line1\n',
113 112 message='commit1', vcs_type='hg', parent=None, newfile=True)
114 113
115 114 #fork this repo
116 115 repo2 = fixture.create_fork('one', 'one-fork')
117 116 self.r2_id = repo2.repo_id
118 117
119 118 #now commit something to origin repo
120 119 cs1_prim = _commit_change(repo1.repo_name, filename='file2', content='line1file2\n',
121 120 message='commit2', vcs_type='hg', parent=cs0, newfile=True)
122 121
123 122 #add two extra commit into fork
124 123 cs1 = _commit_change(repo2.repo_name, filename='file1', content='line1\nline2\n',
125 124 message='commit2', vcs_type='hg', parent=cs0)
126 125
127 126 cs2 = _commit_change(repo2.repo_name, filename='file1', content='line1\nline2\nline3\n',
128 127 message='commit3', vcs_type='hg', parent=cs1)
129 128
130 129 rev1 = 'default'
131 130 rev2 = 'default'
132 131
133 132 response = self.app.get(url(controller='compare', action='index',
134 133 repo_name=repo1.repo_name,
135 134 org_ref_type="branch",
136 135 org_ref=rev2,
137 136 other_repo=repo2.repo_name,
138 137 other_ref_type="branch",
139 138 other_ref=rev1,
140 139 merge='x',
141 140 ))
142 141 response.mustcontain('%s@%s -&gt; %s@%s' % (repo1.repo_name, rev2, repo2.repo_name, rev1))
143 142 response.mustcontain("""Showing 2 commits""")
144 143 response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
145 144
146 145 response.mustcontain("""<div class="message tooltip" title="commit2" style="white-space:normal">commit2</div>""")
147 146 response.mustcontain("""<div class="message tooltip" title="commit3" style="white-space:normal">commit3</div>""")
148 147
149 148 response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
150 149 response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
151 150 ## files
152 151 response.mustcontain("""<a href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=x#C--826e8142e6ba">file1</a>""" % (repo1.repo_name, rev2, rev1, repo2.repo_name))
153 152 #swap
154 153 response.mustcontain("""<a href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True">[swap]</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
155 154
156 155 def test_compare_cherry_pick_changesets_from_bottom(self):
157 156
158 157 # repo1:
159 158 # cs0:
160 159 # cs1:
161 160 # repo1-fork- in which we will cherry pick bottom changesets
162 161 # cs0:
163 162 # cs1:
164 163 # cs2: x
165 164 # cs3: x
166 165 # cs4: x
167 166 # cs5:
168 167 #make repo1, and cs1+cs2
169 168 self.log_user()
170 169
171 repo1 = RepoModel().create_repo(repo_name='repo1', repo_type='hg',
172 description='diff-test',
173 owner=TEST_USER_ADMIN_LOGIN)
174 Session().commit()
170 repo1 = fixture.create_repo('repo1', repo_type='hg',
171 repo_description='diff-test',
172 cur_user=TEST_USER_ADMIN_LOGIN)
175 173 self.r1_id = repo1.repo_id
176 174
177 175 #commit something !
178 176 cs0 = _commit_change(repo1.repo_name, filename='file1', content='line1\n',
179 177 message='commit1', vcs_type='hg', parent=None,
180 178 newfile=True)
181 179 cs1 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\n',
182 180 message='commit2', vcs_type='hg', parent=cs0)
183 181 #fork this repo
184 182 repo2 = fixture.create_fork('repo1', 'repo1-fork')
185 183 self.r2_id = repo2.repo_id
186 184 #now make cs3-6
187 185 cs2 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\n',
188 186 message='commit3', vcs_type='hg', parent=cs1)
189 187 cs3 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\n',
190 188 message='commit4', vcs_type='hg', parent=cs2)
191 189 cs4 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\n',
192 190 message='commit5', vcs_type='hg', parent=cs3)
193 191 cs5 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\nline6\n',
194 192 message='commit6', vcs_type='hg', parent=cs4)
195 193
196 194 response = self.app.get(url(controller='compare', action='index',
197 195 repo_name=repo2.repo_name,
198 196 org_ref_type="rev",
199 197 org_ref=cs1.short_id, # parent of cs2, in repo2
200 198 other_repo=repo1.repo_name,
201 199 other_ref_type="rev",
202 200 other_ref=cs4.short_id,
203 201 merge='True',
204 202 ))
205 203 response.mustcontain('%s@%s -&gt; %s@%s' % (repo2.repo_name, cs1.short_id, repo1.repo_name, cs4.short_id))
206 204 response.mustcontain("""Showing 3 commits""")
207 205 response.mustcontain("""1 file changed with 3 insertions and 0 deletions""")
208 206
209 207 response.mustcontain("""<div class="message tooltip" title="commit3" style="white-space:normal">commit3</div>""")
210 208 response.mustcontain("""<div class="message tooltip" title="commit4" style="white-space:normal">commit4</div>""")
211 209 response.mustcontain("""<div class="message tooltip" title="commit5" style="white-space:normal">commit5</div>""")
212 210
213 211 response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo1.repo_name, cs2.raw_id, cs2.short_id))
214 212 response.mustcontain("""<a href="/%s/changeset/%s">r3:%s</a>""" % (repo1.repo_name, cs3.raw_id, cs3.short_id))
215 213 response.mustcontain("""<a href="/%s/changeset/%s">r4:%s</a>""" % (repo1.repo_name, cs4.raw_id, cs4.short_id))
216 214 ## files
217 215 response.mustcontain("""#C--826e8142e6ba">file1</a>""")
218 216
219 217 def test_compare_cherry_pick_changesets_from_top(self):
220 218 # repo1:
221 219 # cs0:
222 220 # cs1:
223 221 # repo1-fork- in which we will cherry pick bottom changesets
224 222 # cs0:
225 223 # cs1:
226 224 # cs2:
227 225 # cs3: x
228 226 # cs4: x
229 227 # cs5: x
230 228 #
231 229 #make repo1, and cs1+cs2
232 230 self.log_user()
233 repo1 = RepoModel().create_repo(repo_name='repo1', repo_type='hg',
234 description='diff-test',
235 owner=TEST_USER_ADMIN_LOGIN)
236 Session().commit()
231 repo1 = fixture.create_repo('repo1', repo_type='hg',
232 repo_description='diff-test',
233 cur_user=TEST_USER_ADMIN_LOGIN)
237 234 self.r1_id = repo1.repo_id
238 235
239 236 #commit something !
240 237 cs0 = _commit_change(repo1.repo_name, filename='file1', content='line1\n',
241 238 message='commit1', vcs_type='hg', parent=None,
242 239 newfile=True)
243 240 cs1 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\n',
244 241 message='commit2', vcs_type='hg', parent=cs0)
245 242 #fork this repo
246 243 repo2 = fixture.create_fork('repo1', 'repo1-fork')
247 244 self.r2_id = repo2.repo_id
248 245 #now make cs3-6
249 246 cs2 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\n',
250 247 message='commit3', vcs_type='hg', parent=cs1)
251 248 cs3 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\n',
252 249 message='commit4', vcs_type='hg', parent=cs2)
253 250 cs4 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\n',
254 251 message='commit5', vcs_type='hg', parent=cs3)
255 252 cs5 = _commit_change(repo1.repo_name, filename='file1', content='line1\nline2\nline3\nline4\nline5\nline6\n',
256 253 message='commit6', vcs_type='hg', parent=cs4)
257 254 response = self.app.get(url(controller='compare', action='index',
258 255 repo_name=repo1.repo_name,
259 256 org_ref_type="rev",
260 257 org_ref=cs2.short_id, # parent of cs3, not in repo2
261 258 other_ref_type="rev",
262 259 other_ref=cs5.short_id,
263 260 merge='1',
264 261 ))
265 262
266 263 response.mustcontain('%s@%s -&gt; %s@%s' % (repo1.repo_name, cs2.short_id, repo1.repo_name, cs5.short_id))
267 264 response.mustcontain("""Showing 3 commits""")
268 265 response.mustcontain("""1 file changed with 3 insertions and 0 deletions""")
269 266
270 267 response.mustcontain("""<div class="message tooltip" title="commit4" style="white-space:normal">commit4</div>""")
271 268 response.mustcontain("""<div class="message tooltip" title="commit5" style="white-space:normal">commit5</div>""")
272 269 response.mustcontain("""<div class="message tooltip" title="commit6" style="white-space:normal">commit6</div>""")
273 270
274 271 response.mustcontain("""<a href="/%s/changeset/%s">r3:%s</a>""" % (repo1.repo_name, cs3.raw_id, cs3.short_id))
275 272 response.mustcontain("""<a href="/%s/changeset/%s">r4:%s</a>""" % (repo1.repo_name, cs4.raw_id, cs4.short_id))
276 273 response.mustcontain("""<a href="/%s/changeset/%s">r5:%s</a>""" % (repo1.repo_name, cs5.raw_id, cs5.short_id))
277 274 ## files
278 275 response.mustcontain("""#C--826e8142e6ba">file1</a>""")
279 276
280 277 def test_compare_cherry_pick_changeset_mixed_branches(self):
281 """
282
283 """
284 278 pass
285 279 #TODO write this tastecase
286 280
287 281 def test_compare_remote_branches_hg(self):
288 282 self.log_user()
289 283
290 284 repo2 = fixture.create_fork(HG_REPO, HG_FORK)
291 285 self.r2_id = repo2.repo_id
292 286 rev1 = '56349e29c2af'
293 287 rev2 = '7d4bc8ec6be5'
294 288
295 289 response = self.app.get(url(controller='compare', action='index',
296 290 repo_name=HG_REPO,
297 291 org_ref_type="rev",
298 292 org_ref=rev1,
299 293 other_ref_type="rev",
300 294 other_ref=rev2,
301 295 other_repo=HG_FORK,
302 296 merge='1',
303 297 ))
304 298 response.mustcontain('%s@%s -&gt; %s@%s' % (HG_REPO, rev1, HG_FORK, rev2))
305 299 ## outgoing changesets between those revisions
306 300
307 301 response.mustcontain("""<a href="/%s/changeset/2dda4e345facb0ccff1a191052dd1606dba6781d">r4:2dda4e345fac</a>""" % (HG_FORK))
308 302 response.mustcontain("""<a href="/%s/changeset/6fff84722075f1607a30f436523403845f84cd9e">r5:6fff84722075</a>""" % (HG_FORK))
309 303 response.mustcontain("""<a href="/%s/changeset/7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7">r6:%s</a>""" % (HG_FORK, rev2))
310 304
311 305 ## files
312 306 response.mustcontain("""<a href="/%s/compare/rev@%s...rev@%s?other_repo=%s&amp;merge=1#C--9c390eb52cd6">vcs/backends/hg.py</a>""" % (HG_REPO, rev1, rev2, HG_FORK))
313 307 response.mustcontain("""<a href="/%s/compare/rev@%s...rev@%s?other_repo=%s&amp;merge=1#C--41b41c1f2796">vcs/backends/__init__.py</a>""" % (HG_REPO, rev1, rev2, HG_FORK))
314 308 response.mustcontain("""<a href="/%s/compare/rev@%s...rev@%s?other_repo=%s&amp;merge=1#C--2f574d260608">vcs/backends/base.py</a>""" % (HG_REPO, rev1, rev2, HG_FORK))
315 309
316 310 def test_org_repo_new_commits_after_forking_simple_diff(self):
317 311 self.log_user()
318 312
319 repo1 = RepoModel().create_repo(repo_name='one', repo_type='hg',
320 description='diff-test',
321 owner=TEST_USER_ADMIN_LOGIN)
313 repo1 = fixture.create_repo('one', repo_type='hg',
314 repo_description='diff-test',
315 cur_user=TEST_USER_ADMIN_LOGIN)
322 316
323 Session().commit()
324 317 self.r1_id = repo1.repo_id
325 318 r1_name = repo1.repo_name
326 319
327 320 #commit something initially !
328 321 cs0 = ScmModel().create_node(
329 322 repo=repo1.scm_instance, repo_name=r1_name,
330 323 cs=EmptyChangeset(alias='hg'), user=TEST_USER_ADMIN_LOGIN,
331 324 author=TEST_USER_ADMIN_LOGIN,
332 325 message='commit1',
333 326 content='line1',
334 327 f_path='file1'
335 328 )
336 329 Session().commit()
337 330 self.assertEqual(repo1.scm_instance.revisions, [cs0.raw_id])
338 331 #fork the repo1
339 repo2 = RepoModel().create_repo(repo_name='one-fork', repo_type='hg',
340 description='compare-test',
332 repo2 = fixture.create_repo('one-fork', repo_type='hg',
333 repo_description='diff-test',
334 cur_user=TEST_USER_ADMIN_LOGIN,
341 335 clone_uri=repo1.repo_full_path,
342 owner=TEST_USER_ADMIN_LOGIN, fork_of='one')
336 fork_of='one')
343 337 Session().commit()
344 338 self.assertEqual(repo2.scm_instance.revisions, [cs0.raw_id])
345 339 self.r2_id = repo2.repo_id
346 340 r2_name = repo2.repo_name
347 341
348 342 #make 3 new commits in fork
349 343 cs1 = ScmModel().create_node(
350 344 repo=repo2.scm_instance, repo_name=r2_name,
351 345 cs=repo2.scm_instance[-1], user=TEST_USER_ADMIN_LOGIN,
352 346 author=TEST_USER_ADMIN_LOGIN,
353 347 message='commit1-fork',
354 348 content='file1-line1-from-fork',
355 349 f_path='file1-fork'
356 350 )
357 351 cs2 = ScmModel().create_node(
358 352 repo=repo2.scm_instance, repo_name=r2_name,
359 353 cs=cs1, user=TEST_USER_ADMIN_LOGIN,
360 354 author=TEST_USER_ADMIN_LOGIN,
361 355 message='commit2-fork',
362 356 content='file2-line1-from-fork',
363 357 f_path='file2-fork'
364 358 )
365 359 cs3 = ScmModel().create_node(
366 360 repo=repo2.scm_instance, repo_name=r2_name,
367 361 cs=cs2, user=TEST_USER_ADMIN_LOGIN,
368 362 author=TEST_USER_ADMIN_LOGIN,
369 363 message='commit3-fork',
370 364 content='file3-line1-from-fork',
371 365 f_path='file3-fork'
372 366 )
373 367
374 368 #compare !
375 369 rev1 = 'default'
376 370 rev2 = 'default'
377 371
378 372 response = self.app.get(url(controller='compare', action='index',
379 373 repo_name=r2_name,
380 374 org_ref_type="branch",
381 375 org_ref=rev1,
382 376 other_ref_type="branch",
383 377 other_ref=rev2,
384 378 other_repo=r1_name,
385 379 merge='1',
386 380 ))
387 381 response.mustcontain('%s@%s -&gt; %s@%s' % (r2_name, rev1, r1_name, rev2))
388 382 response.mustcontain('No files')
389 383 response.mustcontain('No changesets')
390 384
391 385 #add new commit into parent !
392 386 cs0 = ScmModel().create_node(
393 387 repo=repo1.scm_instance, repo_name=r1_name,
394 388 cs=EmptyChangeset(alias='hg'), user=TEST_USER_ADMIN_LOGIN,
395 389 author=TEST_USER_ADMIN_LOGIN,
396 390 message='commit2-parent',
397 391 content='line1-added-after-fork',
398 392 f_path='file2'
399 393 )
400 394 #compare !
401 395 rev1 = 'default'
402 396 rev2 = 'default'
403 397 response = self.app.get(url(controller='compare', action='index',
404 398 repo_name=r2_name,
405 399 org_ref_type="branch",
406 400 org_ref=rev1,
407 401 other_ref_type="branch",
408 402 other_ref=rev2,
409 403 other_repo=r1_name,
410 404 merge='1',
411 405 ))
412 406
413 407 response.mustcontain('%s@%s -&gt; %s@%s' % (r2_name, rev1, r1_name, rev2))
414 408
415 409 response.mustcontain("""commit2-parent""")
416 410 response.mustcontain("""1 file changed with 1 insertions and 0 deletions""")
417 411 response.mustcontain("""line1-added-after-fork""")
@@ -1,108 +1,111 b''
1 1 import time
2 2 from rhodecode.tests import *
3 from rhodecode.tests.fixture import Fixture
3 4 from rhodecode.model.meta import Session
4 from rhodecode.model.db import User, RhodeCodeSetting, Repository
5 from rhodecode.lib.utils import set_rhodecode_config
6 from rhodecode.tests.models.common import _make_repo, _make_group
5 from rhodecode.model.db import User, Repository
7 6 from rhodecode.model.repo import RepoModel
8 7 from rhodecode.model.repos_group import ReposGroupModel
9 8
10 9
10 fixture = Fixture()
11
12
11 13 class TestHomeController(TestController):
12 14
13 15 def test_index(self):
14 16 self.log_user()
15 17 response = self.app.get(url(controller='home', action='index'))
16 18 #if global permission is set
17 19 response.mustcontain('Add repository')
18 20 response.mustcontain('href="/%s"' % HG_REPO)
19 21
20 22 response.mustcontain("""<img class="icon" title="Mercurial repository" """
21 23 """alt="Mercurial repository" src="/images/icons/hg"""
22 24 """icon.png"/>""")
23 25 response.mustcontain("""<img class="icon" title="public repository" """
24 26 """alt="public repository" src="/images/icons/lock_"""
25 27 """open.png"/>""")
26 28
27 29 response.mustcontain(
28 30 """<a title="Marcin Kuzminski &amp;lt;marcin@python-works.com&amp;gt;:\n
29 31 merge" class="tooltip" href="/vcs_test_hg/changeset/27cd5cce30c96924232"""
30 32 """dffcd24178a07ffeb5dfc">r173:27cd5cce30c9</a>"""
31 33 )
32 34
33 35 def test_repo_summary_with_anonymous_access_disabled(self):
34 36 anon = User.get_by_username('default')
35 37 anon.active = False
36 38 Session().add(anon)
37 39 Session().commit()
38 40 time.sleep(1.5) # must sleep for cache (1s to expire)
39 41 try:
40 42 response = self.app.get(url(controller='summary',
41 43 action='index', repo_name=HG_REPO),
42 44 status=302)
43 45 assert 'login' in response.location
44 46
45 47 finally:
46 48 anon = User.get_by_username('default')
47 49 anon.active = True
48 50 Session().add(anon)
49 51 Session().commit()
50 52
51 53 def test_index_with_anonymous_access_disabled(self):
52 54 anon = User.get_by_username('default')
53 55 anon.active = False
54 56 Session().add(anon)
55 57 Session().commit()
56 58 time.sleep(1.5) # must sleep for cache (1s to expire)
57 59 try:
58 60 response = self.app.get(url(controller='home', action='index'),
59 61 status=302)
60 62 assert 'login' in response.location
61 63 finally:
62 64 anon = User.get_by_username('default')
63 65 anon.active = True
64 66 Session().add(anon)
65 67 Session().commit()
66 68
67 69 def _set_l_dash(self, set_to):
68 70 self.app.post(url('admin_setting', setting_id='visual'),
69 71 params=dict(_method='put',
70 72 rhodecode_lightweight_dashboard=set_to,))
71 73
72 74 def test_index_with_lightweight_dashboard(self):
73 75 self.log_user()
74 76 self._set_l_dash(True)
75 77
76 78 try:
77 79 response = self.app.get(url(controller='home', action='index'))
78 response.mustcontain("""var data = {"totalRecords": %s""" % len(Repository.getAll()))
80 response.mustcontain("""var data = {"totalRecords": %s"""
81 % len(Repository.getAll()))
79 82 finally:
80 83 self._set_l_dash(False)
81 84
82 85 def test_index_page_on_groups(self):
83 86 self.log_user()
84 _make_repo(name='gr1/repo_in_group', repos_group=_make_group('gr1'))
85 Session().commit()
87 gr = fixture.create_group('gr1')
88 fixture.create_repo(name='gr1/repo_in_group', repos_group=gr)
86 89 response = self.app.get(url('repos_group_home', group_name='gr1'))
87 90
88 91 try:
89 response.mustcontain("""gr1/repo_in_group""")
92 response.mustcontain("gr1/repo_in_group")
90 93 finally:
91 94 RepoModel().delete('gr1/repo_in_group')
92 95 ReposGroupModel().delete(repos_group='gr1', force_delete=True)
93 96 Session().commit()
94 97
95 98 def test_index_page_on_groups_with_lightweight_dashboard(self):
96 99 self.log_user()
97 100 self._set_l_dash(True)
98 _make_repo(name='gr1/repo_in_group', repos_group=_make_group('gr1'))
99 Session().commit()
101 fixture.create_repo(name='gr1/repo_in_group',
102 repos_group=fixture.create_group('gr1'))
100 103 response = self.app.get(url('repos_group_home', group_name='gr1'))
101 104
102 105 try:
103 106 response.mustcontain("""gr1/repo_in_group""")
104 107 finally:
105 108 self._set_l_dash(False)
106 109 RepoModel().delete('gr1/repo_in_group')
107 110 ReposGroupModel().delete(repos_group='gr1', force_delete=True)
108 111 Session().commit()
@@ -1,121 +1,122 b''
1 1 from rhodecode.tests import *
2 from rhodecode.tests.fixture import Fixture
2 3 from rhodecode.model.db import Repository
3 4 from rhodecode.lib.utils import invalidate_cache
4 5 from rhodecode.model.repo import RepoModel
5 from rhodecode.tests.models.common import _make_repo
6 6 from rhodecode.model.meta import Session
7 7
8 fixture = Fixture()
9
8 10
9 11 class TestSummaryController(TestController):
10 12
11 13 def test_index(self):
12 14 self.log_user()
13 15 ID = Repository.get_by_repo_name(HG_REPO).repo_id
14 16 response = self.app.get(url(controller='summary',
15 17 action='index',
16 18 repo_name=HG_REPO))
17 19
18 20 #repo type
19 21 response.mustcontain(
20 22 """<img style="margin-bottom:2px" class="icon" """
21 23 """title="Mercurial repository" alt="Mercurial repository" """
22 24 """src="/images/icons/hgicon.png"/>"""
23 25 )
24 26 response.mustcontain(
25 27 """<img style="margin-bottom:2px" class="icon" """
26 28 """title="public repository" alt="public """
27 29 """repository" src="/images/icons/lock_open.png"/>"""
28 30 )
29 31
30 32 #codes stats
31 33 self._enable_stats()
32 34
33 35 invalidate_cache('get_repo_cached_%s' % HG_REPO)
34 36 response = self.app.get(url(controller='summary', action='index',
35 37 repo_name=HG_REPO))
36 38 response.mustcontain(
37 39 """var data = [["py", {"count": 42, "desc": ["Python"]}], """
38 40 """["rst", {"count": 11, "desc": ["Rst"]}], """
39 41 """["sh", {"count": 2, "desc": ["Bash"]}], """
40 42 """["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}],"""
41 43 """ ["cfg", {"count": 1, "desc": ["Ini"]}], """
42 44 """["css", {"count": 1, "desc": ["Css"]}], """
43 45 """["bat", {"count": 1, "desc": ["Batch"]}]];"""
44 46 )
45 47
46 48 # clone url...
47 49 response.mustcontain('''id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"''' % HG_REPO)
48 50 response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"''' % ID)
49 51
50 52 def test_index_git(self):
51 53 self.log_user()
52 54 ID = Repository.get_by_repo_name(GIT_REPO).repo_id
53 55 response = self.app.get(url(controller='summary',
54 56 action='index',
55 57 repo_name=GIT_REPO))
56 58
57 59 #repo type
58 60 response.mustcontain(
59 61 """<img style="margin-bottom:2px" class="icon" """
60 62 """title="Git repository" alt="Git repository" """
61 63 """src="/images/icons/giticon.png"/>"""
62 64 )
63 65 response.mustcontain(
64 66 """<img style="margin-bottom:2px" class="icon" """
65 67 """title="public repository" alt="public """
66 68 """repository" src="/images/icons/lock_open.png"/>"""
67 69 )
68 70
69 71 # clone url...
70 72 response.mustcontain('''id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"''' % GIT_REPO)
71 73 response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"''' % ID)
72 74
73 75 def test_index_by_id_hg(self):
74 76 self.log_user()
75 77 ID = Repository.get_by_repo_name(HG_REPO).repo_id
76 78 response = self.app.get(url(controller='summary',
77 79 action='index',
78 80 repo_name='_%s' % ID))
79 81
80 82 #repo type
81 83 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
82 84 """title="Mercurial repository" alt="Mercurial """
83 85 """repository" src="/images/icons/hgicon.png"/>""")
84 86 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
85 87 """title="public repository" alt="public """
86 88 """repository" src="/images/icons/lock_open.png"/>""")
87 89
88 90 def test_index_by_repo_having_id_path_in_name_hg(self):
89 91 self.log_user()
90 _make_repo(name='repo_1')
91 Session().commit()
92 fixture.create_repo(name='repo_1')
92 93 response = self.app.get(url(controller='summary',
93 94 action='index',
94 95 repo_name='repo_1'))
95 96
96 97 try:
97 response.mustcontain("""repo_1""")
98 response.mustcontain("repo_1")
98 99 finally:
99 100 RepoModel().delete(Repository.get_by_repo_name('repo_1'))
100 101 Session().commit()
101 102
102 103 def test_index_by_id_git(self):
103 104 self.log_user()
104 105 ID = Repository.get_by_repo_name(GIT_REPO).repo_id
105 106 response = self.app.get(url(controller='summary',
106 107 action='index',
107 108 repo_name='_%s' % ID))
108 109
109 110 #repo type
110 111 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
111 112 """title="Git repository" alt="Git """
112 113 """repository" src="/images/icons/giticon.png"/>""")
113 114 response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
114 115 """title="public repository" alt="public """
115 116 """repository" src="/images/icons/lock_open.png"/>""")
116 117
117 118 def _enable_stats(self):
118 119 r = Repository.get_by_repo_name(HG_REPO)
119 120 r.enable_statistics = True
120 121 self.Session.add(r)
121 122 self.Session.commit()
@@ -1,120 +1,104 b''
1 1 import os
2 2 import unittest
3 3 import functools
4 4 from rhodecode.tests import *
5
5 from rhodecode.tests.fixture import Fixture
6 6
7 7 from rhodecode.model.repos_group import ReposGroupModel
8 8 from rhodecode.model.repo import RepoModel
9 9 from rhodecode.model.db import RepoGroup, Repository, User
10 10 from rhodecode.model.user import UserModel
11 11
12 12 from rhodecode.lib.auth import AuthUser
13 13 from rhodecode.model.meta import Session
14 14
15 15
16 def _make_group(path, desc='desc', parent_id=None,
17 skip_if_exists=False):
18
19 gr = RepoGroup.get_by_group_name(path)
20 if gr and skip_if_exists:
21 return gr
22 if isinstance(parent_id, RepoGroup):
23 parent_id = parent_id.group_id
24 gr = ReposGroupModel().create(path, desc, TEST_USER_ADMIN_LOGIN, parent_id)
25 return gr
26
27
28 def _make_repo(name, repos_group=None, repo_type='hg', private=False):
29 return RepoModel().create_repo(name, repo_type, 'desc',
30 TEST_USER_ADMIN_LOGIN,
31 repos_group=repos_group,
32 private=private)
16 fixture = Fixture()
33 17
34 18
35 19 def _destroy_project_tree(test_u1_id):
36 20 Session.remove()
37 21 repos_group = RepoGroup.get_by_group_name(group_name='g0')
38 22 for el in reversed(repos_group.recursive_groups_and_repos()):
39 23 if isinstance(el, Repository):
40 24 RepoModel().delete(el)
41 25 elif isinstance(el, RepoGroup):
42 26 ReposGroupModel().delete(el, force_delete=True)
43 27
44 28 u = User.get(test_u1_id)
45 29 Session().delete(u)
46 30 Session().commit()
47 31
48 32
49 33 def _create_project_tree():
50 34 """
51 35 Creates a tree of groups and repositories to test permissions
52 36
53 37 structure
54 38 [g0] - group `g0` with 3 subgroups
55 39 |
56 40 |__[g0_1] group g0_1 with 2 groups 0 repos
57 41 | |
58 42 | |__[g0_1_1] group g0_1_1 with 1 group 2 repos
59 43 | | |__<g0/g0_1/g0_1_1/g0_1_1_r1>
60 44 | | |__<g0/g0_1/g0_1_1/g0_1_1_r2>
61 45 | |__<g0/g0_1/g0_1_r1>
62 46 |
63 47 |__[g0_2] 2 repos
64 48 | |
65 49 | |__<g0/g0_2/g0_2_r1>
66 50 | |__<g0/g0_2/g0_2_r2>
67 51 |
68 52 |__[g0_3] 1 repo
69 53 |
70 54 |_<g0/g0_3/g0_3_r1>
71 55 |_<g0/g0_3/g0_3_r2_private>
72 56
73 57 """
74 58 test_u1 = UserModel().create_or_update(
75 59 username=u'test_u1', password=u'qweqwe',
76 60 email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1'
77 61 )
78 g0 = _make_group('g0')
79 g0_1 = _make_group('g0_1', parent_id=g0)
80 g0_1_1 = _make_group('g0_1_1', parent_id=g0_1)
81 g0_1_1_r1 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
82 g0_1_1_r2 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
83 g0_1_r1 = _make_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
84 g0_2 = _make_group('g0_2', parent_id=g0)
85 g0_2_r1 = _make_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
86 g0_2_r2 = _make_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
87 g0_3 = _make_group('g0_3', parent_id=g0)
88 g0_3_r1 = _make_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
89 g0_3_r2_private = _make_repo('g0/g0_3/g0_3_r1_private', repos_group=g0_3,
90 private=True)
62 g0 = fixture.create_group('g0')
63 g0_1 = fixture.create_group('g0_1', group_parent_id=g0)
64 g0_1_1 = fixture.create_group('g0_1_1', group_parent_id=g0_1)
65 g0_1_1_r1 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
66 g0_1_1_r2 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
67 g0_1_r1 = fixture.create_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
68 g0_2 = fixture.create_group('g0_2', group_parent_id=g0)
69 g0_2_r1 = fixture.create_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
70 g0_2_r2 = fixture.create_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
71 g0_3 = fixture.create_group('g0_3', group_parent_id=g0)
72 g0_3_r1 = fixture.create_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
73 g0_3_r2_private = fixture.create_repo('g0/g0_3/g0_3_r1_private',
74 repos_group=g0_3, repo_private=True)
91 75 return test_u1
92 76
93 77
94 78 def expected_count(group_name, objects=False):
95 79 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
96 80 objs = repos_group.recursive_groups_and_repos()
97 81 if objects:
98 82 return objs
99 83 return len(objs)
100 84
101 85
102 86 def _check_expected_count(items, repo_items, expected):
103 87 should_be = len(items + repo_items)
104 88 there_are = len(expected)
105 89 assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
106 90
107 91
108 92 def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
109 93 assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
110 94 % (obj_name, repo_perm, expected_perm))
111 95
112 96
113 97 def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None):
114 98 test_u1 = AuthUser(user_id=test_u1_id)
115 99 for k, v in test_u1.permissions[key].items():
116 100 if recursive and k.startswith(filter_):
117 101 yield k, v
118 102 elif not recursive:
119 103 if k == filter_:
120 104 yield k, v
@@ -1,472 +1,465 b''
1 1 import os
2 2 import unittest
3 3 from rhodecode.tests import *
4 from rhodecode.tests.models.common import _make_group
4 from rhodecode.tests.fixture import Fixture
5 5 from rhodecode.model.repos_group import ReposGroupModel
6 6 from rhodecode.model.repo import RepoModel
7 7 from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm
8 8 from rhodecode.model.user import UserModel
9 9
10 10 from rhodecode.model.meta import Session
11 11 from rhodecode.model.users_group import UserGroupModel
12 12 from rhodecode.lib.auth import AuthUser
13 from rhodecode.tests.api.api_base import create_repo
13
14
15 fixture = Fixture()
14 16
15 17
16 18 class TestPermissions(unittest.TestCase):
17 19 def __init__(self, methodName='runTest'):
18 20 super(TestPermissions, self).__init__(methodName=methodName)
19 21
20 22 def setUp(self):
21 23 self.u1 = UserModel().create_or_update(
22 24 username=u'u1', password=u'qweqwe',
23 25 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
24 26 )
25 27 self.u2 = UserModel().create_or_update(
26 28 username=u'u2', password=u'qweqwe',
27 29 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
28 30 )
29 31 self.u3 = UserModel().create_or_update(
30 32 username=u'u3', password=u'qweqwe',
31 33 email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
32 34 )
33 35 self.anon = User.get_by_username('default')
34 36 self.a1 = UserModel().create_or_update(
35 37 username=u'a1', password=u'qweqwe',
36 38 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
37 39 )
38 40 Session().commit()
39 41
40 42 def tearDown(self):
41 43 if hasattr(self, 'test_repo'):
42 44 RepoModel().delete(repo=self.test_repo)
43 45
44 46 UserModel().delete(self.u1)
45 47 UserModel().delete(self.u2)
46 48 UserModel().delete(self.u3)
47 49 UserModel().delete(self.a1)
48 50 if hasattr(self, 'g1'):
49 51 ReposGroupModel().delete(self.g1.group_id)
50 52 if hasattr(self, 'g2'):
51 53 ReposGroupModel().delete(self.g2.group_id)
52 54
53 55 if hasattr(self, 'ug1'):
54 56 UserGroupModel().delete(self.ug1, force=True)
55 57
56 58 Session().commit()
57 59
58 60 def test_default_perms_set(self):
59 61 u1_auth = AuthUser(user_id=self.u1.user_id)
60 62 perms = {
61 63 'repositories_groups': {},
62 64 'global': set([u'hg.create.repository', u'repository.read',
63 65 u'hg.register.manual_activate']),
64 66 'repositories': {u'vcs_test_hg': u'repository.read'}
65 67 }
66 68 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
67 69 perms['repositories'][HG_REPO])
68 70 new_perm = 'repository.write'
69 71 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
70 72 perm=new_perm)
71 73 Session().commit()
72 74
73 75 u1_auth = AuthUser(user_id=self.u1.user_id)
74 76 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
75 77 new_perm)
76 78
77 79 def test_default_admin_perms_set(self):
78 80 a1_auth = AuthUser(user_id=self.a1.user_id)
79 81 perms = {
80 82 'repositories_groups': {},
81 83 'global': set([u'hg.admin']),
82 84 'repositories': {u'vcs_test_hg': u'repository.admin'}
83 85 }
84 86 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
85 87 perms['repositories'][HG_REPO])
86 88 new_perm = 'repository.write'
87 89 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
88 90 perm=new_perm)
89 91 Session().commit()
90 92 # cannot really downgrade admins permissions !? they still get's set as
91 93 # admin !
92 94 u1_auth = AuthUser(user_id=self.a1.user_id)
93 95 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
94 96 perms['repositories'][HG_REPO])
95 97
96 98 def test_default_group_perms(self):
97 self.g1 = _make_group('test1', skip_if_exists=True)
98 self.g2 = _make_group('test2', skip_if_exists=True)
99 self.g1 = fixture.create_group('test1', skip_if_exists=True)
100 self.g2 = fixture.create_group('test2', skip_if_exists=True)
99 101 u1_auth = AuthUser(user_id=self.u1.user_id)
100 102 perms = {
101 103 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
102 104 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
103 105 'repositories': {u'vcs_test_hg': u'repository.read'}
104 106 }
105 107 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
106 108 perms['repositories'][HG_REPO])
107 109 self.assertEqual(u1_auth.permissions['repositories_groups'],
108 110 perms['repositories_groups'])
109 111
110 112 def test_default_admin_group_perms(self):
111 self.g1 = _make_group('test1', skip_if_exists=True)
112 self.g2 = _make_group('test2', skip_if_exists=True)
113 self.g1 = fixture.create_group('test1', skip_if_exists=True)
114 self.g2 = fixture.create_group('test2', skip_if_exists=True)
113 115 a1_auth = AuthUser(user_id=self.a1.user_id)
114 116 perms = {
115 117 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
116 118 'global': set(['hg.admin']),
117 119 'repositories': {u'vcs_test_hg': 'repository.admin'}
118 120 }
119 121
120 122 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
121 123 perms['repositories'][HG_REPO])
122 124 self.assertEqual(a1_auth.permissions['repositories_groups'],
123 125 perms['repositories_groups'])
124 126
125 127 def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
126 128 # make group
127 129 self.ug1 = UserGroupModel().create('G1')
128 130 # add user to group
129 131
130 132 UserGroupModel().add_user_to_group(self.ug1, self.u1)
131 133
132 134 # set permission to lower
133 135 new_perm = 'repository.none'
134 136 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
135 137 Session().commit()
136 138 u1_auth = AuthUser(user_id=self.u1.user_id)
137 139 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
138 140 new_perm)
139 141
140 142 # grant perm for group this should not override permission from user
141 143 # since it has explicitly set
142 144 new_perm_gr = 'repository.write'
143 145 RepoModel().grant_users_group_permission(repo=HG_REPO,
144 146 group_name=self.ug1,
145 147 perm=new_perm_gr)
146 148 # check perms
147 149 u1_auth = AuthUser(user_id=self.u1.user_id)
148 150 perms = {
149 151 'repositories_groups': {},
150 152 'global': set([u'hg.create.repository', u'repository.read',
151 153 u'hg.register.manual_activate']),
152 154 'repositories': {u'vcs_test_hg': u'repository.read'}
153 155 }
154 156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
155 157 new_perm)
156 158 self.assertEqual(u1_auth.permissions['repositories_groups'],
157 159 perms['repositories_groups'])
158 160
159 161 def test_propagated_permission_from_users_group(self):
160 162 # make group
161 163 self.ug1 = UserGroupModel().create('G1')
162 164 # add user to group
163 165
164 166 UserGroupModel().add_user_to_group(self.ug1, self.u3)
165 167
166 168 # grant perm for group this should override default permission from user
167 169 new_perm_gr = 'repository.write'
168 170 RepoModel().grant_users_group_permission(repo=HG_REPO,
169 171 group_name=self.ug1,
170 172 perm=new_perm_gr)
171 173 # check perms
172 174 u3_auth = AuthUser(user_id=self.u3.user_id)
173 175 perms = {
174 176 'repositories_groups': {},
175 177 'global': set([u'hg.create.repository', u'repository.read',
176 178 u'hg.register.manual_activate']),
177 179 'repositories': {u'vcs_test_hg': u'repository.read'}
178 180 }
179 181 self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
180 182 new_perm_gr)
181 183 self.assertEqual(u3_auth.permissions['repositories_groups'],
182 184 perms['repositories_groups'])
183 185
184 186 def test_propagated_permission_from_users_group_lower_weight(self):
185 187 # make group
186 188 self.ug1 = UserGroupModel().create('G1')
187 189 # add user to group
188 190 UserGroupModel().add_user_to_group(self.ug1, self.u1)
189 191
190 192 # set permission to lower
191 193 new_perm_h = 'repository.write'
192 194 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
193 195 perm=new_perm_h)
194 196 Session().commit()
195 197 u1_auth = AuthUser(user_id=self.u1.user_id)
196 198 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
197 199 new_perm_h)
198 200
199 201 # grant perm for group this should NOT override permission from user
200 202 # since it's lower than granted
201 203 new_perm_l = 'repository.read'
202 204 RepoModel().grant_users_group_permission(repo=HG_REPO,
203 205 group_name=self.ug1,
204 206 perm=new_perm_l)
205 207 # check perms
206 208 u1_auth = AuthUser(user_id=self.u1.user_id)
207 209 perms = {
208 210 'repositories_groups': {},
209 211 'global': set([u'hg.create.repository', u'repository.read',
210 212 u'hg.register.manual_activate']),
211 213 'repositories': {u'vcs_test_hg': u'repository.write'}
212 214 }
213 215 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
214 216 new_perm_h)
215 217 self.assertEqual(u1_auth.permissions['repositories_groups'],
216 218 perms['repositories_groups'])
217 219
218 220 def test_repo_in_group_permissions(self):
219 self.g1 = _make_group('group1', skip_if_exists=True)
220 self.g2 = _make_group('group2', skip_if_exists=True)
221 Session().commit()
221 self.g1 = fixture.create_group('group1', skip_if_exists=True)
222 self.g2 = fixture.create_group('group2', skip_if_exists=True)
222 223 # both perms should be read !
223 224 u1_auth = AuthUser(user_id=self.u1.user_id)
224 225 self.assertEqual(u1_auth.permissions['repositories_groups'],
225 226 {u'group1': u'group.read', u'group2': u'group.read'})
226 227
227 228 a1_auth = AuthUser(user_id=self.anon.user_id)
228 229 self.assertEqual(a1_auth.permissions['repositories_groups'],
229 230 {u'group1': u'group.read', u'group2': u'group.read'})
230 231
231 232 #Change perms to none for both groups
232 233 ReposGroupModel().grant_user_permission(repos_group=self.g1,
233 234 user=self.anon,
234 235 perm='group.none')
235 236 ReposGroupModel().grant_user_permission(repos_group=self.g2,
236 237 user=self.anon,
237 238 perm='group.none')
238 239
239 240 u1_auth = AuthUser(user_id=self.u1.user_id)
240 241 self.assertEqual(u1_auth.permissions['repositories_groups'],
241 242 {u'group1': u'group.none', u'group2': u'group.none'})
242 243
243 244 a1_auth = AuthUser(user_id=self.anon.user_id)
244 245 self.assertEqual(a1_auth.permissions['repositories_groups'],
245 246 {u'group1': u'group.none', u'group2': u'group.none'})
246 247
247 248 # add repo to group
248 249 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
249 self.test_repo = RepoModel().create_repo(
250 repo_name=name,
250 self.test_repo = fixture.create_repo(name=name,
251 251 repo_type='hg',
252 description='',
253 252 repos_group=self.g1,
254 owner=self.u1,
255 )
256 Session().commit()
253 cur_user=self.u1,)
257 254
258 255 u1_auth = AuthUser(user_id=self.u1.user_id)
259 256 self.assertEqual(u1_auth.permissions['repositories_groups'],
260 257 {u'group1': u'group.none', u'group2': u'group.none'})
261 258
262 259 a1_auth = AuthUser(user_id=self.anon.user_id)
263 260 self.assertEqual(a1_auth.permissions['repositories_groups'],
264 261 {u'group1': u'group.none', u'group2': u'group.none'})
265 262
266 263 #grant permission for u2 !
267 264 ReposGroupModel().grant_user_permission(repos_group=self.g1,
268 265 user=self.u2,
269 266 perm='group.read')
270 267 ReposGroupModel().grant_user_permission(repos_group=self.g2,
271 268 user=self.u2,
272 269 perm='group.read')
273 270 Session().commit()
274 271 self.assertNotEqual(self.u1, self.u2)
275 272 #u1 and anon should have not change perms while u2 should !
276 273 u1_auth = AuthUser(user_id=self.u1.user_id)
277 274 self.assertEqual(u1_auth.permissions['repositories_groups'],
278 275 {u'group1': u'group.none', u'group2': u'group.none'})
279 276
280 277 u2_auth = AuthUser(user_id=self.u2.user_id)
281 278 self.assertEqual(u2_auth.permissions['repositories_groups'],
282 279 {u'group1': u'group.read', u'group2': u'group.read'})
283 280
284 281 a1_auth = AuthUser(user_id=self.anon.user_id)
285 282 self.assertEqual(a1_auth.permissions['repositories_groups'],
286 283 {u'group1': u'group.none', u'group2': u'group.none'})
287 284
288 285 def test_repo_group_user_as_user_group_member(self):
289 286 # create Group1
290 self.g1 = _make_group('group1', skip_if_exists=True)
291 Session().commit()
287 self.g1 = fixture.create_group('group1', skip_if_exists=True)
292 288 a1_auth = AuthUser(user_id=self.anon.user_id)
293 289
294 290 self.assertEqual(a1_auth.permissions['repositories_groups'],
295 291 {u'group1': u'group.read'})
296 292
297 293 # set default permission to none
298 294 ReposGroupModel().grant_user_permission(repos_group=self.g1,
299 295 user=self.anon,
300 296 perm='group.none')
301 297 # make group
302 298 self.ug1 = UserGroupModel().create('G1')
303 299 # add user to group
304 300 UserGroupModel().add_user_to_group(self.ug1, self.u1)
305 301 Session().commit()
306 302
307 303 # check if user is in the group
308 304 membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
309 305 self.assertEqual(membrs, [self.u1.user_id])
310 306 # add some user to that group
311 307
312 308 # check his permissions
313 309 a1_auth = AuthUser(user_id=self.anon.user_id)
314 310 self.assertEqual(a1_auth.permissions['repositories_groups'],
315 311 {u'group1': u'group.none'})
316 312
317 313 u1_auth = AuthUser(user_id=self.u1.user_id)
318 314 self.assertEqual(u1_auth.permissions['repositories_groups'],
319 315 {u'group1': u'group.none'})
320 316
321 317 # grant ug1 read permissions for
322 318 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
323 319 group_name=self.ug1,
324 320 perm='group.read')
325 321 Session().commit()
326 322 # check if the
327 323 obj = Session().query(UserGroupRepoGroupToPerm)\
328 324 .filter(UserGroupRepoGroupToPerm.group == self.g1)\
329 325 .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
330 326 .scalar()
331 327 self.assertEqual(obj.permission.permission_name, 'group.read')
332 328
333 329 a1_auth = AuthUser(user_id=self.anon.user_id)
334 330
335 331 self.assertEqual(a1_auth.permissions['repositories_groups'],
336 332 {u'group1': u'group.none'})
337 333
338 334 u1_auth = AuthUser(user_id=self.u1.user_id)
339 335 self.assertEqual(u1_auth.permissions['repositories_groups'],
340 336 {u'group1': u'group.read'})
341 337
342 338 def test_inherited_permissions_from_default_on_user_enabled(self):
343 339 user_model = UserModel()
344 340 # enable fork and create on default user
345 341 usr = 'default'
346 342 user_model.revoke_perm(usr, 'hg.create.none')
347 343 user_model.grant_perm(usr, 'hg.create.repository')
348 344 user_model.revoke_perm(usr, 'hg.fork.none')
349 345 user_model.grant_perm(usr, 'hg.fork.repository')
350 346 # make sure inherit flag is turned on
351 347 self.u1.inherit_default_permissions = True
352 348 Session().commit()
353 349 u1_auth = AuthUser(user_id=self.u1.user_id)
354 350 # this user will have inherited permissions from default user
355 351 self.assertEqual(u1_auth.permissions['global'],
356 352 set(['hg.create.repository', 'hg.fork.repository',
357 353 'hg.register.manual_activate',
358 354 'repository.read', 'group.read']))
359 355
360 356 def test_inherited_permissions_from_default_on_user_disabled(self):
361 357 user_model = UserModel()
362 358 # disable fork and create on default user
363 359 usr = 'default'
364 360 user_model.revoke_perm(usr, 'hg.create.repository')
365 361 user_model.grant_perm(usr, 'hg.create.none')
366 362 user_model.revoke_perm(usr, 'hg.fork.repository')
367 363 user_model.grant_perm(usr, 'hg.fork.none')
368 364 # make sure inherit flag is turned on
369 365 self.u1.inherit_default_permissions = True
370 366 Session().commit()
371 367 u1_auth = AuthUser(user_id=self.u1.user_id)
372 368 # this user will have inherited permissions from default user
373 369 self.assertEqual(u1_auth.permissions['global'],
374 370 set(['hg.create.none', 'hg.fork.none',
375 371 'hg.register.manual_activate',
376 372 'repository.read', 'group.read']))
377 373
378 374 def test_non_inherited_permissions_from_default_on_user_enabled(self):
379 375 user_model = UserModel()
380 376 # enable fork and create on default user
381 377 usr = 'default'
382 378 user_model.revoke_perm(usr, 'hg.create.none')
383 379 user_model.grant_perm(usr, 'hg.create.repository')
384 380 user_model.revoke_perm(usr, 'hg.fork.none')
385 381 user_model.grant_perm(usr, 'hg.fork.repository')
386 382
387 383 #disable global perms on specific user
388 384 user_model.revoke_perm(self.u1, 'hg.create.repository')
389 385 user_model.grant_perm(self.u1, 'hg.create.none')
390 386 user_model.revoke_perm(self.u1, 'hg.fork.repository')
391 387 user_model.grant_perm(self.u1, 'hg.fork.none')
392 388
393 389 # make sure inherit flag is turned off
394 390 self.u1.inherit_default_permissions = False
395 391 Session().commit()
396 392 u1_auth = AuthUser(user_id=self.u1.user_id)
397 393 # this user will have non inherited permissions from he's
398 394 # explicitly set permissions
399 395 self.assertEqual(u1_auth.permissions['global'],
400 396 set(['hg.create.none', 'hg.fork.none',
401 397 'hg.register.manual_activate',
402 398 'repository.read', 'group.read']))
403 399
404 400 def test_non_inherited_permissions_from_default_on_user_disabled(self):
405 401 user_model = UserModel()
406 402 # disable fork and create on default user
407 403 usr = 'default'
408 404 user_model.revoke_perm(usr, 'hg.create.repository')
409 405 user_model.grant_perm(usr, 'hg.create.none')
410 406 user_model.revoke_perm(usr, 'hg.fork.repository')
411 407 user_model.grant_perm(usr, 'hg.fork.none')
412 408
413 409 #enable global perms on specific user
414 410 user_model.revoke_perm(self.u1, 'hg.create.none')
415 411 user_model.grant_perm(self.u1, 'hg.create.repository')
416 412 user_model.revoke_perm(self.u1, 'hg.fork.none')
417 413 user_model.grant_perm(self.u1, 'hg.fork.repository')
418 414
419 415 # make sure inherit flag is turned off
420 416 self.u1.inherit_default_permissions = False
421 417 Session().commit()
422 418 u1_auth = AuthUser(user_id=self.u1.user_id)
423 419 # this user will have non inherited permissions from he's
424 420 # explicitly set permissions
425 421 self.assertEqual(u1_auth.permissions['global'],
426 422 set(['hg.create.repository', 'hg.fork.repository',
427 423 'hg.register.manual_activate',
428 424 'repository.read', 'group.read']))
429 425
430 426 def test_owner_permissions_doesnot_get_overwritten_by_group(self):
431 427 #create repo as USER,
432 self.test_repo = repo = RepoModel().create_repo(repo_name='myownrepo',
428 self.test_repo = fixture.create_repo(name='myownrepo',
433 429 repo_type='hg',
434 description='desc',
435 owner=self.u1)
430 cur_user=self.u1)
436 431
437 Session().commit()
438 432 #he has permissions of admin as owner
439 433 u1_auth = AuthUser(user_id=self.u1.user_id)
440 434 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
441 435 'repository.admin')
442 436 #set his permission as user group, he should still be admin
443 437 self.ug1 = UserGroupModel().create('G1')
444 438 # add user to group
445 439 UserGroupModel().add_user_to_group(self.ug1, self.u1)
446 RepoModel().grant_users_group_permission(repo, group_name=self.ug1,
440 RepoModel().grant_users_group_permission(self.test_repo,
441 group_name=self.ug1,
447 442 perm='repository.none')
448 443
449 444 Session().commit()
450 445 u1_auth = AuthUser(user_id=self.u1.user_id)
451 446 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
452 447 'repository.admin')
453 448
454 449 def test_owner_permissions_doesnot_get_overwritten_by_others(self):
455 450 #create repo as USER,
456 self.test_repo = repo = RepoModel().create_repo(repo_name='myownrepo',
451 self.test_repo = fixture.create_repo(name='myownrepo',
457 452 repo_type='hg',
458 description='desc',
459 owner=self.u1)
453 cur_user=self.u1)
460 454
461 Session().commit()
462 455 #he has permissions of admin as owner
463 456 u1_auth = AuthUser(user_id=self.u1.user_id)
464 457 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
465 458 'repository.admin')
466 459 #set his permission as user, he should still be admin
467 RepoModel().grant_user_permission(repo, user=self.u1,
460 RepoModel().grant_user_permission(self.test_repo, user=self.u1,
468 461 perm='repository.none')
469 462 Session().commit()
470 463 u1_auth = AuthUser(user_id=self.u1.user_id)
471 464 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
472 465 'repository.admin')
@@ -1,242 +1,200 b''
1 1 import os
2 2 import unittest
3 from sqlalchemy.exc import IntegrityError
4
3 5 from rhodecode.tests import *
6 from rhodecode.tests.fixture import Fixture
4 7
5 8 from rhodecode.model.repos_group import ReposGroupModel
6 9 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User
10 from rhodecode.model.db import RepoGroup
8 11 from rhodecode.model.meta import Session
9 from sqlalchemy.exc import IntegrityError
10 12
11 13
12 def _make_group(path, desc='desc', parent_id=None,
13 skip_if_exists=False):
14
15 gr = RepoGroup.get_by_group_name(path)
16 if gr and skip_if_exists:
17 return gr
18 if isinstance(parent_id, RepoGroup):
19 parent_id = parent_id.group_id
20 gr = ReposGroupModel().create(path, desc, TEST_USER_ADMIN_LOGIN, parent_id)
21 return gr
14 fixture = Fixture()
22 15
23 16
24 17 def _update_group(id_, group_name, desc='desc', parent_id=None):
25 form_data = _get_group_create_params(group_name=group_name,
18 form_data = fixture._get_group_create_params(group_name=group_name,
26 19 group_desc=desc,
27 20 group_parent_id=parent_id)
28 21 gr = ReposGroupModel().update(id_, form_data)
29 22 return gr
30 23
31 24
32 def _make_repo(name, **kwargs):
33 form_data = _get_repo_create_params(repo_name=name, **kwargs)
34 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
35 r = RepoModel().create(form_data, cur_user)
36 return r
37
38
39 25 def _update_repo(name, **kwargs):
40 form_data = _get_repo_create_params(**kwargs)
26 form_data = fixture._get_repo_create_params(**kwargs)
41 27 if not 'repo_name' in kwargs:
42 28 form_data['repo_name'] = name
43 29 if not 'perms_new' in kwargs:
44 30 form_data['perms_new'] = []
45 31 if not 'perms_updates' in kwargs:
46 32 form_data['perms_updates'] = []
47 33 r = RepoModel().update(name, **form_data)
48 34 return r
49 35
50 36
51 37 class TestReposGroups(unittest.TestCase):
52 38
53 39 def setUp(self):
54 self.g1 = _make_group('test1', skip_if_exists=True)
55 Session().commit()
56 self.g2 = _make_group('test2', skip_if_exists=True)
57 Session().commit()
58 self.g3 = _make_group('test3', skip_if_exists=True)
59 Session().commit()
40 self.g1 = fixture.create_group('test1', skip_if_exists=True)
41 self.g2 = fixture.create_group('test2', skip_if_exists=True)
42 self.g3 = fixture.create_group('test3', skip_if_exists=True)
60 43
61 44 def tearDown(self):
62 45 Session.remove()
63 46
64 47 def __check_path(self, *path):
65 48 """
66 49 Checks the path for existance !
67 50 """
68 51 path = [TESTS_TMP_PATH] + list(path)
69 52 path = os.path.join(*path)
70 53 return os.path.isdir(path)
71 54
72 55 def _check_folders(self):
73 56 print os.listdir(TESTS_TMP_PATH)
74 57
75 58 def __delete_group(self, id_):
76 59 ReposGroupModel().delete(id_)
77 60
78 61 def test_create_group(self):
79 g = _make_group('newGroup')
62 g = fixture.create_group('newGroup')
80 63 Session().commit()
81 64 self.assertEqual(g.full_path, 'newGroup')
82 65
83 66 self.assertTrue(self.__check_path('newGroup'))
84 67
85 68 def test_create_same_name_group(self):
86 self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
69 self.assertRaises(IntegrityError, lambda: fixture.create_group('newGroup'))
87 70 Session().rollback()
88 71
89 72 def test_same_subgroup(self):
90 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
91 Session().commit()
73 sg1 = fixture.create_group('sub1', group_parent_id=self.g1.group_id)
92 74 self.assertEqual(sg1.parent_group, self.g1)
93 75 self.assertEqual(sg1.full_path, 'test1/sub1')
94 76 self.assertTrue(self.__check_path('test1', 'sub1'))
95 77
96 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
97 Session().commit()
78 ssg1 = fixture.create_group('subsub1', group_parent_id=sg1.group_id)
98 79 self.assertEqual(ssg1.parent_group, sg1)
99 80 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
100 81 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
101 82
102 83 def test_remove_group(self):
103 sg1 = _make_group('deleteme')
104 Session().commit()
84 sg1 = fixture.create_group('deleteme')
105 85 self.__delete_group(sg1.group_id)
106 86
107 87 self.assertEqual(RepoGroup.get(sg1.group_id), None)
108 88 self.assertFalse(self.__check_path('deteteme'))
109 89
110 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
111 Session().commit()
90 sg1 = fixture.create_group('deleteme', group_parent_id=self.g1.group_id)
112 91 self.__delete_group(sg1.group_id)
113 92
114 93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
115 94 self.assertFalse(self.__check_path('test1', 'deteteme'))
116 95
117 96 def test_rename_single_group(self):
118 sg1 = _make_group('initial')
119 Session().commit()
97 sg1 = fixture.create_group('initial')
120 98
121 99 new_sg1 = _update_group(sg1.group_id, 'after')
122 100 self.assertTrue(self.__check_path('after'))
123 101 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
124 102
125 103 def test_update_group_parent(self):
126 104
127 sg1 = _make_group('initial', parent_id=self.g1.group_id)
128 Session().commit()
105 sg1 = fixture.create_group('initial', group_parent_id=self.g1.group_id)
129 106
130 107 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
131 108 self.assertTrue(self.__check_path('test1', 'after'))
132 109 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
133 110
134 111 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
135 112 self.assertTrue(self.__check_path('test3', 'after'))
136 113 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
137 114
138 115 new_sg1 = _update_group(sg1.group_id, 'hello')
139 116 self.assertTrue(self.__check_path('hello'))
140 117
141 118 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
142 119
143 120 def test_subgrouping_with_repo(self):
144 121
145 g1 = _make_group('g1')
146 g2 = _make_group('g2')
147 Session().commit()
122 g1 = fixture.create_group('g1')
123 g2 = fixture.create_group('g2')
148 124 # create new repo
149 r = _make_repo('john')
150 Session().commit()
125 r = fixture.create_repo('john')
126
151 127 self.assertEqual(r.repo_name, 'john')
152 128 # put repo into group
153 129 r = _update_repo('john', repo_group=g1.group_id)
154 130 Session().commit()
155 131 self.assertEqual(r.repo_name, 'g1/john')
156 132
157 133 _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
158 134 self.assertTrue(self.__check_path('g2', 'g1'))
159 135
160 136 # test repo
161 137 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
162 138 r.just_name]))
163 139
164 140 def test_move_to_root(self):
165 g1 = _make_group('t11')
166 Session().commit()
167 g2 = _make_group('t22', parent_id=g1.group_id)
168 Session().commit()
141 g1 = fixture.create_group('t11')
142 g2 = fixture.create_group('t22', group_parent_id=g1.group_id)
169 143
170 144 self.assertEqual(g2.full_path, 't11/t22')
171 145 self.assertTrue(self.__check_path('t11', 't22'))
172 146
173 147 g2 = _update_group(g2.group_id, 'g22', parent_id=None)
174 148 Session().commit()
175 149
176 150 self.assertEqual(g2.group_name, 'g22')
177 151 # we moved out group from t1 to '' so it's full path should be 'g2'
178 152 self.assertEqual(g2.full_path, 'g22')
179 153 self.assertFalse(self.__check_path('t11', 't22'))
180 154 self.assertTrue(self.__check_path('g22'))
181 155
182 156 def test_rename_top_level_group_in_nested_setup(self):
183 g1 = _make_group('L1')
184 Session().commit()
185 g2 = _make_group('L2', parent_id=g1.group_id)
186 Session().commit()
187 g3 = _make_group('L3', parent_id=g2.group_id)
188 Session().commit()
157 g1 = fixture.create_group('L1')
158 g2 = fixture.create_group('L2', group_parent_id=g1.group_id)
159 g3 = fixture.create_group('L3', group_parent_id=g2.group_id)
189 160
190 r = _make_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
191 Session().commit()
161 r = fixture.create_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
192 162
193 163 ##rename L1 all groups should be now changed
194 164 _update_group(g1.group_id, 'L1_NEW')
195 165 Session().commit()
196 166 self.assertEqual(g1.full_path, 'L1_NEW')
197 167 self.assertEqual(g2.full_path, 'L1_NEW/L2')
198 168 self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
199 169 self.assertEqual(r.repo_name, 'L1_NEW/L2/L3/L3_REPO')
200 170
201 171 def test_change_parent_of_top_level_group_in_nested_setup(self):
202 g1 = _make_group('R1')
203 Session().commit()
204 g2 = _make_group('R2', parent_id=g1.group_id)
205 Session().commit()
206 g3 = _make_group('R3', parent_id=g2.group_id)
207 Session().commit()
172 g1 = fixture.create_group('R1')
173 g2 = fixture.create_group('R2', group_parent_id=g1.group_id)
174 g3 = fixture.create_group('R3', group_parent_id=g2.group_id)
175 g4 = fixture.create_group('R1_NEW')
208 176
209 g4 = _make_group('R1_NEW')
210 Session().commit()
211
212 r = _make_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
213 Session().commit()
177 r = fixture.create_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
214 178 ##rename L1 all groups should be now changed
215 179 _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
216 180 Session().commit()
217 181 self.assertEqual(g1.full_path, 'R1_NEW/R1')
218 182 self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
219 183 self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
220 184 self.assertEqual(r.repo_name, 'R1_NEW/R1/R2/R3/R3_REPO')
221 185
222 186 def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
223 g1 = _make_group('X1')
224 Session().commit()
225 g2 = _make_group('X2', parent_id=g1.group_id)
226 Session().commit()
227 g3 = _make_group('X3', parent_id=g2.group_id)
228 Session().commit()
187 g1 = fixture.create_group('X1')
188 g2 = fixture.create_group('X2', group_parent_id=g1.group_id)
189 g3 = fixture.create_group('X3', group_parent_id=g2.group_id)
190 g4 = fixture.create_group('X1_NEW')
229 191
230 g4 = _make_group('X1_NEW')
231 Session().commit()
232
233 r = _make_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
234 Session().commit()
192 r = fixture.create_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
235 193
236 194 ##rename L1 all groups should be now changed
237 195 _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
238 196 Session().commit()
239 197 self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
240 198 self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
241 199 self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
242 200 self.assertEqual(r.repo_name, 'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
@@ -1,124 +1,128 b''
1 1 import unittest
2 2 from rhodecode.tests import *
3 3
4 4 from rhodecode.model.db import User, UserGroup, UserGroupMember, UserEmailMap,\
5 5 Permission
6 6 from rhodecode.model.user import UserModel
7 7
8 8 from rhodecode.model.meta import Session
9 9 from rhodecode.model.users_group import UserGroupModel
10 10
11 11
12 12 class TestUser(unittest.TestCase):
13 13 def __init__(self, methodName='runTest'):
14 14 Session.remove()
15 15 super(TestUser, self).__init__(methodName=methodName)
16 16
17 def tearDown(self):
18 Session.remove()
19
17 20 def test_create_and_remove(self):
18 21 usr = UserModel().create_or_update(username=u'test_user',
19 22 password=u'qweqwe',
20 23 email=u'u232@rhodecode.org',
21 24 firstname=u'u1', lastname=u'u1')
22 25 Session().commit()
23 26 self.assertEqual(User.get_by_username(u'test_user'), usr)
24 27
25 28 # make user group
26 29 users_group = UserGroupModel().create('some_example_group')
27 30 Session().commit()
28 31
29 32 UserGroupModel().add_user_to_group(users_group, usr)
30 33 Session().commit()
31 34
32 35 self.assertEqual(UserGroup.get(users_group.users_group_id), users_group)
33 36 self.assertEqual(UserGroupMember.query().count(), 1)
34 37 UserModel().delete(usr.user_id)
35 38 Session().commit()
36 39
37 40 self.assertEqual(UserGroupMember.query().all(), [])
38 41
39 42 def test_additonal_email_as_main(self):
40 43 usr = UserModel().create_or_update(username=u'test_user',
41 44 password=u'qweqwe',
42 45 email=u'main_email@rhodecode.org',
43 46 firstname=u'u1', lastname=u'u1')
44 47 Session().commit()
45 48
46 49 def do():
47 50 m = UserEmailMap()
48 51 m.email = u'main_email@rhodecode.org'
49 52 m.user = usr
50 53 Session().add(m)
51 54 Session().commit()
52 55 self.assertRaises(AttributeError, do)
53 56
54 57 UserModel().delete(usr.user_id)
55 58 Session().commit()
56 59
57 60 def test_extra_email_map(self):
58 61 usr = UserModel().create_or_update(username=u'test_user',
59 62 password=u'qweqwe',
60 63 email=u'main_email@rhodecode.org',
61 64 firstname=u'u1', lastname=u'u1')
62 65 Session().commit()
63 66
64 67 m = UserEmailMap()
65 68 m.email = u'main_email2@rhodecode.org'
66 69 m.user = usr
67 70 Session().add(m)
68 71 Session().commit()
69 72
70 73 u = User.get_by_email(email='main_email@rhodecode.org')
71 74 self.assertEqual(usr.user_id, u.user_id)
72 75 self.assertEqual(usr.username, u.username)
73 76
74 77 u = User.get_by_email(email='main_email2@rhodecode.org')
75 78 self.assertEqual(usr.user_id, u.user_id)
76 79 self.assertEqual(usr.username, u.username)
77 80 u = User.get_by_email(email='main_email3@rhodecode.org')
78 81 self.assertEqual(None, u)
79 82
80 83 UserModel().delete(usr.user_id)
81 84 Session().commit()
82 85
83 86
84 87 class TestUsers(unittest.TestCase):
85 88
86 89 def __init__(self, methodName='runTest'):
87 90 super(TestUsers, self).__init__(methodName=methodName)
88 91
89 92 def setUp(self):
90 93 self.u1 = UserModel().create_or_update(username=u'u1',
91 94 password=u'qweqwe',
92 95 email=u'u1@rhodecode.org',
93 96 firstname=u'u1', lastname=u'u1')
94 97
95 98 def tearDown(self):
96 99 perm = Permission.query().all()
97 100 for p in perm:
98 101 UserModel().revoke_perm(self.u1, p)
99 102
100 103 UserModel().delete(self.u1)
101 104 Session().commit()
105 Session.remove()
102 106
103 107 def test_add_perm(self):
104 108 perm = Permission.query().all()[0]
105 109 UserModel().grant_perm(self.u1, perm)
106 110 Session().commit()
107 111 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
108 112
109 113 def test_has_perm(self):
110 114 perm = Permission.query().all()
111 115 for p in perm:
112 116 has_p = UserModel().has_perm(self.u1, p)
113 117 self.assertEqual(False, has_p)
114 118
115 119 def test_revoke_perm(self):
116 120 perm = Permission.query().all()[0]
117 121 UserModel().grant_perm(self.u1, perm)
118 122 Session().commit()
119 123 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
120 124
121 125 #revoke
122 126 UserModel().revoke_perm(self.u1, perm)
123 127 Session().commit()
124 128 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
@@ -1,249 +1,248 b''
1 1 # -*- coding: utf-8 -*-
2 2 import unittest
3 3 import formencode
4 4
5 5 from rhodecode.tests import *
6 6
7 7 from rhodecode.model import validators as v
8 8 from rhodecode.model.users_group import UserGroupModel
9 9
10 10 from rhodecode.model.meta import Session
11 11 from rhodecode.model.repos_group import ReposGroupModel
12 12 from rhodecode.config.routing import ADMIN_PREFIX
13 13 from rhodecode.model.db import ChangesetStatus, Repository
14 14 from rhodecode.model.changeset_status import ChangesetStatusModel
15 from rhodecode.model.comment import ChangesetCommentsModel
16 15
17 16
18 17 class TestReposGroups(unittest.TestCase):
19 18
20 19 def setUp(self):
21 20 pass
22 21
23 22 def tearDown(self):
24 pass
23 Session.remove()
25 24
26 25 def test_Message_extractor(self):
27 26 validator = v.ValidUsername()
28 27 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
29 28
30 29 class StateObj(object):
31 30 pass
32 31
33 32 self.assertRaises(formencode.Invalid,
34 33 validator.to_python, 'default', StateObj)
35 34
36 35 def test_ValidUsername(self):
37 36 validator = v.ValidUsername()
38 37
39 38 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
40 39 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
41 40 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
42 41 self.assertRaises(formencode.Invalid, validator.to_python,
43 42 TEST_USER_ADMIN_LOGIN)
44 43 self.assertEqual('test', validator.to_python('test'))
45 44
46 45 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
47 46
48 47 def test_ValidRepoUser(self):
49 48 validator = v.ValidRepoUser()
50 49 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
51 50 self.assertEqual(TEST_USER_ADMIN_LOGIN,
52 51 validator.to_python(TEST_USER_ADMIN_LOGIN))
53 52
54 53 def test_ValidUserGroup(self):
55 54 validator = v.ValidUserGroup()
56 55 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
57 56 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
58 57
59 58 gr = UserGroupModel().create('test')
60 59 gr2 = UserGroupModel().create('tes2')
61 60 Session.commit()
62 61 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
63 62 assert gr.users_group_id != None
64 63 validator = v.ValidUserGroup(edit=True,
65 64 old_data={'users_group_id':
66 65 gr2.users_group_id})
67 66
68 67 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
69 68 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
70 69 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
71 70 UserGroupModel().delete(gr)
72 71 UserGroupModel().delete(gr2)
73 72 Session.commit()
74 73
75 74 def test_ValidReposGroup(self):
76 75 validator = v.ValidReposGroup()
77 76 model = ReposGroupModel()
78 77 self.assertRaises(formencode.Invalid, validator.to_python,
79 78 {'group_name': HG_REPO, })
80 79 gr = model.create(group_name='test_gr', group_description='desc',
81 80 parent=None,
82 81 just_db=True,
83 82 owner=TEST_USER_ADMIN_LOGIN)
84 83 self.assertRaises(formencode.Invalid,
85 84 validator.to_python, {'group_name': gr.group_name, })
86 85
87 86 validator = v.ValidReposGroup(edit=True,
88 87 old_data={'group_id': gr.group_id})
89 88 self.assertRaises(formencode.Invalid,
90 89 validator.to_python, {
91 90 'group_name': gr.group_name + 'n',
92 91 'group_parent_id': gr.group_id
93 92 })
94 93 model.delete(gr)
95 94
96 95 def test_ValidPassword(self):
97 96 validator = v.ValidPassword()
98 97 self.assertEqual('lol', validator.to_python('lol'))
99 98 self.assertEqual(None, validator.to_python(None))
100 99 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
101 100
102 101 def test_ValidPasswordsMatch(self):
103 102 validator = v.ValidPasswordsMatch()
104 103 self.assertRaises(formencode.Invalid,
105 104 validator.to_python, {'password': 'pass',
106 105 'password_confirmation': 'pass2'})
107 106
108 107 self.assertRaises(formencode.Invalid,
109 108 validator.to_python, {'new_password': 'pass',
110 109 'password_confirmation': 'pass2'})
111 110
112 111 self.assertEqual({'new_password': 'pass',
113 112 'password_confirmation': 'pass'},
114 113 validator.to_python({'new_password': 'pass',
115 114 'password_confirmation': 'pass'}))
116 115
117 116 self.assertEqual({'password': 'pass',
118 117 'password_confirmation': 'pass'},
119 118 validator.to_python({'password': 'pass',
120 119 'password_confirmation': 'pass'}))
121 120
122 121 def test_ValidAuth(self):
123 122 validator = v.ValidAuth()
124 123 valid_creds = {
125 124 'username': TEST_USER_REGULAR2_LOGIN,
126 125 'password': TEST_USER_REGULAR2_PASS,
127 126 }
128 127 invalid_creds = {
129 128 'username': 'err',
130 129 'password': 'err',
131 130 }
132 131 self.assertEqual(valid_creds, validator.to_python(valid_creds))
133 132 self.assertRaises(formencode.Invalid,
134 133 validator.to_python, invalid_creds)
135 134
136 135 def test_ValidAuthToken(self):
137 136 validator = v.ValidAuthToken()
138 137 # this is untestable without a threadlocal
139 138 # self.assertRaises(formencode.Invalid,
140 139 # validator.to_python, 'BadToken')
141 140 validator
142 141
143 142 def test_ValidRepoName(self):
144 143 validator = v.ValidRepoName()
145 144
146 145 self.assertRaises(formencode.Invalid,
147 146 validator.to_python, {'repo_name': ''})
148 147
149 148 self.assertRaises(formencode.Invalid,
150 149 validator.to_python, {'repo_name': HG_REPO})
151 150
152 151 gr = ReposGroupModel().create(group_name='group_test',
153 152 group_description='desc',
154 153 parent=None,
155 154 owner=TEST_USER_ADMIN_LOGIN)
156 155 self.assertRaises(formencode.Invalid,
157 156 validator.to_python, {'repo_name': gr.group_name})
158 157
159 158 #TODO: write an error case for that ie. create a repo withinh a group
160 159 # self.assertRaises(formencode.Invalid,
161 160 # validator.to_python, {'repo_name': 'some',
162 161 # 'repo_group': gr.group_id})
163 162
164 163 def test_ValidForkName(self):
165 164 # this uses ValidRepoName validator
166 165 assert True
167 166
168 167 @parameterized.expand([
169 168 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
170 169 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
171 170 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
172 171 ('/]re po', 're-po')])
173 172 def test_SlugifyName(self, name, expected):
174 173 validator = v.SlugifyName()
175 174 self.assertEqual(expected, validator.to_python(name))
176 175
177 176 def test_ValidCloneUri(self):
178 177 #TODO: write this one
179 178 pass
180 179
181 180 def test_ValidForkType(self):
182 181 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
183 182 self.assertEqual('hg', validator.to_python('hg'))
184 183 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
185 184
186 185 def test_ValidPerms(self):
187 186 #TODO: write this one
188 187 pass
189 188
190 189 def test_ValidSettings(self):
191 190 validator = v.ValidSettings()
192 191 self.assertEqual({'pass': 'pass'},
193 192 validator.to_python(value={'user': 'test',
194 193 'pass': 'pass'}))
195 194
196 195 self.assertEqual({'user2': 'test', 'pass': 'pass'},
197 196 validator.to_python(value={'user2': 'test',
198 197 'pass': 'pass'}))
199 198
200 199 def test_ValidPath(self):
201 200 validator = v.ValidPath()
202 201 self.assertEqual(TESTS_TMP_PATH,
203 202 validator.to_python(TESTS_TMP_PATH))
204 203 self.assertRaises(formencode.Invalid, validator.to_python,
205 204 '/no_such_dir')
206 205
207 206 def test_UniqSystemEmail(self):
208 207 validator = v.UniqSystemEmail(old_data={})
209 208
210 209 self.assertEqual('mail@python.org',
211 210 validator.to_python('MaiL@Python.org'))
212 211
213 212 email = TEST_USER_REGULAR2_EMAIL
214 213 self.assertRaises(formencode.Invalid, validator.to_python, email)
215 214
216 215 def test_ValidSystemEmail(self):
217 216 validator = v.ValidSystemEmail()
218 217 email = TEST_USER_REGULAR2_EMAIL
219 218
220 219 self.assertEqual(email, validator.to_python(email))
221 220 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
222 221
223 222 def test_LdapLibValidator(self):
224 223 validator = v.LdapLibValidator()
225 224 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
226 225
227 226 def test_AttrLoginValidator(self):
228 227 validator = v.AttrLoginValidator()
229 228 self.assertRaises(formencode.Invalid, validator.to_python, 123)
230 229
231 230 def test_NotReviewedRevisions(self):
232 231 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
233 232 validator = v.NotReviewedRevisions(repo_id)
234 233 rev = '0' * 40
235 234 # add status for a rev, that should throw an error because it is already
236 235 # reviewed
237 236 new_status = ChangesetStatus()
238 237 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
239 238 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
240 239 new_status.status = ChangesetStatus.STATUS_APPROVED
241 240 new_status.comment = None
242 241 new_status.revision = rev
243 242 Session().add(new_status)
244 243 Session().commit()
245 244 try:
246 245 self.assertRaises(formencode.Invalid, validator.to_python, [rev])
247 246 finally:
248 247 Session().delete(new_status)
249 248 Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now