Show More
1 | NO CONTENT: new file 100644 |
|
NO CONTENT: new file 100644 |
@@ -0,0 +1,316 | |||||
|
1 | import os | |||
|
2 | import unittest | |||
|
3 | from rhodecode.tests import * | |||
|
4 | ||||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |||
|
6 | from rhodecode.model.repo import RepoModel | |||
|
7 | from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm | |||
|
8 | from rhodecode.model.user import UserModel | |||
|
9 | ||||
|
10 | from rhodecode.model.meta import Session | |||
|
11 | from rhodecode.model.users_group import UsersGroupModel | |||
|
12 | from rhodecode.lib.auth import AuthUser | |||
|
13 | ||||
|
14 | ||||
|
15 | def _make_group(path, desc='desc', parent_id=None, | |||
|
16 | skip_if_exists=False): | |||
|
17 | ||||
|
18 | gr = RepoGroup.get_by_group_name(path) | |||
|
19 | if gr and skip_if_exists: | |||
|
20 | return gr | |||
|
21 | ||||
|
22 | gr = ReposGroupModel().create(path, desc, parent_id) | |||
|
23 | return gr | |||
|
24 | ||||
|
25 | ||||
|
26 | class TestPermissions(unittest.TestCase): | |||
|
27 | def __init__(self, methodName='runTest'): | |||
|
28 | super(TestPermissions, self).__init__(methodName=methodName) | |||
|
29 | ||||
|
30 | def setUp(self): | |||
|
31 | self.u1 = UserModel().create_or_update( | |||
|
32 | username=u'u1', password=u'qweqwe', | |||
|
33 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |||
|
34 | ) | |||
|
35 | self.u2 = UserModel().create_or_update( | |||
|
36 | username=u'u2', password=u'qweqwe', | |||
|
37 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |||
|
38 | ) | |||
|
39 | self.anon = User.get_by_username('default') | |||
|
40 | self.a1 = UserModel().create_or_update( | |||
|
41 | username=u'a1', password=u'qweqwe', | |||
|
42 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |||
|
43 | ) | |||
|
44 | Session().commit() | |||
|
45 | ||||
|
46 | def tearDown(self): | |||
|
47 | if hasattr(self, 'test_repo'): | |||
|
48 | RepoModel().delete(repo=self.test_repo) | |||
|
49 | UserModel().delete(self.u1) | |||
|
50 | UserModel().delete(self.u2) | |||
|
51 | UserModel().delete(self.a1) | |||
|
52 | if hasattr(self, 'g1'): | |||
|
53 | ReposGroupModel().delete(self.g1.group_id) | |||
|
54 | if hasattr(self, 'g2'): | |||
|
55 | ReposGroupModel().delete(self.g2.group_id) | |||
|
56 | ||||
|
57 | if hasattr(self, 'ug1'): | |||
|
58 | UsersGroupModel().delete(self.ug1, force=True) | |||
|
59 | ||||
|
60 | Session().commit() | |||
|
61 | ||||
|
62 | def test_default_perms_set(self): | |||
|
63 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
64 | perms = { | |||
|
65 | 'repositories_groups': {}, | |||
|
66 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
67 | u'hg.register.manual_activate']), | |||
|
68 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
69 | } | |||
|
70 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
71 | perms['repositories'][HG_REPO]) | |||
|
72 | new_perm = 'repository.write' | |||
|
73 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |||
|
74 | perm=new_perm) | |||
|
75 | Session().commit() | |||
|
76 | ||||
|
77 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
78 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
79 | new_perm) | |||
|
80 | ||||
|
81 | def test_default_admin_perms_set(self): | |||
|
82 | a1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
83 | perms = { | |||
|
84 | 'repositories_groups': {}, | |||
|
85 | 'global': set([u'hg.admin']), | |||
|
86 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |||
|
87 | } | |||
|
88 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |||
|
89 | perms['repositories'][HG_REPO]) | |||
|
90 | new_perm = 'repository.write' | |||
|
91 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, | |||
|
92 | perm=new_perm) | |||
|
93 | Session().commit() | |||
|
94 | # cannot really downgrade admins permissions !? they still get's set as | |||
|
95 | # admin ! | |||
|
96 | u1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
97 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
98 | perms['repositories'][HG_REPO]) | |||
|
99 | ||||
|
100 | def test_default_group_perms(self): | |||
|
101 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
102 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
103 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
104 | perms = { | |||
|
105 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |||
|
106 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |||
|
107 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
108 | } | |||
|
109 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
110 | perms['repositories'][HG_REPO]) | |||
|
111 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
112 | perms['repositories_groups']) | |||
|
113 | ||||
|
114 | def test_default_admin_group_perms(self): | |||
|
115 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
116 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
117 | a1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
118 | perms = { | |||
|
119 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |||
|
120 | 'global': set(['hg.admin']), | |||
|
121 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |||
|
122 | } | |||
|
123 | ||||
|
124 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |||
|
125 | perms['repositories'][HG_REPO]) | |||
|
126 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
127 | perms['repositories_groups']) | |||
|
128 | ||||
|
129 | def test_propagated_permission_from_users_group(self): | |||
|
130 | # make group | |||
|
131 | self.ug1 = UsersGroupModel().create('G1') | |||
|
132 | # add user to group | |||
|
133 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
134 | ||||
|
135 | # set permission to lower | |||
|
136 | new_perm = 'repository.none' | |||
|
137 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |||
|
138 | Session().commit() | |||
|
139 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
140 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
141 | new_perm) | |||
|
142 | ||||
|
143 | # grant perm for group this should override permission from user | |||
|
144 | new_perm = 'repository.write' | |||
|
145 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |||
|
146 | group_name=self.ug1, | |||
|
147 | perm=new_perm) | |||
|
148 | # check perms | |||
|
149 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
150 | perms = { | |||
|
151 | 'repositories_groups': {}, | |||
|
152 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
153 | u'hg.register.manual_activate']), | |||
|
154 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
155 | } | |||
|
156 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
157 | new_perm) | |||
|
158 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
159 | perms['repositories_groups']) | |||
|
160 | ||||
|
161 | def test_propagated_permission_from_users_group_lower_weight(self): | |||
|
162 | # make group | |||
|
163 | self.ug1 = UsersGroupModel().create('G1') | |||
|
164 | # add user to group | |||
|
165 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
166 | ||||
|
167 | # set permission to lower | |||
|
168 | new_perm_h = 'repository.write' | |||
|
169 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |||
|
170 | perm=new_perm_h) | |||
|
171 | Session().commit() | |||
|
172 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
173 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
174 | new_perm_h) | |||
|
175 | ||||
|
176 | # grant perm for group this should NOT override permission from user | |||
|
177 | # since it's lower than granted | |||
|
178 | new_perm_l = 'repository.read' | |||
|
179 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |||
|
180 | group_name=self.ug1, | |||
|
181 | perm=new_perm_l) | |||
|
182 | # check perms | |||
|
183 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
184 | perms = { | |||
|
185 | 'repositories_groups': {}, | |||
|
186 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
187 | u'hg.register.manual_activate']), | |||
|
188 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |||
|
189 | } | |||
|
190 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
191 | new_perm_h) | |||
|
192 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
193 | perms['repositories_groups']) | |||
|
194 | ||||
|
195 | def test_repo_in_group_permissions(self): | |||
|
196 | self.g1 = _make_group('group1', skip_if_exists=True) | |||
|
197 | self.g2 = _make_group('group2', skip_if_exists=True) | |||
|
198 | Session().commit() | |||
|
199 | # both perms should be read ! | |||
|
200 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
201 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
202 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
203 | ||||
|
204 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
205 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
206 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
207 | ||||
|
208 | #Change perms to none for both groups | |||
|
209 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
210 | user=self.anon, | |||
|
211 | perm='group.none') | |||
|
212 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |||
|
213 | user=self.anon, | |||
|
214 | perm='group.none') | |||
|
215 | ||||
|
216 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
217 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
218 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
219 | ||||
|
220 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
221 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
222 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
223 | ||||
|
224 | # add repo to group | |||
|
225 | name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm']) | |||
|
226 | self.test_repo = RepoModel().create_repo( | |||
|
227 | repo_name=name, | |||
|
228 | repo_type='hg', | |||
|
229 | description='', | |||
|
230 | owner=self.u1, | |||
|
231 | ) | |||
|
232 | Session().commit() | |||
|
233 | ||||
|
234 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
235 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
236 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
237 | ||||
|
238 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
239 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
240 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
241 | ||||
|
242 | #grant permission for u2 ! | |||
|
243 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
244 | user=self.u2, | |||
|
245 | perm='group.read') | |||
|
246 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |||
|
247 | user=self.u2, | |||
|
248 | perm='group.read') | |||
|
249 | Session().commit() | |||
|
250 | self.assertNotEqual(self.u1, self.u2) | |||
|
251 | #u1 and anon should have not change perms while u2 should ! | |||
|
252 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
253 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
254 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
255 | ||||
|
256 | u2_auth = AuthUser(user_id=self.u2.user_id) | |||
|
257 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |||
|
258 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
259 | ||||
|
260 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
261 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
262 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
263 | ||||
|
264 | def test_repo_group_user_as_user_group_member(self): | |||
|
265 | # create Group1 | |||
|
266 | self.g1 = _make_group('group1', skip_if_exists=True) | |||
|
267 | Session().commit() | |||
|
268 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
269 | ||||
|
270 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
271 | {u'group1': u'group.read'}) | |||
|
272 | ||||
|
273 | # set default permission to none | |||
|
274 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
275 | user=self.anon, | |||
|
276 | perm='group.none') | |||
|
277 | # make group | |||
|
278 | self.ug1 = UsersGroupModel().create('G1') | |||
|
279 | # add user to group | |||
|
280 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
281 | Session().commit() | |||
|
282 | ||||
|
283 | # check if user is in the group | |||
|
284 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |||
|
285 | self.assertEqual(membrs, [self.u1.user_id]) | |||
|
286 | # add some user to that group | |||
|
287 | ||||
|
288 | # check his permissions | |||
|
289 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
290 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
291 | {u'group1': u'group.none'}) | |||
|
292 | ||||
|
293 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
294 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
295 | {u'group1': u'group.none'}) | |||
|
296 | ||||
|
297 | # grant ug1 read permissions for | |||
|
298 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |||
|
299 | group_name=self.ug1, | |||
|
300 | perm='group.read') | |||
|
301 | Session().commit() | |||
|
302 | # check if the | |||
|
303 | obj = Session().query(UsersGroupRepoGroupToPerm)\ | |||
|
304 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |||
|
305 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |||
|
306 | .scalar() | |||
|
307 | self.assertEqual(obj.permission.permission_name, 'group.read') | |||
|
308 | ||||
|
309 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
310 | ||||
|
311 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
312 | {u'group1': u'group.none'}) | |||
|
313 | ||||
|
314 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
315 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
316 | {u'group1': u'group.read'}) |
@@ -0,0 +1,170 | |||||
|
1 | import os | |||
|
2 | import unittest | |||
|
3 | from rhodecode.tests import * | |||
|
4 | ||||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |||
|
6 | from rhodecode.model.repo import RepoModel | |||
|
7 | from rhodecode.model.db import RepoGroup, User | |||
|
8 | from rhodecode.model.meta import Session | |||
|
9 | from sqlalchemy.exc import IntegrityError | |||
|
10 | ||||
|
11 | ||||
|
12 | def _make_group(path, desc='desc', parent_id=None, | |||
|
13 | skip_if_exists=False): | |||
|
14 | ||||
|
15 | gr = RepoGroup.get_by_group_name(path) | |||
|
16 | if gr and skip_if_exists: | |||
|
17 | return gr | |||
|
18 | ||||
|
19 | gr = ReposGroupModel().create(path, desc, parent_id) | |||
|
20 | return gr | |||
|
21 | ||||
|
22 | ||||
|
23 | class TestReposGroups(unittest.TestCase): | |||
|
24 | ||||
|
25 | def setUp(self): | |||
|
26 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
27 | Session().commit() | |||
|
28 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
29 | Session().commit() | |||
|
30 | self.g3 = _make_group('test3', skip_if_exists=True) | |||
|
31 | Session().commit() | |||
|
32 | ||||
|
33 | def tearDown(self): | |||
|
34 | print 'out' | |||
|
35 | ||||
|
36 | def __check_path(self, *path): | |||
|
37 | """ | |||
|
38 | Checks the path for existance ! | |||
|
39 | """ | |||
|
40 | path = [TESTS_TMP_PATH] + list(path) | |||
|
41 | path = os.path.join(*path) | |||
|
42 | return os.path.isdir(path) | |||
|
43 | ||||
|
44 | def _check_folders(self): | |||
|
45 | print os.listdir(TESTS_TMP_PATH) | |||
|
46 | ||||
|
47 | def __delete_group(self, id_): | |||
|
48 | ReposGroupModel().delete(id_) | |||
|
49 | ||||
|
50 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |||
|
51 | form_data = dict( | |||
|
52 | group_name=path, | |||
|
53 | group_description=desc, | |||
|
54 | group_parent_id=parent_id, | |||
|
55 | perms_updates=[], | |||
|
56 | perms_new=[] | |||
|
57 | ) | |||
|
58 | gr = ReposGroupModel().update(id_, form_data) | |||
|
59 | return gr | |||
|
60 | ||||
|
61 | def test_create_group(self): | |||
|
62 | g = _make_group('newGroup') | |||
|
63 | self.assertEqual(g.full_path, 'newGroup') | |||
|
64 | ||||
|
65 | self.assertTrue(self.__check_path('newGroup')) | |||
|
66 | ||||
|
67 | def test_create_same_name_group(self): | |||
|
68 | self.assertRaises(IntegrityError, lambda: _make_group('newGroup')) | |||
|
69 | Session().rollback() | |||
|
70 | ||||
|
71 | def test_same_subgroup(self): | |||
|
72 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |||
|
73 | self.assertEqual(sg1.parent_group, self.g1) | |||
|
74 | self.assertEqual(sg1.full_path, 'test1/sub1') | |||
|
75 | self.assertTrue(self.__check_path('test1', 'sub1')) | |||
|
76 | ||||
|
77 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |||
|
78 | self.assertEqual(ssg1.parent_group, sg1) | |||
|
79 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |||
|
80 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |||
|
81 | ||||
|
82 | def test_remove_group(self): | |||
|
83 | sg1 = _make_group('deleteme') | |||
|
84 | self.__delete_group(sg1.group_id) | |||
|
85 | ||||
|
86 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |||
|
87 | self.assertFalse(self.__check_path('deteteme')) | |||
|
88 | ||||
|
89 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |||
|
90 | self.__delete_group(sg1.group_id) | |||
|
91 | ||||
|
92 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |||
|
93 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |||
|
94 | ||||
|
95 | def test_rename_single_group(self): | |||
|
96 | sg1 = _make_group('initial') | |||
|
97 | ||||
|
98 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |||
|
99 | self.assertTrue(self.__check_path('after')) | |||
|
100 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |||
|
101 | ||||
|
102 | def test_update_group_parent(self): | |||
|
103 | ||||
|
104 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |||
|
105 | ||||
|
106 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |||
|
107 | self.assertTrue(self.__check_path('test1', 'after')) | |||
|
108 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |||
|
109 | ||||
|
110 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |||
|
111 | self.assertTrue(self.__check_path('test3', 'after')) | |||
|
112 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |||
|
113 | ||||
|
114 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |||
|
115 | self.assertTrue(self.__check_path('hello')) | |||
|
116 | ||||
|
117 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |||
|
118 | ||||
|
119 | def test_subgrouping_with_repo(self): | |||
|
120 | ||||
|
121 | g1 = _make_group('g1') | |||
|
122 | g2 = _make_group('g2') | |||
|
123 | ||||
|
124 | # create new repo | |||
|
125 | form_data = dict(repo_name='john', | |||
|
126 | repo_name_full='john', | |||
|
127 | fork_name=None, | |||
|
128 | description=None, | |||
|
129 | repo_group=None, | |||
|
130 | private=False, | |||
|
131 | repo_type='hg', | |||
|
132 | clone_uri=None, | |||
|
133 | landing_rev='tip') | |||
|
134 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |||
|
135 | r = RepoModel().create(form_data, cur_user) | |||
|
136 | ||||
|
137 | self.assertEqual(r.repo_name, 'john') | |||
|
138 | ||||
|
139 | # put repo into group | |||
|
140 | form_data = form_data | |||
|
141 | form_data['repo_group'] = g1.group_id | |||
|
142 | form_data['perms_new'] = [] | |||
|
143 | form_data['perms_updates'] = [] | |||
|
144 | RepoModel().update(r.repo_name, form_data) | |||
|
145 | self.assertEqual(r.repo_name, 'g1/john') | |||
|
146 | ||||
|
147 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |||
|
148 | self.assertTrue(self.__check_path('g2', 'g1')) | |||
|
149 | ||||
|
150 | # test repo | |||
|
151 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', | |||
|
152 | r.just_name])) | |||
|
153 | ||||
|
154 | def test_move_to_root(self): | |||
|
155 | g1 = _make_group('t11') | |||
|
156 | Session().commit() | |||
|
157 | g2 = _make_group('t22', parent_id=g1.group_id) | |||
|
158 | Session().commit() | |||
|
159 | ||||
|
160 | self.assertEqual(g2.full_path, 't11/t22') | |||
|
161 | self.assertTrue(self.__check_path('t11', 't22')) | |||
|
162 | ||||
|
163 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |||
|
164 | Session().commit() | |||
|
165 | ||||
|
166 | self.assertEqual(g2.group_name, 'g22') | |||
|
167 | # we moved out group from t1 to '' so it's full path should be 'g2' | |||
|
168 | self.assertEqual(g2.full_path, 'g22') | |||
|
169 | self.assertFalse(self.__check_path('t11', 't22')) | |||
|
170 | self.assertTrue(self.__check_path('g22')) No newline at end of file |
@@ -0,0 +1,124 | |||||
|
1 | import unittest | |||
|
2 | from rhodecode.tests import * | |||
|
3 | ||||
|
4 | from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\ | |||
|
5 | Permission | |||
|
6 | from rhodecode.model.user import UserModel | |||
|
7 | ||||
|
8 | from rhodecode.model.meta import Session | |||
|
9 | from rhodecode.model.users_group import UsersGroupModel | |||
|
10 | ||||
|
11 | ||||
|
12 | class TestUser(unittest.TestCase): | |||
|
13 | def __init__(self, methodName='runTest'): | |||
|
14 | Session.remove() | |||
|
15 | super(TestUser, self).__init__(methodName=methodName) | |||
|
16 | ||||
|
17 | def test_create_and_remove(self): | |||
|
18 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
19 | password=u'qweqwe', | |||
|
20 | email=u'u232@rhodecode.org', | |||
|
21 | firstname=u'u1', lastname=u'u1') | |||
|
22 | Session().commit() | |||
|
23 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |||
|
24 | ||||
|
25 | # make users group | |||
|
26 | users_group = UsersGroupModel().create('some_example_group') | |||
|
27 | Session().commit() | |||
|
28 | ||||
|
29 | UsersGroupModel().add_user_to_group(users_group, usr) | |||
|
30 | Session().commit() | |||
|
31 | ||||
|
32 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |||
|
33 | self.assertEqual(UsersGroupMember.query().count(), 1) | |||
|
34 | UserModel().delete(usr.user_id) | |||
|
35 | Session().commit() | |||
|
36 | ||||
|
37 | self.assertEqual(UsersGroupMember.query().all(), []) | |||
|
38 | ||||
|
39 | def test_additonal_email_as_main(self): | |||
|
40 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
41 | password=u'qweqwe', | |||
|
42 | email=u'main_email@rhodecode.org', | |||
|
43 | firstname=u'u1', lastname=u'u1') | |||
|
44 | Session().commit() | |||
|
45 | ||||
|
46 | def do(): | |||
|
47 | m = UserEmailMap() | |||
|
48 | m.email = u'main_email@rhodecode.org' | |||
|
49 | m.user = usr | |||
|
50 | Session().add(m) | |||
|
51 | Session().commit() | |||
|
52 | self.assertRaises(AttributeError, do) | |||
|
53 | ||||
|
54 | UserModel().delete(usr.user_id) | |||
|
55 | Session().commit() | |||
|
56 | ||||
|
57 | def test_extra_email_map(self): | |||
|
58 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
59 | password=u'qweqwe', | |||
|
60 | email=u'main_email@rhodecode.org', | |||
|
61 | firstname=u'u1', lastname=u'u1') | |||
|
62 | Session().commit() | |||
|
63 | ||||
|
64 | m = UserEmailMap() | |||
|
65 | m.email = u'main_email2@rhodecode.org' | |||
|
66 | m.user = usr | |||
|
67 | Session().add(m) | |||
|
68 | Session().commit() | |||
|
69 | ||||
|
70 | u = User.get_by_email(email='main_email@rhodecode.org') | |||
|
71 | self.assertEqual(usr.user_id, u.user_id) | |||
|
72 | self.assertEqual(usr.username, u.username) | |||
|
73 | ||||
|
74 | u = User.get_by_email(email='main_email2@rhodecode.org') | |||
|
75 | self.assertEqual(usr.user_id, u.user_id) | |||
|
76 | self.assertEqual(usr.username, u.username) | |||
|
77 | u = User.get_by_email(email='main_email3@rhodecode.org') | |||
|
78 | self.assertEqual(None, u) | |||
|
79 | ||||
|
80 | UserModel().delete(usr.user_id) | |||
|
81 | Session().commit() | |||
|
82 | ||||
|
83 | ||||
|
84 | class TestUsers(unittest.TestCase): | |||
|
85 | ||||
|
86 | def __init__(self, methodName='runTest'): | |||
|
87 | super(TestUsers, self).__init__(methodName=methodName) | |||
|
88 | ||||
|
89 | def setUp(self): | |||
|
90 | self.u1 = UserModel().create_or_update(username=u'u1', | |||
|
91 | password=u'qweqwe', | |||
|
92 | email=u'u1@rhodecode.org', | |||
|
93 | firstname=u'u1', lastname=u'u1') | |||
|
94 | ||||
|
95 | def tearDown(self): | |||
|
96 | perm = Permission.query().all() | |||
|
97 | for p in perm: | |||
|
98 | UserModel().revoke_perm(self.u1, p) | |||
|
99 | ||||
|
100 | UserModel().delete(self.u1) | |||
|
101 | Session().commit() | |||
|
102 | ||||
|
103 | def test_add_perm(self): | |||
|
104 | perm = Permission.query().all()[0] | |||
|
105 | UserModel().grant_perm(self.u1, perm) | |||
|
106 | Session().commit() | |||
|
107 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |||
|
108 | ||||
|
109 | def test_has_perm(self): | |||
|
110 | perm = Permission.query().all() | |||
|
111 | for p in perm: | |||
|
112 | has_p = UserModel().has_perm(self.u1, p) | |||
|
113 | self.assertEqual(False, has_p) | |||
|
114 | ||||
|
115 | def test_revoke_perm(self): | |||
|
116 | perm = Permission.query().all()[0] | |||
|
117 | UserModel().grant_perm(self.u1, perm) | |||
|
118 | Session().commit() | |||
|
119 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |||
|
120 | ||||
|
121 | #revoke | |||
|
122 | UserModel().revoke_perm(self.u1, perm) | |||
|
123 | Session().commit() | |||
|
124 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them | |||||
@@ -2,250 +2,11 import os | |||||
2 | import unittest |
|
2 | import unittest | |
3 | from rhodecode.tests import * |
|
3 | from rhodecode.tests import * | |
4 |
|
4 | |||
5 | from rhodecode.model.repos_group import ReposGroupModel |
|
5 | from rhodecode.model.db import User, Notification, UserNotification | |
6 | from rhodecode.model.repo import RepoModel |
|
|||
7 | from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \ |
|
|||
8 | UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\ |
|
|||
9 | Repository, UserEmailMap |
|
|||
10 | from sqlalchemy.exc import IntegrityError, DatabaseError |
|
|||
11 | from rhodecode.model.user import UserModel |
|
6 | from rhodecode.model.user import UserModel | |
12 |
|
7 | |||
13 | from rhodecode.model.meta import Session |
|
8 | from rhodecode.model.meta import Session | |
14 | from rhodecode.model.notification import NotificationModel |
|
9 | from rhodecode.model.notification import NotificationModel | |
15 | from rhodecode.model.users_group import UsersGroupModel |
|
|||
16 | from rhodecode.lib.auth import AuthUser |
|
|||
17 |
|
||||
18 |
|
||||
19 | def _make_group(path, desc='desc', parent_id=None, |
|
|||
20 | skip_if_exists=False): |
|
|||
21 |
|
||||
22 | gr = RepoGroup.get_by_group_name(path) |
|
|||
23 | if gr and skip_if_exists: |
|
|||
24 | return gr |
|
|||
25 |
|
||||
26 | gr = ReposGroupModel().create(path, desc, parent_id) |
|
|||
27 | return gr |
|
|||
28 |
|
||||
29 |
|
||||
30 | class TestReposGroups(unittest.TestCase): |
|
|||
31 |
|
||||
32 | def setUp(self): |
|
|||
33 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
34 | Session.commit() |
|
|||
35 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
36 | Session.commit() |
|
|||
37 | self.g3 = _make_group('test3', skip_if_exists=True) |
|
|||
38 | Session.commit() |
|
|||
39 |
|
||||
40 | def tearDown(self): |
|
|||
41 | print 'out' |
|
|||
42 |
|
||||
43 | def __check_path(self, *path): |
|
|||
44 | """ |
|
|||
45 | Checks the path for existance ! |
|
|||
46 | """ |
|
|||
47 | path = [TESTS_TMP_PATH] + list(path) |
|
|||
48 | path = os.path.join(*path) |
|
|||
49 | return os.path.isdir(path) |
|
|||
50 |
|
||||
51 | def _check_folders(self): |
|
|||
52 | print os.listdir(TESTS_TMP_PATH) |
|
|||
53 |
|
||||
54 | def __delete_group(self, id_): |
|
|||
55 | ReposGroupModel().delete(id_) |
|
|||
56 |
|
||||
57 | def __update_group(self, id_, path, desc='desc', parent_id=None): |
|
|||
58 | form_data = dict( |
|
|||
59 | group_name=path, |
|
|||
60 | group_description=desc, |
|
|||
61 | group_parent_id=parent_id, |
|
|||
62 | perms_updates=[], |
|
|||
63 | perms_new=[] |
|
|||
64 | ) |
|
|||
65 | gr = ReposGroupModel().update(id_, form_data) |
|
|||
66 | return gr |
|
|||
67 |
|
||||
68 | def test_create_group(self): |
|
|||
69 | g = _make_group('newGroup') |
|
|||
70 | self.assertEqual(g.full_path, 'newGroup') |
|
|||
71 |
|
||||
72 | self.assertTrue(self.__check_path('newGroup')) |
|
|||
73 |
|
||||
74 | def test_create_same_name_group(self): |
|
|||
75 | self.assertRaises(IntegrityError, lambda:_make_group('newGroup')) |
|
|||
76 | Session.rollback() |
|
|||
77 |
|
||||
78 | def test_same_subgroup(self): |
|
|||
79 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) |
|
|||
80 | self.assertEqual(sg1.parent_group, self.g1) |
|
|||
81 | self.assertEqual(sg1.full_path, 'test1/sub1') |
|
|||
82 | self.assertTrue(self.__check_path('test1', 'sub1')) |
|
|||
83 |
|
||||
84 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) |
|
|||
85 | self.assertEqual(ssg1.parent_group, sg1) |
|
|||
86 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') |
|
|||
87 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) |
|
|||
88 |
|
||||
89 | def test_remove_group(self): |
|
|||
90 | sg1 = _make_group('deleteme') |
|
|||
91 | self.__delete_group(sg1.group_id) |
|
|||
92 |
|
||||
93 | self.assertEqual(RepoGroup.get(sg1.group_id), None) |
|
|||
94 | self.assertFalse(self.__check_path('deteteme')) |
|
|||
95 |
|
||||
96 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) |
|
|||
97 | self.__delete_group(sg1.group_id) |
|
|||
98 |
|
||||
99 | self.assertEqual(RepoGroup.get(sg1.group_id), None) |
|
|||
100 | self.assertFalse(self.__check_path('test1', 'deteteme')) |
|
|||
101 |
|
||||
102 | def test_rename_single_group(self): |
|
|||
103 | sg1 = _make_group('initial') |
|
|||
104 |
|
||||
105 | new_sg1 = self.__update_group(sg1.group_id, 'after') |
|
|||
106 | self.assertTrue(self.__check_path('after')) |
|
|||
107 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) |
|
|||
108 |
|
||||
109 | def test_update_group_parent(self): |
|
|||
110 |
|
||||
111 | sg1 = _make_group('initial', parent_id=self.g1.group_id) |
|
|||
112 |
|
||||
113 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) |
|
|||
114 | self.assertTrue(self.__check_path('test1', 'after')) |
|
|||
115 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) |
|
|||
116 |
|
||||
117 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) |
|
|||
118 | self.assertTrue(self.__check_path('test3', 'after')) |
|
|||
119 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) |
|
|||
120 |
|
||||
121 | new_sg1 = self.__update_group(sg1.group_id, 'hello') |
|
|||
122 | self.assertTrue(self.__check_path('hello')) |
|
|||
123 |
|
||||
124 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) |
|
|||
125 |
|
||||
126 | def test_subgrouping_with_repo(self): |
|
|||
127 |
|
||||
128 | g1 = _make_group('g1') |
|
|||
129 | g2 = _make_group('g2') |
|
|||
130 |
|
||||
131 | # create new repo |
|
|||
132 | form_data = dict(repo_name='john', |
|
|||
133 | repo_name_full='john', |
|
|||
134 | fork_name=None, |
|
|||
135 | description=None, |
|
|||
136 | repo_group=None, |
|
|||
137 | private=False, |
|
|||
138 | repo_type='hg', |
|
|||
139 | clone_uri=None, |
|
|||
140 | landing_rev='tip') |
|
|||
141 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) |
|
|||
142 | r = RepoModel().create(form_data, cur_user) |
|
|||
143 |
|
||||
144 | self.assertEqual(r.repo_name, 'john') |
|
|||
145 |
|
||||
146 | # put repo into group |
|
|||
147 | form_data = form_data |
|
|||
148 | form_data['repo_group'] = g1.group_id |
|
|||
149 | form_data['perms_new'] = [] |
|
|||
150 | form_data['perms_updates'] = [] |
|
|||
151 | RepoModel().update(r.repo_name, form_data) |
|
|||
152 | self.assertEqual(r.repo_name, 'g1/john') |
|
|||
153 |
|
||||
154 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) |
|
|||
155 | self.assertTrue(self.__check_path('g2', 'g1')) |
|
|||
156 |
|
||||
157 | # test repo |
|
|||
158 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name])) |
|
|||
159 |
|
||||
160 | def test_move_to_root(self): |
|
|||
161 | g1 = _make_group('t11') |
|
|||
162 | Session.commit() |
|
|||
163 | g2 = _make_group('t22', parent_id=g1.group_id) |
|
|||
164 | Session.commit() |
|
|||
165 |
|
||||
166 | self.assertEqual(g2.full_path, 't11/t22') |
|
|||
167 | self.assertTrue(self.__check_path('t11', 't22')) |
|
|||
168 |
|
||||
169 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) |
|
|||
170 | Session.commit() |
|
|||
171 |
|
||||
172 | self.assertEqual(g2.group_name, 'g22') |
|
|||
173 | # we moved out group from t1 to '' so it's full path should be 'g2' |
|
|||
174 | self.assertEqual(g2.full_path, 'g22') |
|
|||
175 | self.assertFalse(self.__check_path('t11', 't22')) |
|
|||
176 | self.assertTrue(self.__check_path('g22')) |
|
|||
177 |
|
||||
178 |
|
||||
179 | class TestUser(unittest.TestCase): |
|
|||
180 | def __init__(self, methodName='runTest'): |
|
|||
181 | Session.remove() |
|
|||
182 | super(TestUser, self).__init__(methodName=methodName) |
|
|||
183 |
|
||||
184 | def test_create_and_remove(self): |
|
|||
185 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
186 | password=u'qweqwe', |
|
|||
187 | email=u'u232@rhodecode.org', |
|
|||
188 | firstname=u'u1', lastname=u'u1') |
|
|||
189 | Session.commit() |
|
|||
190 | self.assertEqual(User.get_by_username(u'test_user'), usr) |
|
|||
191 |
|
||||
192 | # make users group |
|
|||
193 | users_group = UsersGroupModel().create('some_example_group') |
|
|||
194 | Session.commit() |
|
|||
195 |
|
||||
196 | UsersGroupModel().add_user_to_group(users_group, usr) |
|
|||
197 | Session.commit() |
|
|||
198 |
|
||||
199 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) |
|
|||
200 | self.assertEqual(UsersGroupMember.query().count(), 1) |
|
|||
201 | UserModel().delete(usr.user_id) |
|
|||
202 | Session.commit() |
|
|||
203 |
|
||||
204 | self.assertEqual(UsersGroupMember.query().all(), []) |
|
|||
205 |
|
||||
206 | def test_additonal_email_as_main(self): |
|
|||
207 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
208 | password=u'qweqwe', |
|
|||
209 | email=u'main_email@rhodecode.org', |
|
|||
210 | firstname=u'u1', lastname=u'u1') |
|
|||
211 | Session.commit() |
|
|||
212 |
|
||||
213 | def do(): |
|
|||
214 | m = UserEmailMap() |
|
|||
215 | m.email = u'main_email@rhodecode.org' |
|
|||
216 | m.user = usr |
|
|||
217 | Session.add(m) |
|
|||
218 | Session.commit() |
|
|||
219 | self.assertRaises(AttributeError, do) |
|
|||
220 |
|
||||
221 | UserModel().delete(usr.user_id) |
|
|||
222 | Session.commit() |
|
|||
223 |
|
||||
224 | def test_extra_email_map(self): |
|
|||
225 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
226 | password=u'qweqwe', |
|
|||
227 | email=u'main_email@rhodecode.org', |
|
|||
228 | firstname=u'u1', lastname=u'u1') |
|
|||
229 | Session.commit() |
|
|||
230 |
|
||||
231 | m = UserEmailMap() |
|
|||
232 | m.email = u'main_email2@rhodecode.org' |
|
|||
233 | m.user = usr |
|
|||
234 | Session.add(m) |
|
|||
235 | Session.commit() |
|
|||
236 |
|
||||
237 | u = User.get_by_email(email='main_email@rhodecode.org') |
|
|||
238 | self.assertEqual(usr.user_id, u.user_id) |
|
|||
239 | self.assertEqual(usr.username, u.username) |
|
|||
240 |
|
||||
241 | u = User.get_by_email(email='main_email2@rhodecode.org') |
|
|||
242 | self.assertEqual(usr.user_id, u.user_id) |
|
|||
243 | self.assertEqual(usr.username, u.username) |
|
|||
244 | u = User.get_by_email(email='main_email3@rhodecode.org') |
|
|||
245 | self.assertEqual(None, u) |
|
|||
246 |
|
||||
247 | UserModel().delete(usr.user_id) |
|
|||
248 | Session.commit() |
|
|||
249 |
|
10 | |||
250 |
|
11 | |||
251 | class TestNotifications(unittest.TestCase): |
|
12 | class TestNotifications(unittest.TestCase): | |
@@ -256,30 +17,30 class TestNotifications(unittest.TestCas | |||||
256 | password=u'qweqwe', |
|
17 | password=u'qweqwe', | |
257 | email=u'u1@rhodecode.org', |
|
18 | email=u'u1@rhodecode.org', | |
258 | firstname=u'u1', lastname=u'u1') |
|
19 | firstname=u'u1', lastname=u'u1') | |
259 | Session.commit() |
|
20 | Session().commit() | |
260 | self.u1 = self.u1.user_id |
|
21 | self.u1 = self.u1.user_id | |
261 |
|
22 | |||
262 | self.u2 = UserModel().create_or_update(username=u'u2', |
|
23 | self.u2 = UserModel().create_or_update(username=u'u2', | |
263 | password=u'qweqwe', |
|
24 | password=u'qweqwe', | |
264 | email=u'u2@rhodecode.org', |
|
25 | email=u'u2@rhodecode.org', | |
265 | firstname=u'u2', lastname=u'u3') |
|
26 | firstname=u'u2', lastname=u'u3') | |
266 | Session.commit() |
|
27 | Session().commit() | |
267 | self.u2 = self.u2.user_id |
|
28 | self.u2 = self.u2.user_id | |
268 |
|
29 | |||
269 | self.u3 = UserModel().create_or_update(username=u'u3', |
|
30 | self.u3 = UserModel().create_or_update(username=u'u3', | |
270 | password=u'qweqwe', |
|
31 | password=u'qweqwe', | |
271 | email=u'u3@rhodecode.org', |
|
32 | email=u'u3@rhodecode.org', | |
272 | firstname=u'u3', lastname=u'u3') |
|
33 | firstname=u'u3', lastname=u'u3') | |
273 | Session.commit() |
|
34 | Session().commit() | |
274 | self.u3 = self.u3.user_id |
|
35 | self.u3 = self.u3.user_id | |
275 |
|
36 | |||
276 | super(TestNotifications, self).__init__(methodName=methodName) |
|
37 | super(TestNotifications, self).__init__(methodName=methodName) | |
277 |
|
38 | |||
278 | def _clean_notifications(self): |
|
39 | def _clean_notifications(self): | |
279 | for n in Notification.query().all(): |
|
40 | for n in Notification.query().all(): | |
280 | Session.delete(n) |
|
41 | Session().delete(n) | |
281 |
|
42 | |||
282 | Session.commit() |
|
43 | Session().commit() | |
283 | self.assertEqual(Notification.query().all(), []) |
|
44 | self.assertEqual(Notification.query().all(), []) | |
284 |
|
45 | |||
285 | def tearDown(self): |
|
46 | def tearDown(self): | |
@@ -293,7 +54,7 class TestNotifications(unittest.TestCas | |||||
293 | notification = NotificationModel().create(created_by=self.u1, |
|
54 | notification = NotificationModel().create(created_by=self.u1, | |
294 | subject=u'subj', body=u'hi there', |
|
55 | subject=u'subj', body=u'hi there', | |
295 | recipients=usrs) |
|
56 | recipients=usrs) | |
296 | Session.commit() |
|
57 | Session().commit() | |
297 | u1 = User.get(self.u1) |
|
58 | u1 = User.get(self.u1) | |
298 | u2 = User.get(self.u2) |
|
59 | u2 = User.get(self.u2) | |
299 | u3 = User.get(self.u3) |
|
60 | u3 = User.get(self.u3) | |
@@ -316,12 +77,12 class TestNotifications(unittest.TestCas | |||||
316 | notification1 = NotificationModel().create(created_by=self.u1, |
|
77 | notification1 = NotificationModel().create(created_by=self.u1, | |
317 | subject=u'subj', body=u'hi there1', |
|
78 | subject=u'subj', body=u'hi there1', | |
318 | recipients=[self.u3]) |
|
79 | recipients=[self.u3]) | |
319 | Session.commit() |
|
80 | Session().commit() | |
320 | notification2 = NotificationModel().create(created_by=self.u1, |
|
81 | notification2 = NotificationModel().create(created_by=self.u1, | |
321 | subject=u'subj', body=u'hi there2', |
|
82 | subject=u'subj', body=u'hi there2', | |
322 | recipients=[self.u3]) |
|
83 | recipients=[self.u3]) | |
323 | Session.commit() |
|
84 | Session().commit() | |
324 | u3 = Session.query(User).get(self.u3) |
|
85 | u3 = Session().query(User).get(self.u3) | |
325 |
|
86 | |||
326 | self.assertEqual(sorted([x.notification for x in u3.notifications]), |
|
87 | self.assertEqual(sorted([x.notification for x in u3.notifications]), | |
327 | sorted([notification2, notification1])) |
|
88 | sorted([notification2, notification1])) | |
@@ -333,12 +94,12 class TestNotifications(unittest.TestCas | |||||
333 | notification = NotificationModel().create(created_by=self.u1, |
|
94 | notification = NotificationModel().create(created_by=self.u1, | |
334 | subject=u'title', body=u'hi there3', |
|
95 | subject=u'title', body=u'hi there3', | |
335 | recipients=[self.u3, self.u1, self.u2]) |
|
96 | recipients=[self.u3, self.u1, self.u2]) | |
336 | Session.commit() |
|
97 | Session().commit() | |
337 | notifications = Notification.query().all() |
|
98 | notifications = Notification.query().all() | |
338 | self.assertTrue(notification in notifications) |
|
99 | self.assertTrue(notification in notifications) | |
339 |
|
100 | |||
340 | Notification.delete(notification.notification_id) |
|
101 | Notification.delete(notification.notification_id) | |
341 | Session.commit() |
|
102 | Session().commit() | |
342 |
|
103 | |||
343 | notifications = Notification.query().all() |
|
104 | notifications = Notification.query().all() | |
344 | self.assertFalse(notification in notifications) |
|
105 | self.assertFalse(notification in notifications) | |
@@ -355,7 +116,7 class TestNotifications(unittest.TestCas | |||||
355 | notification = NotificationModel().create(created_by=self.u1, |
|
116 | notification = NotificationModel().create(created_by=self.u1, | |
356 | subject=u'title', body=u'hi there3', |
|
117 | subject=u'title', body=u'hi there3', | |
357 | recipients=[self.u3, self.u1, self.u2]) |
|
118 | recipients=[self.u3, self.u1, self.u2]) | |
358 | Session.commit() |
|
119 | Session().commit() | |
359 |
|
120 | |||
360 | unotification = UserNotification.query()\ |
|
121 | unotification = UserNotification.query()\ | |
361 | .filter(UserNotification.notification == |
|
122 | .filter(UserNotification.notification == | |
@@ -367,7 +128,7 class TestNotifications(unittest.TestCas | |||||
367 |
|
128 | |||
368 | NotificationModel().delete(self.u3, |
|
129 | NotificationModel().delete(self.u3, | |
369 | notification.notification_id) |
|
130 | notification.notification_id) | |
370 | Session.commit() |
|
131 | Session().commit() | |
371 |
|
132 | |||
372 | u3notification = UserNotification.query()\ |
|
133 | u3notification = UserNotification.query()\ | |
373 | .filter(UserNotification.notification == |
|
134 | .filter(UserNotification.notification == | |
@@ -402,7 +163,7 class TestNotifications(unittest.TestCas | |||||
402 | NotificationModel().create(created_by=self.u1, |
|
163 | NotificationModel().create(created_by=self.u1, | |
403 | subject=u'title', body=u'hi there_delete', |
|
164 | subject=u'title', body=u'hi there_delete', | |
404 | recipients=[self.u3, self.u1]) |
|
165 | recipients=[self.u3, self.u1]) | |
405 | Session.commit() |
|
166 | Session().commit() | |
406 |
|
167 | |||
407 | self.assertEqual(NotificationModel() |
|
168 | self.assertEqual(NotificationModel() | |
408 | .get_unread_cnt_for_user(self.u1), 1) |
|
169 | .get_unread_cnt_for_user(self.u1), 1) | |
@@ -414,7 +175,7 class TestNotifications(unittest.TestCas | |||||
414 | notification = NotificationModel().create(created_by=self.u1, |
|
175 | notification = NotificationModel().create(created_by=self.u1, | |
415 | subject=u'title', body=u'hi there3', |
|
176 | subject=u'title', body=u'hi there3', | |
416 | recipients=[self.u3, self.u1, self.u2]) |
|
177 | recipients=[self.u3, self.u1, self.u2]) | |
417 | Session.commit() |
|
178 | Session().commit() | |
418 |
|
179 | |||
419 | self.assertEqual(NotificationModel() |
|
180 | self.assertEqual(NotificationModel() | |
420 | .get_unread_cnt_for_user(self.u1), 2) |
|
181 | .get_unread_cnt_for_user(self.u1), 2) | |
@@ -424,338 +185,5 class TestNotifications(unittest.TestCas | |||||
424 | .get_unread_cnt_for_user(self.u3), 2) |
|
185 | .get_unread_cnt_for_user(self.u3), 2) | |
425 |
|
186 | |||
426 |
|
187 | |||
427 | class TestUsers(unittest.TestCase): |
|
|||
428 |
|
||||
429 | def __init__(self, methodName='runTest'): |
|
|||
430 | super(TestUsers, self).__init__(methodName=methodName) |
|
|||
431 |
|
||||
432 | def setUp(self): |
|
|||
433 | self.u1 = UserModel().create_or_update(username=u'u1', |
|
|||
434 | password=u'qweqwe', |
|
|||
435 | email=u'u1@rhodecode.org', |
|
|||
436 | firstname=u'u1', lastname=u'u1') |
|
|||
437 |
|
||||
438 | def tearDown(self): |
|
|||
439 | perm = Permission.query().all() |
|
|||
440 | for p in perm: |
|
|||
441 | UserModel().revoke_perm(self.u1, p) |
|
|||
442 |
|
||||
443 | UserModel().delete(self.u1) |
|
|||
444 | Session.commit() |
|
|||
445 |
|
||||
446 | def test_add_perm(self): |
|
|||
447 | perm = Permission.query().all()[0] |
|
|||
448 | UserModel().grant_perm(self.u1, perm) |
|
|||
449 | Session.commit() |
|
|||
450 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) |
|
|||
451 |
|
||||
452 | def test_has_perm(self): |
|
|||
453 | perm = Permission.query().all() |
|
|||
454 | for p in perm: |
|
|||
455 | has_p = UserModel().has_perm(self.u1, p) |
|
|||
456 | self.assertEqual(False, has_p) |
|
|||
457 |
|
||||
458 | def test_revoke_perm(self): |
|
|||
459 | perm = Permission.query().all()[0] |
|
|||
460 | UserModel().grant_perm(self.u1, perm) |
|
|||
461 | Session.commit() |
|
|||
462 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) |
|
|||
463 |
|
||||
464 | #revoke |
|
|||
465 | UserModel().revoke_perm(self.u1, perm) |
|
|||
466 | Session.commit() |
|
|||
467 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
|
|||
468 |
|
188 | |||
469 |
|
189 | |||
470 | class TestPermissions(unittest.TestCase): |
|
|||
471 | def __init__(self, methodName='runTest'): |
|
|||
472 | super(TestPermissions, self).__init__(methodName=methodName) |
|
|||
473 |
|
||||
474 | def setUp(self): |
|
|||
475 | self.u1 = UserModel().create_or_update( |
|
|||
476 | username=u'u1', password=u'qweqwe', |
|
|||
477 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' |
|
|||
478 | ) |
|
|||
479 | self.u2 = UserModel().create_or_update( |
|
|||
480 | username=u'u2', password=u'qweqwe', |
|
|||
481 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' |
|
|||
482 | ) |
|
|||
483 | self.anon = User.get_by_username('default') |
|
|||
484 | self.a1 = UserModel().create_or_update( |
|
|||
485 | username=u'a1', password=u'qweqwe', |
|
|||
486 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True |
|
|||
487 | ) |
|
|||
488 | Session.commit() |
|
|||
489 |
|
||||
490 | def tearDown(self): |
|
|||
491 | if hasattr(self, 'test_repo'): |
|
|||
492 | RepoModel().delete(repo=self.test_repo) |
|
|||
493 | UserModel().delete(self.u1) |
|
|||
494 | UserModel().delete(self.u2) |
|
|||
495 | UserModel().delete(self.a1) |
|
|||
496 | if hasattr(self, 'g1'): |
|
|||
497 | ReposGroupModel().delete(self.g1.group_id) |
|
|||
498 | if hasattr(self, 'g2'): |
|
|||
499 | ReposGroupModel().delete(self.g2.group_id) |
|
|||
500 |
|
||||
501 | if hasattr(self, 'ug1'): |
|
|||
502 | UsersGroupModel().delete(self.ug1, force=True) |
|
|||
503 |
|
||||
504 | Session.commit() |
|
|||
505 |
|
||||
506 | def test_default_perms_set(self): |
|
|||
507 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
508 | perms = { |
|
|||
509 | 'repositories_groups': {}, |
|
|||
510 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
511 | u'hg.register.manual_activate']), |
|
|||
512 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
513 | } |
|
|||
514 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
515 | perms['repositories'][HG_REPO]) |
|
|||
516 | new_perm = 'repository.write' |
|
|||
517 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) |
|
|||
518 | Session.commit() |
|
|||
519 |
|
||||
520 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
521 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm) |
|
|||
522 |
|
||||
523 | def test_default_admin_perms_set(self): |
|
|||
524 | a1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
525 | perms = { |
|
|||
526 | 'repositories_groups': {}, |
|
|||
527 | 'global': set([u'hg.admin']), |
|
|||
528 | 'repositories': {u'vcs_test_hg': u'repository.admin'} |
|
|||
529 | } |
|
|||
530 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], |
|
|||
531 | perms['repositories'][HG_REPO]) |
|
|||
532 | new_perm = 'repository.write' |
|
|||
533 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm) |
|
|||
534 | Session.commit() |
|
|||
535 | # cannot really downgrade admins permissions !? they still get's set as |
|
|||
536 | # admin ! |
|
|||
537 | u1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
538 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
539 | perms['repositories'][HG_REPO]) |
|
|||
540 |
|
||||
541 | def test_default_group_perms(self): |
|
|||
542 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
543 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
544 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
545 | perms = { |
|
|||
546 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, |
|
|||
547 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), |
|
|||
548 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
549 | } |
|
|||
550 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
551 | perms['repositories'][HG_REPO]) |
|
|||
552 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
553 | perms['repositories_groups']) |
|
|||
554 |
|
||||
555 | def test_default_admin_group_perms(self): |
|
|||
556 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
557 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
558 | a1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
559 | perms = { |
|
|||
560 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, |
|
|||
561 | 'global': set(['hg.admin']), |
|
|||
562 | 'repositories': {u'vcs_test_hg': 'repository.admin'} |
|
|||
563 | } |
|
|||
564 |
|
||||
565 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], |
|
|||
566 | perms['repositories'][HG_REPO]) |
|
|||
567 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
568 | perms['repositories_groups']) |
|
|||
569 |
|
||||
570 | def test_propagated_permission_from_users_group(self): |
|
|||
571 | # make group |
|
|||
572 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
573 | # add user to group |
|
|||
574 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
575 |
|
||||
576 | # set permission to lower |
|
|||
577 | new_perm = 'repository.none' |
|
|||
578 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) |
|
|||
579 | Session.commit() |
|
|||
580 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
581 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
582 | new_perm) |
|
|||
583 |
|
||||
584 | # grant perm for group this should override permission from user |
|
|||
585 | new_perm = 'repository.write' |
|
|||
586 | RepoModel().grant_users_group_permission(repo=HG_REPO, |
|
|||
587 | group_name=self.ug1, |
|
|||
588 | perm=new_perm) |
|
|||
589 | # check perms |
|
|||
590 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
591 | perms = { |
|
|||
592 | 'repositories_groups': {}, |
|
|||
593 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
594 | u'hg.register.manual_activate']), |
|
|||
595 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
596 | } |
|
|||
597 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
598 | new_perm) |
|
|||
599 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
600 | perms['repositories_groups']) |
|
|||
601 |
|
||||
602 | def test_propagated_permission_from_users_group_lower_weight(self): |
|
|||
603 | # make group |
|
|||
604 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
605 | # add user to group |
|
|||
606 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
607 |
|
||||
608 | # set permission to lower |
|
|||
609 | new_perm_h = 'repository.write' |
|
|||
610 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, |
|
|||
611 | perm=new_perm_h) |
|
|||
612 | Session.commit() |
|
|||
613 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
614 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
615 | new_perm_h) |
|
|||
616 |
|
||||
617 | # grant perm for group this should NOT override permission from user |
|
|||
618 | # since it's lower than granted |
|
|||
619 | new_perm_l = 'repository.read' |
|
|||
620 | RepoModel().grant_users_group_permission(repo=HG_REPO, |
|
|||
621 | group_name=self.ug1, |
|
|||
622 | perm=new_perm_l) |
|
|||
623 | # check perms |
|
|||
624 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
625 | perms = { |
|
|||
626 | 'repositories_groups': {}, |
|
|||
627 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
628 | u'hg.register.manual_activate']), |
|
|||
629 | 'repositories': {u'vcs_test_hg': u'repository.write'} |
|
|||
630 | } |
|
|||
631 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
632 | new_perm_h) |
|
|||
633 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
634 | perms['repositories_groups']) |
|
|||
635 |
|
||||
636 | def test_repo_in_group_permissions(self): |
|
|||
637 | self.g1 = _make_group('group1', skip_if_exists=True) |
|
|||
638 | self.g2 = _make_group('group2', skip_if_exists=True) |
|
|||
639 | Session.commit() |
|
|||
640 | # both perms should be read ! |
|
|||
641 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
642 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
643 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
644 |
|
||||
645 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
646 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
647 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
648 |
|
||||
649 | #Change perms to none for both groups |
|
|||
650 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
651 | user=self.anon, |
|
|||
652 | perm='group.none') |
|
|||
653 | ReposGroupModel().grant_user_permission(repos_group=self.g2, |
|
|||
654 | user=self.anon, |
|
|||
655 | perm='group.none') |
|
|||
656 |
|
||||
657 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
658 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
659 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
660 |
|
||||
661 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
662 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
663 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
664 |
|
||||
665 | # add repo to group |
|
|||
666 | form_data = { |
|
|||
667 | 'repo_name': HG_REPO, |
|
|||
668 | 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]), |
|
|||
669 | 'repo_type': 'hg', |
|
|||
670 | 'clone_uri': '', |
|
|||
671 | 'repo_group': self.g1.group_id, |
|
|||
672 | 'description': 'desc', |
|
|||
673 | 'private': False, |
|
|||
674 | 'landing_rev': 'tip' |
|
|||
675 | } |
|
|||
676 | self.test_repo = RepoModel().create(form_data, cur_user=self.u1) |
|
|||
677 | Session.commit() |
|
|||
678 |
|
||||
679 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
680 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
681 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
682 |
|
||||
683 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
684 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
685 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
686 |
|
||||
687 | #grant permission for u2 ! |
|
|||
688 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
689 | user=self.u2, |
|
|||
690 | perm='group.read') |
|
|||
691 | ReposGroupModel().grant_user_permission(repos_group=self.g2, |
|
|||
692 | user=self.u2, |
|
|||
693 | perm='group.read') |
|
|||
694 | Session.commit() |
|
|||
695 | self.assertNotEqual(self.u1, self.u2) |
|
|||
696 | #u1 and anon should have not change perms while u2 should ! |
|
|||
697 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
698 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
699 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
700 |
|
||||
701 | u2_auth = AuthUser(user_id=self.u2.user_id) |
|
|||
702 | self.assertEqual(u2_auth.permissions['repositories_groups'], |
|
|||
703 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
704 |
|
||||
705 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
706 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
707 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
708 |
|
||||
709 | def test_repo_group_user_as_user_group_member(self): |
|
|||
710 | # create Group1 |
|
|||
711 | self.g1 = _make_group('group1', skip_if_exists=True) |
|
|||
712 | Session.commit() |
|
|||
713 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
714 |
|
||||
715 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
716 | {u'group1': u'group.read'}) |
|
|||
717 |
|
||||
718 | # set default permission to none |
|
|||
719 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
720 | user=self.anon, |
|
|||
721 | perm='group.none') |
|
|||
722 | # make group |
|
|||
723 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
724 | # add user to group |
|
|||
725 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
726 | Session.commit() |
|
|||
727 |
|
||||
728 | # check if user is in the group |
|
|||
729 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] |
|
|||
730 | self.assertEqual(membrs, [self.u1.user_id]) |
|
|||
731 | # add some user to that group |
|
|||
732 |
|
||||
733 | # check his permissions |
|
|||
734 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
735 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
736 | {u'group1': u'group.none'}) |
|
|||
737 |
|
||||
738 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
739 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
740 | {u'group1': u'group.none'}) |
|
|||
741 |
|
||||
742 | # grant ug1 read permissions for |
|
|||
743 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, |
|
|||
744 | group_name=self.ug1, |
|
|||
745 | perm='group.read') |
|
|||
746 | Session.commit() |
|
|||
747 | # check if the |
|
|||
748 | obj = Session.query(UsersGroupRepoGroupToPerm)\ |
|
|||
749 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ |
|
|||
750 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ |
|
|||
751 | .scalar() |
|
|||
752 | self.assertEqual(obj.permission.permission_name, 'group.read') |
|
|||
753 |
|
||||
754 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
755 |
|
||||
756 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
757 | {u'group1': u'group.none'}) |
|
|||
758 |
|
||||
759 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
760 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
761 | {u'group1': u'group.read'}) |
|
1 | NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
|
NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
@@ -79,6 +79,7 class Command(object): | |||||
79 | print stdout, stderr |
|
79 | print stdout, stderr | |
80 | return stdout, stderr |
|
80 | return stdout, stderr | |
81 |
|
81 | |||
|
82 | ||||
82 | def get_session(): |
|
83 | def get_session(): | |
83 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
84 | engine = engine_from_config(conf, 'sqlalchemy.db1.') | |
84 | init_model(engine) |
|
85 | init_model(engine) | |
@@ -124,7 +125,6 def create_test_repo(force=True): | |||||
124 | if user is None: |
|
125 | if user is None: | |
125 | raise Exception('user not found') |
|
126 | raise Exception('user not found') | |
126 |
|
127 | |||
127 |
|
||||
128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | |
129 |
|
129 | |||
130 | if repo is None: |
|
130 | if repo is None: | |
@@ -140,6 +140,7 def create_test_repo(force=True): | |||||
140 |
|
140 | |||
141 | print 'done' |
|
141 | print 'done' | |
142 |
|
142 | |||
|
143 | ||||
143 | def set_anonymous_access(enable=True): |
|
144 | def set_anonymous_access(enable=True): | |
144 | sa = get_session() |
|
145 | sa = get_session() | |
145 | user = sa.query(User).filter(User.username == 'default').one() |
|
146 | user = sa.query(User).filter(User.username == 'default').one() | |
@@ -147,6 +148,7 def set_anonymous_access(enable=True): | |||||
147 | sa.add(user) |
|
148 | sa.add(user) | |
148 | sa.commit() |
|
149 | sa.commit() | |
149 |
|
150 | |||
|
151 | ||||
150 | def get_anonymous_access(): |
|
152 | def get_anonymous_access(): | |
151 | sa = get_session() |
|
153 | sa = get_session() | |
152 | return sa.query(User).filter(User.username == 'default').one().active |
|
154 | return sa.query(User).filter(User.username == 'default').one().active |
1 | NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
|
NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
@@ -64,7 +64,8 log = logging.getLogger(__name__) | |||||
64 |
|
64 | |||
65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') | |
66 | init_model(engine) |
|
66 | init_model(engine) | |
67 | sa = meta.Session |
|
67 | sa = meta.Session() | |
|
68 | ||||
68 |
|
69 | |||
69 | class Command(object): |
|
70 | class Command(object): | |
70 |
|
71 | |||
@@ -137,7 +138,6 def create_test_repo(force=True): | |||||
137 | if user is None: |
|
138 | if user is None: | |
138 | raise Exception('user not found') |
|
139 | raise Exception('user not found') | |
139 |
|
140 | |||
140 |
|
||||
141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | |
142 |
|
142 | |||
143 | if repo is None: |
|
143 | if repo is None: | |
@@ -161,6 +161,7 def set_anonymous_access(enable=True): | |||||
161 | if enable != User.get_by_username('default').active: |
|
161 | if enable != User.get_by_username('default').active: | |
162 | raise Exception('Cannot set anonymous access') |
|
162 | raise Exception('Cannot set anonymous access') | |
163 |
|
163 | |||
|
164 | ||||
164 | def get_anonymous_access(): |
|
165 | def get_anonymous_access(): | |
165 | user = User.get_by_username('default') |
|
166 | user = User.get_by_username('default') | |
166 | return user.active |
|
167 | return user.active | |
@@ -235,6 +236,7 def test_clone_anonymous(): | |||||
235 | print '\tdisabling anonymous access' |
|
236 | print '\tdisabling anonymous access' | |
236 | set_anonymous_access(enable=False) |
|
237 | set_anonymous_access(enable=False) | |
237 |
|
238 | |||
|
239 | ||||
238 | @test_wrapp |
|
240 | @test_wrapp | |
239 | def test_clone_wrong_credentials(): |
|
241 | def test_clone_wrong_credentials(): | |
240 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
242 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
@@ -264,10 +266,12 def test_clone_wrong_credentials(): | |||||
264 | if not """abort: authorization failed""" in stderr: |
|
266 | if not """abort: authorization failed""" in stderr: | |
265 | raise Exception('Failure') |
|
267 | raise Exception('Failure') | |
266 |
|
268 | |||
|
269 | ||||
267 | @test_wrapp |
|
270 | @test_wrapp | |
268 | def test_pull(): |
|
271 | def test_pull(): | |
269 | pass |
|
272 | pass | |
270 |
|
273 | |||
|
274 | ||||
271 | @test_wrapp |
|
275 | @test_wrapp | |
272 | def test_push_modify_file(f_name='setup.py'): |
|
276 | def test_push_modify_file(f_name='setup.py'): | |
273 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
277 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
@@ -281,6 +285,7 def test_push_modify_file(f_name='setup. | |||||
281 |
|
285 | |||
282 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) |
|
286 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) | |
283 |
|
287 | |||
|
288 | ||||
284 | @test_wrapp |
|
289 | @test_wrapp | |
285 | def test_push_new_file(commits=15, with_clone=True): |
|
290 | def test_push_new_file(commits=15, with_clone=True): | |
286 |
|
291 | |||
@@ -312,6 +317,7 def test_push_new_file(commits=15, with_ | |||||
312 |
|
317 | |||
313 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) |
|
318 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) | |
314 |
|
319 | |||
|
320 | ||||
315 | @test_wrapp |
|
321 | @test_wrapp | |
316 | def test_push_wrong_credentials(): |
|
322 | def test_push_wrong_credentials(): | |
317 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
323 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
@@ -332,6 +338,7 def test_push_wrong_credentials(): | |||||
332 |
|
338 | |||
333 | Command(cwd).execute('hg push %s' % clone_url) |
|
339 | Command(cwd).execute('hg push %s' % clone_url) | |
334 |
|
340 | |||
|
341 | ||||
335 | @test_wrapp |
|
342 | @test_wrapp | |
336 | def test_push_wrong_path(): |
|
343 | def test_push_wrong_path(): | |
337 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
344 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
@@ -366,10 +373,12 def test_push_wrong_path(): | |||||
366 | if not """abort: HTTP Error 403: Forbidden""" in stderr: |
|
373 | if not """abort: HTTP Error 403: Forbidden""" in stderr: | |
367 | raise Exception('Failure') |
|
374 | raise Exception('Failure') | |
368 |
|
375 | |||
|
376 | ||||
369 | @test_wrapp |
|
377 | @test_wrapp | |
370 | def get_logs(): |
|
378 | def get_logs(): | |
371 | return UserLog.query().all() |
|
379 | return UserLog.query().all() | |
372 |
|
380 | |||
|
381 | ||||
373 | @test_wrapp |
|
382 | @test_wrapp | |
374 | def test_logs(initial): |
|
383 | def test_logs(initial): | |
375 | logs = UserLog.query().all() |
|
384 | logs = UserLog.query().all() |
@@ -22,8 +22,8 function at ``tests/__init__.py``. | |||||
22 | import os |
|
22 | import os | |
23 | from rhodecode.lib import vcs |
|
23 | from rhodecode.lib import vcs | |
24 | from rhodecode.lib.vcs.utils.compat import unittest |
|
24 | from rhodecode.lib.vcs.utils.compat import unittest | |
|
25 | from utils import VCSTestError, SCMFetcher | |||
25 | from rhodecode.tests import * |
|
26 | from rhodecode.tests import * | |
26 | from utils import VCSTestError, SCMFetcher |
|
|||
27 |
|
27 | |||
28 |
|
28 | |||
29 | def setup_package(): |
|
29 | def setup_package(): |
General Comments 0
You need to be logged in to leave comments.
Login now