##// END OF EJS Templates
orginized test module...
marcink -
r2527:95624ce4 beta
parent child Browse files
Show More
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -0,0 +1,316 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
8 from rhodecode.model.user import UserModel
9
10 from rhodecode.model.meta import Session
11 from rhodecode.model.users_group import UsersGroupModel
12 from rhodecode.lib.auth import AuthUser
13
14
15 def _make_group(path, desc='desc', parent_id=None,
16 skip_if_exists=False):
17
18 gr = RepoGroup.get_by_group_name(path)
19 if gr and skip_if_exists:
20 return gr
21
22 gr = ReposGroupModel().create(path, desc, parent_id)
23 return gr
24
25
26 class TestPermissions(unittest.TestCase):
27 def __init__(self, methodName='runTest'):
28 super(TestPermissions, self).__init__(methodName=methodName)
29
30 def setUp(self):
31 self.u1 = UserModel().create_or_update(
32 username=u'u1', password=u'qweqwe',
33 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
34 )
35 self.u2 = UserModel().create_or_update(
36 username=u'u2', password=u'qweqwe',
37 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
38 )
39 self.anon = User.get_by_username('default')
40 self.a1 = UserModel().create_or_update(
41 username=u'a1', password=u'qweqwe',
42 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
43 )
44 Session().commit()
45
46 def tearDown(self):
47 if hasattr(self, 'test_repo'):
48 RepoModel().delete(repo=self.test_repo)
49 UserModel().delete(self.u1)
50 UserModel().delete(self.u2)
51 UserModel().delete(self.a1)
52 if hasattr(self, 'g1'):
53 ReposGroupModel().delete(self.g1.group_id)
54 if hasattr(self, 'g2'):
55 ReposGroupModel().delete(self.g2.group_id)
56
57 if hasattr(self, 'ug1'):
58 UsersGroupModel().delete(self.ug1, force=True)
59
60 Session().commit()
61
62 def test_default_perms_set(self):
63 u1_auth = AuthUser(user_id=self.u1.user_id)
64 perms = {
65 'repositories_groups': {},
66 'global': set([u'hg.create.repository', u'repository.read',
67 u'hg.register.manual_activate']),
68 'repositories': {u'vcs_test_hg': u'repository.read'}
69 }
70 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 perms['repositories'][HG_REPO])
72 new_perm = 'repository.write'
73 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 perm=new_perm)
75 Session().commit()
76
77 u1_auth = AuthUser(user_id=self.u1.user_id)
78 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 new_perm)
80
81 def test_default_admin_perms_set(self):
82 a1_auth = AuthUser(user_id=self.a1.user_id)
83 perms = {
84 'repositories_groups': {},
85 'global': set([u'hg.admin']),
86 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 }
88 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 perms['repositories'][HG_REPO])
90 new_perm = 'repository.write'
91 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 perm=new_perm)
93 Session().commit()
94 # cannot really downgrade admins permissions !? they still get's set as
95 # admin !
96 u1_auth = AuthUser(user_id=self.a1.user_id)
97 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 perms['repositories'][HG_REPO])
99
100 def test_default_group_perms(self):
101 self.g1 = _make_group('test1', skip_if_exists=True)
102 self.g2 = _make_group('test2', skip_if_exists=True)
103 u1_auth = AuthUser(user_id=self.u1.user_id)
104 perms = {
105 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
107 'repositories': {u'vcs_test_hg': u'repository.read'}
108 }
109 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 perms['repositories'][HG_REPO])
111 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 perms['repositories_groups'])
113
114 def test_default_admin_group_perms(self):
115 self.g1 = _make_group('test1', skip_if_exists=True)
116 self.g2 = _make_group('test2', skip_if_exists=True)
117 a1_auth = AuthUser(user_id=self.a1.user_id)
118 perms = {
119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
120 'global': set(['hg.admin']),
121 'repositories': {u'vcs_test_hg': 'repository.admin'}
122 }
123
124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
125 perms['repositories'][HG_REPO])
126 self.assertEqual(a1_auth.permissions['repositories_groups'],
127 perms['repositories_groups'])
128
129 def test_propagated_permission_from_users_group(self):
130 # make group
131 self.ug1 = UsersGroupModel().create('G1')
132 # add user to group
133 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
134
135 # set permission to lower
136 new_perm = 'repository.none'
137 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
138 Session().commit()
139 u1_auth = AuthUser(user_id=self.u1.user_id)
140 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
141 new_perm)
142
143 # grant perm for group this should override permission from user
144 new_perm = 'repository.write'
145 RepoModel().grant_users_group_permission(repo=HG_REPO,
146 group_name=self.ug1,
147 perm=new_perm)
148 # check perms
149 u1_auth = AuthUser(user_id=self.u1.user_id)
150 perms = {
151 'repositories_groups': {},
152 'global': set([u'hg.create.repository', u'repository.read',
153 u'hg.register.manual_activate']),
154 'repositories': {u'vcs_test_hg': u'repository.read'}
155 }
156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
157 new_perm)
158 self.assertEqual(u1_auth.permissions['repositories_groups'],
159 perms['repositories_groups'])
160
161 def test_propagated_permission_from_users_group_lower_weight(self):
162 # make group
163 self.ug1 = UsersGroupModel().create('G1')
164 # add user to group
165 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
166
167 # set permission to lower
168 new_perm_h = 'repository.write'
169 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
170 perm=new_perm_h)
171 Session().commit()
172 u1_auth = AuthUser(user_id=self.u1.user_id)
173 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
174 new_perm_h)
175
176 # grant perm for group this should NOT override permission from user
177 # since it's lower than granted
178 new_perm_l = 'repository.read'
179 RepoModel().grant_users_group_permission(repo=HG_REPO,
180 group_name=self.ug1,
181 perm=new_perm_l)
182 # check perms
183 u1_auth = AuthUser(user_id=self.u1.user_id)
184 perms = {
185 'repositories_groups': {},
186 'global': set([u'hg.create.repository', u'repository.read',
187 u'hg.register.manual_activate']),
188 'repositories': {u'vcs_test_hg': u'repository.write'}
189 }
190 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
191 new_perm_h)
192 self.assertEqual(u1_auth.permissions['repositories_groups'],
193 perms['repositories_groups'])
194
195 def test_repo_in_group_permissions(self):
196 self.g1 = _make_group('group1', skip_if_exists=True)
197 self.g2 = _make_group('group2', skip_if_exists=True)
198 Session().commit()
199 # both perms should be read !
200 u1_auth = AuthUser(user_id=self.u1.user_id)
201 self.assertEqual(u1_auth.permissions['repositories_groups'],
202 {u'group1': u'group.read', u'group2': u'group.read'})
203
204 a1_auth = AuthUser(user_id=self.anon.user_id)
205 self.assertEqual(a1_auth.permissions['repositories_groups'],
206 {u'group1': u'group.read', u'group2': u'group.read'})
207
208 #Change perms to none for both groups
209 ReposGroupModel().grant_user_permission(repos_group=self.g1,
210 user=self.anon,
211 perm='group.none')
212 ReposGroupModel().grant_user_permission(repos_group=self.g2,
213 user=self.anon,
214 perm='group.none')
215
216 u1_auth = AuthUser(user_id=self.u1.user_id)
217 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 {u'group1': u'group.none', u'group2': u'group.none'})
219
220 a1_auth = AuthUser(user_id=self.anon.user_id)
221 self.assertEqual(a1_auth.permissions['repositories_groups'],
222 {u'group1': u'group.none', u'group2': u'group.none'})
223
224 # add repo to group
225 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
226 self.test_repo = RepoModel().create_repo(
227 repo_name=name,
228 repo_type='hg',
229 description='',
230 owner=self.u1,
231 )
232 Session().commit()
233
234 u1_auth = AuthUser(user_id=self.u1.user_id)
235 self.assertEqual(u1_auth.permissions['repositories_groups'],
236 {u'group1': u'group.none', u'group2': u'group.none'})
237
238 a1_auth = AuthUser(user_id=self.anon.user_id)
239 self.assertEqual(a1_auth.permissions['repositories_groups'],
240 {u'group1': u'group.none', u'group2': u'group.none'})
241
242 #grant permission for u2 !
243 ReposGroupModel().grant_user_permission(repos_group=self.g1,
244 user=self.u2,
245 perm='group.read')
246 ReposGroupModel().grant_user_permission(repos_group=self.g2,
247 user=self.u2,
248 perm='group.read')
249 Session().commit()
250 self.assertNotEqual(self.u1, self.u2)
251 #u1 and anon should have not change perms while u2 should !
252 u1_auth = AuthUser(user_id=self.u1.user_id)
253 self.assertEqual(u1_auth.permissions['repositories_groups'],
254 {u'group1': u'group.none', u'group2': u'group.none'})
255
256 u2_auth = AuthUser(user_id=self.u2.user_id)
257 self.assertEqual(u2_auth.permissions['repositories_groups'],
258 {u'group1': u'group.read', u'group2': u'group.read'})
259
260 a1_auth = AuthUser(user_id=self.anon.user_id)
261 self.assertEqual(a1_auth.permissions['repositories_groups'],
262 {u'group1': u'group.none', u'group2': u'group.none'})
263
264 def test_repo_group_user_as_user_group_member(self):
265 # create Group1
266 self.g1 = _make_group('group1', skip_if_exists=True)
267 Session().commit()
268 a1_auth = AuthUser(user_id=self.anon.user_id)
269
270 self.assertEqual(a1_auth.permissions['repositories_groups'],
271 {u'group1': u'group.read'})
272
273 # set default permission to none
274 ReposGroupModel().grant_user_permission(repos_group=self.g1,
275 user=self.anon,
276 perm='group.none')
277 # make group
278 self.ug1 = UsersGroupModel().create('G1')
279 # add user to group
280 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
281 Session().commit()
282
283 # check if user is in the group
284 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
285 self.assertEqual(membrs, [self.u1.user_id])
286 # add some user to that group
287
288 # check his permissions
289 a1_auth = AuthUser(user_id=self.anon.user_id)
290 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 {u'group1': u'group.none'})
292
293 u1_auth = AuthUser(user_id=self.u1.user_id)
294 self.assertEqual(u1_auth.permissions['repositories_groups'],
295 {u'group1': u'group.none'})
296
297 # grant ug1 read permissions for
298 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
299 group_name=self.ug1,
300 perm='group.read')
301 Session().commit()
302 # check if the
303 obj = Session().query(UsersGroupRepoGroupToPerm)\
304 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
305 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
306 .scalar()
307 self.assertEqual(obj.permission.permission_name, 'group.read')
308
309 a1_auth = AuthUser(user_id=self.anon.user_id)
310
311 self.assertEqual(a1_auth.permissions['repositories_groups'],
312 {u'group1': u'group.none'})
313
314 u1_auth = AuthUser(user_id=self.u1.user_id)
315 self.assertEqual(u1_auth.permissions['repositories_groups'],
316 {u'group1': u'group.read'})
@@ -0,0 +1,170 b''
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User
8 from rhodecode.model.meta import Session
9 from sqlalchemy.exc import IntegrityError
10
11
12 def _make_group(path, desc='desc', parent_id=None,
13 skip_if_exists=False):
14
15 gr = RepoGroup.get_by_group_name(path)
16 if gr and skip_if_exists:
17 return gr
18
19 gr = ReposGroupModel().create(path, desc, parent_id)
20 return gr
21
22
23 class TestReposGroups(unittest.TestCase):
24
25 def setUp(self):
26 self.g1 = _make_group('test1', skip_if_exists=True)
27 Session().commit()
28 self.g2 = _make_group('test2', skip_if_exists=True)
29 Session().commit()
30 self.g3 = _make_group('test3', skip_if_exists=True)
31 Session().commit()
32
33 def tearDown(self):
34 print 'out'
35
36 def __check_path(self, *path):
37 """
38 Checks the path for existance !
39 """
40 path = [TESTS_TMP_PATH] + list(path)
41 path = os.path.join(*path)
42 return os.path.isdir(path)
43
44 def _check_folders(self):
45 print os.listdir(TESTS_TMP_PATH)
46
47 def __delete_group(self, id_):
48 ReposGroupModel().delete(id_)
49
50 def __update_group(self, id_, path, desc='desc', parent_id=None):
51 form_data = dict(
52 group_name=path,
53 group_description=desc,
54 group_parent_id=parent_id,
55 perms_updates=[],
56 perms_new=[]
57 )
58 gr = ReposGroupModel().update(id_, form_data)
59 return gr
60
61 def test_create_group(self):
62 g = _make_group('newGroup')
63 self.assertEqual(g.full_path, 'newGroup')
64
65 self.assertTrue(self.__check_path('newGroup'))
66
67 def test_create_same_name_group(self):
68 self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
69 Session().rollback()
70
71 def test_same_subgroup(self):
72 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
73 self.assertEqual(sg1.parent_group, self.g1)
74 self.assertEqual(sg1.full_path, 'test1/sub1')
75 self.assertTrue(self.__check_path('test1', 'sub1'))
76
77 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
78 self.assertEqual(ssg1.parent_group, sg1)
79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
81
82 def test_remove_group(self):
83 sg1 = _make_group('deleteme')
84 self.__delete_group(sg1.group_id)
85
86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
87 self.assertFalse(self.__check_path('deteteme'))
88
89 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
90 self.__delete_group(sg1.group_id)
91
92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
93 self.assertFalse(self.__check_path('test1', 'deteteme'))
94
95 def test_rename_single_group(self):
96 sg1 = _make_group('initial')
97
98 new_sg1 = self.__update_group(sg1.group_id, 'after')
99 self.assertTrue(self.__check_path('after'))
100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
101
102 def test_update_group_parent(self):
103
104 sg1 = _make_group('initial', parent_id=self.g1.group_id)
105
106 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
107 self.assertTrue(self.__check_path('test1', 'after'))
108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
109
110 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
111 self.assertTrue(self.__check_path('test3', 'after'))
112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
113
114 new_sg1 = self.__update_group(sg1.group_id, 'hello')
115 self.assertTrue(self.__check_path('hello'))
116
117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
118
119 def test_subgrouping_with_repo(self):
120
121 g1 = _make_group('g1')
122 g2 = _make_group('g2')
123
124 # create new repo
125 form_data = dict(repo_name='john',
126 repo_name_full='john',
127 fork_name=None,
128 description=None,
129 repo_group=None,
130 private=False,
131 repo_type='hg',
132 clone_uri=None,
133 landing_rev='tip')
134 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
135 r = RepoModel().create(form_data, cur_user)
136
137 self.assertEqual(r.repo_name, 'john')
138
139 # put repo into group
140 form_data = form_data
141 form_data['repo_group'] = g1.group_id
142 form_data['perms_new'] = []
143 form_data['perms_updates'] = []
144 RepoModel().update(r.repo_name, form_data)
145 self.assertEqual(r.repo_name, 'g1/john')
146
147 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
148 self.assertTrue(self.__check_path('g2', 'g1'))
149
150 # test repo
151 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
152 r.just_name]))
153
154 def test_move_to_root(self):
155 g1 = _make_group('t11')
156 Session().commit()
157 g2 = _make_group('t22', parent_id=g1.group_id)
158 Session().commit()
159
160 self.assertEqual(g2.full_path, 't11/t22')
161 self.assertTrue(self.__check_path('t11', 't22'))
162
163 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
164 Session().commit()
165
166 self.assertEqual(g2.group_name, 'g22')
167 # we moved out group from t1 to '' so it's full path should be 'g2'
168 self.assertEqual(g2.full_path, 'g22')
169 self.assertFalse(self.__check_path('t11', 't22'))
170 self.assertTrue(self.__check_path('g22')) No newline at end of file
@@ -0,0 +1,124 b''
1 import unittest
2 from rhodecode.tests import *
3
4 from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
5 Permission
6 from rhodecode.model.user import UserModel
7
8 from rhodecode.model.meta import Session
9 from rhodecode.model.users_group import UsersGroupModel
10
11
12 class TestUser(unittest.TestCase):
13 def __init__(self, methodName='runTest'):
14 Session.remove()
15 super(TestUser, self).__init__(methodName=methodName)
16
17 def test_create_and_remove(self):
18 usr = UserModel().create_or_update(username=u'test_user',
19 password=u'qweqwe',
20 email=u'u232@rhodecode.org',
21 firstname=u'u1', lastname=u'u1')
22 Session().commit()
23 self.assertEqual(User.get_by_username(u'test_user'), usr)
24
25 # make users group
26 users_group = UsersGroupModel().create('some_example_group')
27 Session().commit()
28
29 UsersGroupModel().add_user_to_group(users_group, usr)
30 Session().commit()
31
32 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
33 self.assertEqual(UsersGroupMember.query().count(), 1)
34 UserModel().delete(usr.user_id)
35 Session().commit()
36
37 self.assertEqual(UsersGroupMember.query().all(), [])
38
39 def test_additonal_email_as_main(self):
40 usr = UserModel().create_or_update(username=u'test_user',
41 password=u'qweqwe',
42 email=u'main_email@rhodecode.org',
43 firstname=u'u1', lastname=u'u1')
44 Session().commit()
45
46 def do():
47 m = UserEmailMap()
48 m.email = u'main_email@rhodecode.org'
49 m.user = usr
50 Session().add(m)
51 Session().commit()
52 self.assertRaises(AttributeError, do)
53
54 UserModel().delete(usr.user_id)
55 Session().commit()
56
57 def test_extra_email_map(self):
58 usr = UserModel().create_or_update(username=u'test_user',
59 password=u'qweqwe',
60 email=u'main_email@rhodecode.org',
61 firstname=u'u1', lastname=u'u1')
62 Session().commit()
63
64 m = UserEmailMap()
65 m.email = u'main_email2@rhodecode.org'
66 m.user = usr
67 Session().add(m)
68 Session().commit()
69
70 u = User.get_by_email(email='main_email@rhodecode.org')
71 self.assertEqual(usr.user_id, u.user_id)
72 self.assertEqual(usr.username, u.username)
73
74 u = User.get_by_email(email='main_email2@rhodecode.org')
75 self.assertEqual(usr.user_id, u.user_id)
76 self.assertEqual(usr.username, u.username)
77 u = User.get_by_email(email='main_email3@rhodecode.org')
78 self.assertEqual(None, u)
79
80 UserModel().delete(usr.user_id)
81 Session().commit()
82
83
84 class TestUsers(unittest.TestCase):
85
86 def __init__(self, methodName='runTest'):
87 super(TestUsers, self).__init__(methodName=methodName)
88
89 def setUp(self):
90 self.u1 = UserModel().create_or_update(username=u'u1',
91 password=u'qweqwe',
92 email=u'u1@rhodecode.org',
93 firstname=u'u1', lastname=u'u1')
94
95 def tearDown(self):
96 perm = Permission.query().all()
97 for p in perm:
98 UserModel().revoke_perm(self.u1, p)
99
100 UserModel().delete(self.u1)
101 Session().commit()
102
103 def test_add_perm(self):
104 perm = Permission.query().all()[0]
105 UserModel().grant_perm(self.u1, perm)
106 Session().commit()
107 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
108
109 def test_has_perm(self):
110 perm = Permission.query().all()
111 for p in perm:
112 has_p = UserModel().has_perm(self.u1, p)
113 self.assertEqual(False, has_p)
114
115 def test_revoke_perm(self):
116 perm = Permission.query().all()[0]
117 UserModel().grant_perm(self.u1, perm)
118 Session().commit()
119 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
120
121 #revoke
122 UserModel().revoke_perm(self.u1, perm)
123 Session().commit()
124 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them
@@ -2,250 +2,11 b' import os'
2 import unittest
2 import unittest
3 from rhodecode.tests import *
3 from rhodecode.tests import *
4
4
5 from rhodecode.model.repos_group import ReposGroupModel
5 from rhodecode.model.db import User, Notification, UserNotification
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
8 UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
9 Repository, UserEmailMap
10 from sqlalchemy.exc import IntegrityError, DatabaseError
11 from rhodecode.model.user import UserModel
6 from rhodecode.model.user import UserModel
12
7
13 from rhodecode.model.meta import Session
8 from rhodecode.model.meta import Session
14 from rhodecode.model.notification import NotificationModel
9 from rhodecode.model.notification import NotificationModel
15 from rhodecode.model.users_group import UsersGroupModel
16 from rhodecode.lib.auth import AuthUser
17
18
19 def _make_group(path, desc='desc', parent_id=None,
20 skip_if_exists=False):
21
22 gr = RepoGroup.get_by_group_name(path)
23 if gr and skip_if_exists:
24 return gr
25
26 gr = ReposGroupModel().create(path, desc, parent_id)
27 return gr
28
29
30 class TestReposGroups(unittest.TestCase):
31
32 def setUp(self):
33 self.g1 = _make_group('test1', skip_if_exists=True)
34 Session.commit()
35 self.g2 = _make_group('test2', skip_if_exists=True)
36 Session.commit()
37 self.g3 = _make_group('test3', skip_if_exists=True)
38 Session.commit()
39
40 def tearDown(self):
41 print 'out'
42
43 def __check_path(self, *path):
44 """
45 Checks the path for existance !
46 """
47 path = [TESTS_TMP_PATH] + list(path)
48 path = os.path.join(*path)
49 return os.path.isdir(path)
50
51 def _check_folders(self):
52 print os.listdir(TESTS_TMP_PATH)
53
54 def __delete_group(self, id_):
55 ReposGroupModel().delete(id_)
56
57 def __update_group(self, id_, path, desc='desc', parent_id=None):
58 form_data = dict(
59 group_name=path,
60 group_description=desc,
61 group_parent_id=parent_id,
62 perms_updates=[],
63 perms_new=[]
64 )
65 gr = ReposGroupModel().update(id_, form_data)
66 return gr
67
68 def test_create_group(self):
69 g = _make_group('newGroup')
70 self.assertEqual(g.full_path, 'newGroup')
71
72 self.assertTrue(self.__check_path('newGroup'))
73
74 def test_create_same_name_group(self):
75 self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
76 Session.rollback()
77
78 def test_same_subgroup(self):
79 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
80 self.assertEqual(sg1.parent_group, self.g1)
81 self.assertEqual(sg1.full_path, 'test1/sub1')
82 self.assertTrue(self.__check_path('test1', 'sub1'))
83
84 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
85 self.assertEqual(ssg1.parent_group, sg1)
86 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
87 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
88
89 def test_remove_group(self):
90 sg1 = _make_group('deleteme')
91 self.__delete_group(sg1.group_id)
92
93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 self.assertFalse(self.__check_path('deteteme'))
95
96 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
97 self.__delete_group(sg1.group_id)
98
99 self.assertEqual(RepoGroup.get(sg1.group_id), None)
100 self.assertFalse(self.__check_path('test1', 'deteteme'))
101
102 def test_rename_single_group(self):
103 sg1 = _make_group('initial')
104
105 new_sg1 = self.__update_group(sg1.group_id, 'after')
106 self.assertTrue(self.__check_path('after'))
107 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
108
109 def test_update_group_parent(self):
110
111 sg1 = _make_group('initial', parent_id=self.g1.group_id)
112
113 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
114 self.assertTrue(self.__check_path('test1', 'after'))
115 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
116
117 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
118 self.assertTrue(self.__check_path('test3', 'after'))
119 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
120
121 new_sg1 = self.__update_group(sg1.group_id, 'hello')
122 self.assertTrue(self.__check_path('hello'))
123
124 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
125
126 def test_subgrouping_with_repo(self):
127
128 g1 = _make_group('g1')
129 g2 = _make_group('g2')
130
131 # create new repo
132 form_data = dict(repo_name='john',
133 repo_name_full='john',
134 fork_name=None,
135 description=None,
136 repo_group=None,
137 private=False,
138 repo_type='hg',
139 clone_uri=None,
140 landing_rev='tip')
141 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
142 r = RepoModel().create(form_data, cur_user)
143
144 self.assertEqual(r.repo_name, 'john')
145
146 # put repo into group
147 form_data = form_data
148 form_data['repo_group'] = g1.group_id
149 form_data['perms_new'] = []
150 form_data['perms_updates'] = []
151 RepoModel().update(r.repo_name, form_data)
152 self.assertEqual(r.repo_name, 'g1/john')
153
154 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
155 self.assertTrue(self.__check_path('g2', 'g1'))
156
157 # test repo
158 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
159
160 def test_move_to_root(self):
161 g1 = _make_group('t11')
162 Session.commit()
163 g2 = _make_group('t22', parent_id=g1.group_id)
164 Session.commit()
165
166 self.assertEqual(g2.full_path, 't11/t22')
167 self.assertTrue(self.__check_path('t11', 't22'))
168
169 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
170 Session.commit()
171
172 self.assertEqual(g2.group_name, 'g22')
173 # we moved out group from t1 to '' so it's full path should be 'g2'
174 self.assertEqual(g2.full_path, 'g22')
175 self.assertFalse(self.__check_path('t11', 't22'))
176 self.assertTrue(self.__check_path('g22'))
177
178
179 class TestUser(unittest.TestCase):
180 def __init__(self, methodName='runTest'):
181 Session.remove()
182 super(TestUser, self).__init__(methodName=methodName)
183
184 def test_create_and_remove(self):
185 usr = UserModel().create_or_update(username=u'test_user',
186 password=u'qweqwe',
187 email=u'u232@rhodecode.org',
188 firstname=u'u1', lastname=u'u1')
189 Session.commit()
190 self.assertEqual(User.get_by_username(u'test_user'), usr)
191
192 # make users group
193 users_group = UsersGroupModel().create('some_example_group')
194 Session.commit()
195
196 UsersGroupModel().add_user_to_group(users_group, usr)
197 Session.commit()
198
199 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
200 self.assertEqual(UsersGroupMember.query().count(), 1)
201 UserModel().delete(usr.user_id)
202 Session.commit()
203
204 self.assertEqual(UsersGroupMember.query().all(), [])
205
206 def test_additonal_email_as_main(self):
207 usr = UserModel().create_or_update(username=u'test_user',
208 password=u'qweqwe',
209 email=u'main_email@rhodecode.org',
210 firstname=u'u1', lastname=u'u1')
211 Session.commit()
212
213 def do():
214 m = UserEmailMap()
215 m.email = u'main_email@rhodecode.org'
216 m.user = usr
217 Session.add(m)
218 Session.commit()
219 self.assertRaises(AttributeError, do)
220
221 UserModel().delete(usr.user_id)
222 Session.commit()
223
224 def test_extra_email_map(self):
225 usr = UserModel().create_or_update(username=u'test_user',
226 password=u'qweqwe',
227 email=u'main_email@rhodecode.org',
228 firstname=u'u1', lastname=u'u1')
229 Session.commit()
230
231 m = UserEmailMap()
232 m.email = u'main_email2@rhodecode.org'
233 m.user = usr
234 Session.add(m)
235 Session.commit()
236
237 u = User.get_by_email(email='main_email@rhodecode.org')
238 self.assertEqual(usr.user_id, u.user_id)
239 self.assertEqual(usr.username, u.username)
240
241 u = User.get_by_email(email='main_email2@rhodecode.org')
242 self.assertEqual(usr.user_id, u.user_id)
243 self.assertEqual(usr.username, u.username)
244 u = User.get_by_email(email='main_email3@rhodecode.org')
245 self.assertEqual(None, u)
246
247 UserModel().delete(usr.user_id)
248 Session.commit()
249
10
250
11
251 class TestNotifications(unittest.TestCase):
12 class TestNotifications(unittest.TestCase):
@@ -256,30 +17,30 b' class TestNotifications(unittest.TestCas'
256 password=u'qweqwe',
17 password=u'qweqwe',
257 email=u'u1@rhodecode.org',
18 email=u'u1@rhodecode.org',
258 firstname=u'u1', lastname=u'u1')
19 firstname=u'u1', lastname=u'u1')
259 Session.commit()
20 Session().commit()
260 self.u1 = self.u1.user_id
21 self.u1 = self.u1.user_id
261
22
262 self.u2 = UserModel().create_or_update(username=u'u2',
23 self.u2 = UserModel().create_or_update(username=u'u2',
263 password=u'qweqwe',
24 password=u'qweqwe',
264 email=u'u2@rhodecode.org',
25 email=u'u2@rhodecode.org',
265 firstname=u'u2', lastname=u'u3')
26 firstname=u'u2', lastname=u'u3')
266 Session.commit()
27 Session().commit()
267 self.u2 = self.u2.user_id
28 self.u2 = self.u2.user_id
268
29
269 self.u3 = UserModel().create_or_update(username=u'u3',
30 self.u3 = UserModel().create_or_update(username=u'u3',
270 password=u'qweqwe',
31 password=u'qweqwe',
271 email=u'u3@rhodecode.org',
32 email=u'u3@rhodecode.org',
272 firstname=u'u3', lastname=u'u3')
33 firstname=u'u3', lastname=u'u3')
273 Session.commit()
34 Session().commit()
274 self.u3 = self.u3.user_id
35 self.u3 = self.u3.user_id
275
36
276 super(TestNotifications, self).__init__(methodName=methodName)
37 super(TestNotifications, self).__init__(methodName=methodName)
277
38
278 def _clean_notifications(self):
39 def _clean_notifications(self):
279 for n in Notification.query().all():
40 for n in Notification.query().all():
280 Session.delete(n)
41 Session().delete(n)
281
42
282 Session.commit()
43 Session().commit()
283 self.assertEqual(Notification.query().all(), [])
44 self.assertEqual(Notification.query().all(), [])
284
45
285 def tearDown(self):
46 def tearDown(self):
@@ -293,7 +54,7 b' class TestNotifications(unittest.TestCas'
293 notification = NotificationModel().create(created_by=self.u1,
54 notification = NotificationModel().create(created_by=self.u1,
294 subject=u'subj', body=u'hi there',
55 subject=u'subj', body=u'hi there',
295 recipients=usrs)
56 recipients=usrs)
296 Session.commit()
57 Session().commit()
297 u1 = User.get(self.u1)
58 u1 = User.get(self.u1)
298 u2 = User.get(self.u2)
59 u2 = User.get(self.u2)
299 u3 = User.get(self.u3)
60 u3 = User.get(self.u3)
@@ -316,12 +77,12 b' class TestNotifications(unittest.TestCas'
316 notification1 = NotificationModel().create(created_by=self.u1,
77 notification1 = NotificationModel().create(created_by=self.u1,
317 subject=u'subj', body=u'hi there1',
78 subject=u'subj', body=u'hi there1',
318 recipients=[self.u3])
79 recipients=[self.u3])
319 Session.commit()
80 Session().commit()
320 notification2 = NotificationModel().create(created_by=self.u1,
81 notification2 = NotificationModel().create(created_by=self.u1,
321 subject=u'subj', body=u'hi there2',
82 subject=u'subj', body=u'hi there2',
322 recipients=[self.u3])
83 recipients=[self.u3])
323 Session.commit()
84 Session().commit()
324 u3 = Session.query(User).get(self.u3)
85 u3 = Session().query(User).get(self.u3)
325
86
326 self.assertEqual(sorted([x.notification for x in u3.notifications]),
87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
327 sorted([notification2, notification1]))
88 sorted([notification2, notification1]))
@@ -333,12 +94,12 b' class TestNotifications(unittest.TestCas'
333 notification = NotificationModel().create(created_by=self.u1,
94 notification = NotificationModel().create(created_by=self.u1,
334 subject=u'title', body=u'hi there3',
95 subject=u'title', body=u'hi there3',
335 recipients=[self.u3, self.u1, self.u2])
96 recipients=[self.u3, self.u1, self.u2])
336 Session.commit()
97 Session().commit()
337 notifications = Notification.query().all()
98 notifications = Notification.query().all()
338 self.assertTrue(notification in notifications)
99 self.assertTrue(notification in notifications)
339
100
340 Notification.delete(notification.notification_id)
101 Notification.delete(notification.notification_id)
341 Session.commit()
102 Session().commit()
342
103
343 notifications = Notification.query().all()
104 notifications = Notification.query().all()
344 self.assertFalse(notification in notifications)
105 self.assertFalse(notification in notifications)
@@ -355,7 +116,7 b' class TestNotifications(unittest.TestCas'
355 notification = NotificationModel().create(created_by=self.u1,
116 notification = NotificationModel().create(created_by=self.u1,
356 subject=u'title', body=u'hi there3',
117 subject=u'title', body=u'hi there3',
357 recipients=[self.u3, self.u1, self.u2])
118 recipients=[self.u3, self.u1, self.u2])
358 Session.commit()
119 Session().commit()
359
120
360 unotification = UserNotification.query()\
121 unotification = UserNotification.query()\
361 .filter(UserNotification.notification ==
122 .filter(UserNotification.notification ==
@@ -367,7 +128,7 b' class TestNotifications(unittest.TestCas'
367
128
368 NotificationModel().delete(self.u3,
129 NotificationModel().delete(self.u3,
369 notification.notification_id)
130 notification.notification_id)
370 Session.commit()
131 Session().commit()
371
132
372 u3notification = UserNotification.query()\
133 u3notification = UserNotification.query()\
373 .filter(UserNotification.notification ==
134 .filter(UserNotification.notification ==
@@ -402,7 +163,7 b' class TestNotifications(unittest.TestCas'
402 NotificationModel().create(created_by=self.u1,
163 NotificationModel().create(created_by=self.u1,
403 subject=u'title', body=u'hi there_delete',
164 subject=u'title', body=u'hi there_delete',
404 recipients=[self.u3, self.u1])
165 recipients=[self.u3, self.u1])
405 Session.commit()
166 Session().commit()
406
167
407 self.assertEqual(NotificationModel()
168 self.assertEqual(NotificationModel()
408 .get_unread_cnt_for_user(self.u1), 1)
169 .get_unread_cnt_for_user(self.u1), 1)
@@ -414,7 +175,7 b' class TestNotifications(unittest.TestCas'
414 notification = NotificationModel().create(created_by=self.u1,
175 notification = NotificationModel().create(created_by=self.u1,
415 subject=u'title', body=u'hi there3',
176 subject=u'title', body=u'hi there3',
416 recipients=[self.u3, self.u1, self.u2])
177 recipients=[self.u3, self.u1, self.u2])
417 Session.commit()
178 Session().commit()
418
179
419 self.assertEqual(NotificationModel()
180 self.assertEqual(NotificationModel()
420 .get_unread_cnt_for_user(self.u1), 2)
181 .get_unread_cnt_for_user(self.u1), 2)
@@ -424,338 +185,5 b' class TestNotifications(unittest.TestCas'
424 .get_unread_cnt_for_user(self.u3), 2)
185 .get_unread_cnt_for_user(self.u3), 2)
425
186
426
187
427 class TestUsers(unittest.TestCase):
428
429 def __init__(self, methodName='runTest'):
430 super(TestUsers, self).__init__(methodName=methodName)
431
432 def setUp(self):
433 self.u1 = UserModel().create_or_update(username=u'u1',
434 password=u'qweqwe',
435 email=u'u1@rhodecode.org',
436 firstname=u'u1', lastname=u'u1')
437
438 def tearDown(self):
439 perm = Permission.query().all()
440 for p in perm:
441 UserModel().revoke_perm(self.u1, p)
442
443 UserModel().delete(self.u1)
444 Session.commit()
445
446 def test_add_perm(self):
447 perm = Permission.query().all()[0]
448 UserModel().grant_perm(self.u1, perm)
449 Session.commit()
450 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
451
452 def test_has_perm(self):
453 perm = Permission.query().all()
454 for p in perm:
455 has_p = UserModel().has_perm(self.u1, p)
456 self.assertEqual(False, has_p)
457
458 def test_revoke_perm(self):
459 perm = Permission.query().all()[0]
460 UserModel().grant_perm(self.u1, perm)
461 Session.commit()
462 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
463
464 #revoke
465 UserModel().revoke_perm(self.u1, perm)
466 Session.commit()
467 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
468
188
469
189
470 class TestPermissions(unittest.TestCase):
471 def __init__(self, methodName='runTest'):
472 super(TestPermissions, self).__init__(methodName=methodName)
473
474 def setUp(self):
475 self.u1 = UserModel().create_or_update(
476 username=u'u1', password=u'qweqwe',
477 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
478 )
479 self.u2 = UserModel().create_or_update(
480 username=u'u2', password=u'qweqwe',
481 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
482 )
483 self.anon = User.get_by_username('default')
484 self.a1 = UserModel().create_or_update(
485 username=u'a1', password=u'qweqwe',
486 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
487 )
488 Session.commit()
489
490 def tearDown(self):
491 if hasattr(self, 'test_repo'):
492 RepoModel().delete(repo=self.test_repo)
493 UserModel().delete(self.u1)
494 UserModel().delete(self.u2)
495 UserModel().delete(self.a1)
496 if hasattr(self, 'g1'):
497 ReposGroupModel().delete(self.g1.group_id)
498 if hasattr(self, 'g2'):
499 ReposGroupModel().delete(self.g2.group_id)
500
501 if hasattr(self, 'ug1'):
502 UsersGroupModel().delete(self.ug1, force=True)
503
504 Session.commit()
505
506 def test_default_perms_set(self):
507 u1_auth = AuthUser(user_id=self.u1.user_id)
508 perms = {
509 'repositories_groups': {},
510 'global': set([u'hg.create.repository', u'repository.read',
511 u'hg.register.manual_activate']),
512 'repositories': {u'vcs_test_hg': u'repository.read'}
513 }
514 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
515 perms['repositories'][HG_REPO])
516 new_perm = 'repository.write'
517 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
518 Session.commit()
519
520 u1_auth = AuthUser(user_id=self.u1.user_id)
521 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm)
522
523 def test_default_admin_perms_set(self):
524 a1_auth = AuthUser(user_id=self.a1.user_id)
525 perms = {
526 'repositories_groups': {},
527 'global': set([u'hg.admin']),
528 'repositories': {u'vcs_test_hg': u'repository.admin'}
529 }
530 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
531 perms['repositories'][HG_REPO])
532 new_perm = 'repository.write'
533 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm)
534 Session.commit()
535 # cannot really downgrade admins permissions !? they still get's set as
536 # admin !
537 u1_auth = AuthUser(user_id=self.a1.user_id)
538 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
539 perms['repositories'][HG_REPO])
540
541 def test_default_group_perms(self):
542 self.g1 = _make_group('test1', skip_if_exists=True)
543 self.g2 = _make_group('test2', skip_if_exists=True)
544 u1_auth = AuthUser(user_id=self.u1.user_id)
545 perms = {
546 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
547 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
548 'repositories': {u'vcs_test_hg': u'repository.read'}
549 }
550 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
551 perms['repositories'][HG_REPO])
552 self.assertEqual(u1_auth.permissions['repositories_groups'],
553 perms['repositories_groups'])
554
555 def test_default_admin_group_perms(self):
556 self.g1 = _make_group('test1', skip_if_exists=True)
557 self.g2 = _make_group('test2', skip_if_exists=True)
558 a1_auth = AuthUser(user_id=self.a1.user_id)
559 perms = {
560 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
561 'global': set(['hg.admin']),
562 'repositories': {u'vcs_test_hg': 'repository.admin'}
563 }
564
565 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
566 perms['repositories'][HG_REPO])
567 self.assertEqual(a1_auth.permissions['repositories_groups'],
568 perms['repositories_groups'])
569
570 def test_propagated_permission_from_users_group(self):
571 # make group
572 self.ug1 = UsersGroupModel().create('G1')
573 # add user to group
574 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
575
576 # set permission to lower
577 new_perm = 'repository.none'
578 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
579 Session.commit()
580 u1_auth = AuthUser(user_id=self.u1.user_id)
581 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
582 new_perm)
583
584 # grant perm for group this should override permission from user
585 new_perm = 'repository.write'
586 RepoModel().grant_users_group_permission(repo=HG_REPO,
587 group_name=self.ug1,
588 perm=new_perm)
589 # check perms
590 u1_auth = AuthUser(user_id=self.u1.user_id)
591 perms = {
592 'repositories_groups': {},
593 'global': set([u'hg.create.repository', u'repository.read',
594 u'hg.register.manual_activate']),
595 'repositories': {u'vcs_test_hg': u'repository.read'}
596 }
597 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
598 new_perm)
599 self.assertEqual(u1_auth.permissions['repositories_groups'],
600 perms['repositories_groups'])
601
602 def test_propagated_permission_from_users_group_lower_weight(self):
603 # make group
604 self.ug1 = UsersGroupModel().create('G1')
605 # add user to group
606 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
607
608 # set permission to lower
609 new_perm_h = 'repository.write'
610 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
611 perm=new_perm_h)
612 Session.commit()
613 u1_auth = AuthUser(user_id=self.u1.user_id)
614 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
615 new_perm_h)
616
617 # grant perm for group this should NOT override permission from user
618 # since it's lower than granted
619 new_perm_l = 'repository.read'
620 RepoModel().grant_users_group_permission(repo=HG_REPO,
621 group_name=self.ug1,
622 perm=new_perm_l)
623 # check perms
624 u1_auth = AuthUser(user_id=self.u1.user_id)
625 perms = {
626 'repositories_groups': {},
627 'global': set([u'hg.create.repository', u'repository.read',
628 u'hg.register.manual_activate']),
629 'repositories': {u'vcs_test_hg': u'repository.write'}
630 }
631 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
632 new_perm_h)
633 self.assertEqual(u1_auth.permissions['repositories_groups'],
634 perms['repositories_groups'])
635
636 def test_repo_in_group_permissions(self):
637 self.g1 = _make_group('group1', skip_if_exists=True)
638 self.g2 = _make_group('group2', skip_if_exists=True)
639 Session.commit()
640 # both perms should be read !
641 u1_auth = AuthUser(user_id=self.u1.user_id)
642 self.assertEqual(u1_auth.permissions['repositories_groups'],
643 {u'group1': u'group.read', u'group2': u'group.read'})
644
645 a1_auth = AuthUser(user_id=self.anon.user_id)
646 self.assertEqual(a1_auth.permissions['repositories_groups'],
647 {u'group1': u'group.read', u'group2': u'group.read'})
648
649 #Change perms to none for both groups
650 ReposGroupModel().grant_user_permission(repos_group=self.g1,
651 user=self.anon,
652 perm='group.none')
653 ReposGroupModel().grant_user_permission(repos_group=self.g2,
654 user=self.anon,
655 perm='group.none')
656
657 u1_auth = AuthUser(user_id=self.u1.user_id)
658 self.assertEqual(u1_auth.permissions['repositories_groups'],
659 {u'group1': u'group.none', u'group2': u'group.none'})
660
661 a1_auth = AuthUser(user_id=self.anon.user_id)
662 self.assertEqual(a1_auth.permissions['repositories_groups'],
663 {u'group1': u'group.none', u'group2': u'group.none'})
664
665 # add repo to group
666 form_data = {
667 'repo_name': HG_REPO,
668 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
669 'repo_type': 'hg',
670 'clone_uri': '',
671 'repo_group': self.g1.group_id,
672 'description': 'desc',
673 'private': False,
674 'landing_rev': 'tip'
675 }
676 self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
677 Session.commit()
678
679 u1_auth = AuthUser(user_id=self.u1.user_id)
680 self.assertEqual(u1_auth.permissions['repositories_groups'],
681 {u'group1': u'group.none', u'group2': u'group.none'})
682
683 a1_auth = AuthUser(user_id=self.anon.user_id)
684 self.assertEqual(a1_auth.permissions['repositories_groups'],
685 {u'group1': u'group.none', u'group2': u'group.none'})
686
687 #grant permission for u2 !
688 ReposGroupModel().grant_user_permission(repos_group=self.g1,
689 user=self.u2,
690 perm='group.read')
691 ReposGroupModel().grant_user_permission(repos_group=self.g2,
692 user=self.u2,
693 perm='group.read')
694 Session.commit()
695 self.assertNotEqual(self.u1, self.u2)
696 #u1 and anon should have not change perms while u2 should !
697 u1_auth = AuthUser(user_id=self.u1.user_id)
698 self.assertEqual(u1_auth.permissions['repositories_groups'],
699 {u'group1': u'group.none', u'group2': u'group.none'})
700
701 u2_auth = AuthUser(user_id=self.u2.user_id)
702 self.assertEqual(u2_auth.permissions['repositories_groups'],
703 {u'group1': u'group.read', u'group2': u'group.read'})
704
705 a1_auth = AuthUser(user_id=self.anon.user_id)
706 self.assertEqual(a1_auth.permissions['repositories_groups'],
707 {u'group1': u'group.none', u'group2': u'group.none'})
708
709 def test_repo_group_user_as_user_group_member(self):
710 # create Group1
711 self.g1 = _make_group('group1', skip_if_exists=True)
712 Session.commit()
713 a1_auth = AuthUser(user_id=self.anon.user_id)
714
715 self.assertEqual(a1_auth.permissions['repositories_groups'],
716 {u'group1': u'group.read'})
717
718 # set default permission to none
719 ReposGroupModel().grant_user_permission(repos_group=self.g1,
720 user=self.anon,
721 perm='group.none')
722 # make group
723 self.ug1 = UsersGroupModel().create('G1')
724 # add user to group
725 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
726 Session.commit()
727
728 # check if user is in the group
729 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
730 self.assertEqual(membrs, [self.u1.user_id])
731 # add some user to that group
732
733 # check his permissions
734 a1_auth = AuthUser(user_id=self.anon.user_id)
735 self.assertEqual(a1_auth.permissions['repositories_groups'],
736 {u'group1': u'group.none'})
737
738 u1_auth = AuthUser(user_id=self.u1.user_id)
739 self.assertEqual(u1_auth.permissions['repositories_groups'],
740 {u'group1': u'group.none'})
741
742 # grant ug1 read permissions for
743 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
744 group_name=self.ug1,
745 perm='group.read')
746 Session.commit()
747 # check if the
748 obj = Session.query(UsersGroupRepoGroupToPerm)\
749 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
750 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
751 .scalar()
752 self.assertEqual(obj.permission.permission_name, 'group.read')
753
754 a1_auth = AuthUser(user_id=self.anon.user_id)
755
756 self.assertEqual(a1_auth.permissions['repositories_groups'],
757 {u'group1': u'group.none'})
758
759 u1_auth = AuthUser(user_id=self.u1.user_id)
760 self.assertEqual(u1_auth.permissions['repositories_groups'],
761 {u'group1': u'group.read'})
1 NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
@@ -79,6 +79,7 b' class Command(object):'
79 print stdout, stderr
79 print stdout, stderr
80 return stdout, stderr
80 return stdout, stderr
81
81
82
82 def get_session():
83 def get_session():
83 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 init_model(engine)
85 init_model(engine)
@@ -124,7 +125,6 b' def create_test_repo(force=True):'
124 if user is None:
125 if user is None:
125 raise Exception('user not found')
126 raise Exception('user not found')
126
127
127
128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129
129
130 if repo is None:
130 if repo is None:
@@ -140,6 +140,7 b' def create_test_repo(force=True):'
140
140
141 print 'done'
141 print 'done'
142
142
143
143 def set_anonymous_access(enable=True):
144 def set_anonymous_access(enable=True):
144 sa = get_session()
145 sa = get_session()
145 user = sa.query(User).filter(User.username == 'default').one()
146 user = sa.query(User).filter(User.username == 'default').one()
@@ -147,6 +148,7 b' def set_anonymous_access(enable=True):'
147 sa.add(user)
148 sa.add(user)
148 sa.commit()
149 sa.commit()
149
150
151
150 def get_anonymous_access():
152 def get_anonymous_access():
151 sa = get_session()
153 sa = get_session()
152 return sa.query(User).filter(User.username == 'default').one().active
154 return sa.query(User).filter(User.username == 'default').one().active
1 NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
@@ -64,7 +64,8 b' log = logging.getLogger(__name__)'
64
64
65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 init_model(engine)
66 init_model(engine)
67 sa = meta.Session
67 sa = meta.Session()
68
68
69
69 class Command(object):
70 class Command(object):
70
71
@@ -137,7 +138,6 b' def create_test_repo(force=True):'
137 if user is None:
138 if user is None:
138 raise Exception('user not found')
139 raise Exception('user not found')
139
140
140
141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
142
142
143 if repo is None:
143 if repo is None:
@@ -161,6 +161,7 b' def set_anonymous_access(enable=True):'
161 if enable != User.get_by_username('default').active:
161 if enable != User.get_by_username('default').active:
162 raise Exception('Cannot set anonymous access')
162 raise Exception('Cannot set anonymous access')
163
163
164
164 def get_anonymous_access():
165 def get_anonymous_access():
165 user = User.get_by_username('default')
166 user = User.get_by_username('default')
166 return user.active
167 return user.active
@@ -235,6 +236,7 b' def test_clone_anonymous():'
235 print '\tdisabling anonymous access'
236 print '\tdisabling anonymous access'
236 set_anonymous_access(enable=False)
237 set_anonymous_access(enable=False)
237
238
239
238 @test_wrapp
240 @test_wrapp
239 def test_clone_wrong_credentials():
241 def test_clone_wrong_credentials():
240 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
242 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -264,10 +266,12 b' def test_clone_wrong_credentials():'
264 if not """abort: authorization failed""" in stderr:
266 if not """abort: authorization failed""" in stderr:
265 raise Exception('Failure')
267 raise Exception('Failure')
266
268
269
267 @test_wrapp
270 @test_wrapp
268 def test_pull():
271 def test_pull():
269 pass
272 pass
270
273
274
271 @test_wrapp
275 @test_wrapp
272 def test_push_modify_file(f_name='setup.py'):
276 def test_push_modify_file(f_name='setup.py'):
273 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
277 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -281,6 +285,7 b" def test_push_modify_file(f_name='setup."
281
285
282 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
286 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
283
287
288
284 @test_wrapp
289 @test_wrapp
285 def test_push_new_file(commits=15, with_clone=True):
290 def test_push_new_file(commits=15, with_clone=True):
286
291
@@ -312,6 +317,7 b' def test_push_new_file(commits=15, with_'
312
317
313 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
318 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
314
319
320
315 @test_wrapp
321 @test_wrapp
316 def test_push_wrong_credentials():
322 def test_push_wrong_credentials():
317 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
323 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -332,6 +338,7 b' def test_push_wrong_credentials():'
332
338
333 Command(cwd).execute('hg push %s' % clone_url)
339 Command(cwd).execute('hg push %s' % clone_url)
334
340
341
335 @test_wrapp
342 @test_wrapp
336 def test_push_wrong_path():
343 def test_push_wrong_path():
337 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
344 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
@@ -366,10 +373,12 b' def test_push_wrong_path():'
366 if not """abort: HTTP Error 403: Forbidden""" in stderr:
373 if not """abort: HTTP Error 403: Forbidden""" in stderr:
367 raise Exception('Failure')
374 raise Exception('Failure')
368
375
376
369 @test_wrapp
377 @test_wrapp
370 def get_logs():
378 def get_logs():
371 return UserLog.query().all()
379 return UserLog.query().all()
372
380
381
373 @test_wrapp
382 @test_wrapp
374 def test_logs(initial):
383 def test_logs(initial):
375 logs = UserLog.query().all()
384 logs = UserLog.query().all()
@@ -22,8 +22,8 b' function at ``tests/__init__.py``.'
22 import os
22 import os
23 from rhodecode.lib import vcs
23 from rhodecode.lib import vcs
24 from rhodecode.lib.vcs.utils.compat import unittest
24 from rhodecode.lib.vcs.utils.compat import unittest
25 from utils import VCSTestError, SCMFetcher
25 from rhodecode.tests import *
26 from rhodecode.tests import *
26 from utils import VCSTestError, SCMFetcher
27
27
28
28
29 def setup_package():
29 def setup_package():
General Comments 0
You need to be logged in to leave comments. Login now