##// END OF EJS Templates
unified flash msg tests
marcink -
r3640:4f80df0d beta
parent child Browse files
Show More
@@ -1,373 +1,371 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 import os
3 import os
4 import urllib
4 import urllib
5
5
6 from rhodecode.lib import vcs
6 from rhodecode.lib import vcs
7 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
7 from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
8 Permission
8 Permission
9 from rhodecode.tests import *
9 from rhodecode.tests import *
10 from rhodecode.model.repos_group import ReposGroupModel
10 from rhodecode.model.repos_group import ReposGroupModel
11 from rhodecode.model.repo import RepoModel
11 from rhodecode.model.repo import RepoModel
12 from rhodecode.model.meta import Session
12 from rhodecode.model.meta import Session
13
13
14
14
15 def _get_permission_for_user(user, repo):
15 def _get_permission_for_user(user, repo):
16 perm = UserRepoToPerm.query()\
16 perm = UserRepoToPerm.query()\
17 .filter(UserRepoToPerm.repository ==
17 .filter(UserRepoToPerm.repository ==
18 Repository.get_by_repo_name(repo))\
18 Repository.get_by_repo_name(repo))\
19 .filter(UserRepoToPerm.user == User.get_by_username(user))\
19 .filter(UserRepoToPerm.user == User.get_by_username(user))\
20 .all()
20 .all()
21 return perm
21 return perm
22
22
23
23
24 class TestAdminReposController(TestController):
24 class TestAdminReposController(TestController):
25
25
26 def __make_repo(self):
26 def __make_repo(self):
27 pass
27 pass
28
28
29 def test_index(self):
29 def test_index(self):
30 self.log_user()
30 self.log_user()
31 response = self.app.get(url('repos'))
31 response = self.app.get(url('repos'))
32 # Test response...
32 # Test response...
33
33
34 def test_index_as_xml(self):
34 def test_index_as_xml(self):
35 response = self.app.get(url('formatted_repos', format='xml'))
35 response = self.app.get(url('formatted_repos', format='xml'))
36
36
37 def test_create_hg(self):
37 def test_create_hg(self):
38 self.log_user()
38 self.log_user()
39 repo_name = NEW_HG_REPO
39 repo_name = NEW_HG_REPO
40 description = 'description for newly created repo'
40 description = 'description for newly created repo'
41 response = self.app.post(url('repos'),
41 response = self.app.post(url('repos'),
42 _get_repo_create_params(repo_private=False,
42 _get_repo_create_params(repo_private=False,
43 repo_name=repo_name,
43 repo_name=repo_name,
44 repo_description=description))
44 repo_description=description))
45 self.checkSessionFlash(response,
45 self.checkSessionFlash(response,
46 'Created repository <a href="/%s">%s</a>'
46 'Created repository <a href="/%s">%s</a>'
47 % (repo_name, repo_name))
47 % (repo_name, repo_name))
48
48
49 #test if the repo was created in the database
49 #test if the repo was created in the database
50 new_repo = self.Session().query(Repository)\
50 new_repo = self.Session().query(Repository)\
51 .filter(Repository.repo_name == repo_name).one()
51 .filter(Repository.repo_name == repo_name).one()
52
52
53 self.assertEqual(new_repo.repo_name, repo_name)
53 self.assertEqual(new_repo.repo_name, repo_name)
54 self.assertEqual(new_repo.description, description)
54 self.assertEqual(new_repo.description, description)
55
55
56 #test if repository is visible in the list ?
56 #test if repository is visible in the list ?
57 response = response.follow()
57 response = response.follow()
58
58
59 response.mustcontain(repo_name)
59 response.mustcontain(repo_name)
60
60
61 #test if repository was created on filesystem
61 #test if repository was created on filesystem
62 try:
62 try:
63 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
63 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
64 except Exception:
64 except Exception:
65 self.fail('no repo %s in filesystem' % repo_name)
65 self.fail('no repo %s in filesystem' % repo_name)
66
66
67 def test_create_hg_non_ascii(self):
67 def test_create_hg_non_ascii(self):
68 self.log_user()
68 self.log_user()
69 non_ascii = "Δ…Δ™Ε‚"
69 non_ascii = "Δ…Δ™Ε‚"
70 repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
70 repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
71 repo_name_unicode = repo_name.decode('utf8')
71 repo_name_unicode = repo_name.decode('utf8')
72 description = 'description for newly created repo' + non_ascii
72 description = 'description for newly created repo' + non_ascii
73 description_unicode = description.decode('utf8')
73 description_unicode = description.decode('utf8')
74 private = False
74 private = False
75 response = self.app.post(url('repos'),
75 response = self.app.post(url('repos'),
76 _get_repo_create_params(repo_private=False,
76 _get_repo_create_params(repo_private=False,
77 repo_name=repo_name,
77 repo_name=repo_name,
78 repo_description=description))
78 repo_description=description))
79 self.checkSessionFlash(response,
79 self.checkSessionFlash(response,
80 u'Created repository <a href="/%s">%s</a>'
80 u'Created repository <a href="/%s">%s</a>'
81 % (urllib.quote(repo_name), repo_name_unicode))
81 % (urllib.quote(repo_name), repo_name_unicode))
82 #test if the repo was created in the database
82 #test if the repo was created in the database
83 new_repo = self.Session().query(Repository)\
83 new_repo = self.Session().query(Repository)\
84 .filter(Repository.repo_name == repo_name_unicode).one()
84 .filter(Repository.repo_name == repo_name_unicode).one()
85
85
86 self.assertEqual(new_repo.repo_name, repo_name_unicode)
86 self.assertEqual(new_repo.repo_name, repo_name_unicode)
87 self.assertEqual(new_repo.description, description_unicode)
87 self.assertEqual(new_repo.description, description_unicode)
88
88
89 #test if repository is visible in the list ?
89 #test if repository is visible in the list ?
90 response = response.follow()
90 response = response.follow()
91
91
92 response.mustcontain(repo_name)
92 response.mustcontain(repo_name)
93
93
94 #test if repository was created on filesystem
94 #test if repository was created on filesystem
95 try:
95 try:
96 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
96 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
97 except Exception:
97 except Exception:
98 self.fail('no repo %s in filesystem' % repo_name)
98 self.fail('no repo %s in filesystem' % repo_name)
99
99
100 def test_create_hg_in_group(self):
100 def test_create_hg_in_group(self):
101 self.log_user()
101 self.log_user()
102
102
103 ## create GROUP
103 ## create GROUP
104 group_name = 'sometest'
104 group_name = 'sometest'
105 gr = ReposGroupModel().create(group_name=group_name,
105 gr = ReposGroupModel().create(group_name=group_name,
106 group_description='test',
106 group_description='test',
107 owner=TEST_USER_ADMIN_LOGIN)
107 owner=TEST_USER_ADMIN_LOGIN)
108 self.Session().commit()
108 self.Session().commit()
109
109
110 repo_name = 'ingroup'
110 repo_name = 'ingroup'
111 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
111 repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
112 description = 'description for newly created repo'
112 description = 'description for newly created repo'
113 response = self.app.post(url('repos'),
113 response = self.app.post(url('repos'),
114 _get_repo_create_params(repo_private=False,
114 _get_repo_create_params(repo_private=False,
115 repo_name=repo_name,
115 repo_name=repo_name,
116 repo_description=description,
116 repo_description=description,
117 repo_group=gr.group_id,))
117 repo_group=gr.group_id,))
118
118
119 self.checkSessionFlash(response,
119 self.checkSessionFlash(response,
120 'Created repository <a href="/%s">%s</a>'
120 'Created repository <a href="/%s">%s</a>'
121 % (repo_name, repo_name))
121 % (repo_name, repo_name))
122 #test if the repo was created in the database
122 #test if the repo was created in the database
123 new_repo = self.Session().query(Repository)\
123 new_repo = self.Session().query(Repository)\
124 .filter(Repository.repo_name == repo_name_full).one()
124 .filter(Repository.repo_name == repo_name_full).one()
125
125
126 self.assertEqual(new_repo.repo_name, repo_name_full)
126 self.assertEqual(new_repo.repo_name, repo_name_full)
127 self.assertEqual(new_repo.description, description)
127 self.assertEqual(new_repo.description, description)
128
128
129 #test if repository is visible in the list ?
129 #test if repository is visible in the list ?
130 response = response.follow()
130 response = response.follow()
131
131
132 response.mustcontain(repo_name_full)
132 response.mustcontain(repo_name_full)
133
133
134 #test if repository was created on filesystem
134 #test if repository was created on filesystem
135 try:
135 try:
136 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
136 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
137 except Exception:
137 except Exception:
138 ReposGroupModel().delete(group_name)
138 ReposGroupModel().delete(group_name)
139 self.Session().commit()
139 self.Session().commit()
140 self.fail('no repo %s in filesystem' % repo_name)
140 self.fail('no repo %s in filesystem' % repo_name)
141
141
142 RepoModel().delete(repo_name_full)
142 RepoModel().delete(repo_name_full)
143 ReposGroupModel().delete(group_name)
143 ReposGroupModel().delete(group_name)
144 self.Session().commit()
144 self.Session().commit()
145
145
146 def test_create_git(self):
146 def test_create_git(self):
147 self.log_user()
147 self.log_user()
148 repo_name = NEW_GIT_REPO
148 repo_name = NEW_GIT_REPO
149 description = 'description for newly created repo'
149 description = 'description for newly created repo'
150
150
151 response = self.app.post(url('repos'),
151 response = self.app.post(url('repos'),
152 _get_repo_create_params(repo_private=False,
152 _get_repo_create_params(repo_private=False,
153 repo_type='git',
153 repo_type='git',
154 repo_name=repo_name,
154 repo_name=repo_name,
155 repo_description=description))
155 repo_description=description))
156 self.checkSessionFlash(response,
156 self.checkSessionFlash(response,
157 'Created repository <a href="/%s">%s</a>'
157 'Created repository <a href="/%s">%s</a>'
158 % (repo_name, repo_name))
158 % (repo_name, repo_name))
159
159
160 #test if the repo was created in the database
160 #test if the repo was created in the database
161 new_repo = self.Session().query(Repository)\
161 new_repo = self.Session().query(Repository)\
162 .filter(Repository.repo_name == repo_name).one()
162 .filter(Repository.repo_name == repo_name).one()
163
163
164 self.assertEqual(new_repo.repo_name, repo_name)
164 self.assertEqual(new_repo.repo_name, repo_name)
165 self.assertEqual(new_repo.description, description)
165 self.assertEqual(new_repo.description, description)
166
166
167 #test if repository is visible in the list ?
167 #test if repository is visible in the list ?
168 response = response.follow()
168 response = response.follow()
169
169
170 response.mustcontain(repo_name)
170 response.mustcontain(repo_name)
171
171
172 #test if repository was created on filesystem
172 #test if repository was created on filesystem
173 try:
173 try:
174 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
174 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
175 except Exception:
175 except Exception:
176 self.fail('no repo %s in filesystem' % repo_name)
176 self.fail('no repo %s in filesystem' % repo_name)
177
177
178 def test_create_git_non_ascii(self):
178 def test_create_git_non_ascii(self):
179 self.log_user()
179 self.log_user()
180 non_ascii = "Δ…Δ™Ε‚"
180 non_ascii = "Δ…Δ™Ε‚"
181 repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
181 repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
182 repo_name_unicode = repo_name.decode('utf8')
182 repo_name_unicode = repo_name.decode('utf8')
183 description = 'description for newly created repo' + non_ascii
183 description = 'description for newly created repo' + non_ascii
184 description_unicode = description.decode('utf8')
184 description_unicode = description.decode('utf8')
185 private = False
185 private = False
186 response = self.app.post(url('repos'),
186 response = self.app.post(url('repos'),
187 _get_repo_create_params(repo_private=False,
187 _get_repo_create_params(repo_private=False,
188 repo_type='git',
188 repo_type='git',
189 repo_name=repo_name,
189 repo_name=repo_name,
190 repo_description=description))
190 repo_description=description))
191
191
192 self.checkSessionFlash(response,
192 self.checkSessionFlash(response,
193 u'Created repository <a href="/%s">%s</a>'
193 u'Created repository <a href="/%s">%s</a>'
194 % (urllib.quote(repo_name), repo_name_unicode))
194 % (urllib.quote(repo_name), repo_name_unicode))
195
195
196 #test if the repo was created in the database
196 #test if the repo was created in the database
197 new_repo = self.Session().query(Repository)\
197 new_repo = self.Session().query(Repository)\
198 .filter(Repository.repo_name == repo_name_unicode).one()
198 .filter(Repository.repo_name == repo_name_unicode).one()
199
199
200 self.assertEqual(new_repo.repo_name, repo_name_unicode)
200 self.assertEqual(new_repo.repo_name, repo_name_unicode)
201 self.assertEqual(new_repo.description, description_unicode)
201 self.assertEqual(new_repo.description, description_unicode)
202
202
203 #test if repository is visible in the list ?
203 #test if repository is visible in the list ?
204 response = response.follow()
204 response = response.follow()
205
205
206 response.mustcontain(repo_name)
206 response.mustcontain(repo_name)
207
207
208 #test if repository was created on filesystem
208 #test if repository was created on filesystem
209 try:
209 try:
210 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
210 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
211 except Exception:
211 except Exception:
212 self.fail('no repo %s in filesystem' % repo_name)
212 self.fail('no repo %s in filesystem' % repo_name)
213
213
214 def test_update(self):
214 def test_update(self):
215 response = self.app.put(url('repo', repo_name=HG_REPO))
215 response = self.app.put(url('repo', repo_name=HG_REPO))
216
216
217 def test_update_browser_fakeout(self):
217 def test_update_browser_fakeout(self):
218 response = self.app.post(url('repo', repo_name=HG_REPO),
218 response = self.app.post(url('repo', repo_name=HG_REPO),
219 params=dict(_method='put'))
219 params=dict(_method='put'))
220
220
221 def test_delete_hg(self):
221 def test_delete_hg(self):
222 self.log_user()
222 self.log_user()
223 repo_name = 'vcs_test_new_to_delete'
223 repo_name = 'vcs_test_new_to_delete'
224 description = 'description for newly created repo'
224 description = 'description for newly created repo'
225 response = self.app.post(url('repos'),
225 response = self.app.post(url('repos'),
226 _get_repo_create_params(repo_private=False,
226 _get_repo_create_params(repo_private=False,
227 repo_type='hg',
227 repo_type='hg',
228 repo_name=repo_name,
228 repo_name=repo_name,
229 repo_description=description))
229 repo_description=description))
230
230
231 self.checkSessionFlash(response,
231 self.checkSessionFlash(response,
232 'Created repository <a href="/%s">%s</a>'
232 'Created repository <a href="/%s">%s</a>'
233 % (repo_name, repo_name))
233 % (repo_name, repo_name))
234 #test if the repo was created in the database
234 #test if the repo was created in the database
235 new_repo = self.Session().query(Repository)\
235 new_repo = self.Session().query(Repository)\
236 .filter(Repository.repo_name == repo_name).one()
236 .filter(Repository.repo_name == repo_name).one()
237
237
238 self.assertEqual(new_repo.repo_name, repo_name)
238 self.assertEqual(new_repo.repo_name, repo_name)
239 self.assertEqual(new_repo.description, description)
239 self.assertEqual(new_repo.description, description)
240
240
241 #test if repository is visible in the list ?
241 #test if repository is visible in the list ?
242 response = response.follow()
242 response = response.follow()
243
243
244 response.mustcontain(repo_name)
244 response.mustcontain(repo_name)
245
245
246 #test if repository was created on filesystem
246 #test if repository was created on filesystem
247 try:
247 try:
248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
248 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
249 except Exception:
249 except Exception:
250 self.fail('no repo %s in filesystem' % repo_name)
250 self.fail('no repo %s in filesystem' % repo_name)
251
251
252 response = self.app.delete(url('repo', repo_name=repo_name))
252 response = self.app.delete(url('repo', repo_name=repo_name))
253
253
254 self.assertTrue('''Deleted repository %s''' % (repo_name) in
254 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
255 response.session['flash'][0])
256
255
257 response.follow()
256 response.follow()
258
257
259 #check if repo was deleted from db
258 #check if repo was deleted from db
260 deleted_repo = self.Session().query(Repository)\
259 deleted_repo = self.Session().query(Repository)\
261 .filter(Repository.repo_name == repo_name).scalar()
260 .filter(Repository.repo_name == repo_name).scalar()
262
261
263 self.assertEqual(deleted_repo, None)
262 self.assertEqual(deleted_repo, None)
264
263
265 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
264 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
266 False)
265 False)
267
266
268 def test_delete_git(self):
267 def test_delete_git(self):
269 self.log_user()
268 self.log_user()
270 repo_name = 'vcs_test_new_to_delete'
269 repo_name = 'vcs_test_new_to_delete'
271 description = 'description for newly created repo'
270 description = 'description for newly created repo'
272 private = False
271 private = False
273 response = self.app.post(url('repos'),
272 response = self.app.post(url('repos'),
274 _get_repo_create_params(repo_private=False,
273 _get_repo_create_params(repo_private=False,
275 repo_type='git',
274 repo_type='git',
276 repo_name=repo_name,
275 repo_name=repo_name,
277 repo_description=description))
276 repo_description=description))
278
277
279 self.checkSessionFlash(response,
278 self.checkSessionFlash(response,
280 'Created repository <a href="/%s">%s</a>'
279 'Created repository <a href="/%s">%s</a>'
281 % (repo_name, repo_name))
280 % (repo_name, repo_name))
282 #test if the repo was created in the database
281 #test if the repo was created in the database
283 new_repo = self.Session().query(Repository)\
282 new_repo = self.Session().query(Repository)\
284 .filter(Repository.repo_name == repo_name).one()
283 .filter(Repository.repo_name == repo_name).one()
285
284
286 self.assertEqual(new_repo.repo_name, repo_name)
285 self.assertEqual(new_repo.repo_name, repo_name)
287 self.assertEqual(new_repo.description, description)
286 self.assertEqual(new_repo.description, description)
288
287
289 #test if repository is visible in the list ?
288 #test if repository is visible in the list ?
290 response = response.follow()
289 response = response.follow()
291
290
292 response.mustcontain(repo_name)
291 response.mustcontain(repo_name)
293
292
294 #test if repository was created on filesystem
293 #test if repository was created on filesystem
295 try:
294 try:
296 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
295 vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
297 except Exception:
296 except Exception:
298 self.fail('no repo %s in filesystem' % repo_name)
297 self.fail('no repo %s in filesystem' % repo_name)
299
298
300 response = self.app.delete(url('repo', repo_name=repo_name))
299 response = self.app.delete(url('repo', repo_name=repo_name))
301
300
302 self.assertTrue('''Deleted repository %s''' % (repo_name) in
301 self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
303 response.session['flash'][0])
304
302
305 response.follow()
303 response.follow()
306
304
307 #check if repo was deleted from db
305 #check if repo was deleted from db
308 deleted_repo = self.Session().query(Repository)\
306 deleted_repo = self.Session().query(Repository)\
309 .filter(Repository.repo_name == repo_name).scalar()
307 .filter(Repository.repo_name == repo_name).scalar()
310
308
311 self.assertEqual(deleted_repo, None)
309 self.assertEqual(deleted_repo, None)
312
310
313 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
311 self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
314 False)
312 False)
315
313
316 def test_delete_repo_with_group(self):
314 def test_delete_repo_with_group(self):
317 #TODO:
315 #TODO:
318 pass
316 pass
319
317
320 def test_delete_browser_fakeout(self):
318 def test_delete_browser_fakeout(self):
321 response = self.app.post(url('repo', repo_name=HG_REPO),
319 response = self.app.post(url('repo', repo_name=HG_REPO),
322 params=dict(_method='delete'))
320 params=dict(_method='delete'))
323
321
324 def test_show_hg(self):
322 def test_show_hg(self):
325 self.log_user()
323 self.log_user()
326 response = self.app.get(url('repo', repo_name=HG_REPO))
324 response = self.app.get(url('repo', repo_name=HG_REPO))
327
325
328 def test_show_git(self):
326 def test_show_git(self):
329 self.log_user()
327 self.log_user()
330 response = self.app.get(url('repo', repo_name=GIT_REPO))
328 response = self.app.get(url('repo', repo_name=GIT_REPO))
331
329
332
330
333 def test_edit(self):
331 def test_edit(self):
334 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
332 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
335
333
336 def test_set_private_flag_sets_default_to_none(self):
334 def test_set_private_flag_sets_default_to_none(self):
337 self.log_user()
335 self.log_user()
338 #initially repository perm should be read
336 #initially repository perm should be read
339 perm = _get_permission_for_user(user='default', repo=HG_REPO)
337 perm = _get_permission_for_user(user='default', repo=HG_REPO)
340 self.assertTrue(len(perm), 1)
338 self.assertTrue(len(perm), 1)
341 self.assertEqual(perm[0].permission.permission_name, 'repository.read')
339 self.assertEqual(perm[0].permission.permission_name, 'repository.read')
342 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
340 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
343
341
344 response = self.app.put(url('repo', repo_name=HG_REPO),
342 response = self.app.put(url('repo', repo_name=HG_REPO),
345 _get_repo_create_params(repo_private=1,
343 _get_repo_create_params(repo_private=1,
346 repo_name=HG_REPO,
344 repo_name=HG_REPO,
347 user=TEST_USER_ADMIN_LOGIN))
345 user=TEST_USER_ADMIN_LOGIN))
348 self.checkSessionFlash(response,
346 self.checkSessionFlash(response,
349 msg='Repository %s updated successfully' % (HG_REPO))
347 msg='Repository %s updated successfully' % (HG_REPO))
350 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True)
348 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True)
351
349
352 #now the repo default permission should be None
350 #now the repo default permission should be None
353 perm = _get_permission_for_user(user='default', repo=HG_REPO)
351 perm = _get_permission_for_user(user='default', repo=HG_REPO)
354 self.assertTrue(len(perm), 1)
352 self.assertTrue(len(perm), 1)
355 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
353 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
356
354
357 response = self.app.put(url('repo', repo_name=HG_REPO),
355 response = self.app.put(url('repo', repo_name=HG_REPO),
358 _get_repo_create_params(repo_private=False,
356 _get_repo_create_params(repo_private=False,
359 repo_name=HG_REPO,
357 repo_name=HG_REPO,
360 user=TEST_USER_ADMIN_LOGIN))
358 user=TEST_USER_ADMIN_LOGIN))
361 self.checkSessionFlash(response,
359 self.checkSessionFlash(response,
362 msg='Repository %s updated successfully' % (HG_REPO))
360 msg='Repository %s updated successfully' % (HG_REPO))
363 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
361 self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
364
362
365 #we turn off private now the repo default permission should stay None
363 #we turn off private now the repo default permission should stay None
366 perm = _get_permission_for_user(user='default', repo=HG_REPO)
364 perm = _get_permission_for_user(user='default', repo=HG_REPO)
367 self.assertTrue(len(perm), 1)
365 self.assertTrue(len(perm), 1)
368 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
366 self.assertEqual(perm[0].permission.permission_name, 'repository.none')
369
367
370 #update this permission back
368 #update this permission back
371 perm[0].permission = Permission.get_by_key('repository.read')
369 perm[0].permission = Permission.get_by_key('repository.read')
372 Session().add(perm[0])
370 Session().add(perm[0])
373 Session().commit()
371 Session().commit()
@@ -1,266 +1,265 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 from rhodecode.lib.auth import get_crypt_password, check_password
3 from rhodecode.lib.auth import get_crypt_password, check_password
4 from rhodecode.model.db import User, RhodeCodeSetting, Repository
4 from rhodecode.model.db import User, RhodeCodeSetting, Repository
5 from rhodecode.tests import *
5 from rhodecode.tests import *
6 from rhodecode.lib import helpers as h
6 from rhodecode.lib import helpers as h
7 from rhodecode.model.user import UserModel
7 from rhodecode.model.user import UserModel
8 from rhodecode.model.scm import ScmModel
8 from rhodecode.model.scm import ScmModel
9
9
10
10
11 class TestAdminSettingsController(TestController):
11 class TestAdminSettingsController(TestController):
12
12
13 def test_index(self):
13 def test_index(self):
14 response = self.app.get(url('admin_settings'))
14 response = self.app.get(url('admin_settings'))
15 # Test response...
15 # Test response...
16
16
17 def test_index_as_xml(self):
17 def test_index_as_xml(self):
18 response = self.app.get(url('formatted_admin_settings', format='xml'))
18 response = self.app.get(url('formatted_admin_settings', format='xml'))
19
19
20 def test_create(self):
20 def test_create(self):
21 response = self.app.post(url('admin_settings'))
21 response = self.app.post(url('admin_settings'))
22
22
23 def test_new(self):
23 def test_new(self):
24 response = self.app.get(url('admin_new_setting'))
24 response = self.app.get(url('admin_new_setting'))
25
25
26 def test_new_as_xml(self):
26 def test_new_as_xml(self):
27 response = self.app.get(url('formatted_admin_new_setting', format='xml'))
27 response = self.app.get(url('formatted_admin_new_setting', format='xml'))
28
28
29 def test_update(self):
29 def test_update(self):
30 response = self.app.put(url('admin_setting', setting_id=1))
30 response = self.app.put(url('admin_setting', setting_id=1))
31
31
32 def test_update_browser_fakeout(self):
32 def test_update_browser_fakeout(self):
33 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
33 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
34
34
35 def test_delete(self):
35 def test_delete(self):
36 response = self.app.delete(url('admin_setting', setting_id=1))
36 response = self.app.delete(url('admin_setting', setting_id=1))
37
37
38 def test_delete_browser_fakeout(self):
38 def test_delete_browser_fakeout(self):
39 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
39 response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
40
40
41 def test_show(self):
41 def test_show(self):
42 response = self.app.get(url('admin_setting', setting_id=1))
42 response = self.app.get(url('admin_setting', setting_id=1))
43
43
44 def test_show_as_xml(self):
44 def test_show_as_xml(self):
45 response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
45 response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
46
46
47 def test_edit(self):
47 def test_edit(self):
48 response = self.app.get(url('admin_edit_setting', setting_id=1))
48 response = self.app.get(url('admin_edit_setting', setting_id=1))
49
49
50 def test_edit_as_xml(self):
50 def test_edit_as_xml(self):
51 response = self.app.get(url('formatted_admin_edit_setting',
51 response = self.app.get(url('formatted_admin_edit_setting',
52 setting_id=1, format='xml'))
52 setting_id=1, format='xml'))
53
53
54 def test_ga_code_active(self):
54 def test_ga_code_active(self):
55 self.log_user()
55 self.log_user()
56 old_title = 'RhodeCode'
56 old_title = 'RhodeCode'
57 old_realm = 'RhodeCode authentication'
57 old_realm = 'RhodeCode authentication'
58 new_ga_code = 'ga-test-123456789'
58 new_ga_code = 'ga-test-123456789'
59 response = self.app.post(url('admin_setting', setting_id='global'),
59 response = self.app.post(url('admin_setting', setting_id='global'),
60 params=dict(
60 params=dict(
61 _method='put',
61 _method='put',
62 rhodecode_title=old_title,
62 rhodecode_title=old_title,
63 rhodecode_realm=old_realm,
63 rhodecode_realm=old_realm,
64 rhodecode_ga_code=new_ga_code
64 rhodecode_ga_code=new_ga_code
65 ))
65 ))
66
66
67 self.checkSessionFlash(response, 'Updated application settings')
67 self.checkSessionFlash(response, 'Updated application settings')
68
68
69 self.assertEqual(RhodeCodeSetting
69 self.assertEqual(RhodeCodeSetting
70 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
70 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
71
71
72 response = response.follow()
72 response = response.follow()
73 response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
73 response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
74
74
75 def test_ga_code_inactive(self):
75 def test_ga_code_inactive(self):
76 self.log_user()
76 self.log_user()
77 old_title = 'RhodeCode'
77 old_title = 'RhodeCode'
78 old_realm = 'RhodeCode authentication'
78 old_realm = 'RhodeCode authentication'
79 new_ga_code = ''
79 new_ga_code = ''
80 response = self.app.post(url('admin_setting', setting_id='global'),
80 response = self.app.post(url('admin_setting', setting_id='global'),
81 params=dict(
81 params=dict(
82 _method='put',
82 _method='put',
83 rhodecode_title=old_title,
83 rhodecode_title=old_title,
84 rhodecode_realm=old_realm,
84 rhodecode_realm=old_realm,
85 rhodecode_ga_code=new_ga_code
85 rhodecode_ga_code=new_ga_code
86 ))
86 ))
87
87
88 self.assertTrue('Updated application settings' in
88 self.checkSessionFlash(response, 'Updated application settings')
89 response.session['flash'][0][1])
90 self.assertEqual(RhodeCodeSetting
89 self.assertEqual(RhodeCodeSetting
91 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
90 .get_app_settings()['rhodecode_ga_code'], new_ga_code)
92
91
93 response = response.follow()
92 response = response.follow()
94 self.assertFalse("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
93 self.assertFalse("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
95 in response.body)
94 in response.body)
96
95
97 def test_title_change(self):
96 def test_title_change(self):
98 self.log_user()
97 self.log_user()
99 old_title = 'RhodeCode'
98 old_title = 'RhodeCode'
100 new_title = old_title + '_changed'
99 new_title = old_title + '_changed'
101 old_realm = 'RhodeCode authentication'
100 old_realm = 'RhodeCode authentication'
102
101
103 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
102 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
104 response = self.app.post(url('admin_setting', setting_id='global'),
103 response = self.app.post(url('admin_setting', setting_id='global'),
105 params=dict(
104 params=dict(
106 _method='put',
105 _method='put',
107 rhodecode_title=new_title,
106 rhodecode_title=new_title,
108 rhodecode_realm=old_realm,
107 rhodecode_realm=old_realm,
109 rhodecode_ga_code=''
108 rhodecode_ga_code=''
110 ))
109 ))
111
110
112 self.checkSessionFlash(response, 'Updated application settings')
111 self.checkSessionFlash(response, 'Updated application settings')
113 self.assertEqual(RhodeCodeSetting
112 self.assertEqual(RhodeCodeSetting
114 .get_app_settings()['rhodecode_title'],
113 .get_app_settings()['rhodecode_title'],
115 new_title.decode('utf-8'))
114 new_title.decode('utf-8'))
116
115
117 response = response.follow()
116 response = response.follow()
118 response.mustcontain("""<h1><a href="/">%s</a></h1>""" % new_title)
117 response.mustcontain("""<h1><a href="/">%s</a></h1>""" % new_title)
119
118
120 def test_my_account(self):
119 def test_my_account(self):
121 self.log_user()
120 self.log_user()
122 response = self.app.get(url('admin_settings_my_account'))
121 response = self.app.get(url('admin_settings_my_account'))
123
122
124 self.assertTrue('value="test_admin' in response.body)
123 self.assertTrue('value="test_admin' in response.body)
125
124
126 @parameterized.expand([('firstname', 'new_username'),
125 @parameterized.expand([('firstname', 'new_username'),
127 ('lastname', 'new_username'),
126 ('lastname', 'new_username'),
128 ('admin', True),
127 ('admin', True),
129 ('admin', False),
128 ('admin', False),
130 ('ldap_dn', 'test'),
129 ('ldap_dn', 'test'),
131 ('ldap_dn', None),
130 ('ldap_dn', None),
132 ('active', False),
131 ('active', False),
133 ('active', True),
132 ('active', True),
134 ('email', 'some@email.com'),
133 ('email', 'some@email.com'),
135 ])
134 ])
136 def test_my_account_update(self, name, expected):
135 def test_my_account_update(self, name, expected):
137 uname = 'testme'
136 uname = 'testme'
138 usr = UserModel().create_or_update(username=uname, password='qweqwe',
137 usr = UserModel().create_or_update(username=uname, password='qweqwe',
139 email='testme@rhodecod.org')
138 email='testme@rhodecod.org')
140 self.Session().commit()
139 self.Session().commit()
141 params = usr.get_api_data()
140 params = usr.get_api_data()
142 user_id = usr.user_id
141 user_id = usr.user_id
143 self.log_user(username=uname, password='qweqwe')
142 self.log_user(username=uname, password='qweqwe')
144 params.update({name: expected})
143 params.update({name: expected})
145 params.update({'password_confirmation': ''})
144 params.update({'password_confirmation': ''})
146 params.update({'new_password': ''})
145 params.update({'new_password': ''})
147
146
148 try:
147 try:
149 response = self.app.put(url('admin_settings_my_account_update',
148 response = self.app.put(url('admin_settings_my_account_update',
150 id=user_id), params)
149 id=user_id), params)
151
150
152 self.checkSessionFlash(response,
151 self.checkSessionFlash(response,
153 'Your account was updated successfully')
152 'Your account was updated successfully')
154
153
155 updated_user = User.get_by_username(uname)
154 updated_user = User.get_by_username(uname)
156 updated_params = updated_user.get_api_data()
155 updated_params = updated_user.get_api_data()
157 updated_params.update({'password_confirmation': ''})
156 updated_params.update({'password_confirmation': ''})
158 updated_params.update({'new_password': ''})
157 updated_params.update({'new_password': ''})
159
158
160 params['last_login'] = updated_params['last_login']
159 params['last_login'] = updated_params['last_login']
161 if name == 'email':
160 if name == 'email':
162 params['emails'] = [expected]
161 params['emails'] = [expected]
163 if name == 'ldap_dn':
162 if name == 'ldap_dn':
164 #cannot update this via form
163 #cannot update this via form
165 params['ldap_dn'] = None
164 params['ldap_dn'] = None
166 if name == 'active':
165 if name == 'active':
167 #my account cannot deactivate account
166 #my account cannot deactivate account
168 params['active'] = True
167 params['active'] = True
169 if name == 'admin':
168 if name == 'admin':
170 #my account cannot make you an admin !
169 #my account cannot make you an admin !
171 params['admin'] = False
170 params['admin'] = False
172
171
173 self.assertEqual(params, updated_params)
172 self.assertEqual(params, updated_params)
174
173
175 finally:
174 finally:
176 UserModel().delete('testme')
175 UserModel().delete('testme')
177
176
178 def test_my_account_update_err_email_exists(self):
177 def test_my_account_update_err_email_exists(self):
179 self.log_user()
178 self.log_user()
180
179
181 new_email = 'test_regular@mail.com' # already exisitn email
180 new_email = 'test_regular@mail.com' # already exisitn email
182 response = self.app.put(url('admin_settings_my_account_update'),
181 response = self.app.put(url('admin_settings_my_account_update'),
183 params=dict(
182 params=dict(
184 username='test_admin',
183 username='test_admin',
185 new_password='test12',
184 new_password='test12',
186 password_confirmation='test122',
185 password_confirmation='test122',
187 firstname='NewName',
186 firstname='NewName',
188 lastname='NewLastname',
187 lastname='NewLastname',
189 email=new_email,)
188 email=new_email,)
190 )
189 )
191
190
192 response.mustcontain('This e-mail address is already taken')
191 response.mustcontain('This e-mail address is already taken')
193
192
194 def test_my_account_update_err(self):
193 def test_my_account_update_err(self):
195 self.log_user('test_regular2', 'test12')
194 self.log_user('test_regular2', 'test12')
196
195
197 new_email = 'newmail.pl'
196 new_email = 'newmail.pl'
198 response = self.app.post(url('admin_settings_my_account_update'),
197 response = self.app.post(url('admin_settings_my_account_update'),
199 params=dict(
198 params=dict(
200 _method='put',
199 _method='put',
201 username='test_admin',
200 username='test_admin',
202 new_password='test12',
201 new_password='test12',
203 password_confirmation='test122',
202 password_confirmation='test122',
204 firstname='NewName',
203 firstname='NewName',
205 lastname='NewLastname',
204 lastname='NewLastname',
206 email=new_email,)
205 email=new_email,)
207 )
206 )
208
207
209 response.mustcontain('An email address must contain a single @')
208 response.mustcontain('An email address must contain a single @')
210 from rhodecode.model import validators
209 from rhodecode.model import validators
211 msg = validators.ValidUsername(edit=False,
210 msg = validators.ValidUsername(edit=False,
212 old_data={})._messages['username_exists']
211 old_data={})._messages['username_exists']
213 msg = h.html_escape(msg % {'username': 'test_admin'})
212 msg = h.html_escape(msg % {'username': 'test_admin'})
214 response.mustcontain(u"%s" % msg)
213 response.mustcontain(u"%s" % msg)
215
214
216 def test_set_repo_fork_has_no_self_id(self):
215 def test_set_repo_fork_has_no_self_id(self):
217 self.log_user()
216 self.log_user()
218 repo = Repository.get_by_repo_name(HG_REPO)
217 repo = Repository.get_by_repo_name(HG_REPO)
219 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
218 response = self.app.get(url('edit_repo', repo_name=HG_REPO))
220 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
219 opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
221 assert opt not in response.body
220 assert opt not in response.body
222
221
223 def test_set_fork_of_repo(self):
222 def test_set_fork_of_repo(self):
224 self.log_user()
223 self.log_user()
225 repo = Repository.get_by_repo_name(HG_REPO)
224 repo = Repository.get_by_repo_name(HG_REPO)
226 repo2 = Repository.get_by_repo_name(GIT_REPO)
225 repo2 = Repository.get_by_repo_name(GIT_REPO)
227 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
226 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
228 params=dict(
227 params=dict(
229 id_fork_of=repo2.repo_id
228 id_fork_of=repo2.repo_id
230 ))
229 ))
231 repo = Repository.get_by_repo_name(HG_REPO)
230 repo = Repository.get_by_repo_name(HG_REPO)
232 repo2 = Repository.get_by_repo_name(GIT_REPO)
231 repo2 = Repository.get_by_repo_name(GIT_REPO)
233 self.checkSessionFlash(response,
232 self.checkSessionFlash(response,
234 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
233 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
235
234
236 assert repo.fork == repo2
235 assert repo.fork == repo2
237 response = response.follow()
236 response = response.follow()
238 # check if given repo is selected
237 # check if given repo is selected
239
238
240 opt = """<option value="%s" selected="selected">%s</option>""" % (
239 opt = """<option value="%s" selected="selected">%s</option>""" % (
241 repo2.repo_id, repo2.repo_name)
240 repo2.repo_id, repo2.repo_name)
242 response.mustcontain(opt)
241 response.mustcontain(opt)
243
242
244 # clean session flash
243 # clean session flash
245 #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
244 #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
246
245
247 ## mark it as None
246 ## mark it as None
248 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
247 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
249 params=dict(
248 params=dict(
250 id_fork_of=None
249 id_fork_of=None
251 ))
250 ))
252 repo = Repository.get_by_repo_name(HG_REPO)
251 repo = Repository.get_by_repo_name(HG_REPO)
253 repo2 = Repository.get_by_repo_name(GIT_REPO)
252 repo2 = Repository.get_by_repo_name(GIT_REPO)
254 self.checkSessionFlash(response,
253 self.checkSessionFlash(response,
255 'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
254 'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
256 assert repo.fork == None
255 assert repo.fork == None
257
256
258 def test_set_fork_of_same_repo(self):
257 def test_set_fork_of_same_repo(self):
259 self.log_user()
258 self.log_user()
260 repo = Repository.get_by_repo_name(HG_REPO)
259 repo = Repository.get_by_repo_name(HG_REPO)
261 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
260 response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
262 params=dict(
261 params=dict(
263 id_fork_of=repo.repo_id
262 id_fork_of=repo.repo_id
264 ))
263 ))
265 self.checkSessionFlash(response,
264 self.checkSessionFlash(response,
266 'An error occurred during this operation')
265 'An error occurred during this operation')
@@ -1,290 +1,289 b''
1 from sqlalchemy.orm.exc import NoResultFound
1 from sqlalchemy.orm.exc import NoResultFound
2
2
3 from rhodecode.tests import *
3 from rhodecode.tests import *
4 from rhodecode.model.db import User, Permission
4 from rhodecode.model.db import User, Permission
5 from rhodecode.lib.auth import check_password
5 from rhodecode.lib.auth import check_password
6 from rhodecode.model.user import UserModel
6 from rhodecode.model.user import UserModel
7 from rhodecode.model import validators
7 from rhodecode.model import validators
8 from rhodecode.lib import helpers as h
8 from rhodecode.lib import helpers as h
9 from rhodecode.model.meta import Session
9 from rhodecode.model.meta import Session
10
10
11
11
12 class TestAdminUsersController(TestController):
12 class TestAdminUsersController(TestController):
13
13
14 def test_index(self):
14 def test_index(self):
15 self.log_user()
15 self.log_user()
16 response = self.app.get(url('users'))
16 response = self.app.get(url('users'))
17 # Test response...
17 # Test response...
18
18
19 def test_index_as_xml(self):
19 def test_index_as_xml(self):
20 response = self.app.get(url('formatted_users', format='xml'))
20 response = self.app.get(url('formatted_users', format='xml'))
21
21
22 def test_create(self):
22 def test_create(self):
23 self.log_user()
23 self.log_user()
24 username = 'newtestuser'
24 username = 'newtestuser'
25 password = 'test12'
25 password = 'test12'
26 password_confirmation = password
26 password_confirmation = password
27 name = 'name'
27 name = 'name'
28 lastname = 'lastname'
28 lastname = 'lastname'
29 email = 'mail@mail.com'
29 email = 'mail@mail.com'
30
30
31 response = self.app.post(url('users'),
31 response = self.app.post(url('users'),
32 {'username': username,
32 {'username': username,
33 'password': password,
33 'password': password,
34 'password_confirmation': password_confirmation,
34 'password_confirmation': password_confirmation,
35 'firstname': name,
35 'firstname': name,
36 'active': True,
36 'active': True,
37 'lastname': lastname,
37 'lastname': lastname,
38 'email': email})
38 'email': email})
39
39
40 self.checkSessionFlash(response, '''Created user %s''' % (username))
40 self.checkSessionFlash(response, '''Created user %s''' % (username))
41
41
42 new_user = self.Session.query(User).\
42 new_user = self.Session.query(User).\
43 filter(User.username == username).one()
43 filter(User.username == username).one()
44
44
45 self.assertEqual(new_user.username, username)
45 self.assertEqual(new_user.username, username)
46 self.assertEqual(check_password(password, new_user.password), True)
46 self.assertEqual(check_password(password, new_user.password), True)
47 self.assertEqual(new_user.name, name)
47 self.assertEqual(new_user.name, name)
48 self.assertEqual(new_user.lastname, lastname)
48 self.assertEqual(new_user.lastname, lastname)
49 self.assertEqual(new_user.email, email)
49 self.assertEqual(new_user.email, email)
50
50
51 response.follow()
51 response.follow()
52 response = response.follow()
52 response = response.follow()
53 response.mustcontain("""newtestuser""")
53 response.mustcontain("""newtestuser""")
54
54
55 def test_create_err(self):
55 def test_create_err(self):
56 self.log_user()
56 self.log_user()
57 username = 'new_user'
57 username = 'new_user'
58 password = ''
58 password = ''
59 name = 'name'
59 name = 'name'
60 lastname = 'lastname'
60 lastname = 'lastname'
61 email = 'errmail.com'
61 email = 'errmail.com'
62
62
63 response = self.app.post(url('users'), {'username': username,
63 response = self.app.post(url('users'), {'username': username,
64 'password': password,
64 'password': password,
65 'name': name,
65 'name': name,
66 'active': False,
66 'active': False,
67 'lastname': lastname,
67 'lastname': lastname,
68 'email': email})
68 'email': email})
69
69
70 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
70 msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
71 msg = h.html_escape(msg % {'username': 'new_user'})
71 msg = h.html_escape(msg % {'username': 'new_user'})
72 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
72 response.mustcontain("""<span class="error-message">%s</span>""" % msg)
73 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
73 response.mustcontain("""<span class="error-message">Please enter a value</span>""")
74 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
74 response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
75
75
76 def get_user():
76 def get_user():
77 self.Session.query(User).filter(User.username == username).one()
77 self.Session.query(User).filter(User.username == username).one()
78
78
79 self.assertRaises(NoResultFound, get_user), 'found user in database'
79 self.assertRaises(NoResultFound, get_user), 'found user in database'
80
80
81 def test_new(self):
81 def test_new(self):
82 self.log_user()
82 self.log_user()
83 response = self.app.get(url('new_user'))
83 response = self.app.get(url('new_user'))
84
84
85 def test_new_as_xml(self):
85 def test_new_as_xml(self):
86 response = self.app.get(url('formatted_new_user', format='xml'))
86 response = self.app.get(url('formatted_new_user', format='xml'))
87
87
88 @parameterized.expand([('firstname', 'new_username'),
88 @parameterized.expand([('firstname', 'new_username'),
89 ('lastname', 'new_username'),
89 ('lastname', 'new_username'),
90 ('admin', True),
90 ('admin', True),
91 ('admin', False),
91 ('admin', False),
92 ('ldap_dn', 'test'),
92 ('ldap_dn', 'test'),
93 ('ldap_dn', None),
93 ('ldap_dn', None),
94 ('active', False),
94 ('active', False),
95 ('active', True),
95 ('active', True),
96 ('email', 'some@email.com'),
96 ('email', 'some@email.com'),
97 ])
97 ])
98 def test_update(self, name, expected):
98 def test_update(self, name, expected):
99 self.log_user()
99 self.log_user()
100 uname = 'testme'
100 uname = 'testme'
101 usr = UserModel().create_or_update(username=uname, password='qweqwe',
101 usr = UserModel().create_or_update(username=uname, password='qweqwe',
102 email='testme@rhodecod.org')
102 email='testme@rhodecod.org')
103 self.Session().commit()
103 self.Session().commit()
104 params = usr.get_api_data()
104 params = usr.get_api_data()
105 params.update({name: expected})
105 params.update({name: expected})
106 params.update({'password_confirmation': ''})
106 params.update({'password_confirmation': ''})
107 params.update({'new_password': ''})
107 params.update({'new_password': ''})
108 if name == 'email':
108 if name == 'email':
109 params['emails'] = [expected]
109 params['emails'] = [expected]
110 if name == 'ldap_dn':
110 if name == 'ldap_dn':
111 #cannot update this via form
111 #cannot update this via form
112 params['ldap_dn'] = None
112 params['ldap_dn'] = None
113 try:
113 try:
114 response = self.app.put(url('user', id=usr.user_id), params)
114 response = self.app.put(url('user', id=usr.user_id), params)
115
115
116 self.checkSessionFlash(response, '''User updated successfully''')
116 self.checkSessionFlash(response, '''User updated successfully''')
117
117
118 updated_user = User.get_by_username(uname)
118 updated_user = User.get_by_username(uname)
119 updated_params = updated_user.get_api_data()
119 updated_params = updated_user.get_api_data()
120 updated_params.update({'password_confirmation': ''})
120 updated_params.update({'password_confirmation': ''})
121 updated_params.update({'new_password': ''})
121 updated_params.update({'new_password': ''})
122
122
123 self.assertEqual(params, updated_params)
123 self.assertEqual(params, updated_params)
124
124
125 finally:
125 finally:
126 UserModel().delete('testme')
126 UserModel().delete('testme')
127
127
128 def test_update_browser_fakeout(self):
128 def test_update_browser_fakeout(self):
129 response = self.app.post(url('user', id=1), params=dict(_method='put'))
129 response = self.app.post(url('user', id=1), params=dict(_method='put'))
130
130
131 def test_delete(self):
131 def test_delete(self):
132 self.log_user()
132 self.log_user()
133 username = 'newtestuserdeleteme'
133 username = 'newtestuserdeleteme'
134 password = 'test12'
134 password = 'test12'
135 name = 'name'
135 name = 'name'
136 lastname = 'lastname'
136 lastname = 'lastname'
137 email = 'todeletemail@mail.com'
137 email = 'todeletemail@mail.com'
138
138
139 response = self.app.post(url('users'), {'username': username,
139 response = self.app.post(url('users'), {'username': username,
140 'password': password,
140 'password': password,
141 'password_confirmation': password,
141 'password_confirmation': password,
142 'firstname': name,
142 'firstname': name,
143 'active': True,
143 'active': True,
144 'lastname': lastname,
144 'lastname': lastname,
145 'email': email})
145 'email': email})
146
146
147 response = response.follow()
147 response = response.follow()
148
148
149 new_user = self.Session.query(User)\
149 new_user = self.Session.query(User)\
150 .filter(User.username == username).one()
150 .filter(User.username == username).one()
151 response = self.app.delete(url('user', id=new_user.user_id))
151 response = self.app.delete(url('user', id=new_user.user_id))
152
152
153 self.assertTrue("""Successfully deleted user""" in
153 self.checkSessionFlash(response, 'Successfully deleted user')
154 response.session['flash'][0])
155
154
156 def test_delete_browser_fakeout(self):
155 def test_delete_browser_fakeout(self):
157 response = self.app.post(url('user', id=1),
156 response = self.app.post(url('user', id=1),
158 params=dict(_method='delete'))
157 params=dict(_method='delete'))
159
158
160 def test_show(self):
159 def test_show(self):
161 response = self.app.get(url('user', id=1))
160 response = self.app.get(url('user', id=1))
162
161
163 def test_show_as_xml(self):
162 def test_show_as_xml(self):
164 response = self.app.get(url('formatted_user', id=1, format='xml'))
163 response = self.app.get(url('formatted_user', id=1, format='xml'))
165
164
166 def test_edit(self):
165 def test_edit(self):
167 self.log_user()
166 self.log_user()
168 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
167 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
169 response = self.app.get(url('edit_user', id=user.user_id))
168 response = self.app.get(url('edit_user', id=user.user_id))
170
169
171 def test_add_perm_create_repo(self):
170 def test_add_perm_create_repo(self):
172 self.log_user()
171 self.log_user()
173 perm_none = Permission.get_by_key('hg.create.none')
172 perm_none = Permission.get_by_key('hg.create.none')
174 perm_create = Permission.get_by_key('hg.create.repository')
173 perm_create = Permission.get_by_key('hg.create.repository')
175
174
176 user = UserModel().create_or_update(username='dummy', password='qwe',
175 user = UserModel().create_or_update(username='dummy', password='qwe',
177 email='dummy', firstname='a',
176 email='dummy', firstname='a',
178 lastname='b')
177 lastname='b')
179 Session().commit()
178 Session().commit()
180 uid = user.user_id
179 uid = user.user_id
181
180
182 try:
181 try:
183 #User should have None permission on creation repository
182 #User should have None permission on creation repository
184 self.assertEqual(UserModel().has_perm(user, perm_none), False)
183 self.assertEqual(UserModel().has_perm(user, perm_none), False)
185 self.assertEqual(UserModel().has_perm(user, perm_create), False)
184 self.assertEqual(UserModel().has_perm(user, perm_create), False)
186
185
187 response = self.app.post(url('user_perm', id=uid),
186 response = self.app.post(url('user_perm', id=uid),
188 params=dict(_method='put',
187 params=dict(_method='put',
189 create_repo_perm=True))
188 create_repo_perm=True))
190
189
191 perm_none = Permission.get_by_key('hg.create.none')
190 perm_none = Permission.get_by_key('hg.create.none')
192 perm_create = Permission.get_by_key('hg.create.repository')
191 perm_create = Permission.get_by_key('hg.create.repository')
193
192
194 #User should have None permission on creation repository
193 #User should have None permission on creation repository
195 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
194 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
196 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
195 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
197 finally:
196 finally:
198 UserModel().delete(uid)
197 UserModel().delete(uid)
199 Session().commit()
198 Session().commit()
200
199
201 def test_revoke_perm_create_repo(self):
200 def test_revoke_perm_create_repo(self):
202 self.log_user()
201 self.log_user()
203 perm_none = Permission.get_by_key('hg.create.none')
202 perm_none = Permission.get_by_key('hg.create.none')
204 perm_create = Permission.get_by_key('hg.create.repository')
203 perm_create = Permission.get_by_key('hg.create.repository')
205
204
206 user = UserModel().create_or_update(username='dummy', password='qwe',
205 user = UserModel().create_or_update(username='dummy', password='qwe',
207 email='dummy', firstname='a',
206 email='dummy', firstname='a',
208 lastname='b')
207 lastname='b')
209 Session().commit()
208 Session().commit()
210 uid = user.user_id
209 uid = user.user_id
211
210
212 try:
211 try:
213 #User should have None permission on creation repository
212 #User should have None permission on creation repository
214 self.assertEqual(UserModel().has_perm(user, perm_none), False)
213 self.assertEqual(UserModel().has_perm(user, perm_none), False)
215 self.assertEqual(UserModel().has_perm(user, perm_create), False)
214 self.assertEqual(UserModel().has_perm(user, perm_create), False)
216
215
217 response = self.app.post(url('user_perm', id=uid),
216 response = self.app.post(url('user_perm', id=uid),
218 params=dict(_method='put'))
217 params=dict(_method='put'))
219
218
220 perm_none = Permission.get_by_key('hg.create.none')
219 perm_none = Permission.get_by_key('hg.create.none')
221 perm_create = Permission.get_by_key('hg.create.repository')
220 perm_create = Permission.get_by_key('hg.create.repository')
222
221
223 #User should have None permission on creation repository
222 #User should have None permission on creation repository
224 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
223 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
225 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
224 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
226 finally:
225 finally:
227 UserModel().delete(uid)
226 UserModel().delete(uid)
228 Session().commit()
227 Session().commit()
229
228
230 def test_add_perm_fork_repo(self):
229 def test_add_perm_fork_repo(self):
231 self.log_user()
230 self.log_user()
232 perm_none = Permission.get_by_key('hg.fork.none')
231 perm_none = Permission.get_by_key('hg.fork.none')
233 perm_fork = Permission.get_by_key('hg.fork.repository')
232 perm_fork = Permission.get_by_key('hg.fork.repository')
234
233
235 user = UserModel().create_or_update(username='dummy', password='qwe',
234 user = UserModel().create_or_update(username='dummy', password='qwe',
236 email='dummy', firstname='a',
235 email='dummy', firstname='a',
237 lastname='b')
236 lastname='b')
238 Session().commit()
237 Session().commit()
239 uid = user.user_id
238 uid = user.user_id
240
239
241 try:
240 try:
242 #User should have None permission on creation repository
241 #User should have None permission on creation repository
243 self.assertEqual(UserModel().has_perm(user, perm_none), False)
242 self.assertEqual(UserModel().has_perm(user, perm_none), False)
244 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
243 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
245
244
246 response = self.app.post(url('user_perm', id=uid),
245 response = self.app.post(url('user_perm', id=uid),
247 params=dict(_method='put',
246 params=dict(_method='put',
248 create_repo_perm=True))
247 create_repo_perm=True))
249
248
250 perm_none = Permission.get_by_key('hg.create.none')
249 perm_none = Permission.get_by_key('hg.create.none')
251 perm_create = Permission.get_by_key('hg.create.repository')
250 perm_create = Permission.get_by_key('hg.create.repository')
252
251
253 #User should have None permission on creation repository
252 #User should have None permission on creation repository
254 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
253 self.assertEqual(UserModel().has_perm(uid, perm_none), False)
255 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
254 self.assertEqual(UserModel().has_perm(uid, perm_create), True)
256 finally:
255 finally:
257 UserModel().delete(uid)
256 UserModel().delete(uid)
258 Session().commit()
257 Session().commit()
259
258
260 def test_revoke_perm_fork_repo(self):
259 def test_revoke_perm_fork_repo(self):
261 self.log_user()
260 self.log_user()
262 perm_none = Permission.get_by_key('hg.fork.none')
261 perm_none = Permission.get_by_key('hg.fork.none')
263 perm_fork = Permission.get_by_key('hg.fork.repository')
262 perm_fork = Permission.get_by_key('hg.fork.repository')
264
263
265 user = UserModel().create_or_update(username='dummy', password='qwe',
264 user = UserModel().create_or_update(username='dummy', password='qwe',
266 email='dummy', firstname='a',
265 email='dummy', firstname='a',
267 lastname='b')
266 lastname='b')
268 Session().commit()
267 Session().commit()
269 uid = user.user_id
268 uid = user.user_id
270
269
271 try:
270 try:
272 #User should have None permission on creation repository
271 #User should have None permission on creation repository
273 self.assertEqual(UserModel().has_perm(user, perm_none), False)
272 self.assertEqual(UserModel().has_perm(user, perm_none), False)
274 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
273 self.assertEqual(UserModel().has_perm(user, perm_fork), False)
275
274
276 response = self.app.post(url('user_perm', id=uid),
275 response = self.app.post(url('user_perm', id=uid),
277 params=dict(_method='put'))
276 params=dict(_method='put'))
278
277
279 perm_none = Permission.get_by_key('hg.create.none')
278 perm_none = Permission.get_by_key('hg.create.none')
280 perm_create = Permission.get_by_key('hg.create.repository')
279 perm_create = Permission.get_by_key('hg.create.repository')
281
280
282 #User should have None permission on creation repository
281 #User should have None permission on creation repository
283 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
282 self.assertEqual(UserModel().has_perm(uid, perm_none), True)
284 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
283 self.assertEqual(UserModel().has_perm(uid, perm_create), False)
285 finally:
284 finally:
286 UserModel().delete(uid)
285 UserModel().delete(uid)
287 Session().commit()
286 Session().commit()
288
287
289 def test_edit_as_xml(self):
288 def test_edit_as_xml(self):
290 response = self.app.get(url('formatted_edit_user', id=1, format='xml'))
289 response = self.app.get(url('formatted_edit_user', id=1, format='xml'))
General Comments 0
You need to be logged in to leave comments. Login now