##// END OF EJS Templates
orginized test module...
marcink -
r2527:95624ce4 beta
parent child Browse files
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,316 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
8 from rhodecode.model.user import UserModel
9
10 from rhodecode.model.meta import Session
11 from rhodecode.model.users_group import UsersGroupModel
12 from rhodecode.lib.auth import AuthUser
13
14
15 def _make_group(path, desc='desc', parent_id=None,
16 skip_if_exists=False):
17
18 gr = RepoGroup.get_by_group_name(path)
19 if gr and skip_if_exists:
20 return gr
21
22 gr = ReposGroupModel().create(path, desc, parent_id)
23 return gr
24
25
26 class TestPermissions(unittest.TestCase):
27 def __init__(self, methodName='runTest'):
28 super(TestPermissions, self).__init__(methodName=methodName)
29
30 def setUp(self):
31 self.u1 = UserModel().create_or_update(
32 username=u'u1', password=u'qweqwe',
33 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
34 )
35 self.u2 = UserModel().create_or_update(
36 username=u'u2', password=u'qweqwe',
37 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
38 )
39 self.anon = User.get_by_username('default')
40 self.a1 = UserModel().create_or_update(
41 username=u'a1', password=u'qweqwe',
42 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
43 )
44 Session().commit()
45
46 def tearDown(self):
47 if hasattr(self, 'test_repo'):
48 RepoModel().delete(repo=self.test_repo)
49 UserModel().delete(self.u1)
50 UserModel().delete(self.u2)
51 UserModel().delete(self.a1)
52 if hasattr(self, 'g1'):
53 ReposGroupModel().delete(self.g1.group_id)
54 if hasattr(self, 'g2'):
55 ReposGroupModel().delete(self.g2.group_id)
56
57 if hasattr(self, 'ug1'):
58 UsersGroupModel().delete(self.ug1, force=True)
59
60 Session().commit()
61
62 def test_default_perms_set(self):
63 u1_auth = AuthUser(user_id=self.u1.user_id)
64 perms = {
65 'repositories_groups': {},
66 'global': set([u'hg.create.repository', u'repository.read',
67 u'hg.register.manual_activate']),
68 'repositories': {u'vcs_test_hg': u'repository.read'}
69 }
70 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 perms['repositories'][HG_REPO])
72 new_perm = 'repository.write'
73 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 perm=new_perm)
75 Session().commit()
76
77 u1_auth = AuthUser(user_id=self.u1.user_id)
78 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 new_perm)
80
81 def test_default_admin_perms_set(self):
82 a1_auth = AuthUser(user_id=self.a1.user_id)
83 perms = {
84 'repositories_groups': {},
85 'global': set([u'hg.admin']),
86 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 }
88 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 perms['repositories'][HG_REPO])
90 new_perm = 'repository.write'
91 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 perm=new_perm)
93 Session().commit()
94 # cannot really downgrade admins permissions !? they still get's set as
95 # admin !
96 u1_auth = AuthUser(user_id=self.a1.user_id)
97 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 perms['repositories'][HG_REPO])
99
100 def test_default_group_perms(self):
101 self.g1 = _make_group('test1', skip_if_exists=True)
102 self.g2 = _make_group('test2', skip_if_exists=True)
103 u1_auth = AuthUser(user_id=self.u1.user_id)
104 perms = {
105 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
107 'repositories': {u'vcs_test_hg': u'repository.read'}
108 }
109 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 perms['repositories'][HG_REPO])
111 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 perms['repositories_groups'])
113
114 def test_default_admin_group_perms(self):
115 self.g1 = _make_group('test1', skip_if_exists=True)
116 self.g2 = _make_group('test2', skip_if_exists=True)
117 a1_auth = AuthUser(user_id=self.a1.user_id)
118 perms = {
119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
120 'global': set(['hg.admin']),
121 'repositories': {u'vcs_test_hg': 'repository.admin'}
122 }
123
124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
125 perms['repositories'][HG_REPO])
126 self.assertEqual(a1_auth.permissions['repositories_groups'],
127 perms['repositories_groups'])
128
129 def test_propagated_permission_from_users_group(self):
130 # make group
131 self.ug1 = UsersGroupModel().create('G1')
132 # add user to group
133 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
134
135 # set permission to lower
136 new_perm = 'repository.none'
137 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
138 Session().commit()
139 u1_auth = AuthUser(user_id=self.u1.user_id)
140 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
141 new_perm)
142
143 # grant perm for group this should override permission from user
144 new_perm = 'repository.write'
145 RepoModel().grant_users_group_permission(repo=HG_REPO,
146 group_name=self.ug1,
147 perm=new_perm)
148 # check perms
149 u1_auth = AuthUser(user_id=self.u1.user_id)
150 perms = {
151 'repositories_groups': {},
152 'global': set([u'hg.create.repository', u'repository.read',
153 u'hg.register.manual_activate']),
154 'repositories': {u'vcs_test_hg': u'repository.read'}
155 }
156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
157 new_perm)
158 self.assertEqual(u1_auth.permissions['repositories_groups'],
159 perms['repositories_groups'])
160
161 def test_propagated_permission_from_users_group_lower_weight(self):
162 # make group
163 self.ug1 = UsersGroupModel().create('G1')
164 # add user to group
165 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
166
167 # set permission to lower
168 new_perm_h = 'repository.write'
169 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
170 perm=new_perm_h)
171 Session().commit()
172 u1_auth = AuthUser(user_id=self.u1.user_id)
173 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
174 new_perm_h)
175
176 # grant perm for group this should NOT override permission from user
177 # since it's lower than granted
178 new_perm_l = 'repository.read'
179 RepoModel().grant_users_group_permission(repo=HG_REPO,
180 group_name=self.ug1,
181 perm=new_perm_l)
182 # check perms
183 u1_auth = AuthUser(user_id=self.u1.user_id)
184 perms = {
185 'repositories_groups': {},
186 'global': set([u'hg.create.repository', u'repository.read',
187 u'hg.register.manual_activate']),
188 'repositories': {u'vcs_test_hg': u'repository.write'}
189 }
190 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
191 new_perm_h)
192 self.assertEqual(u1_auth.permissions['repositories_groups'],
193 perms['repositories_groups'])
194
195 def test_repo_in_group_permissions(self):
196 self.g1 = _make_group('group1', skip_if_exists=True)
197 self.g2 = _make_group('group2', skip_if_exists=True)
198 Session().commit()
199 # both perms should be read !
200 u1_auth = AuthUser(user_id=self.u1.user_id)
201 self.assertEqual(u1_auth.permissions['repositories_groups'],
202 {u'group1': u'group.read', u'group2': u'group.read'})
203
204 a1_auth = AuthUser(user_id=self.anon.user_id)
205 self.assertEqual(a1_auth.permissions['repositories_groups'],
206 {u'group1': u'group.read', u'group2': u'group.read'})
207
208 #Change perms to none for both groups
209 ReposGroupModel().grant_user_permission(repos_group=self.g1,
210 user=self.anon,
211 perm='group.none')
212 ReposGroupModel().grant_user_permission(repos_group=self.g2,
213 user=self.anon,
214 perm='group.none')
215
216 u1_auth = AuthUser(user_id=self.u1.user_id)
217 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 {u'group1': u'group.none', u'group2': u'group.none'})
219
220 a1_auth = AuthUser(user_id=self.anon.user_id)
221 self.assertEqual(a1_auth.permissions['repositories_groups'],
222 {u'group1': u'group.none', u'group2': u'group.none'})
223
224 # add repo to group
225 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
226 self.test_repo = RepoModel().create_repo(
227 repo_name=name,
228 repo_type='hg',
229 description='',
230 owner=self.u1,
231 )
232 Session().commit()
233
234 u1_auth = AuthUser(user_id=self.u1.user_id)
235 self.assertEqual(u1_auth.permissions['repositories_groups'],
236 {u'group1': u'group.none', u'group2': u'group.none'})
237
238 a1_auth = AuthUser(user_id=self.anon.user_id)
239 self.assertEqual(a1_auth.permissions['repositories_groups'],
240 {u'group1': u'group.none', u'group2': u'group.none'})
241
242 #grant permission for u2 !
243 ReposGroupModel().grant_user_permission(repos_group=self.g1,
244 user=self.u2,
245 perm='group.read')
246 ReposGroupModel().grant_user_permission(repos_group=self.g2,
247 user=self.u2,
248 perm='group.read')
249 Session().commit()
250 self.assertNotEqual(self.u1, self.u2)
251 #u1 and anon should have not change perms while u2 should !
252 u1_auth = AuthUser(user_id=self.u1.user_id)
253 self.assertEqual(u1_auth.permissions['repositories_groups'],
254 {u'group1': u'group.none', u'group2': u'group.none'})
255
256 u2_auth = AuthUser(user_id=self.u2.user_id)
257 self.assertEqual(u2_auth.permissions['repositories_groups'],
258 {u'group1': u'group.read', u'group2': u'group.read'})
259
260 a1_auth = AuthUser(user_id=self.anon.user_id)
261 self.assertEqual(a1_auth.permissions['repositories_groups'],
262 {u'group1': u'group.none', u'group2': u'group.none'})
263
264 def test_repo_group_user_as_user_group_member(self):
265 # create Group1
266 self.g1 = _make_group('group1', skip_if_exists=True)
267 Session().commit()
268 a1_auth = AuthUser(user_id=self.anon.user_id)
269
270 self.assertEqual(a1_auth.permissions['repositories_groups'],
271 {u'group1': u'group.read'})
272
273 # set default permission to none
274 ReposGroupModel().grant_user_permission(repos_group=self.g1,
275 user=self.anon,
276 perm='group.none')
277 # make group
278 self.ug1 = UsersGroupModel().create('G1')
279 # add user to group
280 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
281 Session().commit()
282
283 # check if user is in the group
284 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
285 self.assertEqual(membrs, [self.u1.user_id])
286 # add some user to that group
287
288 # check his permissions
289 a1_auth = AuthUser(user_id=self.anon.user_id)
290 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 {u'group1': u'group.none'})
292
293 u1_auth = AuthUser(user_id=self.u1.user_id)
294 self.assertEqual(u1_auth.permissions['repositories_groups'],
295 {u'group1': u'group.none'})
296
297 # grant ug1 read permissions for
298 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
299 group_name=self.ug1,
300 perm='group.read')
301 Session().commit()
302 # check if the
303 obj = Session().query(UsersGroupRepoGroupToPerm)\
304 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
305 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
306 .scalar()
307 self.assertEqual(obj.permission.permission_name, 'group.read')
308
309 a1_auth = AuthUser(user_id=self.anon.user_id)
310
311 self.assertEqual(a1_auth.permissions['repositories_groups'],
312 {u'group1': u'group.none'})
313
314 u1_auth = AuthUser(user_id=self.u1.user_id)
315 self.assertEqual(u1_auth.permissions['repositories_groups'],
316 {u'group1': u'group.read'})
@@ -0,0 +1,170 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User
8 from rhodecode.model.meta import Session
9 from sqlalchemy.exc import IntegrityError
10
11
12 def _make_group(path, desc='desc', parent_id=None,
13 skip_if_exists=False):
14
15 gr = RepoGroup.get_by_group_name(path)
16 if gr and skip_if_exists:
17 return gr
18
19 gr = ReposGroupModel().create(path, desc, parent_id)
20 return gr
21
22
23 class TestReposGroups(unittest.TestCase):
24
25 def setUp(self):
26 self.g1 = _make_group('test1', skip_if_exists=True)
27 Session().commit()
28 self.g2 = _make_group('test2', skip_if_exists=True)
29 Session().commit()
30 self.g3 = _make_group('test3', skip_if_exists=True)
31 Session().commit()
32
33 def tearDown(self):
34 print 'out'
35
36 def __check_path(self, *path):
37 """
38 Checks the path for existance !
39 """
40 path = [TESTS_TMP_PATH] + list(path)
41 path = os.path.join(*path)
42 return os.path.isdir(path)
43
44 def _check_folders(self):
45 print os.listdir(TESTS_TMP_PATH)
46
47 def __delete_group(self, id_):
48 ReposGroupModel().delete(id_)
49
50 def __update_group(self, id_, path, desc='desc', parent_id=None):
51 form_data = dict(
52 group_name=path,
53 group_description=desc,
54 group_parent_id=parent_id,
55 perms_updates=[],
56 perms_new=[]
57 )
58 gr = ReposGroupModel().update(id_, form_data)
59 return gr
60
61 def test_create_group(self):
62 g = _make_group('newGroup')
63 self.assertEqual(g.full_path, 'newGroup')
64
65 self.assertTrue(self.__check_path('newGroup'))
66
67 def test_create_same_name_group(self):
68 self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
69 Session().rollback()
70
71 def test_same_subgroup(self):
72 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
73 self.assertEqual(sg1.parent_group, self.g1)
74 self.assertEqual(sg1.full_path, 'test1/sub1')
75 self.assertTrue(self.__check_path('test1', 'sub1'))
76
77 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
78 self.assertEqual(ssg1.parent_group, sg1)
79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
81
82 def test_remove_group(self):
83 sg1 = _make_group('deleteme')
84 self.__delete_group(sg1.group_id)
85
86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
87 self.assertFalse(self.__check_path('deteteme'))
88
89 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
90 self.__delete_group(sg1.group_id)
91
92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
93 self.assertFalse(self.__check_path('test1', 'deteteme'))
94
95 def test_rename_single_group(self):
96 sg1 = _make_group('initial')
97
98 new_sg1 = self.__update_group(sg1.group_id, 'after')
99 self.assertTrue(self.__check_path('after'))
100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
101
102 def test_update_group_parent(self):
103
104 sg1 = _make_group('initial', parent_id=self.g1.group_id)
105
106 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
107 self.assertTrue(self.__check_path('test1', 'after'))
108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
109
110 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
111 self.assertTrue(self.__check_path('test3', 'after'))
112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
113
114 new_sg1 = self.__update_group(sg1.group_id, 'hello')
115 self.assertTrue(self.__check_path('hello'))
116
117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
118
119 def test_subgrouping_with_repo(self):
120
121 g1 = _make_group('g1')
122 g2 = _make_group('g2')
123
124 # create new repo
125 form_data = dict(repo_name='john',
126 repo_name_full='john',
127 fork_name=None,
128 description=None,
129 repo_group=None,
130 private=False,
131 repo_type='hg',
132 clone_uri=None,
133 landing_rev='tip')
134 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
135 r = RepoModel().create(form_data, cur_user)
136
137 self.assertEqual(r.repo_name, 'john')
138
139 # put repo into group
140 form_data = form_data
141 form_data['repo_group'] = g1.group_id
142 form_data['perms_new'] = []
143 form_data['perms_updates'] = []
144 RepoModel().update(r.repo_name, form_data)
145 self.assertEqual(r.repo_name, 'g1/john')
146
147 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
148 self.assertTrue(self.__check_path('g2', 'g1'))
149
150 # test repo
151 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
152 r.just_name]))
153
154 def test_move_to_root(self):
155 g1 = _make_group('t11')
156 Session().commit()
157 g2 = _make_group('t22', parent_id=g1.group_id)
158 Session().commit()
159
160 self.assertEqual(g2.full_path, 't11/t22')
161 self.assertTrue(self.__check_path('t11', 't22'))
162
163 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
164 Session().commit()
165
166 self.assertEqual(g2.group_name, 'g22')
167 # we moved out group from t1 to '' so it's full path should be 'g2'
168 self.assertEqual(g2.full_path, 'g22')
169 self.assertFalse(self.__check_path('t11', 't22'))
170 self.assertTrue(self.__check_path('g22')) No newline at end of file
@@ -0,0 +1,124 b''
1 import unittest
2 from rhodecode.tests import *
3
4 from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
5 Permission
6 from rhodecode.model.user import UserModel
7
8 from rhodecode.model.meta import Session
9 from rhodecode.model.users_group import UsersGroupModel
10
11
12 class TestUser(unittest.TestCase):
13 def __init__(self, methodName='runTest'):
14 Session.remove()
15 super(TestUser, self).__init__(methodName=methodName)
16
17 def test_create_and_remove(self):
18 usr = UserModel().create_or_update(username=u'test_user',
19 password=u'qweqwe',
20 email=u'u232@rhodecode.org',
21 firstname=u'u1', lastname=u'u1')
22 Session().commit()
23 self.assertEqual(User.get_by_username(u'test_user'), usr)
24
25 # make users group
26 users_group = UsersGroupModel().create('some_example_group')
27 Session().commit()
28
29 UsersGroupModel().add_user_to_group(users_group, usr)
30 Session().commit()
31
32 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
33 self.assertEqual(UsersGroupMember.query().count(), 1)
34 UserModel().delete(usr.user_id)
35 Session().commit()
36
37 self.assertEqual(UsersGroupMember.query().all(), [])
38
39 def test_additonal_email_as_main(self):
40 usr = UserModel().create_or_update(username=u'test_user',
41 password=u'qweqwe',
42 email=u'main_email@rhodecode.org',
43 firstname=u'u1', lastname=u'u1')
44 Session().commit()
45
46 def do():
47 m = UserEmailMap()
48 m.email = u'main_email@rhodecode.org'
49 m.user = usr
50 Session().add(m)
51 Session().commit()
52 self.assertRaises(AttributeError, do)
53
54 UserModel().delete(usr.user_id)
55 Session().commit()
56
57 def test_extra_email_map(self):
58 usr = UserModel().create_or_update(username=u'test_user',
59 password=u'qweqwe',
60 email=u'main_email@rhodecode.org',
61 firstname=u'u1', lastname=u'u1')
62 Session().commit()
63
64 m = UserEmailMap()
65 m.email = u'main_email2@rhodecode.org'
66 m.user = usr
67 Session().add(m)
68 Session().commit()
69
70 u = User.get_by_email(email='main_email@rhodecode.org')
71 self.assertEqual(usr.user_id, u.user_id)
72 self.assertEqual(usr.username, u.username)
73
74 u = User.get_by_email(email='main_email2@rhodecode.org')
75 self.assertEqual(usr.user_id, u.user_id)
76 self.assertEqual(usr.username, u.username)
77 u = User.get_by_email(email='main_email3@rhodecode.org')
78 self.assertEqual(None, u)
79
80 UserModel().delete(usr.user_id)
81 Session().commit()
82
83
84 class TestUsers(unittest.TestCase):
85
86 def __init__(self, methodName='runTest'):
87 super(TestUsers, self).__init__(methodName=methodName)
88
89 def setUp(self):
90 self.u1 = UserModel().create_or_update(username=u'u1',
91 password=u'qweqwe',
92 email=u'u1@rhodecode.org',
93 firstname=u'u1', lastname=u'u1')
94
95 def tearDown(self):
96 perm = Permission.query().all()
97 for p in perm:
98 UserModel().revoke_perm(self.u1, p)
99
100 UserModel().delete(self.u1)
101 Session().commit()
102
103 def test_add_perm(self):
104 perm = Permission.query().all()[0]
105 UserModel().grant_perm(self.u1, perm)
106 Session().commit()
107 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
108
109 def test_has_perm(self):
110 perm = Permission.query().all()
111 for p in perm:
112 has_p = UserModel().has_perm(self.u1, p)
113 self.assertEqual(False, has_p)
114
115 def test_revoke_perm(self):
116 perm = Permission.query().all()[0]
117 UserModel().grant_perm(self.u1, perm)
118 Session().commit()
119 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
120
121 #revoke
122 UserModel().revoke_perm(self.u1, perm)
123 Session().commit()
124 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them
@@ -2,250 +2,11 b' import os'
2 2 import unittest
3 3 from rhodecode.tests import *
4 4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
8 UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
9 Repository, UserEmailMap
10 from sqlalchemy.exc import IntegrityError, DatabaseError
5 from rhodecode.model.db import User, Notification, UserNotification
11 6 from rhodecode.model.user import UserModel
12 7
13 8 from rhodecode.model.meta import Session
14 9 from rhodecode.model.notification import NotificationModel
15 from rhodecode.model.users_group import UsersGroupModel
16 from rhodecode.lib.auth import AuthUser
17
18
19 def _make_group(path, desc='desc', parent_id=None,
20 skip_if_exists=False):
21
22 gr = RepoGroup.get_by_group_name(path)
23 if gr and skip_if_exists:
24 return gr
25
26 gr = ReposGroupModel().create(path, desc, parent_id)
27 return gr
28
29
30 class TestReposGroups(unittest.TestCase):
31
32 def setUp(self):
33 self.g1 = _make_group('test1', skip_if_exists=True)
34 Session.commit()
35 self.g2 = _make_group('test2', skip_if_exists=True)
36 Session.commit()
37 self.g3 = _make_group('test3', skip_if_exists=True)
38 Session.commit()
39
40 def tearDown(self):
41 print 'out'
42
43 def __check_path(self, *path):
44 """
45 Checks the path for existance !
46 """
47 path = [TESTS_TMP_PATH] + list(path)
48 path = os.path.join(*path)
49 return os.path.isdir(path)
50
51 def _check_folders(self):
52 print os.listdir(TESTS_TMP_PATH)
53
54 def __delete_group(self, id_):
55 ReposGroupModel().delete(id_)
56
57 def __update_group(self, id_, path, desc='desc', parent_id=None):
58 form_data = dict(
59 group_name=path,
60 group_description=desc,
61 group_parent_id=parent_id,
62 perms_updates=[],
63 perms_new=[]
64 )
65 gr = ReposGroupModel().update(id_, form_data)
66 return gr
67
68 def test_create_group(self):
69 g = _make_group('newGroup')
70 self.assertEqual(g.full_path, 'newGroup')
71
72 self.assertTrue(self.__check_path('newGroup'))
73
74 def test_create_same_name_group(self):
75 self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
76 Session.rollback()
77
78 def test_same_subgroup(self):
79 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
80 self.assertEqual(sg1.parent_group, self.g1)
81 self.assertEqual(sg1.full_path, 'test1/sub1')
82 self.assertTrue(self.__check_path('test1', 'sub1'))
83
84 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
85 self.assertEqual(ssg1.parent_group, sg1)
86 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
87 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
88
89 def test_remove_group(self):
90 sg1 = _make_group('deleteme')
91 self.__delete_group(sg1.group_id)
92
93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 self.assertFalse(self.__check_path('deteteme'))
95
96 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
97 self.__delete_group(sg1.group_id)
98
99 self.assertEqual(RepoGroup.get(sg1.group_id), None)
100 self.assertFalse(self.__check_path('test1', 'deteteme'))
101
102 def test_rename_single_group(self):
103 sg1 = _make_group('initial')
104
105 new_sg1 = self.__update_group(sg1.group_id, 'after')
106 self.assertTrue(self.__check_path('after'))
107 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
108
109 def test_update_group_parent(self):
110
111 sg1 = _make_group('initial', parent_id=self.g1.group_id)
112
113 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
114 self.assertTrue(self.__check_path('test1', 'after'))
115 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
116
117 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
118 self.assertTrue(self.__check_path('test3', 'after'))
119 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
120
121 new_sg1 = self.__update_group(sg1.group_id, 'hello')
122 self.assertTrue(self.__check_path('hello'))
123
124 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
125
126 def test_subgrouping_with_repo(self):
127
128 g1 = _make_group('g1')
129 g2 = _make_group('g2')
130
131 # create new repo
132 form_data = dict(repo_name='john',
133 repo_name_full='john',
134 fork_name=None,
135 description=None,
136 repo_group=None,
137 private=False,
138 repo_type='hg',
139 clone_uri=None,
140 landing_rev='tip')
141 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
142 r = RepoModel().create(form_data, cur_user)
143
144 self.assertEqual(r.repo_name, 'john')
145
146 # put repo into group
147 form_data = form_data
148 form_data['repo_group'] = g1.group_id
149 form_data['perms_new'] = []
150 form_data['perms_updates'] = []
151 RepoModel().update(r.repo_name, form_data)
152 self.assertEqual(r.repo_name, 'g1/john')
153
154 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
155 self.assertTrue(self.__check_path('g2', 'g1'))
156
157 # test repo
158 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
159
160 def test_move_to_root(self):
161 g1 = _make_group('t11')
162 Session.commit()
163 g2 = _make_group('t22', parent_id=g1.group_id)
164 Session.commit()
165
166 self.assertEqual(g2.full_path, 't11/t22')
167 self.assertTrue(self.__check_path('t11', 't22'))
168
169 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
170 Session.commit()
171
172 self.assertEqual(g2.group_name, 'g22')
173 # we moved out group from t1 to '' so it's full path should be 'g2'
174 self.assertEqual(g2.full_path, 'g22')
175 self.assertFalse(self.__check_path('t11', 't22'))
176 self.assertTrue(self.__check_path('g22'))
177
178
179 class TestUser(unittest.TestCase):
180 def __init__(self, methodName='runTest'):
181 Session.remove()
182 super(TestUser, self).__init__(methodName=methodName)
183
184 def test_create_and_remove(self):
185 usr = UserModel().create_or_update(username=u'test_user',
186 password=u'qweqwe',
187 email=u'u232@rhodecode.org',
188 firstname=u'u1', lastname=u'u1')
189 Session.commit()
190 self.assertEqual(User.get_by_username(u'test_user'), usr)
191
192 # make users group
193 users_group = UsersGroupModel().create('some_example_group')
194 Session.commit()
195
196 UsersGroupModel().add_user_to_group(users_group, usr)
197 Session.commit()
198
199 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
200 self.assertEqual(UsersGroupMember.query().count(), 1)
201 UserModel().delete(usr.user_id)
202 Session.commit()
203
204 self.assertEqual(UsersGroupMember.query().all(), [])
205
206 def test_additonal_email_as_main(self):
207 usr = UserModel().create_or_update(username=u'test_user',
208 password=u'qweqwe',
209 email=u'main_email@rhodecode.org',
210 firstname=u'u1', lastname=u'u1')
211 Session.commit()
212
213 def do():
214 m = UserEmailMap()
215 m.email = u'main_email@rhodecode.org'
216 m.user = usr
217 Session.add(m)
218 Session.commit()
219 self.assertRaises(AttributeError, do)
220
221 UserModel().delete(usr.user_id)
222 Session.commit()
223
224 def test_extra_email_map(self):
225 usr = UserModel().create_or_update(username=u'test_user',
226 password=u'qweqwe',
227 email=u'main_email@rhodecode.org',
228 firstname=u'u1', lastname=u'u1')
229 Session.commit()
230
231 m = UserEmailMap()
232 m.email = u'main_email2@rhodecode.org'
233 m.user = usr
234 Session.add(m)
235 Session.commit()
236
237 u = User.get_by_email(email='main_email@rhodecode.org')
238 self.assertEqual(usr.user_id, u.user_id)
239 self.assertEqual(usr.username, u.username)
240
241 u = User.get_by_email(email='main_email2@rhodecode.org')
242 self.assertEqual(usr.user_id, u.user_id)
243 self.assertEqual(usr.username, u.username)
244 u = User.get_by_email(email='main_email3@rhodecode.org')
245 self.assertEqual(None, u)
246
247 UserModel().delete(usr.user_id)
248 Session.commit()
249 10
250 11
251 12 class TestNotifications(unittest.TestCase):
@@ -256,30 +17,30 b' class TestNotifications(unittest.TestCas'
256 17 password=u'qweqwe',
257 18 email=u'u1@rhodecode.org',
258 19 firstname=u'u1', lastname=u'u1')
259 Session.commit()
20 Session().commit()
260 21 self.u1 = self.u1.user_id
261 22
262 23 self.u2 = UserModel().create_or_update(username=u'u2',
263 24 password=u'qweqwe',
264 25 email=u'u2@rhodecode.org',
265 26 firstname=u'u2', lastname=u'u3')
266 Session.commit()
27 Session().commit()
267 28 self.u2 = self.u2.user_id
268 29
269 30 self.u3 = UserModel().create_or_update(username=u'u3',
270 31 password=u'qweqwe',
271 32 email=u'u3@rhodecode.org',
272 33 firstname=u'u3', lastname=u'u3')
273 Session.commit()
34 Session().commit()
274 35 self.u3 = self.u3.user_id
275 36
276 37 super(TestNotifications, self).__init__(methodName=methodName)
277 38
278 39 def _clean_notifications(self):
279 40 for n in Notification.query().all():
280 Session.delete(n)
41 Session().delete(n)
281 42
282 Session.commit()
43 Session().commit()
283 44 self.assertEqual(Notification.query().all(), [])
284 45
285 46 def tearDown(self):
@@ -293,7 +54,7 b' class TestNotifications(unittest.TestCas'
293 54 notification = NotificationModel().create(created_by=self.u1,
294 55 subject=u'subj', body=u'hi there',
295 56 recipients=usrs)
296 Session.commit()
57 Session().commit()
297 58 u1 = User.get(self.u1)
298 59 u2 = User.get(self.u2)
299 60 u3 = User.get(self.u3)
@@ -316,12 +77,12 b' class TestNotifications(unittest.TestCas'
316 77 notification1 = NotificationModel().create(created_by=self.u1,
317 78 subject=u'subj', body=u'hi there1',
318 79 recipients=[self.u3])
319 Session.commit()
80 Session().commit()
320 81 notification2 = NotificationModel().create(created_by=self.u1,
321 82 subject=u'subj', body=u'hi there2',
322 83 recipients=[self.u3])
323 Session.commit()
324 u3 = Session.query(User).get(self.u3)
84 Session().commit()
85 u3 = Session().query(User).get(self.u3)
325 86
326 87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
327 88 sorted([notification2, notification1]))
@@ -333,12 +94,12 b' class TestNotifications(unittest.TestCas'
333 94 notification = NotificationModel().create(created_by=self.u1,
334 95 subject=u'title', body=u'hi there3',
335 96 recipients=[self.u3, self.u1, self.u2])
336 Session.commit()
97 Session().commit()
337 98 notifications = Notification.query().all()
338 99 self.assertTrue(notification in notifications)
339 100
340 101 Notification.delete(notification.notification_id)
341 Session.commit()
102 Session().commit()
342 103
343 104 notifications = Notification.query().all()
344 105 self.assertFalse(notification in notifications)
@@ -355,7 +116,7 b' class TestNotifications(unittest.TestCas'
355 116 notification = NotificationModel().create(created_by=self.u1,
356 117 subject=u'title', body=u'hi there3',
357 118 recipients=[self.u3, self.u1, self.u2])
358 Session.commit()
119 Session().commit()
359 120
360 121 unotification = UserNotification.query()\
361 122 .filter(UserNotification.notification ==
@@ -367,7 +128,7 b' class TestNotifications(unittest.TestCas'
367 128
368 129 NotificationModel().delete(self.u3,
369 130 notification.notification_id)
370 Session.commit()
131 Session().commit()
371 132
372 133 u3notification = UserNotification.query()\
373 134 .filter(UserNotification.notification ==
@@ -402,7 +163,7 b' class TestNotifications(unittest.TestCas'
402 163 NotificationModel().create(created_by=self.u1,
403 164 subject=u'title', body=u'hi there_delete',
404 165 recipients=[self.u3, self.u1])
405 Session.commit()
166 Session().commit()
406 167
407 168 self.assertEqual(NotificationModel()
408 169 .get_unread_cnt_for_user(self.u1), 1)
@@ -414,7 +175,7 b' class TestNotifications(unittest.TestCas'
414 175 notification = NotificationModel().create(created_by=self.u1,
415 176 subject=u'title', body=u'hi there3',
416 177 recipients=[self.u3, self.u1, self.u2])
417 Session.commit()
178 Session().commit()
418 179
419 180 self.assertEqual(NotificationModel()
420 181 .get_unread_cnt_for_user(self.u1), 2)
@@ -424,338 +185,5 b' class TestNotifications(unittest.TestCas'
424 185 .get_unread_cnt_for_user(self.u3), 2)
425 186
426 187
427 class TestUsers(unittest.TestCase):
428
429 def __init__(self, methodName='runTest'):
430 super(TestUsers, self).__init__(methodName=methodName)
431
432 def setUp(self):
433 self.u1 = UserModel().create_or_update(username=u'u1',
434 password=u'qweqwe',
435 email=u'u1@rhodecode.org',
436 firstname=u'u1', lastname=u'u1')
437
438 def tearDown(self):
439 perm = Permission.query().all()
440 for p in perm:
441 UserModel().revoke_perm(self.u1, p)
442
443 UserModel().delete(self.u1)
444 Session.commit()
445
446 def test_add_perm(self):
447 perm = Permission.query().all()[0]
448 UserModel().grant_perm(self.u1, perm)
449 Session.commit()
450 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
451
452 def test_has_perm(self):
453 perm = Permission.query().all()
454 for p in perm:
455 has_p = UserModel().has_perm(self.u1, p)
456 self.assertEqual(False, has_p)
457
458 def test_revoke_perm(self):
459 perm = Permission.query().all()[0]
460 UserModel().grant_perm(self.u1, perm)
461 Session.commit()
462 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
463
464 #revoke
465 UserModel().revoke_perm(self.u1, perm)
466 Session.commit()
467 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
468 188
469 189
470 class TestPermissions(unittest.TestCase):
471 def __init__(self, methodName='runTest'):
472 super(TestPermissions, self).__init__(methodName=methodName)
473
474 def setUp(self):
475 self.u1 = UserModel().create_or_update(
476 username=u'u1', password=u'qweqwe',
477 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
478 )
479 self.u2 = UserModel().create_or_update(
480 username=u'u2', password=u'qweqwe',
481 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
482 )
483 self.anon = User.get_by_username('default')
484 self.a1 = UserModel().create_or_update(
485 username=u'a1', password=u'qweqwe',
486 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
487 )
488 Session.commit()
489
490 def tearDown(self):
491 if hasattr(self, 'test_repo'):
492 RepoModel().delete(repo=self.test_repo)
493 UserModel().delete(self.u1)
494 UserModel().delete(self.u2)
495 UserModel().delete(self.a1)
496 if hasattr(self, 'g1'):
497 ReposGroupModel().delete(self.g1.group_id)
498 if hasattr(self, 'g2'):
499 ReposGroupModel().delete(self.g2.group_id)
500
501 if hasattr(self, 'ug1'):
502 UsersGroupModel().delete(self.ug1, force=True)
503
504 Session.commit()
505
506 def test_default_perms_set(self):
507 u1_auth = AuthUser(user_id=self.u1.user_id)
508 perms = {
509 'repositories_groups': {},
510 'global': set([u'hg.create.repository', u'repository.read',
511 u'hg.register.manual_activate']),
512 'repositories': {u'vcs_test_hg': u'repository.read'}
513 }
514 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
515 perms['repositories'][HG_REPO])
516 new_perm = 'repository.write'
517 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
518 Session.commit()
519
520 u1_auth = AuthUser(user_id=self.u1.user_id)
521 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm)
522
523 def test_default_admin_perms_set(self):
524 a1_auth = AuthUser(user_id=self.a1.user_id)
525 perms = {
526 'repositories_groups': {},
527 'global': set([u'hg.admin']),
528 'repositories': {u'vcs_test_hg': u'repository.admin'}
529 }
530 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
531 perms['repositories'][HG_REPO])
532 new_perm = 'repository.write'
533 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm)
534 Session.commit()
535 # cannot really downgrade admins permissions !? they still get's set as
536 # admin !
537 u1_auth = AuthUser(user_id=self.a1.user_id)
538 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
539 perms['repositories'][HG_REPO])
540
541 def test_default_group_perms(self):
542 self.g1 = _make_group('test1', skip_if_exists=True)
543 self.g2 = _make_group('test2', skip_if_exists=True)
544 u1_auth = AuthUser(user_id=self.u1.user_id)
545 perms = {
546 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
547 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
548 'repositories': {u'vcs_test_hg': u'repository.read'}
549 }
550 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
551 perms['repositories'][HG_REPO])
552 self.assertEqual(u1_auth.permissions['repositories_groups'],
553 perms['repositories_groups'])
554
555 def test_default_admin_group_perms(self):
556 self.g1 = _make_group('test1', skip_if_exists=True)
557 self.g2 = _make_group('test2', skip_if_exists=True)
558 a1_auth = AuthUser(user_id=self.a1.user_id)
559 perms = {
560 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
561 'global': set(['hg.admin']),
562 'repositories': {u'vcs_test_hg': 'repository.admin'}
563 }
564
565 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
566 perms['repositories'][HG_REPO])
567 self.assertEqual(a1_auth.permissions['repositories_groups'],
568 perms['repositories_groups'])
569
570 def test_propagated_permission_from_users_group(self):
571 # make group
572 self.ug1 = UsersGroupModel().create('G1')
573 # add user to group
574 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
575
576 # set permission to lower
577 new_perm = 'repository.none'
578 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
579 Session.commit()
580 u1_auth = AuthUser(user_id=self.u1.user_id)
581 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
582 new_perm)
583
584 # grant perm for group this should override permission from user
585 new_perm = 'repository.write'
586 RepoModel().grant_users_group_permission(repo=HG_REPO,
587 group_name=self.ug1,
588 perm=new_perm)
589 # check perms
590 u1_auth = AuthUser(user_id=self.u1.user_id)
591 perms = {
592 'repositories_groups': {},
593 'global': set([u'hg.create.repository', u'repository.read',
594 u'hg.register.manual_activate']),
595 'repositories': {u'vcs_test_hg': u'repository.read'}
596 }
597 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
598 new_perm)
599 self.assertEqual(u1_auth.permissions['repositories_groups'],
600 perms['repositories_groups'])
601
602 def test_propagated_permission_from_users_group_lower_weight(self):
603 # make group
604 self.ug1 = UsersGroupModel().create('G1')
605 # add user to group
606 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
607
608 # set permission to lower
609 new_perm_h = 'repository.write'
610 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
611 perm=new_perm_h)
612 Session.commit()
613 u1_auth = AuthUser(user_id=self.u1.user_id)
614 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
615 new_perm_h)
616
617 # grant perm for group this should NOT override permission from user
618 # since it's lower than granted
619 new_perm_l = 'repository.read'
620 RepoModel().grant_users_group_permission(repo=HG_REPO,
621 group_name=self.ug1,
622 perm=new_perm_l)
623 # check perms
624 u1_auth = AuthUser(user_id=self.u1.user_id)
625 perms = {
626 'repositories_groups': {},
627 'global': set([u'hg.create.repository', u'repository.read',
628 u'hg.register.manual_activate']),
629 'repositories': {u'vcs_test_hg': u'repository.write'}
630 }
631 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
632 new_perm_h)
633 self.assertEqual(u1_auth.permissions['repositories_groups'],
634 perms['repositories_groups'])
635
636 def test_repo_in_group_permissions(self):
637 self.g1 = _make_group('group1', skip_if_exists=True)
638 self.g2 = _make_group('group2', skip_if_exists=True)
639 Session.commit()
640 # both perms should be read !
641 u1_auth = AuthUser(user_id=self.u1.user_id)
642 self.assertEqual(u1_auth.permissions['repositories_groups'],
643 {u'group1': u'group.read', u'group2': u'group.read'})
644
645 a1_auth = AuthUser(user_id=self.anon.user_id)
646 self.assertEqual(a1_auth.permissions['repositories_groups'],
647 {u'group1': u'group.read', u'group2': u'group.read'})
648
649 #Change perms to none for both groups
650 ReposGroupModel().grant_user_permission(repos_group=self.g1,
651 user=self.anon,
652 perm='group.none')
653 ReposGroupModel().grant_user_permission(repos_group=self.g2,
654 user=self.anon,
655 perm='group.none')
656
657 u1_auth = AuthUser(user_id=self.u1.user_id)
658 self.assertEqual(u1_auth.permissions['repositories_groups'],
659 {u'group1': u'group.none', u'group2': u'group.none'})
660
661 a1_auth = AuthUser(user_id=self.anon.user_id)
662 self.assertEqual(a1_auth.permissions['repositories_groups'],
663 {u'group1': u'group.none', u'group2': u'group.none'})
664
665 # add repo to group
666 form_data = {
667 'repo_name': HG_REPO,
668 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
669 'repo_type': 'hg',
670 'clone_uri': '',
671 'repo_group': self.g1.group_id,
672 'description': 'desc',
673 'private': False,
674 'landing_rev': 'tip'
675 }
676 self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
677 Session.commit()
678
679 u1_auth = AuthUser(user_id=self.u1.user_id)
680 self.assertEqual(u1_auth.permissions['repositories_groups'],
681 {u'group1': u'group.none', u'group2': u'group.none'})
682
683 a1_auth = AuthUser(user_id=self.anon.user_id)
684 self.assertEqual(a1_auth.permissions['repositories_groups'],
685 {u'group1': u'group.none', u'group2': u'group.none'})
686
687 #grant permission for u2 !
688 ReposGroupModel().grant_user_permission(repos_group=self.g1,
689 user=self.u2,
690 perm='group.read')
691 ReposGroupModel().grant_user_permission(repos_group=self.g2,
692 user=self.u2,
693 perm='group.read')
694 Session.commit()
695 self.assertNotEqual(self.u1, self.u2)
696 #u1 and anon should have not change perms while u2 should !
697 u1_auth = AuthUser(user_id=self.u1.user_id)
698 self.assertEqual(u1_auth.permissions['repositories_groups'],
699 {u'group1': u'group.none', u'group2': u'group.none'})
700
701 u2_auth = AuthUser(user_id=self.u2.user_id)
702 self.assertEqual(u2_auth.permissions['repositories_groups'],
703 {u'group1': u'group.read', u'group2': u'group.read'})
704
705 a1_auth = AuthUser(user_id=self.anon.user_id)
706 self.assertEqual(a1_auth.permissions['repositories_groups'],
707 {u'group1': u'group.none', u'group2': u'group.none'})
708
709 def test_repo_group_user_as_user_group_member(self):
710 # create Group1
711 self.g1 = _make_group('group1', skip_if_exists=True)
712 Session.commit()
713 a1_auth = AuthUser(user_id=self.anon.user_id)
714
715 self.assertEqual(a1_auth.permissions['repositories_groups'],
716 {u'group1': u'group.read'})
717
718 # set default permission to none
719 ReposGroupModel().grant_user_permission(repos_group=self.g1,
720 user=self.anon,
721 perm='group.none')
722 # make group
723 self.ug1 = UsersGroupModel().create('G1')
724 # add user to group
725 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
726 Session.commit()
727
728 # check if user is in the group
729 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
730 self.assertEqual(membrs, [self.u1.user_id])
731 # add some user to that group
732
733 # check his permissions
734 a1_auth = AuthUser(user_id=self.anon.user_id)
735 self.assertEqual(a1_auth.permissions['repositories_groups'],
736 {u'group1': u'group.none'})
737
738 u1_auth = AuthUser(user_id=self.u1.user_id)
739 self.assertEqual(u1_auth.permissions['repositories_groups'],
740 {u'group1': u'group.none'})
741
742 # grant ug1 read permissions for
743 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
744 group_name=self.ug1,
745 perm='group.read')
746 Session.commit()
747 # check if the
748 obj = Session.query(UsersGroupRepoGroupToPerm)\
749 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
750 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
751 .scalar()
752 self.assertEqual(obj.permission.permission_name, 'group.read')
753
754 a1_auth = AuthUser(user_id=self.anon.user_id)
755
756 self.assertEqual(a1_auth.permissions['repositories_groups'],
757 {u'group1': u'group.none'})
758
759 u1_auth = AuthUser(user_id=self.u1.user_id)
760 self.assertEqual(u1_auth.permissions['repositories_groups'],
761 {u'group1': u'group.read'})
1 NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
@@ -79,6 +79,7 b' class Command(object):'
79 79 print stdout, stderr
80 80 return stdout, stderr
81 81
82
82 83 def get_session():
83 84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 85 init_model(engine)
@@ -124,7 +125,6 b' def create_test_repo(force=True):'
124 125 if user is None:
125 126 raise Exception('user not found')
126 127
127
128 128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129 129
130 130 if repo is None:
@@ -140,6 +140,7 b' def create_test_repo(force=True):'
140 140
141 141 print 'done'
142 142
143
143 144 def set_anonymous_access(enable=True):
144 145 sa = get_session()
145 146 user = sa.query(User).filter(User.username == 'default').one()
@@ -147,6 +148,7 b' def set_anonymous_access(enable=True):'
147 148 sa.add(user)
148 149 sa.commit()
149 150
151
150 152 def get_anonymous_access():
151 153 sa = get_session()
152 154 return sa.query(User).filter(User.username == 'default').one().active
1 NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
@@ -64,7 +64,8 b' log = logging.getLogger(__name__)'
64 64
65 65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 66 init_model(engine)
67 sa = meta.Session
67 sa = meta.Session()
68
68 69
69 70 class Command(object):
70 71
@@ -137,7 +138,6 b' def create_test_repo(force=True):'
137 138 if user is None:
138 139 raise Exception('user not found')
139 140
140
141 141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
142 142
143 143 if repo is None:
@@ -161,6 +161,7 b' def set_anonymous_access(enable=True):'
161 161 if enable != User.get_by_username('default').active:
162 162 raise Exception('Cannot set anonymous access')
163 163
164
164 165 def get_anonymous_access():
165 166 user = User.get_by_username('default')
166 167 return user.active
@@ -235,6 +236,7 b' def test_clone_anonymous():'
235 236 print '\tdisabling anonymous access'
236 237 set_anonymous_access(enable=False)
237 238
239
238 240 @test_wrapp
239 241 def test_clone_wrong_credentials():
240 242 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -264,10 +266,12 b' def test_clone_wrong_credentials():'
264 266 if not """abort: authorization failed""" in stderr:
265 267 raise Exception('Failure')
266 268
269
267 270 @test_wrapp
268 271 def test_pull():
269 272 pass
270 273
274
271 275 @test_wrapp
272 276 def test_push_modify_file(f_name='setup.py'):
273 277 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -281,6 +285,7 b" def test_push_modify_file(f_name='setup."
281 285
282 286 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
283 287
288
284 289 @test_wrapp
285 290 def test_push_new_file(commits=15, with_clone=True):
286 291
@@ -312,6 +317,7 b' def test_push_new_file(commits=15, with_'
312 317
313 318 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
314 319
320
315 321 @test_wrapp
316 322 def test_push_wrong_credentials():
317 323 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -332,6 +338,7 b' def test_push_wrong_credentials():'
332 338
333 339 Command(cwd).execute('hg push %s' % clone_url)
334 340
341
335 342 @test_wrapp
336 343 def test_push_wrong_path():
337 344 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -366,10 +373,12 b' def test_push_wrong_path():'
366 373 if not """abort: HTTP Error 403: Forbidden""" in stderr:
367 374 raise Exception('Failure')
368 375
376
369 377 @test_wrapp
370 378 def get_logs():
371 379 return UserLog.query().all()
372 380
381
373 382 @test_wrapp
374 383 def test_logs(initial):
375 384 logs = UserLog.query().all()
@@ -22,8 +22,8 b' function at ``tests/__init__.py``.'
22 22 import os
23 23 from rhodecode.lib import vcs
24 24 from rhodecode.lib.vcs.utils.compat import unittest
25 from utils import VCSTestError, SCMFetcher
25 26 from rhodecode.tests import *
26 from utils import VCSTestError, SCMFetcher
27 27
28 28
29 29 def setup_package():
General Comments 0
You need to be logged in to leave comments. Login now