##// END OF EJS Templates
unified flash msg tests
marcink -
r3640:4f80df0d beta
parent child Browse files
Show More
@@ -1,373 +1,371 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 import os
4 4 import urllib
5 5
6 6 from rhodecode.lib import vcs
7 7 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
8 8 Permission
9 9 from rhodecode.tests import *
10 10 from rhodecode.model.repos_group import ReposGroupModel
11 11 from rhodecode.model.repo import RepoModel
12 12 from rhodecode.model.meta import Session
13 13
14 14
15 15 def _get_permission_for_user(user, repo):
16 16 perm = UserRepoToPerm.query()\
17 17 .filter(UserRepoToPerm.repository ==
18 18 Repository.get_by_repo_name(repo))\
19 19 .filter(UserRepoToPerm.user == User.get_by_username(user))\
20 20 .all()
21 21 return perm
22 22
23 23
24 24 class TestAdminReposController(TestController):
25 25
26 26 def __make_repo(self):
27 27 pass
28 28
29 29 def test_index(self):
30 30 self.log_user()
31 31 response = self.app.get(url('repos'))
32 32 # Test response...
33 33
34 34 def test_index_as_xml(self):
35 35 response = self.app.get(url('formatted_repos', format='xml'))
36 36
37 37 def test_create_hg(self):
38 38 self.log_user()
39 39 repo_name = NEW_HG_REPO
40 40 description = 'description for newly created repo'
41 41 response = self.app.post(url('repos'),
42 42 _get_repo_create_params(repo_private=False,
43 43 repo_name=repo_name,
44 44 repo_description=description))
45 45 self.checkSessionFlash(response,
46 46 'Created repository <a href="/%s">%s</a>'
47 47 % (repo_name, repo_name))
48 48
49 49 #test if the repo was created in the database
50 50 new_repo = self.Session().query(Repository)\
51 51 .filter(Repository.repo_name == repo_name).one()
52 52
53 53 self.assertEqual(new_repo.repo_name, repo_name)
54 54 self.assertEqual(new_repo.description, description)
55 55
56 56 #test if repository is visible in the list ?
57 57 response = response.follow()
58 58
59 59 response.mustcontain(repo_name)
60 60
61 61 #test if repository was created on filesystem
62 62 try:
63 63 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
64 64 except Exception:
65 65 self.fail('no repo %s in filesystem' % repo_name)
66 66
67 67 def test_create_hg_non_ascii(self):
68 68 self.log_user()
69 69 non_ascii = "Δ…Δ™Ε‚"
70 70 repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
71 71 repo_name_unicode = repo_name.decode('utf8')
72 72 description = 'description for newly created repo' + non_ascii
73 73 description_unicode = description.decode('utf8')
74 74 private = False
75 75 response = self.app.post(url('repos'),
76 76 _get_repo_create_params(repo_private=False,
77 77 repo_name=repo_name,
78 78 repo_description=description))
79 79 self.checkSessionFlash(response,
80 80 u'Created repository <a href="/%s">%s</a>'
81 81 % (urllib.quote(repo_name), repo_name_unicode))
82 82 #test if the repo was created in the database
83 83 new_repo = self.Session().query(Repository)\
84 84 .filter(Repository.repo_name == repo_name_unicode).one()
85 85
86 86 self.assertEqual(new_repo.repo_name, repo_name_unicode)
87 87 self.assertEqual(new_repo.description, description_unicode)
88 88
89 89 #test if repository is visible in the list ?
90 90 response = response.follow()
91 91
92 92 response.mustcontain(repo_name)
93 93
94 94 #test if repository was created on filesystem
95 95 try:
96 96 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
97 97 except Exception:
98 98 self.fail('no repo %s in filesystem' % repo_name)
99 99
100 100 def test_create_hg_in_group(self):
101 101 self.log_user()
102 102
103 103 ## create GROUP
104 104 group_name = 'sometest'
105 105 gr = ReposGroupModel().create(group_name=group_name,
106 106 group_description='test',
107 107 owner=TEST_USER_ADMIN_LOGIN)
108 108 self.Session().commit()
109 109
110 110 repo_name = 'ingroup'
111 111 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
112 112 description = 'description for newly created repo'
113 113 response = self.app.post(url('repos'),
114 114 _get_repo_create_params(repo_private=False,
115 115 repo_name=repo_name,
116 116 repo_description=description,
117 117 repo_group=gr.group_id,))
118 118
119 119 self.checkSessionFlash(response,
120 120 'Created repository <a href="/%s">%s</a>'
121 121 % (repo_name, repo_name))
122 122 #test if the repo was created in the database
123 123 new_repo = self.Session().query(Repository)\
124 124 .filter(Repository.repo_name == repo_name_full).one()
125 125
126 126 self.assertEqual(new_repo.repo_name, repo_name_full)
127 127 self.assertEqual(new_repo.description, description)
128 128
129 129 #test if repository is visible in the list ?
130 130 response = response.follow()
131 131
132 132 response.mustcontain(repo_name_full)
133 133
134 134 #test if repository was created on filesystem
135 135 try:
136 136 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
137 137 except Exception:
138 138 ReposGroupModel().delete(group_name)
139 139 self.Session().commit()
140 140 self.fail('no repo %s in filesystem' % repo_name)
141 141
142 142 RepoModel().delete(repo_name_full)
143 143 ReposGroupModel().delete(group_name)
144 144 self.Session().commit()
145 145
146 146 def test_create_git(self):
147 147 self.log_user()
148 148 repo_name = NEW_GIT_REPO
149 149 description = 'description for newly created repo'
150 150
151 151 response = self.app.post(url('repos'),
152 152 _get_repo_create_params(repo_private=False,
153 153 repo_type='git',
154 154 repo_name=repo_name,
155 155 repo_description=description))
156 156 self.checkSessionFlash(response,
157 157 'Created repository <a href="/%s">%s</a>'
158 158 % (repo_name, repo_name))
159 159
160 160 #test if the repo was created in the database
161 161 new_repo = self.Session().query(Repository)\
162 162 .filter(Repository.repo_name == repo_name).one()
163 163
164 164 self.assertEqual(new_repo.repo_name, repo_name)
165 165 self.assertEqual(new_repo.description, description)
166 166
167 167 #test if repository is visible in the list ?
168 168 response = response.follow()
169 169
170 170 response.mustcontain(repo_name)
171 171
172 172 #test if repository was created on filesystem
173 173 try:
174 174 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
175 175 except Exception:
176 176 self.fail('no repo %s in filesystem' % repo_name)
177 177
178 178 def test_create_git_non_ascii(self):
179 179 self.log_user()
180 180 non_ascii = "Δ…Δ™Ε‚"
181 181 repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
182 182 repo_name_unicode = repo_name.decode('utf8')
183 183 description = 'description for newly created repo' + non_ascii
184 184 description_unicode = description.decode('utf8')
185 185 private = False
186 186 response = self.app.post(url('repos'),
187 187 _get_repo_create_params(repo_private=False,
188 188 repo_type='git',
189 189 repo_name=repo_name,
190 190 repo_description=description))
191 191
192 192 self.checkSessionFlash(response,
193 193 u'Created repository <a href="/%s">%s</a>'
194 194 % (urllib.quote(repo_name), repo_name_unicode))
195 195
196 196 #test if the repo was created in the database
197 197 new_repo = self.Session().query(Repository)\
198 198 .filter(Repository.repo_name == repo_name_unicode).one()
199 199
200 200 self.assertEqual(new_repo.repo_name, repo_name_unicode)
201 201 self.assertEqual(new_repo.description, description_unicode)
202 202
203 203 #test if repository is visible in the list ?
204 204 response = response.follow()
205 205
206 206 response.mustcontain(repo_name)
207 207
208 208 #test if repository was created on filesystem
209 209 try:
210 210 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
211 211 except Exception:
212 212 self.fail('no repo %s in filesystem' % repo_name)
213 213
214 214 def test_update(self):
215 215 response = self.app.put(url('repo', repo_name=HG_REPO))
216 216
217 217 def test_update_browser_fakeout(self):
218 218 response = self.app.post(url('repo', repo_name=HG_REPO),
219 219 params=dict(_method='put'))
220 220
221 221 def test_delete_hg(self):
222 222 self.log_user()
223 223 repo_name = 'vcs_test_new_to_delete'
224 224 description = 'description for newly created repo'
225 225 response = self.app.post(url('repos'),
226 226 _get_repo_create_params(repo_private=False,
227 227 repo_type='hg',
228 228 repo_name=repo_name,
229 229 repo_description=description))
230 230
231 231 self.checkSessionFlash(response,
232 232 'Created repository <a href="/%s">%s</a>'
233 233 % (repo_name, repo_name))
234 234 #test if the repo was created in the database
235 235 new_repo = self.Session().query(Repository)\
236 236 .filter(Repository.repo_name == repo_name).one()
237 237
238 238 self.assertEqual(new_repo.repo_name, repo_name)
239 239 self.assertEqual(new_repo.description, description)
240 240
241 241 #test if repository is visible in the list ?
242 242 response = response.follow()
243 243
244 244 response.mustcontain(repo_name)
245 245
246 246 #test if repository was created on filesystem
247 247 try:
248 248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
249 249 except Exception:
250 250 self.fail('no repo %s in filesystem' % repo_name)
251 251
252 252 response = self.app.delete(url('repo', repo_name=repo_name))
253 253
254 self.assertTrue('''Deleted repository %s''' % (repo_name) in
255 response.session['flash'][0])
254 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
256 255
257 256 response.follow()
258 257
259 258 #check if repo was deleted from db
260 259 deleted_repo = self.Session().query(Repository)\
261 260 .filter(Repository.repo_name == repo_name).scalar()
262 261
263 262 self.assertEqual(deleted_repo, None)
264 263
265 264 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
266 265 False)
267 266
268 267 def test_delete_git(self):
269 268 self.log_user()
270 269 repo_name = 'vcs_test_new_to_delete'
271 270 description = 'description for newly created repo'
272 271 private = False
273 272 response = self.app.post(url('repos'),
274 273 _get_repo_create_params(repo_private=False,
275 274 repo_type='git',
276 275 repo_name=repo_name,
277 276 repo_description=description))
278 277
279 278 self.checkSessionFlash(response,
280 279 'Created repository <a href="/%s">%s</a>'
281 280 % (repo_name, repo_name))
282 281 #test if the repo was created in the database
283 282 new_repo = self.Session().query(Repository)\
284 283 .filter(Repository.repo_name == repo_name).one()
285 284
286 285 self.assertEqual(new_repo.repo_name, repo_name)
287 286 self.assertEqual(new_repo.description, description)
288 287
289 288 #test if repository is visible in the list ?
290 289 response = response.follow()
291 290
292 291 response.mustcontain(repo_name)
293 292
294 293 #test if repository was created on filesystem
295 294 try:
296 295 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
297 296 except Exception:
298 297 self.fail('no repo %s in filesystem' % repo_name)
299 298
300 299 response = self.app.delete(url('repo', repo_name=repo_name))
301 300
302 self.assertTrue('''Deleted repository %s''' % (repo_name) in
303 response.session['flash'][0])
301 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
304 302
305 303 response.follow()
306 304
307 305 #check if repo was deleted from db
308 306 deleted_repo = self.Session().query(Repository)\
309 307 .filter(Repository.repo_name == repo_name).scalar()
310 308
311 309 self.assertEqual(deleted_repo, None)
312 310
313 311 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
314 312 False)
315 313
316 314 def test_delete_repo_with_group(self):
317 315 #TODO:
318 316 pass
319 317
320 318 def test_delete_browser_fakeout(self):
321 319 response = self.app.post(url('repo', repo_name=HG_REPO),
322 320 params=dict(_method='delete'))
323 321
324 322 def test_show_hg(self):
325 323 self.log_user()
326 324 response = self.app.get(url('repo', repo_name=HG_REPO))
327 325
328 326 def test_show_git(self):
329 327 self.log_user()
330 328 response = self.app.get(url('repo', repo_name=GIT_REPO))
331 329
332 330
333 331 def test_edit(self):
334 332 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
335 333
336 334 def test_set_private_flag_sets_default_to_none(self):
337 335 self.log_user()
338 336 #initially repository perm should be read
339 337 perm = _get_permission_for_user(user='default', repo=HG_REPO)
340 338 self.assertTrue(len(perm), 1)
341 339 self.assertEqual(perm[0].permission.permission_name, 'repository.read')
342 340 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
343 341
344 342 response = self.app.put(url('repo', repo_name=HG_REPO),
345 343 _get_repo_create_params(repo_private=1,
346 344 repo_name=HG_REPO,
347 345 user=TEST_USER_ADMIN_LOGIN))
348 346 self.checkSessionFlash(response,
349 347 msg='Repository %s updated successfully' % (HG_REPO))
350 348 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True)
351 349
352 350 #now the repo default permission should be None
353 351 perm = _get_permission_for_user(user='default', repo=HG_REPO)
354 352 self.assertTrue(len(perm), 1)
355 353 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
356 354
357 355 response = self.app.put(url('repo', repo_name=HG_REPO),
358 356 _get_repo_create_params(repo_private=False,
359 357 repo_name=HG_REPO,
360 358 user=TEST_USER_ADMIN_LOGIN))
361 359 self.checkSessionFlash(response,
362 360 msg='Repository %s updated successfully' % (HG_REPO))
363 361 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
364 362
365 363 #we turn off private now the repo default permission should stay None
366 364 perm = _get_permission_for_user(user='default', repo=HG_REPO)
367 365 self.assertTrue(len(perm), 1)
368 366 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
369 367
370 368 #update this permission back
371 369 perm[0].permission = Permission.get_by_key('repository.read')
372 370 Session().add(perm[0])
373 371 Session().commit()
@@ -1,266 +1,265 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 from rhodecode.lib.auth import get_crypt_password, check_password
4 4 from rhodecode.model.db import User, RhodeCodeSetting, Repository
5 5 from rhodecode.tests import *
6 6 from rhodecode.lib import helpers as h
7 7 from rhodecode.model.user import UserModel
8 8 from rhodecode.model.scm import ScmModel
9 9
10 10
11 11 class TestAdminSettingsController(TestController):
12 12
13 13 def test_index(self):
14 14 response = self.app.get(url('admin_settings'))
15 15 # Test response...
16 16
17 17 def test_index_as_xml(self):
18 18 response = self.app.get(url('formatted_admin_settings', format='xml'))
19 19
20 20 def test_create(self):
21 21 response = self.app.post(url('admin_settings'))
22 22
23 23 def test_new(self):
24 24 response = self.app.get(url('admin_new_setting'))
25 25
26 26 def test_new_as_xml(self):
27 27 response = self.app.get(url('formatted_admin_new_setting', format='xml'))
28 28
29 29 def test_update(self):
30 30 response = self.app.put(url('admin_setting', setting_id=1))
31 31
32 32 def test_update_browser_fakeout(self):
33 33 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
34 34
35 35 def test_delete(self):
36 36 response = self.app.delete(url('admin_setting', setting_id=1))
37 37
38 38 def test_delete_browser_fakeout(self):
39 39 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
40 40
41 41 def test_show(self):
42 42 response = self.app.get(url('admin_setting', setting_id=1))
43 43
44 44 def test_show_as_xml(self):
45 45 response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
46 46
47 47 def test_edit(self):
48 48 response = self.app.get(url('admin_edit_setting', setting_id=1))
49 49
50 50 def test_edit_as_xml(self):
51 51 response = self.app.get(url('formatted_admin_edit_setting',
52 52 setting_id=1, format='xml'))
53 53
54 54 def test_ga_code_active(self):
55 55 self.log_user()
56 56 old_title = 'RhodeCode'
57 57 old_realm = 'RhodeCode authentication'
58 58 new_ga_code = 'ga-test-123456789'
59 59 response = self.app.post(url('admin_setting', setting_id='global'),
60 60 params=dict(
61 61 _method='put',
62 62 rhodecode_title=old_title,
63 63 rhodecode_realm=old_realm,
64 64 rhodecode_ga_code=new_ga_code
65 65 ))
66 66
67 67 self.checkSessionFlash(response, 'Updated application settings')
68 68
69 69 self.assertEqual(RhodeCodeSetting
70 70 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
71 71
72 72 response = response.follow()
73 73 response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
74 74
75 75 def test_ga_code_inactive(self):
76 76 self.log_user()
77 77 old_title = 'RhodeCode'
78 78 old_realm = 'RhodeCode authentication'
79 79 new_ga_code = ''
80 80 response = self.app.post(url('admin_setting', setting_id='global'),
81 81 params=dict(
82 82 _method='put',
83 83 rhodecode_title=old_title,
84 84 rhodecode_realm=old_realm,
85 85 rhodecode_ga_code=new_ga_code
86 86 ))
87 87
88 self.assertTrue('Updated application settings' in
89 response.session['flash'][0][1])
88 self.checkSessionFlash(response, 'Updated application settings')
90 89 self.assertEqual(RhodeCodeSetting
91 90 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
92 91
93 92 response = response.follow()
94 93 self.assertFalse("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
95 94 in response.body)
96 95
97 96 def test_title_change(self):
98 97 self.log_user()
99 98 old_title = 'RhodeCode'
100 99 new_title = old_title + '_changed'
101 100 old_realm = 'RhodeCode authentication'
102 101
103 102 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
104 103 response = self.app.post(url('admin_setting', setting_id='global'),
105 104 params=dict(
106 105 _method='put',
107 106 rhodecode_title=new_title,
108 107 rhodecode_realm=old_realm,
109 108 rhodecode_ga_code=''
110 109 ))
111 110
112 111 self.checkSessionFlash(response, 'Updated application settings')
113 112 self.assertEqual(RhodeCodeSetting
114 113 .get_app_settings()['rhodecode_title'],
115 114 new_title.decode('utf-8'))
116 115
117 116 response = response.follow()
118 117 response.mustcontain("""<h1><a href="/">%s</a></h1>""" % new_title)
119 118
120 119 def test_my_account(self):
121 120 self.log_user()
122 121 response = self.app.get(url('admin_settings_my_account'))
123 122
124 123 self.assertTrue('value="test_admin' in response.body)
125 124
126 125 @parameterized.expand([('firstname', 'new_username'),
127 126 ('lastname', 'new_username'),
128 127 ('admin', True),
129 128 ('admin', False),
130 129 ('ldap_dn', 'test'),
131 130 ('ldap_dn', None),
132 131 ('active', False),
133 132 ('active', True),
134 133 ('email', 'some@email.com'),
135 134 ])
136 135 def test_my_account_update(self, name, expected):
137 136 uname = 'testme'
138 137 usr = UserModel().create_or_update(username=uname, password='qweqwe',
139 138 email='testme@rhodecod.org')
140 139 self.Session().commit()
141 140 params = usr.get_api_data()
142 141 user_id = usr.user_id
143 142 self.log_user(username=uname, password='qweqwe')
144 143 params.update({name: expected})
145 144 params.update({'password_confirmation': ''})
146 145 params.update({'new_password': ''})
147 146
148 147 try:
149 148 response = self.app.put(url('admin_settings_my_account_update',
150 149 id=user_id), params)
151 150
152 151 self.checkSessionFlash(response,
153 152 'Your account was updated successfully')
154 153
155 154 updated_user = User.get_by_username(uname)
156 155 updated_params = updated_user.get_api_data()
157 156 updated_params.update({'password_confirmation': ''})
158 157 updated_params.update({'new_password': ''})
159 158
160 159 params['last_login'] = updated_params['last_login']
161 160 if name == 'email':
162 161 params['emails'] = [expected]
163 162 if name == 'ldap_dn':
164 163 #cannot update this via form
165 164 params['ldap_dn'] = None
166 165 if name == 'active':
167 166 #my account cannot deactivate account
168 167 params['active'] = True
169 168 if name == 'admin':
170 169 #my account cannot make you an admin !
171 170 params['admin'] = False
172 171
173 172 self.assertEqual(params, updated_params)
174 173
175 174 finally:
176 175 UserModel().delete('testme')
177 176
178 177 def test_my_account_update_err_email_exists(self):
179 178 self.log_user()
180 179
181 180 new_email = 'test_regular@mail.com' # already exisitn email
182 181 response = self.app.put(url('admin_settings_my_account_update'),
183 182 params=dict(
184 183 username='test_admin',
185 184 new_password='test12',
186 185 password_confirmation='test122',
187 186 firstname='NewName',
188 187 lastname='NewLastname',
189 188 email=new_email,)
190 189 )
191 190
192 191 response.mustcontain('This e-mail address is already taken')
193 192
194 193 def test_my_account_update_err(self):
195 194 self.log_user('test_regular2', 'test12')
196 195
197 196 new_email = 'newmail.pl'
198 197 response = self.app.post(url('admin_settings_my_account_update'),
199 198 params=dict(
200 199 _method='put',
201 200 username='test_admin',
202 201 new_password='test12',
203 202 password_confirmation='test122',
204 203 firstname='NewName',
205 204 lastname='NewLastname',
206 205 email=new_email,)
207 206 )
208 207
209 208 response.mustcontain('An email address must contain a single @')
210 209 from rhodecode.model import validators
211 210 msg = validators.ValidUsername(edit=False,
212 211 old_data={})._messages['username_exists']
213 212 msg = h.html_escape(msg % {'username': 'test_admin'})
214 213 response.mustcontain(u"%s" % msg)
215 214
216 215 def test_set_repo_fork_has_no_self_id(self):
217 216 self.log_user()
218 217 repo = Repository.get_by_repo_name(HG_REPO)
219 218 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
220 219 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
221 220 assert opt not in response.body
222 221
223 222 def test_set_fork_of_repo(self):
224 223 self.log_user()
225 224 repo = Repository.get_by_repo_name(HG_REPO)
226 225 repo2 = Repository.get_by_repo_name(GIT_REPO)
227 226 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
228 227 params=dict(
229 228 id_fork_of=repo2.repo_id
230 229 ))
231 230 repo = Repository.get_by_repo_name(HG_REPO)
232 231 repo2 = Repository.get_by_repo_name(GIT_REPO)
233 232 self.checkSessionFlash(response,
234 233 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
235 234
236 235 assert repo.fork == repo2
237 236 response = response.follow()
238 237 # check if given repo is selected
239 238
240 239 opt = """<option value="%s" selected="selected">%s</option>""" % (
241 240 repo2.repo_id, repo2.repo_name)
242 241 response.mustcontain(opt)
243 242
244 243 # clean session flash
245 244 #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
246 245
247 246 ## mark it as None
248 247 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
249 248 params=dict(
250 249 id_fork_of=None
251 250 ))
252 251 repo = Repository.get_by_repo_name(HG_REPO)
253 252 repo2 = Repository.get_by_repo_name(GIT_REPO)
254 253 self.checkSessionFlash(response,
255 254 'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
256 255 assert repo.fork == None
257 256
258 257 def test_set_fork_of_same_repo(self):
259 258 self.log_user()
260 259 repo = Repository.get_by_repo_name(HG_REPO)
261 260 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
262 261 params=dict(
263 262 id_fork_of=repo.repo_id
264 263 ))
265 264 self.checkSessionFlash(response,
266 265 'An error occurred during this operation')
@@ -1,290 +1,289 b''
1 1 from sqlalchemy.orm.exc import NoResultFound
2 2
3 3 from rhodecode.tests import *
4 4 from rhodecode.model.db import User, Permission
5 5 from rhodecode.lib.auth import check_password
6 6 from rhodecode.model.user import UserModel
7 7 from rhodecode.model import validators
8 8 from rhodecode.lib import helpers as h
9 9 from rhodecode.model.meta import Session
10 10
11 11
12 12 class TestAdminUsersController(TestController):
13 13
14 14 def test_index(self):
15 15 self.log_user()
16 16 response = self.app.get(url('users'))
17 17 # Test response...
18 18
19 19 def test_index_as_xml(self):
20 20 response = self.app.get(url('formatted_users', format='xml'))
21 21
22 22 def test_create(self):
23 23 self.log_user()
24 24 username = 'newtestuser'
25 25 password = 'test12'
26 26 password_confirmation = password
27 27 name = 'name'
28 28 lastname = 'lastname'
29 29 email = 'mail@mail.com'
30 30
31 31 response = self.app.post(url('users'),
32 32 {'username': username,
33 33 'password': password,
34 34 'password_confirmation': password_confirmation,
35 35 'firstname': name,
36 36 'active': True,
37 37 'lastname': lastname,
38 38 'email': email})
39 39
40 40 self.checkSessionFlash(response, '''Created user %s''' % (username))
41 41
42 42 new_user = self.Session.query(User).\
43 43 filter(User.username == username).one()
44 44
45 45 self.assertEqual(new_user.username, username)
46 46 self.assertEqual(check_password(password, new_user.password), True)
47 47 self.assertEqual(new_user.name, name)
48 48 self.assertEqual(new_user.lastname, lastname)
49 49 self.assertEqual(new_user.email, email)
50 50
51 51 response.follow()
52 52 response = response.follow()
53 53 response.mustcontain("""newtestuser""")
54 54
55 55 def test_create_err(self):
56 56 self.log_user()
57 57 username = 'new_user'
58 58 password = ''
59 59 name = 'name'
60 60 lastname = 'lastname'
61 61 email = 'errmail.com'
62 62
63 63 response = self.app.post(url('users'), {'username': username,
64 64 'password': password,
65 65 'name': name,
66 66 'active': False,
67 67 'lastname': lastname,
68 68 'email': email})
69 69
70 70 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
71 71 msg = h.html_escape(msg % {'username': 'new_user'})
72 72 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
73 73 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
74 74 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
75 75
76 76 def get_user():
77 77 self.Session.query(User).filter(User.username == username).one()
78 78
79 79 self.assertRaises(NoResultFound, get_user), 'found user in database'
80 80
81 81 def test_new(self):
82 82 self.log_user()
83 83 response = self.app.get(url('new_user'))
84 84
85 85 def test_new_as_xml(self):
86 86 response = self.app.get(url('formatted_new_user', format='xml'))
87 87
88 88 @parameterized.expand([('firstname', 'new_username'),
89 89 ('lastname', 'new_username'),
90 90 ('admin', True),
91 91 ('admin', False),
92 92 ('ldap_dn', 'test'),
93 93 ('ldap_dn', None),
94 94 ('active', False),
95 95 ('active', True),
96 96 ('email', 'some@email.com'),
97 97 ])
98 98 def test_update(self, name, expected):
99 99 self.log_user()
100 100 uname = 'testme'
101 101 usr = UserModel().create_or_update(username=uname, password='qweqwe',
102 102 email='testme@rhodecod.org')
103 103 self.Session().commit()
104 104 params = usr.get_api_data()
105 105 params.update({name: expected})
106 106 params.update({'password_confirmation': ''})
107 107 params.update({'new_password': ''})
108 108 if name == 'email':
109 109 params['emails'] = [expected]
110 110 if name == 'ldap_dn':
111 111 #cannot update this via form
112 112 params['ldap_dn'] = None
113 113 try:
114 114 response = self.app.put(url('user', id=usr.user_id), params)
115 115
116 116 self.checkSessionFlash(response, '''User updated successfully''')
117 117
118 118 updated_user = User.get_by_username(uname)
119 119 updated_params = updated_user.get_api_data()
120 120 updated_params.update({'password_confirmation': ''})
121 121 updated_params.update({'new_password': ''})
122 122
123 123 self.assertEqual(params, updated_params)
124 124
125 125 finally:
126 126 UserModel().delete('testme')
127 127
128 128 def test_update_browser_fakeout(self):
129 129 response = self.app.post(url('user', id=1), params=dict(_method='put'))
130 130
131 131 def test_delete(self):
132 132 self.log_user()
133 133 username = 'newtestuserdeleteme'
134 134 password = 'test12'
135 135 name = 'name'
136 136 lastname = 'lastname'
137 137 email = 'todeletemail@mail.com'
138 138
139 139 response = self.app.post(url('users'), {'username': username,
140 140 'password': password,
141 141 'password_confirmation': password,
142 142 'firstname': name,
143 143 'active': True,
144 144 'lastname': lastname,
145 145 'email': email})
146 146
147 147 response = response.follow()
148 148
149 149 new_user = self.Session.query(User)\
150 150 .filter(User.username == username).one()
151 151 response = self.app.delete(url('user', id=new_user.user_id))
152 152
153 self.assertTrue("""Successfully deleted user""" in
154 response.session['flash'][0])
153 self.checkSessionFlash(response, 'Successfully deleted user')
155 154
156 155 def test_delete_browser_fakeout(self):
157 156 response = self.app.post(url('user', id=1),
158 157 params=dict(_method='delete'))
159 158
160 159 def test_show(self):
161 160 response = self.app.get(url('user', id=1))
162 161
163 162 def test_show_as_xml(self):
164 163 response = self.app.get(url('formatted_user', id=1, format='xml'))
165 164
166 165 def test_edit(self):
167 166 self.log_user()
168 167 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
169 168 response = self.app.get(url('edit_user', id=user.user_id))
170 169
171 170 def test_add_perm_create_repo(self):
172 171 self.log_user()
173 172 perm_none = Permission.get_by_key('hg.create.none')
174 173 perm_create = Permission.get_by_key('hg.create.repository')
175 174
176 175 user = UserModel().create_or_update(username='dummy', password='qwe',
177 176 email='dummy', firstname='a',
178 177 lastname='b')
179 178 Session().commit()
180 179 uid = user.user_id
181 180
182 181 try:
183 182 #User should have None permission on creation repository
184 183 self.assertEqual(UserModel().has_perm(user, perm_none), False)
185 184 self.assertEqual(UserModel().has_perm(user, perm_create), False)
186 185
187 186 response = self.app.post(url('user_perm', id=uid),
188 187 params=dict(_method='put',
189 188 create_repo_perm=True))
190 189
191 190 perm_none = Permission.get_by_key('hg.create.none')
192 191 perm_create = Permission.get_by_key('hg.create.repository')
193 192
194 193 #User should have None permission on creation repository
195 194 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
196 195 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
197 196 finally:
198 197 UserModel().delete(uid)
199 198 Session().commit()
200 199
201 200 def test_revoke_perm_create_repo(self):
202 201 self.log_user()
203 202 perm_none = Permission.get_by_key('hg.create.none')
204 203 perm_create = Permission.get_by_key('hg.create.repository')
205 204
206 205 user = UserModel().create_or_update(username='dummy', password='qwe',
207 206 email='dummy', firstname='a',
208 207 lastname='b')
209 208 Session().commit()
210 209 uid = user.user_id
211 210
212 211 try:
213 212 #User should have None permission on creation repository
214 213 self.assertEqual(UserModel().has_perm(user, perm_none), False)
215 214 self.assertEqual(UserModel().has_perm(user, perm_create), False)
216 215
217 216 response = self.app.post(url('user_perm', id=uid),
218 217 params=dict(_method='put'))
219 218
220 219 perm_none = Permission.get_by_key('hg.create.none')
221 220 perm_create = Permission.get_by_key('hg.create.repository')
222 221
223 222 #User should have None permission on creation repository
224 223 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
225 224 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
226 225 finally:
227 226 UserModel().delete(uid)
228 227 Session().commit()
229 228
230 229 def test_add_perm_fork_repo(self):
231 230 self.log_user()
232 231 perm_none = Permission.get_by_key('hg.fork.none')
233 232 perm_fork = Permission.get_by_key('hg.fork.repository')
234 233
235 234 user = UserModel().create_or_update(username='dummy', password='qwe',
236 235 email='dummy', firstname='a',
237 236 lastname='b')
238 237 Session().commit()
239 238 uid = user.user_id
240 239
241 240 try:
242 241 #User should have None permission on creation repository
243 242 self.assertEqual(UserModel().has_perm(user, perm_none), False)
244 243 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
245 244
246 245 response = self.app.post(url('user_perm', id=uid),
247 246 params=dict(_method='put',
248 247 create_repo_perm=True))
249 248
250 249 perm_none = Permission.get_by_key('hg.create.none')
251 250 perm_create = Permission.get_by_key('hg.create.repository')
252 251
253 252 #User should have None permission on creation repository
254 253 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
255 254 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
256 255 finally:
257 256 UserModel().delete(uid)
258 257 Session().commit()
259 258
260 259 def test_revoke_perm_fork_repo(self):
261 260 self.log_user()
262 261 perm_none = Permission.get_by_key('hg.fork.none')
263 262 perm_fork = Permission.get_by_key('hg.fork.repository')
264 263
265 264 user = UserModel().create_or_update(username='dummy', password='qwe',
266 265 email='dummy', firstname='a',
267 266 lastname='b')
268 267 Session().commit()
269 268 uid = user.user_id
270 269
271 270 try:
272 271 #User should have None permission on creation repository
273 272 self.assertEqual(UserModel().has_perm(user, perm_none), False)
274 273 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
275 274
276 275 response = self.app.post(url('user_perm', id=uid),
277 276 params=dict(_method='put'))
278 277
279 278 perm_none = Permission.get_by_key('hg.create.none')
280 279 perm_create = Permission.get_by_key('hg.create.repository')
281 280
282 281 #User should have None permission on creation repository
283 282 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
284 283 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
285 284 finally:
286 285 UserModel().delete(uid)
287 286 Session().commit()
288 287
289 288 def test_edit_as_xml(self):
290 289 response = self.app.get(url('formatted_edit_user', id=1, format='xml'))
General Comments 0
You need to be logged in to leave comments. Login now