Show More
|
1 | NO CONTENT: new file 100644 |
@@ -0,0 +1,316 b'' | |||
|
1 | import os | |
|
2 | import unittest | |
|
3 | from rhodecode.tests import * | |
|
4 | ||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm | |
|
8 | from rhodecode.model.user import UserModel | |
|
9 | ||
|
10 | from rhodecode.model.meta import Session | |
|
11 | from rhodecode.model.users_group import UsersGroupModel | |
|
12 | from rhodecode.lib.auth import AuthUser | |
|
13 | ||
|
14 | ||
|
15 | def _make_group(path, desc='desc', parent_id=None, | |
|
16 | skip_if_exists=False): | |
|
17 | ||
|
18 | gr = RepoGroup.get_by_group_name(path) | |
|
19 | if gr and skip_if_exists: | |
|
20 | return gr | |
|
21 | ||
|
22 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
23 | return gr | |
|
24 | ||
|
25 | ||
|
26 | class TestPermissions(unittest.TestCase): | |
|
27 | def __init__(self, methodName='runTest'): | |
|
28 | super(TestPermissions, self).__init__(methodName=methodName) | |
|
29 | ||
|
30 | def setUp(self): | |
|
31 | self.u1 = UserModel().create_or_update( | |
|
32 | username=u'u1', password=u'qweqwe', | |
|
33 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |
|
34 | ) | |
|
35 | self.u2 = UserModel().create_or_update( | |
|
36 | username=u'u2', password=u'qweqwe', | |
|
37 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |
|
38 | ) | |
|
39 | self.anon = User.get_by_username('default') | |
|
40 | self.a1 = UserModel().create_or_update( | |
|
41 | username=u'a1', password=u'qweqwe', | |
|
42 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |
|
43 | ) | |
|
44 | Session().commit() | |
|
45 | ||
|
46 | def tearDown(self): | |
|
47 | if hasattr(self, 'test_repo'): | |
|
48 | RepoModel().delete(repo=self.test_repo) | |
|
49 | UserModel().delete(self.u1) | |
|
50 | UserModel().delete(self.u2) | |
|
51 | UserModel().delete(self.a1) | |
|
52 | if hasattr(self, 'g1'): | |
|
53 | ReposGroupModel().delete(self.g1.group_id) | |
|
54 | if hasattr(self, 'g2'): | |
|
55 | ReposGroupModel().delete(self.g2.group_id) | |
|
56 | ||
|
57 | if hasattr(self, 'ug1'): | |
|
58 | UsersGroupModel().delete(self.ug1, force=True) | |
|
59 | ||
|
60 | Session().commit() | |
|
61 | ||
|
62 | def test_default_perms_set(self): | |
|
63 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
64 | perms = { | |
|
65 | 'repositories_groups': {}, | |
|
66 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
67 | u'hg.register.manual_activate']), | |
|
68 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
69 | } | |
|
70 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
71 | perms['repositories'][HG_REPO]) | |
|
72 | new_perm = 'repository.write' | |
|
73 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
74 | perm=new_perm) | |
|
75 | Session().commit() | |
|
76 | ||
|
77 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
78 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
79 | new_perm) | |
|
80 | ||
|
81 | def test_default_admin_perms_set(self): | |
|
82 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
83 | perms = { | |
|
84 | 'repositories_groups': {}, | |
|
85 | 'global': set([u'hg.admin']), | |
|
86 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |
|
87 | } | |
|
88 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
89 | perms['repositories'][HG_REPO]) | |
|
90 | new_perm = 'repository.write' | |
|
91 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, | |
|
92 | perm=new_perm) | |
|
93 | Session().commit() | |
|
94 | # cannot really downgrade admins permissions !? they still get's set as | |
|
95 | # admin ! | |
|
96 | u1_auth = AuthUser(user_id=self.a1.user_id) | |
|
97 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
98 | perms['repositories'][HG_REPO]) | |
|
99 | ||
|
100 | def test_default_group_perms(self): | |
|
101 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
102 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
103 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
104 | perms = { | |
|
105 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |
|
106 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |
|
107 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
108 | } | |
|
109 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
110 | perms['repositories'][HG_REPO]) | |
|
111 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
112 | perms['repositories_groups']) | |
|
113 | ||
|
114 | def test_default_admin_group_perms(self): | |
|
115 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
116 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
117 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
118 | perms = { | |
|
119 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |
|
120 | 'global': set(['hg.admin']), | |
|
121 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |
|
122 | } | |
|
123 | ||
|
124 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
125 | perms['repositories'][HG_REPO]) | |
|
126 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
127 | perms['repositories_groups']) | |
|
128 | ||
|
129 | def test_propagated_permission_from_users_group(self): | |
|
130 | # make group | |
|
131 | self.ug1 = UsersGroupModel().create('G1') | |
|
132 | # add user to group | |
|
133 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
134 | ||
|
135 | # set permission to lower | |
|
136 | new_perm = 'repository.none' | |
|
137 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
138 | Session().commit() | |
|
139 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
140 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
141 | new_perm) | |
|
142 | ||
|
143 | # grant perm for group this should override permission from user | |
|
144 | new_perm = 'repository.write' | |
|
145 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
146 | group_name=self.ug1, | |
|
147 | perm=new_perm) | |
|
148 | # check perms | |
|
149 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
150 | perms = { | |
|
151 | 'repositories_groups': {}, | |
|
152 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
153 | u'hg.register.manual_activate']), | |
|
154 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
155 | } | |
|
156 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
157 | new_perm) | |
|
158 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
159 | perms['repositories_groups']) | |
|
160 | ||
|
161 | def test_propagated_permission_from_users_group_lower_weight(self): | |
|
162 | # make group | |
|
163 | self.ug1 = UsersGroupModel().create('G1') | |
|
164 | # add user to group | |
|
165 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
166 | ||
|
167 | # set permission to lower | |
|
168 | new_perm_h = 'repository.write' | |
|
169 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
170 | perm=new_perm_h) | |
|
171 | Session().commit() | |
|
172 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
173 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
174 | new_perm_h) | |
|
175 | ||
|
176 | # grant perm for group this should NOT override permission from user | |
|
177 | # since it's lower than granted | |
|
178 | new_perm_l = 'repository.read' | |
|
179 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
180 | group_name=self.ug1, | |
|
181 | perm=new_perm_l) | |
|
182 | # check perms | |
|
183 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
184 | perms = { | |
|
185 | 'repositories_groups': {}, | |
|
186 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
187 | u'hg.register.manual_activate']), | |
|
188 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |
|
189 | } | |
|
190 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
191 | new_perm_h) | |
|
192 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
193 | perms['repositories_groups']) | |
|
194 | ||
|
195 | def test_repo_in_group_permissions(self): | |
|
196 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
197 | self.g2 = _make_group('group2', skip_if_exists=True) | |
|
198 | Session().commit() | |
|
199 | # both perms should be read ! | |
|
200 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
201 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
202 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
203 | ||
|
204 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
205 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
206 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
207 | ||
|
208 | #Change perms to none for both groups | |
|
209 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
210 | user=self.anon, | |
|
211 | perm='group.none') | |
|
212 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
213 | user=self.anon, | |
|
214 | perm='group.none') | |
|
215 | ||
|
216 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
217 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
218 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
219 | ||
|
220 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
221 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
222 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
223 | ||
|
224 | # add repo to group | |
|
225 | name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm']) | |
|
226 | self.test_repo = RepoModel().create_repo( | |
|
227 | repo_name=name, | |
|
228 | repo_type='hg', | |
|
229 | description='', | |
|
230 | owner=self.u1, | |
|
231 | ) | |
|
232 | Session().commit() | |
|
233 | ||
|
234 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
235 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
236 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
237 | ||
|
238 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
239 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
240 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
241 | ||
|
242 | #grant permission for u2 ! | |
|
243 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
244 | user=self.u2, | |
|
245 | perm='group.read') | |
|
246 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
247 | user=self.u2, | |
|
248 | perm='group.read') | |
|
249 | Session().commit() | |
|
250 | self.assertNotEqual(self.u1, self.u2) | |
|
251 | #u1 and anon should have not change perms while u2 should ! | |
|
252 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
253 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
254 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
255 | ||
|
256 | u2_auth = AuthUser(user_id=self.u2.user_id) | |
|
257 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |
|
258 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
259 | ||
|
260 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
261 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
262 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
263 | ||
|
264 | def test_repo_group_user_as_user_group_member(self): | |
|
265 | # create Group1 | |
|
266 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
267 | Session().commit() | |
|
268 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
269 | ||
|
270 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
271 | {u'group1': u'group.read'}) | |
|
272 | ||
|
273 | # set default permission to none | |
|
274 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
275 | user=self.anon, | |
|
276 | perm='group.none') | |
|
277 | # make group | |
|
278 | self.ug1 = UsersGroupModel().create('G1') | |
|
279 | # add user to group | |
|
280 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
281 | Session().commit() | |
|
282 | ||
|
283 | # check if user is in the group | |
|
284 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |
|
285 | self.assertEqual(membrs, [self.u1.user_id]) | |
|
286 | # add some user to that group | |
|
287 | ||
|
288 | # check his permissions | |
|
289 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
290 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
291 | {u'group1': u'group.none'}) | |
|
292 | ||
|
293 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
294 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
295 | {u'group1': u'group.none'}) | |
|
296 | ||
|
297 | # grant ug1 read permissions for | |
|
298 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |
|
299 | group_name=self.ug1, | |
|
300 | perm='group.read') | |
|
301 | Session().commit() | |
|
302 | # check if the | |
|
303 | obj = Session().query(UsersGroupRepoGroupToPerm)\ | |
|
304 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |
|
305 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |
|
306 | .scalar() | |
|
307 | self.assertEqual(obj.permission.permission_name, 'group.read') | |
|
308 | ||
|
309 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
310 | ||
|
311 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
312 | {u'group1': u'group.none'}) | |
|
313 | ||
|
314 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
315 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
316 | {u'group1': u'group.read'}) |
@@ -0,0 +1,170 b'' | |||
|
1 | import os | |
|
2 | import unittest | |
|
3 | from rhodecode.tests import * | |
|
4 | ||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User | |
|
8 | from rhodecode.model.meta import Session | |
|
9 | from sqlalchemy.exc import IntegrityError | |
|
10 | ||
|
11 | ||
|
12 | def _make_group(path, desc='desc', parent_id=None, | |
|
13 | skip_if_exists=False): | |
|
14 | ||
|
15 | gr = RepoGroup.get_by_group_name(path) | |
|
16 | if gr and skip_if_exists: | |
|
17 | return gr | |
|
18 | ||
|
19 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
20 | return gr | |
|
21 | ||
|
22 | ||
|
23 | class TestReposGroups(unittest.TestCase): | |
|
24 | ||
|
25 | def setUp(self): | |
|
26 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
27 | Session().commit() | |
|
28 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
29 | Session().commit() | |
|
30 | self.g3 = _make_group('test3', skip_if_exists=True) | |
|
31 | Session().commit() | |
|
32 | ||
|
33 | def tearDown(self): | |
|
34 | print 'out' | |
|
35 | ||
|
36 | def __check_path(self, *path): | |
|
37 | """ | |
|
38 | Checks the path for existance ! | |
|
39 | """ | |
|
40 | path = [TESTS_TMP_PATH] + list(path) | |
|
41 | path = os.path.join(*path) | |
|
42 | return os.path.isdir(path) | |
|
43 | ||
|
44 | def _check_folders(self): | |
|
45 | print os.listdir(TESTS_TMP_PATH) | |
|
46 | ||
|
47 | def __delete_group(self, id_): | |
|
48 | ReposGroupModel().delete(id_) | |
|
49 | ||
|
50 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |
|
51 | form_data = dict( | |
|
52 | group_name=path, | |
|
53 | group_description=desc, | |
|
54 | group_parent_id=parent_id, | |
|
55 | perms_updates=[], | |
|
56 | perms_new=[] | |
|
57 | ) | |
|
58 | gr = ReposGroupModel().update(id_, form_data) | |
|
59 | return gr | |
|
60 | ||
|
61 | def test_create_group(self): | |
|
62 | g = _make_group('newGroup') | |
|
63 | self.assertEqual(g.full_path, 'newGroup') | |
|
64 | ||
|
65 | self.assertTrue(self.__check_path('newGroup')) | |
|
66 | ||
|
67 | def test_create_same_name_group(self): | |
|
68 | self.assertRaises(IntegrityError, lambda: _make_group('newGroup')) | |
|
69 | Session().rollback() | |
|
70 | ||
|
71 | def test_same_subgroup(self): | |
|
72 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |
|
73 | self.assertEqual(sg1.parent_group, self.g1) | |
|
74 | self.assertEqual(sg1.full_path, 'test1/sub1') | |
|
75 | self.assertTrue(self.__check_path('test1', 'sub1')) | |
|
76 | ||
|
77 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |
|
78 | self.assertEqual(ssg1.parent_group, sg1) | |
|
79 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |
|
80 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |
|
81 | ||
|
82 | def test_remove_group(self): | |
|
83 | sg1 = _make_group('deleteme') | |
|
84 | self.__delete_group(sg1.group_id) | |
|
85 | ||
|
86 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
87 | self.assertFalse(self.__check_path('deteteme')) | |
|
88 | ||
|
89 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |
|
90 | self.__delete_group(sg1.group_id) | |
|
91 | ||
|
92 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
93 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |
|
94 | ||
|
95 | def test_rename_single_group(self): | |
|
96 | sg1 = _make_group('initial') | |
|
97 | ||
|
98 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |
|
99 | self.assertTrue(self.__check_path('after')) | |
|
100 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |
|
101 | ||
|
102 | def test_update_group_parent(self): | |
|
103 | ||
|
104 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |
|
105 | ||
|
106 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |
|
107 | self.assertTrue(self.__check_path('test1', 'after')) | |
|
108 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |
|
109 | ||
|
110 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |
|
111 | self.assertTrue(self.__check_path('test3', 'after')) | |
|
112 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |
|
113 | ||
|
114 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |
|
115 | self.assertTrue(self.__check_path('hello')) | |
|
116 | ||
|
117 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |
|
118 | ||
|
119 | def test_subgrouping_with_repo(self): | |
|
120 | ||
|
121 | g1 = _make_group('g1') | |
|
122 | g2 = _make_group('g2') | |
|
123 | ||
|
124 | # create new repo | |
|
125 | form_data = dict(repo_name='john', | |
|
126 | repo_name_full='john', | |
|
127 | fork_name=None, | |
|
128 | description=None, | |
|
129 | repo_group=None, | |
|
130 | private=False, | |
|
131 | repo_type='hg', | |
|
132 | clone_uri=None, | |
|
133 | landing_rev='tip') | |
|
134 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |
|
135 | r = RepoModel().create(form_data, cur_user) | |
|
136 | ||
|
137 | self.assertEqual(r.repo_name, 'john') | |
|
138 | ||
|
139 | # put repo into group | |
|
140 | form_data = form_data | |
|
141 | form_data['repo_group'] = g1.group_id | |
|
142 | form_data['perms_new'] = [] | |
|
143 | form_data['perms_updates'] = [] | |
|
144 | RepoModel().update(r.repo_name, form_data) | |
|
145 | self.assertEqual(r.repo_name, 'g1/john') | |
|
146 | ||
|
147 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |
|
148 | self.assertTrue(self.__check_path('g2', 'g1')) | |
|
149 | ||
|
150 | # test repo | |
|
151 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', | |
|
152 | r.just_name])) | |
|
153 | ||
|
154 | def test_move_to_root(self): | |
|
155 | g1 = _make_group('t11') | |
|
156 | Session().commit() | |
|
157 | g2 = _make_group('t22', parent_id=g1.group_id) | |
|
158 | Session().commit() | |
|
159 | ||
|
160 | self.assertEqual(g2.full_path, 't11/t22') | |
|
161 | self.assertTrue(self.__check_path('t11', 't22')) | |
|
162 | ||
|
163 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |
|
164 | Session().commit() | |
|
165 | ||
|
166 | self.assertEqual(g2.group_name, 'g22') | |
|
167 | # we moved out group from t1 to '' so it's full path should be 'g2' | |
|
168 | self.assertEqual(g2.full_path, 'g22') | |
|
169 | self.assertFalse(self.__check_path('t11', 't22')) | |
|
170 | self.assertTrue(self.__check_path('g22')) No newline at end of file |
@@ -0,0 +1,124 b'' | |||
|
1 | import unittest | |
|
2 | from rhodecode.tests import * | |
|
3 | ||
|
4 | from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\ | |
|
5 | Permission | |
|
6 | from rhodecode.model.user import UserModel | |
|
7 | ||
|
8 | from rhodecode.model.meta import Session | |
|
9 | from rhodecode.model.users_group import UsersGroupModel | |
|
10 | ||
|
11 | ||
|
12 | class TestUser(unittest.TestCase): | |
|
13 | def __init__(self, methodName='runTest'): | |
|
14 | Session.remove() | |
|
15 | super(TestUser, self).__init__(methodName=methodName) | |
|
16 | ||
|
17 | def test_create_and_remove(self): | |
|
18 | usr = UserModel().create_or_update(username=u'test_user', | |
|
19 | password=u'qweqwe', | |
|
20 | email=u'u232@rhodecode.org', | |
|
21 | firstname=u'u1', lastname=u'u1') | |
|
22 | Session().commit() | |
|
23 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |
|
24 | ||
|
25 | # make users group | |
|
26 | users_group = UsersGroupModel().create('some_example_group') | |
|
27 | Session().commit() | |
|
28 | ||
|
29 | UsersGroupModel().add_user_to_group(users_group, usr) | |
|
30 | Session().commit() | |
|
31 | ||
|
32 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |
|
33 | self.assertEqual(UsersGroupMember.query().count(), 1) | |
|
34 | UserModel().delete(usr.user_id) | |
|
35 | Session().commit() | |
|
36 | ||
|
37 | self.assertEqual(UsersGroupMember.query().all(), []) | |
|
38 | ||
|
39 | def test_additonal_email_as_main(self): | |
|
40 | usr = UserModel().create_or_update(username=u'test_user', | |
|
41 | password=u'qweqwe', | |
|
42 | email=u'main_email@rhodecode.org', | |
|
43 | firstname=u'u1', lastname=u'u1') | |
|
44 | Session().commit() | |
|
45 | ||
|
46 | def do(): | |
|
47 | m = UserEmailMap() | |
|
48 | m.email = u'main_email@rhodecode.org' | |
|
49 | m.user = usr | |
|
50 | Session().add(m) | |
|
51 | Session().commit() | |
|
52 | self.assertRaises(AttributeError, do) | |
|
53 | ||
|
54 | UserModel().delete(usr.user_id) | |
|
55 | Session().commit() | |
|
56 | ||
|
57 | def test_extra_email_map(self): | |
|
58 | usr = UserModel().create_or_update(username=u'test_user', | |
|
59 | password=u'qweqwe', | |
|
60 | email=u'main_email@rhodecode.org', | |
|
61 | firstname=u'u1', lastname=u'u1') | |
|
62 | Session().commit() | |
|
63 | ||
|
64 | m = UserEmailMap() | |
|
65 | m.email = u'main_email2@rhodecode.org' | |
|
66 | m.user = usr | |
|
67 | Session().add(m) | |
|
68 | Session().commit() | |
|
69 | ||
|
70 | u = User.get_by_email(email='main_email@rhodecode.org') | |
|
71 | self.assertEqual(usr.user_id, u.user_id) | |
|
72 | self.assertEqual(usr.username, u.username) | |
|
73 | ||
|
74 | u = User.get_by_email(email='main_email2@rhodecode.org') | |
|
75 | self.assertEqual(usr.user_id, u.user_id) | |
|
76 | self.assertEqual(usr.username, u.username) | |
|
77 | u = User.get_by_email(email='main_email3@rhodecode.org') | |
|
78 | self.assertEqual(None, u) | |
|
79 | ||
|
80 | UserModel().delete(usr.user_id) | |
|
81 | Session().commit() | |
|
82 | ||
|
83 | ||
|
84 | class TestUsers(unittest.TestCase): | |
|
85 | ||
|
86 | def __init__(self, methodName='runTest'): | |
|
87 | super(TestUsers, self).__init__(methodName=methodName) | |
|
88 | ||
|
89 | def setUp(self): | |
|
90 | self.u1 = UserModel().create_or_update(username=u'u1', | |
|
91 | password=u'qweqwe', | |
|
92 | email=u'u1@rhodecode.org', | |
|
93 | firstname=u'u1', lastname=u'u1') | |
|
94 | ||
|
95 | def tearDown(self): | |
|
96 | perm = Permission.query().all() | |
|
97 | for p in perm: | |
|
98 | UserModel().revoke_perm(self.u1, p) | |
|
99 | ||
|
100 | UserModel().delete(self.u1) | |
|
101 | Session().commit() | |
|
102 | ||
|
103 | def test_add_perm(self): | |
|
104 | perm = Permission.query().all()[0] | |
|
105 | UserModel().grant_perm(self.u1, perm) | |
|
106 | Session().commit() | |
|
107 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
108 | ||
|
109 | def test_has_perm(self): | |
|
110 | perm = Permission.query().all() | |
|
111 | for p in perm: | |
|
112 | has_p = UserModel().has_perm(self.u1, p) | |
|
113 | self.assertEqual(False, has_p) | |
|
114 | ||
|
115 | def test_revoke_perm(self): | |
|
116 | perm = Permission.query().all()[0] | |
|
117 | UserModel().grant_perm(self.u1, perm) | |
|
118 | Session().commit() | |
|
119 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
120 | ||
|
121 | #revoke | |
|
122 | UserModel().revoke_perm(self.u1, perm) | |
|
123 | Session().commit() | |
|
124 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them | |||
@@ -2,250 +2,11 b' import os' | |||
|
2 | 2 | import unittest |
|
3 | 3 | from rhodecode.tests import * |
|
4 | 4 | |
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \ | |
|
8 | UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\ | |
|
9 | Repository, UserEmailMap | |
|
10 | from sqlalchemy.exc import IntegrityError, DatabaseError | |
|
5 | from rhodecode.model.db import User, Notification, UserNotification | |
|
11 | 6 | from rhodecode.model.user import UserModel |
|
12 | 7 | |
|
13 | 8 | from rhodecode.model.meta import Session |
|
14 | 9 | from rhodecode.model.notification import NotificationModel |
|
15 | from rhodecode.model.users_group import UsersGroupModel | |
|
16 | from rhodecode.lib.auth import AuthUser | |
|
17 | ||
|
18 | ||
|
19 | def _make_group(path, desc='desc', parent_id=None, | |
|
20 | skip_if_exists=False): | |
|
21 | ||
|
22 | gr = RepoGroup.get_by_group_name(path) | |
|
23 | if gr and skip_if_exists: | |
|
24 | return gr | |
|
25 | ||
|
26 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
27 | return gr | |
|
28 | ||
|
29 | ||
|
30 | class TestReposGroups(unittest.TestCase): | |
|
31 | ||
|
32 | def setUp(self): | |
|
33 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
34 | Session.commit() | |
|
35 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
36 | Session.commit() | |
|
37 | self.g3 = _make_group('test3', skip_if_exists=True) | |
|
38 | Session.commit() | |
|
39 | ||
|
40 | def tearDown(self): | |
|
41 | print 'out' | |
|
42 | ||
|
43 | def __check_path(self, *path): | |
|
44 | """ | |
|
45 | Checks the path for existance ! | |
|
46 | """ | |
|
47 | path = [TESTS_TMP_PATH] + list(path) | |
|
48 | path = os.path.join(*path) | |
|
49 | return os.path.isdir(path) | |
|
50 | ||
|
51 | def _check_folders(self): | |
|
52 | print os.listdir(TESTS_TMP_PATH) | |
|
53 | ||
|
54 | def __delete_group(self, id_): | |
|
55 | ReposGroupModel().delete(id_) | |
|
56 | ||
|
57 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |
|
58 | form_data = dict( | |
|
59 | group_name=path, | |
|
60 | group_description=desc, | |
|
61 | group_parent_id=parent_id, | |
|
62 | perms_updates=[], | |
|
63 | perms_new=[] | |
|
64 | ) | |
|
65 | gr = ReposGroupModel().update(id_, form_data) | |
|
66 | return gr | |
|
67 | ||
|
68 | def test_create_group(self): | |
|
69 | g = _make_group('newGroup') | |
|
70 | self.assertEqual(g.full_path, 'newGroup') | |
|
71 | ||
|
72 | self.assertTrue(self.__check_path('newGroup')) | |
|
73 | ||
|
74 | def test_create_same_name_group(self): | |
|
75 | self.assertRaises(IntegrityError, lambda:_make_group('newGroup')) | |
|
76 | Session.rollback() | |
|
77 | ||
|
78 | def test_same_subgroup(self): | |
|
79 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |
|
80 | self.assertEqual(sg1.parent_group, self.g1) | |
|
81 | self.assertEqual(sg1.full_path, 'test1/sub1') | |
|
82 | self.assertTrue(self.__check_path('test1', 'sub1')) | |
|
83 | ||
|
84 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |
|
85 | self.assertEqual(ssg1.parent_group, sg1) | |
|
86 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |
|
87 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |
|
88 | ||
|
89 | def test_remove_group(self): | |
|
90 | sg1 = _make_group('deleteme') | |
|
91 | self.__delete_group(sg1.group_id) | |
|
92 | ||
|
93 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
94 | self.assertFalse(self.__check_path('deteteme')) | |
|
95 | ||
|
96 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |
|
97 | self.__delete_group(sg1.group_id) | |
|
98 | ||
|
99 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
100 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |
|
101 | ||
|
102 | def test_rename_single_group(self): | |
|
103 | sg1 = _make_group('initial') | |
|
104 | ||
|
105 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |
|
106 | self.assertTrue(self.__check_path('after')) | |
|
107 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |
|
108 | ||
|
109 | def test_update_group_parent(self): | |
|
110 | ||
|
111 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |
|
112 | ||
|
113 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |
|
114 | self.assertTrue(self.__check_path('test1', 'after')) | |
|
115 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |
|
116 | ||
|
117 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |
|
118 | self.assertTrue(self.__check_path('test3', 'after')) | |
|
119 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |
|
120 | ||
|
121 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |
|
122 | self.assertTrue(self.__check_path('hello')) | |
|
123 | ||
|
124 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |
|
125 | ||
|
126 | def test_subgrouping_with_repo(self): | |
|
127 | ||
|
128 | g1 = _make_group('g1') | |
|
129 | g2 = _make_group('g2') | |
|
130 | ||
|
131 | # create new repo | |
|
132 | form_data = dict(repo_name='john', | |
|
133 | repo_name_full='john', | |
|
134 | fork_name=None, | |
|
135 | description=None, | |
|
136 | repo_group=None, | |
|
137 | private=False, | |
|
138 | repo_type='hg', | |
|
139 | clone_uri=None, | |
|
140 | landing_rev='tip') | |
|
141 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |
|
142 | r = RepoModel().create(form_data, cur_user) | |
|
143 | ||
|
144 | self.assertEqual(r.repo_name, 'john') | |
|
145 | ||
|
146 | # put repo into group | |
|
147 | form_data = form_data | |
|
148 | form_data['repo_group'] = g1.group_id | |
|
149 | form_data['perms_new'] = [] | |
|
150 | form_data['perms_updates'] = [] | |
|
151 | RepoModel().update(r.repo_name, form_data) | |
|
152 | self.assertEqual(r.repo_name, 'g1/john') | |
|
153 | ||
|
154 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |
|
155 | self.assertTrue(self.__check_path('g2', 'g1')) | |
|
156 | ||
|
157 | # test repo | |
|
158 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name])) | |
|
159 | ||
|
160 | def test_move_to_root(self): | |
|
161 | g1 = _make_group('t11') | |
|
162 | Session.commit() | |
|
163 | g2 = _make_group('t22', parent_id=g1.group_id) | |
|
164 | Session.commit() | |
|
165 | ||
|
166 | self.assertEqual(g2.full_path, 't11/t22') | |
|
167 | self.assertTrue(self.__check_path('t11', 't22')) | |
|
168 | ||
|
169 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |
|
170 | Session.commit() | |
|
171 | ||
|
172 | self.assertEqual(g2.group_name, 'g22') | |
|
173 | # we moved out group from t1 to '' so it's full path should be 'g2' | |
|
174 | self.assertEqual(g2.full_path, 'g22') | |
|
175 | self.assertFalse(self.__check_path('t11', 't22')) | |
|
176 | self.assertTrue(self.__check_path('g22')) | |
|
177 | ||
|
178 | ||
|
179 | class TestUser(unittest.TestCase): | |
|
180 | def __init__(self, methodName='runTest'): | |
|
181 | Session.remove() | |
|
182 | super(TestUser, self).__init__(methodName=methodName) | |
|
183 | ||
|
184 | def test_create_and_remove(self): | |
|
185 | usr = UserModel().create_or_update(username=u'test_user', | |
|
186 | password=u'qweqwe', | |
|
187 | email=u'u232@rhodecode.org', | |
|
188 | firstname=u'u1', lastname=u'u1') | |
|
189 | Session.commit() | |
|
190 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |
|
191 | ||
|
192 | # make users group | |
|
193 | users_group = UsersGroupModel().create('some_example_group') | |
|
194 | Session.commit() | |
|
195 | ||
|
196 | UsersGroupModel().add_user_to_group(users_group, usr) | |
|
197 | Session.commit() | |
|
198 | ||
|
199 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |
|
200 | self.assertEqual(UsersGroupMember.query().count(), 1) | |
|
201 | UserModel().delete(usr.user_id) | |
|
202 | Session.commit() | |
|
203 | ||
|
204 | self.assertEqual(UsersGroupMember.query().all(), []) | |
|
205 | ||
|
206 | def test_additonal_email_as_main(self): | |
|
207 | usr = UserModel().create_or_update(username=u'test_user', | |
|
208 | password=u'qweqwe', | |
|
209 | email=u'main_email@rhodecode.org', | |
|
210 | firstname=u'u1', lastname=u'u1') | |
|
211 | Session.commit() | |
|
212 | ||
|
213 | def do(): | |
|
214 | m = UserEmailMap() | |
|
215 | m.email = u'main_email@rhodecode.org' | |
|
216 | m.user = usr | |
|
217 | Session.add(m) | |
|
218 | Session.commit() | |
|
219 | self.assertRaises(AttributeError, do) | |
|
220 | ||
|
221 | UserModel().delete(usr.user_id) | |
|
222 | Session.commit() | |
|
223 | ||
|
224 | def test_extra_email_map(self): | |
|
225 | usr = UserModel().create_or_update(username=u'test_user', | |
|
226 | password=u'qweqwe', | |
|
227 | email=u'main_email@rhodecode.org', | |
|
228 | firstname=u'u1', lastname=u'u1') | |
|
229 | Session.commit() | |
|
230 | ||
|
231 | m = UserEmailMap() | |
|
232 | m.email = u'main_email2@rhodecode.org' | |
|
233 | m.user = usr | |
|
234 | Session.add(m) | |
|
235 | Session.commit() | |
|
236 | ||
|
237 | u = User.get_by_email(email='main_email@rhodecode.org') | |
|
238 | self.assertEqual(usr.user_id, u.user_id) | |
|
239 | self.assertEqual(usr.username, u.username) | |
|
240 | ||
|
241 | u = User.get_by_email(email='main_email2@rhodecode.org') | |
|
242 | self.assertEqual(usr.user_id, u.user_id) | |
|
243 | self.assertEqual(usr.username, u.username) | |
|
244 | u = User.get_by_email(email='main_email3@rhodecode.org') | |
|
245 | self.assertEqual(None, u) | |
|
246 | ||
|
247 | UserModel().delete(usr.user_id) | |
|
248 | Session.commit() | |
|
249 | 10 | |
|
250 | 11 | |
|
251 | 12 | class TestNotifications(unittest.TestCase): |
@@ -256,30 +17,30 b' class TestNotifications(unittest.TestCas' | |||
|
256 | 17 | password=u'qweqwe', |
|
257 | 18 | email=u'u1@rhodecode.org', |
|
258 | 19 | firstname=u'u1', lastname=u'u1') |
|
259 | Session.commit() | |
|
20 | Session().commit() | |
|
260 | 21 | self.u1 = self.u1.user_id |
|
261 | 22 | |
|
262 | 23 | self.u2 = UserModel().create_or_update(username=u'u2', |
|
263 | 24 | password=u'qweqwe', |
|
264 | 25 | email=u'u2@rhodecode.org', |
|
265 | 26 | firstname=u'u2', lastname=u'u3') |
|
266 | Session.commit() | |
|
27 | Session().commit() | |
|
267 | 28 | self.u2 = self.u2.user_id |
|
268 | 29 | |
|
269 | 30 | self.u3 = UserModel().create_or_update(username=u'u3', |
|
270 | 31 | password=u'qweqwe', |
|
271 | 32 | email=u'u3@rhodecode.org', |
|
272 | 33 | firstname=u'u3', lastname=u'u3') |
|
273 | Session.commit() | |
|
34 | Session().commit() | |
|
274 | 35 | self.u3 = self.u3.user_id |
|
275 | 36 | |
|
276 | 37 | super(TestNotifications, self).__init__(methodName=methodName) |
|
277 | 38 | |
|
278 | 39 | def _clean_notifications(self): |
|
279 | 40 | for n in Notification.query().all(): |
|
280 | Session.delete(n) | |
|
41 | Session().delete(n) | |
|
281 | 42 | |
|
282 | Session.commit() | |
|
43 | Session().commit() | |
|
283 | 44 | self.assertEqual(Notification.query().all(), []) |
|
284 | 45 | |
|
285 | 46 | def tearDown(self): |
@@ -293,7 +54,7 b' class TestNotifications(unittest.TestCas' | |||
|
293 | 54 | notification = NotificationModel().create(created_by=self.u1, |
|
294 | 55 | subject=u'subj', body=u'hi there', |
|
295 | 56 | recipients=usrs) |
|
296 | Session.commit() | |
|
57 | Session().commit() | |
|
297 | 58 | u1 = User.get(self.u1) |
|
298 | 59 | u2 = User.get(self.u2) |
|
299 | 60 | u3 = User.get(self.u3) |
@@ -316,12 +77,12 b' class TestNotifications(unittest.TestCas' | |||
|
316 | 77 | notification1 = NotificationModel().create(created_by=self.u1, |
|
317 | 78 | subject=u'subj', body=u'hi there1', |
|
318 | 79 | recipients=[self.u3]) |
|
319 | Session.commit() | |
|
80 | Session().commit() | |
|
320 | 81 | notification2 = NotificationModel().create(created_by=self.u1, |
|
321 | 82 | subject=u'subj', body=u'hi there2', |
|
322 | 83 | recipients=[self.u3]) |
|
323 | Session.commit() | |
|
324 | u3 = Session.query(User).get(self.u3) | |
|
84 | Session().commit() | |
|
85 | u3 = Session().query(User).get(self.u3) | |
|
325 | 86 | |
|
326 | 87 | self.assertEqual(sorted([x.notification for x in u3.notifications]), |
|
327 | 88 | sorted([notification2, notification1])) |
@@ -333,12 +94,12 b' class TestNotifications(unittest.TestCas' | |||
|
333 | 94 | notification = NotificationModel().create(created_by=self.u1, |
|
334 | 95 | subject=u'title', body=u'hi there3', |
|
335 | 96 | recipients=[self.u3, self.u1, self.u2]) |
|
336 | Session.commit() | |
|
97 | Session().commit() | |
|
337 | 98 | notifications = Notification.query().all() |
|
338 | 99 | self.assertTrue(notification in notifications) |
|
339 | 100 | |
|
340 | 101 | Notification.delete(notification.notification_id) |
|
341 | Session.commit() | |
|
102 | Session().commit() | |
|
342 | 103 | |
|
343 | 104 | notifications = Notification.query().all() |
|
344 | 105 | self.assertFalse(notification in notifications) |
@@ -355,7 +116,7 b' class TestNotifications(unittest.TestCas' | |||
|
355 | 116 | notification = NotificationModel().create(created_by=self.u1, |
|
356 | 117 | subject=u'title', body=u'hi there3', |
|
357 | 118 | recipients=[self.u3, self.u1, self.u2]) |
|
358 | Session.commit() | |
|
119 | Session().commit() | |
|
359 | 120 | |
|
360 | 121 | unotification = UserNotification.query()\ |
|
361 | 122 | .filter(UserNotification.notification == |
@@ -367,7 +128,7 b' class TestNotifications(unittest.TestCas' | |||
|
367 | 128 | |
|
368 | 129 | NotificationModel().delete(self.u3, |
|
369 | 130 | notification.notification_id) |
|
370 | Session.commit() | |
|
131 | Session().commit() | |
|
371 | 132 | |
|
372 | 133 | u3notification = UserNotification.query()\ |
|
373 | 134 | .filter(UserNotification.notification == |
@@ -402,7 +163,7 b' class TestNotifications(unittest.TestCas' | |||
|
402 | 163 | NotificationModel().create(created_by=self.u1, |
|
403 | 164 | subject=u'title', body=u'hi there_delete', |
|
404 | 165 | recipients=[self.u3, self.u1]) |
|
405 | Session.commit() | |
|
166 | Session().commit() | |
|
406 | 167 | |
|
407 | 168 | self.assertEqual(NotificationModel() |
|
408 | 169 | .get_unread_cnt_for_user(self.u1), 1) |
@@ -414,7 +175,7 b' class TestNotifications(unittest.TestCas' | |||
|
414 | 175 | notification = NotificationModel().create(created_by=self.u1, |
|
415 | 176 | subject=u'title', body=u'hi there3', |
|
416 | 177 | recipients=[self.u3, self.u1, self.u2]) |
|
417 | Session.commit() | |
|
178 | Session().commit() | |
|
418 | 179 | |
|
419 | 180 | self.assertEqual(NotificationModel() |
|
420 | 181 | .get_unread_cnt_for_user(self.u1), 2) |
@@ -424,338 +185,5 b' class TestNotifications(unittest.TestCas' | |||
|
424 | 185 | .get_unread_cnt_for_user(self.u3), 2) |
|
425 | 186 | |
|
426 | 187 | |
|
427 | class TestUsers(unittest.TestCase): | |
|
428 | ||
|
429 | def __init__(self, methodName='runTest'): | |
|
430 | super(TestUsers, self).__init__(methodName=methodName) | |
|
431 | ||
|
432 | def setUp(self): | |
|
433 | self.u1 = UserModel().create_or_update(username=u'u1', | |
|
434 | password=u'qweqwe', | |
|
435 | email=u'u1@rhodecode.org', | |
|
436 | firstname=u'u1', lastname=u'u1') | |
|
437 | ||
|
438 | def tearDown(self): | |
|
439 | perm = Permission.query().all() | |
|
440 | for p in perm: | |
|
441 | UserModel().revoke_perm(self.u1, p) | |
|
442 | ||
|
443 | UserModel().delete(self.u1) | |
|
444 | Session.commit() | |
|
445 | ||
|
446 | def test_add_perm(self): | |
|
447 | perm = Permission.query().all()[0] | |
|
448 | UserModel().grant_perm(self.u1, perm) | |
|
449 | Session.commit() | |
|
450 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
451 | ||
|
452 | def test_has_perm(self): | |
|
453 | perm = Permission.query().all() | |
|
454 | for p in perm: | |
|
455 | has_p = UserModel().has_perm(self.u1, p) | |
|
456 | self.assertEqual(False, has_p) | |
|
457 | ||
|
458 | def test_revoke_perm(self): | |
|
459 | perm = Permission.query().all()[0] | |
|
460 | UserModel().grant_perm(self.u1, perm) | |
|
461 | Session.commit() | |
|
462 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
463 | ||
|
464 | #revoke | |
|
465 | UserModel().revoke_perm(self.u1, perm) | |
|
466 | Session.commit() | |
|
467 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) | |
|
468 | 188 | |
|
469 | 189 | |
|
470 | class TestPermissions(unittest.TestCase): | |
|
471 | def __init__(self, methodName='runTest'): | |
|
472 | super(TestPermissions, self).__init__(methodName=methodName) | |
|
473 | ||
|
474 | def setUp(self): | |
|
475 | self.u1 = UserModel().create_or_update( | |
|
476 | username=u'u1', password=u'qweqwe', | |
|
477 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |
|
478 | ) | |
|
479 | self.u2 = UserModel().create_or_update( | |
|
480 | username=u'u2', password=u'qweqwe', | |
|
481 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |
|
482 | ) | |
|
483 | self.anon = User.get_by_username('default') | |
|
484 | self.a1 = UserModel().create_or_update( | |
|
485 | username=u'a1', password=u'qweqwe', | |
|
486 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |
|
487 | ) | |
|
488 | Session.commit() | |
|
489 | ||
|
490 | def tearDown(self): | |
|
491 | if hasattr(self, 'test_repo'): | |
|
492 | RepoModel().delete(repo=self.test_repo) | |
|
493 | UserModel().delete(self.u1) | |
|
494 | UserModel().delete(self.u2) | |
|
495 | UserModel().delete(self.a1) | |
|
496 | if hasattr(self, 'g1'): | |
|
497 | ReposGroupModel().delete(self.g1.group_id) | |
|
498 | if hasattr(self, 'g2'): | |
|
499 | ReposGroupModel().delete(self.g2.group_id) | |
|
500 | ||
|
501 | if hasattr(self, 'ug1'): | |
|
502 | UsersGroupModel().delete(self.ug1, force=True) | |
|
503 | ||
|
504 | Session.commit() | |
|
505 | ||
|
506 | def test_default_perms_set(self): | |
|
507 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
508 | perms = { | |
|
509 | 'repositories_groups': {}, | |
|
510 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
511 | u'hg.register.manual_activate']), | |
|
512 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
513 | } | |
|
514 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
515 | perms['repositories'][HG_REPO]) | |
|
516 | new_perm = 'repository.write' | |
|
517 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
518 | Session.commit() | |
|
519 | ||
|
520 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
521 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm) | |
|
522 | ||
|
523 | def test_default_admin_perms_set(self): | |
|
524 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
525 | perms = { | |
|
526 | 'repositories_groups': {}, | |
|
527 | 'global': set([u'hg.admin']), | |
|
528 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |
|
529 | } | |
|
530 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
531 | perms['repositories'][HG_REPO]) | |
|
532 | new_perm = 'repository.write' | |
|
533 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm) | |
|
534 | Session.commit() | |
|
535 | # cannot really downgrade admins permissions !? they still get's set as | |
|
536 | # admin ! | |
|
537 | u1_auth = AuthUser(user_id=self.a1.user_id) | |
|
538 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
539 | perms['repositories'][HG_REPO]) | |
|
540 | ||
|
541 | def test_default_group_perms(self): | |
|
542 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
543 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
544 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
545 | perms = { | |
|
546 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |
|
547 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |
|
548 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
549 | } | |
|
550 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
551 | perms['repositories'][HG_REPO]) | |
|
552 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
553 | perms['repositories_groups']) | |
|
554 | ||
|
555 | def test_default_admin_group_perms(self): | |
|
556 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
557 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
558 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
559 | perms = { | |
|
560 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |
|
561 | 'global': set(['hg.admin']), | |
|
562 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |
|
563 | } | |
|
564 | ||
|
565 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
566 | perms['repositories'][HG_REPO]) | |
|
567 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
568 | perms['repositories_groups']) | |
|
569 | ||
|
570 | def test_propagated_permission_from_users_group(self): | |
|
571 | # make group | |
|
572 | self.ug1 = UsersGroupModel().create('G1') | |
|
573 | # add user to group | |
|
574 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
575 | ||
|
576 | # set permission to lower | |
|
577 | new_perm = 'repository.none' | |
|
578 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
579 | Session.commit() | |
|
580 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
581 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
582 | new_perm) | |
|
583 | ||
|
584 | # grant perm for group this should override permission from user | |
|
585 | new_perm = 'repository.write' | |
|
586 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
587 | group_name=self.ug1, | |
|
588 | perm=new_perm) | |
|
589 | # check perms | |
|
590 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
591 | perms = { | |
|
592 | 'repositories_groups': {}, | |
|
593 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
594 | u'hg.register.manual_activate']), | |
|
595 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
596 | } | |
|
597 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
598 | new_perm) | |
|
599 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
600 | perms['repositories_groups']) | |
|
601 | ||
|
602 | def test_propagated_permission_from_users_group_lower_weight(self): | |
|
603 | # make group | |
|
604 | self.ug1 = UsersGroupModel().create('G1') | |
|
605 | # add user to group | |
|
606 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
607 | ||
|
608 | # set permission to lower | |
|
609 | new_perm_h = 'repository.write' | |
|
610 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
611 | perm=new_perm_h) | |
|
612 | Session.commit() | |
|
613 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
614 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
615 | new_perm_h) | |
|
616 | ||
|
617 | # grant perm for group this should NOT override permission from user | |
|
618 | # since it's lower than granted | |
|
619 | new_perm_l = 'repository.read' | |
|
620 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
621 | group_name=self.ug1, | |
|
622 | perm=new_perm_l) | |
|
623 | # check perms | |
|
624 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
625 | perms = { | |
|
626 | 'repositories_groups': {}, | |
|
627 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
628 | u'hg.register.manual_activate']), | |
|
629 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |
|
630 | } | |
|
631 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
632 | new_perm_h) | |
|
633 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
634 | perms['repositories_groups']) | |
|
635 | ||
|
636 | def test_repo_in_group_permissions(self): | |
|
637 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
638 | self.g2 = _make_group('group2', skip_if_exists=True) | |
|
639 | Session.commit() | |
|
640 | # both perms should be read ! | |
|
641 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
642 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
643 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
644 | ||
|
645 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
646 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
647 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
648 | ||
|
649 | #Change perms to none for both groups | |
|
650 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
651 | user=self.anon, | |
|
652 | perm='group.none') | |
|
653 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
654 | user=self.anon, | |
|
655 | perm='group.none') | |
|
656 | ||
|
657 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
658 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
659 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
660 | ||
|
661 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
662 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
663 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
664 | ||
|
665 | # add repo to group | |
|
666 | form_data = { | |
|
667 | 'repo_name': HG_REPO, | |
|
668 | 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]), | |
|
669 | 'repo_type': 'hg', | |
|
670 | 'clone_uri': '', | |
|
671 | 'repo_group': self.g1.group_id, | |
|
672 | 'description': 'desc', | |
|
673 | 'private': False, | |
|
674 | 'landing_rev': 'tip' | |
|
675 | } | |
|
676 | self.test_repo = RepoModel().create(form_data, cur_user=self.u1) | |
|
677 | Session.commit() | |
|
678 | ||
|
679 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
680 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
681 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
682 | ||
|
683 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
684 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
685 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
686 | ||
|
687 | #grant permission for u2 ! | |
|
688 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
689 | user=self.u2, | |
|
690 | perm='group.read') | |
|
691 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
692 | user=self.u2, | |
|
693 | perm='group.read') | |
|
694 | Session.commit() | |
|
695 | self.assertNotEqual(self.u1, self.u2) | |
|
696 | #u1 and anon should have not change perms while u2 should ! | |
|
697 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
698 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
699 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
700 | ||
|
701 | u2_auth = AuthUser(user_id=self.u2.user_id) | |
|
702 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |
|
703 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
704 | ||
|
705 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
706 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
707 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
708 | ||
|
709 | def test_repo_group_user_as_user_group_member(self): | |
|
710 | # create Group1 | |
|
711 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
712 | Session.commit() | |
|
713 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
714 | ||
|
715 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
716 | {u'group1': u'group.read'}) | |
|
717 | ||
|
718 | # set default permission to none | |
|
719 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
720 | user=self.anon, | |
|
721 | perm='group.none') | |
|
722 | # make group | |
|
723 | self.ug1 = UsersGroupModel().create('G1') | |
|
724 | # add user to group | |
|
725 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
726 | Session.commit() | |
|
727 | ||
|
728 | # check if user is in the group | |
|
729 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |
|
730 | self.assertEqual(membrs, [self.u1.user_id]) | |
|
731 | # add some user to that group | |
|
732 | ||
|
733 | # check his permissions | |
|
734 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
735 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
736 | {u'group1': u'group.none'}) | |
|
737 | ||
|
738 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
739 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
740 | {u'group1': u'group.none'}) | |
|
741 | ||
|
742 | # grant ug1 read permissions for | |
|
743 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |
|
744 | group_name=self.ug1, | |
|
745 | perm='group.read') | |
|
746 | Session.commit() | |
|
747 | # check if the | |
|
748 | obj = Session.query(UsersGroupRepoGroupToPerm)\ | |
|
749 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |
|
750 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |
|
751 | .scalar() | |
|
752 | self.assertEqual(obj.permission.permission_name, 'group.read') | |
|
753 | ||
|
754 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
755 | ||
|
756 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
757 | {u'group1': u'group.none'}) | |
|
758 | ||
|
759 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
760 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
761 | {u'group1': u'group.read'}) |
|
1 | NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
@@ -79,6 +79,7 b' class Command(object):' | |||
|
79 | 79 | print stdout, stderr |
|
80 | 80 | return stdout, stderr |
|
81 | 81 | |
|
82 | ||
|
82 | 83 | def get_session(): |
|
83 | 84 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
84 | 85 | init_model(engine) |
@@ -124,7 +125,6 b' def create_test_repo(force=True):' | |||
|
124 | 125 | if user is None: |
|
125 | 126 | raise Exception('user not found') |
|
126 | 127 | |
|
127 | ||
|
128 | 128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
129 | 129 | |
|
130 | 130 | if repo is None: |
@@ -140,6 +140,7 b' def create_test_repo(force=True):' | |||
|
140 | 140 | |
|
141 | 141 | print 'done' |
|
142 | 142 | |
|
143 | ||
|
143 | 144 | def set_anonymous_access(enable=True): |
|
144 | 145 | sa = get_session() |
|
145 | 146 | user = sa.query(User).filter(User.username == 'default').one() |
@@ -147,6 +148,7 b' def set_anonymous_access(enable=True):' | |||
|
147 | 148 | sa.add(user) |
|
148 | 149 | sa.commit() |
|
149 | 150 | |
|
151 | ||
|
150 | 152 | def get_anonymous_access(): |
|
151 | 153 | sa = get_session() |
|
152 | 154 | return sa.query(User).filter(User.username == 'default').one().active |
|
1 | NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
@@ -64,7 +64,8 b' log = logging.getLogger(__name__)' | |||
|
64 | 64 | |
|
65 | 65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
66 | 66 | init_model(engine) |
|
67 | sa = meta.Session | |
|
67 | sa = meta.Session() | |
|
68 | ||
|
68 | 69 | |
|
69 | 70 | class Command(object): |
|
70 | 71 | |
@@ -137,7 +138,6 b' def create_test_repo(force=True):' | |||
|
137 | 138 | if user is None: |
|
138 | 139 | raise Exception('user not found') |
|
139 | 140 | |
|
140 | ||
|
141 | 141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
142 | 142 | |
|
143 | 143 | if repo is None: |
@@ -161,6 +161,7 b' def set_anonymous_access(enable=True):' | |||
|
161 | 161 | if enable != User.get_by_username('default').active: |
|
162 | 162 | raise Exception('Cannot set anonymous access') |
|
163 | 163 | |
|
164 | ||
|
164 | 165 | def get_anonymous_access(): |
|
165 | 166 | user = User.get_by_username('default') |
|
166 | 167 | return user.active |
@@ -235,6 +236,7 b' def test_clone_anonymous():' | |||
|
235 | 236 | print '\tdisabling anonymous access' |
|
236 | 237 | set_anonymous_access(enable=False) |
|
237 | 238 | |
|
239 | ||
|
238 | 240 | @test_wrapp |
|
239 | 241 | def test_clone_wrong_credentials(): |
|
240 | 242 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
@@ -264,10 +266,12 b' def test_clone_wrong_credentials():' | |||
|
264 | 266 | if not """abort: authorization failed""" in stderr: |
|
265 | 267 | raise Exception('Failure') |
|
266 | 268 | |
|
269 | ||
|
267 | 270 | @test_wrapp |
|
268 | 271 | def test_pull(): |
|
269 | 272 | pass |
|
270 | 273 | |
|
274 | ||
|
271 | 275 | @test_wrapp |
|
272 | 276 | def test_push_modify_file(f_name='setup.py'): |
|
273 | 277 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
@@ -281,6 +285,7 b" def test_push_modify_file(f_name='setup." | |||
|
281 | 285 | |
|
282 | 286 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) |
|
283 | 287 | |
|
288 | ||
|
284 | 289 | @test_wrapp |
|
285 | 290 | def test_push_new_file(commits=15, with_clone=True): |
|
286 | 291 | |
@@ -312,6 +317,7 b' def test_push_new_file(commits=15, with_' | |||
|
312 | 317 | |
|
313 | 318 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) |
|
314 | 319 | |
|
320 | ||
|
315 | 321 | @test_wrapp |
|
316 | 322 | def test_push_wrong_credentials(): |
|
317 | 323 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
@@ -332,6 +338,7 b' def test_push_wrong_credentials():' | |||
|
332 | 338 | |
|
333 | 339 | Command(cwd).execute('hg push %s' % clone_url) |
|
334 | 340 | |
|
341 | ||
|
335 | 342 | @test_wrapp |
|
336 | 343 | def test_push_wrong_path(): |
|
337 | 344 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
@@ -366,10 +373,12 b' def test_push_wrong_path():' | |||
|
366 | 373 | if not """abort: HTTP Error 403: Forbidden""" in stderr: |
|
367 | 374 | raise Exception('Failure') |
|
368 | 375 | |
|
376 | ||
|
369 | 377 | @test_wrapp |
|
370 | 378 | def get_logs(): |
|
371 | 379 | return UserLog.query().all() |
|
372 | 380 | |
|
381 | ||
|
373 | 382 | @test_wrapp |
|
374 | 383 | def test_logs(initial): |
|
375 | 384 | logs = UserLog.query().all() |
@@ -22,8 +22,8 b' function at ``tests/__init__.py``.' | |||
|
22 | 22 | import os |
|
23 | 23 | from rhodecode.lib import vcs |
|
24 | 24 | from rhodecode.lib.vcs.utils.compat import unittest |
|
25 | from utils import VCSTestError, SCMFetcher | |
|
25 | 26 | from rhodecode.tests import * |
|
26 | from utils import VCSTestError, SCMFetcher | |
|
27 | 27 | |
|
28 | 28 | |
|
29 | 29 | def setup_package(): |
General Comments 0
You need to be logged in to leave comments.
Login now