##// END OF EJS Templates
orginized test module...
marcink -
r2527:95624ce4 beta
parent child Browse files
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,316
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
8 from rhodecode.model.user import UserModel
9
10 from rhodecode.model.meta import Session
11 from rhodecode.model.users_group import UsersGroupModel
12 from rhodecode.lib.auth import AuthUser
13
14
15 def _make_group(path, desc='desc', parent_id=None,
16 skip_if_exists=False):
17
18 gr = RepoGroup.get_by_group_name(path)
19 if gr and skip_if_exists:
20 return gr
21
22 gr = ReposGroupModel().create(path, desc, parent_id)
23 return gr
24
25
26 class TestPermissions(unittest.TestCase):
27 def __init__(self, methodName='runTest'):
28 super(TestPermissions, self).__init__(methodName=methodName)
29
30 def setUp(self):
31 self.u1 = UserModel().create_or_update(
32 username=u'u1', password=u'qweqwe',
33 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
34 )
35 self.u2 = UserModel().create_or_update(
36 username=u'u2', password=u'qweqwe',
37 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
38 )
39 self.anon = User.get_by_username('default')
40 self.a1 = UserModel().create_or_update(
41 username=u'a1', password=u'qweqwe',
42 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
43 )
44 Session().commit()
45
46 def tearDown(self):
47 if hasattr(self, 'test_repo'):
48 RepoModel().delete(repo=self.test_repo)
49 UserModel().delete(self.u1)
50 UserModel().delete(self.u2)
51 UserModel().delete(self.a1)
52 if hasattr(self, 'g1'):
53 ReposGroupModel().delete(self.g1.group_id)
54 if hasattr(self, 'g2'):
55 ReposGroupModel().delete(self.g2.group_id)
56
57 if hasattr(self, 'ug1'):
58 UsersGroupModel().delete(self.ug1, force=True)
59
60 Session().commit()
61
62 def test_default_perms_set(self):
63 u1_auth = AuthUser(user_id=self.u1.user_id)
64 perms = {
65 'repositories_groups': {},
66 'global': set([u'hg.create.repository', u'repository.read',
67 u'hg.register.manual_activate']),
68 'repositories': {u'vcs_test_hg': u'repository.read'}
69 }
70 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 perms['repositories'][HG_REPO])
72 new_perm = 'repository.write'
73 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 perm=new_perm)
75 Session().commit()
76
77 u1_auth = AuthUser(user_id=self.u1.user_id)
78 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 new_perm)
80
81 def test_default_admin_perms_set(self):
82 a1_auth = AuthUser(user_id=self.a1.user_id)
83 perms = {
84 'repositories_groups': {},
85 'global': set([u'hg.admin']),
86 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 }
88 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 perms['repositories'][HG_REPO])
90 new_perm = 'repository.write'
91 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 perm=new_perm)
93 Session().commit()
94 # cannot really downgrade admins permissions !? they still get's set as
95 # admin !
96 u1_auth = AuthUser(user_id=self.a1.user_id)
97 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 perms['repositories'][HG_REPO])
99
100 def test_default_group_perms(self):
101 self.g1 = _make_group('test1', skip_if_exists=True)
102 self.g2 = _make_group('test2', skip_if_exists=True)
103 u1_auth = AuthUser(user_id=self.u1.user_id)
104 perms = {
105 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
107 'repositories': {u'vcs_test_hg': u'repository.read'}
108 }
109 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 perms['repositories'][HG_REPO])
111 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 perms['repositories_groups'])
113
114 def test_default_admin_group_perms(self):
115 self.g1 = _make_group('test1', skip_if_exists=True)
116 self.g2 = _make_group('test2', skip_if_exists=True)
117 a1_auth = AuthUser(user_id=self.a1.user_id)
118 perms = {
119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
120 'global': set(['hg.admin']),
121 'repositories': {u'vcs_test_hg': 'repository.admin'}
122 }
123
124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
125 perms['repositories'][HG_REPO])
126 self.assertEqual(a1_auth.permissions['repositories_groups'],
127 perms['repositories_groups'])
128
129 def test_propagated_permission_from_users_group(self):
130 # make group
131 self.ug1 = UsersGroupModel().create('G1')
132 # add user to group
133 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
134
135 # set permission to lower
136 new_perm = 'repository.none'
137 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
138 Session().commit()
139 u1_auth = AuthUser(user_id=self.u1.user_id)
140 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
141 new_perm)
142
143 # grant perm for group this should override permission from user
144 new_perm = 'repository.write'
145 RepoModel().grant_users_group_permission(repo=HG_REPO,
146 group_name=self.ug1,
147 perm=new_perm)
148 # check perms
149 u1_auth = AuthUser(user_id=self.u1.user_id)
150 perms = {
151 'repositories_groups': {},
152 'global': set([u'hg.create.repository', u'repository.read',
153 u'hg.register.manual_activate']),
154 'repositories': {u'vcs_test_hg': u'repository.read'}
155 }
156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
157 new_perm)
158 self.assertEqual(u1_auth.permissions['repositories_groups'],
159 perms['repositories_groups'])
160
161 def test_propagated_permission_from_users_group_lower_weight(self):
162 # make group
163 self.ug1 = UsersGroupModel().create('G1')
164 # add user to group
165 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
166
167 # set permission to lower
168 new_perm_h = 'repository.write'
169 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
170 perm=new_perm_h)
171 Session().commit()
172 u1_auth = AuthUser(user_id=self.u1.user_id)
173 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
174 new_perm_h)
175
176 # grant perm for group this should NOT override permission from user
177 # since it's lower than granted
178 new_perm_l = 'repository.read'
179 RepoModel().grant_users_group_permission(repo=HG_REPO,
180 group_name=self.ug1,
181 perm=new_perm_l)
182 # check perms
183 u1_auth = AuthUser(user_id=self.u1.user_id)
184 perms = {
185 'repositories_groups': {},
186 'global': set([u'hg.create.repository', u'repository.read',
187 u'hg.register.manual_activate']),
188 'repositories': {u'vcs_test_hg': u'repository.write'}
189 }
190 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
191 new_perm_h)
192 self.assertEqual(u1_auth.permissions['repositories_groups'],
193 perms['repositories_groups'])
194
195 def test_repo_in_group_permissions(self):
196 self.g1 = _make_group('group1', skip_if_exists=True)
197 self.g2 = _make_group('group2', skip_if_exists=True)
198 Session().commit()
199 # both perms should be read !
200 u1_auth = AuthUser(user_id=self.u1.user_id)
201 self.assertEqual(u1_auth.permissions['repositories_groups'],
202 {u'group1': u'group.read', u'group2': u'group.read'})
203
204 a1_auth = AuthUser(user_id=self.anon.user_id)
205 self.assertEqual(a1_auth.permissions['repositories_groups'],
206 {u'group1': u'group.read', u'group2': u'group.read'})
207
208 #Change perms to none for both groups
209 ReposGroupModel().grant_user_permission(repos_group=self.g1,
210 user=self.anon,
211 perm='group.none')
212 ReposGroupModel().grant_user_permission(repos_group=self.g2,
213 user=self.anon,
214 perm='group.none')
215
216 u1_auth = AuthUser(user_id=self.u1.user_id)
217 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 {u'group1': u'group.none', u'group2': u'group.none'})
219
220 a1_auth = AuthUser(user_id=self.anon.user_id)
221 self.assertEqual(a1_auth.permissions['repositories_groups'],
222 {u'group1': u'group.none', u'group2': u'group.none'})
223
224 # add repo to group
225 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
226 self.test_repo = RepoModel().create_repo(
227 repo_name=name,
228 repo_type='hg',
229 description='',
230 owner=self.u1,
231 )
232 Session().commit()
233
234 u1_auth = AuthUser(user_id=self.u1.user_id)
235 self.assertEqual(u1_auth.permissions['repositories_groups'],
236 {u'group1': u'group.none', u'group2': u'group.none'})
237
238 a1_auth = AuthUser(user_id=self.anon.user_id)
239 self.assertEqual(a1_auth.permissions['repositories_groups'],
240 {u'group1': u'group.none', u'group2': u'group.none'})
241
242 #grant permission for u2 !
243 ReposGroupModel().grant_user_permission(repos_group=self.g1,
244 user=self.u2,
245 perm='group.read')
246 ReposGroupModel().grant_user_permission(repos_group=self.g2,
247 user=self.u2,
248 perm='group.read')
249 Session().commit()
250 self.assertNotEqual(self.u1, self.u2)
251 #u1 and anon should have not change perms while u2 should !
252 u1_auth = AuthUser(user_id=self.u1.user_id)
253 self.assertEqual(u1_auth.permissions['repositories_groups'],
254 {u'group1': u'group.none', u'group2': u'group.none'})
255
256 u2_auth = AuthUser(user_id=self.u2.user_id)
257 self.assertEqual(u2_auth.permissions['repositories_groups'],
258 {u'group1': u'group.read', u'group2': u'group.read'})
259
260 a1_auth = AuthUser(user_id=self.anon.user_id)
261 self.assertEqual(a1_auth.permissions['repositories_groups'],
262 {u'group1': u'group.none', u'group2': u'group.none'})
263
264 def test_repo_group_user_as_user_group_member(self):
265 # create Group1
266 self.g1 = _make_group('group1', skip_if_exists=True)
267 Session().commit()
268 a1_auth = AuthUser(user_id=self.anon.user_id)
269
270 self.assertEqual(a1_auth.permissions['repositories_groups'],
271 {u'group1': u'group.read'})
272
273 # set default permission to none
274 ReposGroupModel().grant_user_permission(repos_group=self.g1,
275 user=self.anon,
276 perm='group.none')
277 # make group
278 self.ug1 = UsersGroupModel().create('G1')
279 # add user to group
280 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
281 Session().commit()
282
283 # check if user is in the group
284 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
285 self.assertEqual(membrs, [self.u1.user_id])
286 # add some user to that group
287
288 # check his permissions
289 a1_auth = AuthUser(user_id=self.anon.user_id)
290 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 {u'group1': u'group.none'})
292
293 u1_auth = AuthUser(user_id=self.u1.user_id)
294 self.assertEqual(u1_auth.permissions['repositories_groups'],
295 {u'group1': u'group.none'})
296
297 # grant ug1 read permissions for
298 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
299 group_name=self.ug1,
300 perm='group.read')
301 Session().commit()
302 # check if the
303 obj = Session().query(UsersGroupRepoGroupToPerm)\
304 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
305 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
306 .scalar()
307 self.assertEqual(obj.permission.permission_name, 'group.read')
308
309 a1_auth = AuthUser(user_id=self.anon.user_id)
310
311 self.assertEqual(a1_auth.permissions['repositories_groups'],
312 {u'group1': u'group.none'})
313
314 u1_auth = AuthUser(user_id=self.u1.user_id)
315 self.assertEqual(u1_auth.permissions['repositories_groups'],
316 {u'group1': u'group.read'})
@@ -0,0 +1,170
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User
8 from rhodecode.model.meta import Session
9 from sqlalchemy.exc import IntegrityError
10
11
12 def _make_group(path, desc='desc', parent_id=None,
13 skip_if_exists=False):
14
15 gr = RepoGroup.get_by_group_name(path)
16 if gr and skip_if_exists:
17 return gr
18
19 gr = ReposGroupModel().create(path, desc, parent_id)
20 return gr
21
22
23 class TestReposGroups(unittest.TestCase):
24
25 def setUp(self):
26 self.g1 = _make_group('test1', skip_if_exists=True)
27 Session().commit()
28 self.g2 = _make_group('test2', skip_if_exists=True)
29 Session().commit()
30 self.g3 = _make_group('test3', skip_if_exists=True)
31 Session().commit()
32
33 def tearDown(self):
34 print 'out'
35
36 def __check_path(self, *path):
37 """
38 Checks the path for existance !
39 """
40 path = [TESTS_TMP_PATH] + list(path)
41 path = os.path.join(*path)
42 return os.path.isdir(path)
43
44 def _check_folders(self):
45 print os.listdir(TESTS_TMP_PATH)
46
47 def __delete_group(self, id_):
48 ReposGroupModel().delete(id_)
49
50 def __update_group(self, id_, path, desc='desc', parent_id=None):
51 form_data = dict(
52 group_name=path,
53 group_description=desc,
54 group_parent_id=parent_id,
55 perms_updates=[],
56 perms_new=[]
57 )
58 gr = ReposGroupModel().update(id_, form_data)
59 return gr
60
61 def test_create_group(self):
62 g = _make_group('newGroup')
63 self.assertEqual(g.full_path, 'newGroup')
64
65 self.assertTrue(self.__check_path('newGroup'))
66
67 def test_create_same_name_group(self):
68 self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
69 Session().rollback()
70
71 def test_same_subgroup(self):
72 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
73 self.assertEqual(sg1.parent_group, self.g1)
74 self.assertEqual(sg1.full_path, 'test1/sub1')
75 self.assertTrue(self.__check_path('test1', 'sub1'))
76
77 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
78 self.assertEqual(ssg1.parent_group, sg1)
79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
81
82 def test_remove_group(self):
83 sg1 = _make_group('deleteme')
84 self.__delete_group(sg1.group_id)
85
86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
87 self.assertFalse(self.__check_path('deteteme'))
88
89 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
90 self.__delete_group(sg1.group_id)
91
92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
93 self.assertFalse(self.__check_path('test1', 'deteteme'))
94
95 def test_rename_single_group(self):
96 sg1 = _make_group('initial')
97
98 new_sg1 = self.__update_group(sg1.group_id, 'after')
99 self.assertTrue(self.__check_path('after'))
100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
101
102 def test_update_group_parent(self):
103
104 sg1 = _make_group('initial', parent_id=self.g1.group_id)
105
106 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
107 self.assertTrue(self.__check_path('test1', 'after'))
108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
109
110 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
111 self.assertTrue(self.__check_path('test3', 'after'))
112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
113
114 new_sg1 = self.__update_group(sg1.group_id, 'hello')
115 self.assertTrue(self.__check_path('hello'))
116
117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
118
119 def test_subgrouping_with_repo(self):
120
121 g1 = _make_group('g1')
122 g2 = _make_group('g2')
123
124 # create new repo
125 form_data = dict(repo_name='john',
126 repo_name_full='john',
127 fork_name=None,
128 description=None,
129 repo_group=None,
130 private=False,
131 repo_type='hg',
132 clone_uri=None,
133 landing_rev='tip')
134 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
135 r = RepoModel().create(form_data, cur_user)
136
137 self.assertEqual(r.repo_name, 'john')
138
139 # put repo into group
140 form_data = form_data
141 form_data['repo_group'] = g1.group_id
142 form_data['perms_new'] = []
143 form_data['perms_updates'] = []
144 RepoModel().update(r.repo_name, form_data)
145 self.assertEqual(r.repo_name, 'g1/john')
146
147 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
148 self.assertTrue(self.__check_path('g2', 'g1'))
149
150 # test repo
151 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
152 r.just_name]))
153
154 def test_move_to_root(self):
155 g1 = _make_group('t11')
156 Session().commit()
157 g2 = _make_group('t22', parent_id=g1.group_id)
158 Session().commit()
159
160 self.assertEqual(g2.full_path, 't11/t22')
161 self.assertTrue(self.__check_path('t11', 't22'))
162
163 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
164 Session().commit()
165
166 self.assertEqual(g2.group_name, 'g22')
167 # we moved out group from t1 to '' so it's full path should be 'g2'
168 self.assertEqual(g2.full_path, 'g22')
169 self.assertFalse(self.__check_path('t11', 't22'))
170 self.assertTrue(self.__check_path('g22')) No newline at end of file
@@ -0,0 +1,124
1 import unittest
2 from rhodecode.tests import *
3
4 from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
5 Permission
6 from rhodecode.model.user import UserModel
7
8 from rhodecode.model.meta import Session
9 from rhodecode.model.users_group import UsersGroupModel
10
11
12 class TestUser(unittest.TestCase):
13 def __init__(self, methodName='runTest'):
14 Session.remove()
15 super(TestUser, self).__init__(methodName=methodName)
16
17 def test_create_and_remove(self):
18 usr = UserModel().create_or_update(username=u'test_user',
19 password=u'qweqwe',
20 email=u'u232@rhodecode.org',
21 firstname=u'u1', lastname=u'u1')
22 Session().commit()
23 self.assertEqual(User.get_by_username(u'test_user'), usr)
24
25 # make users group
26 users_group = UsersGroupModel().create('some_example_group')
27 Session().commit()
28
29 UsersGroupModel().add_user_to_group(users_group, usr)
30 Session().commit()
31
32 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
33 self.assertEqual(UsersGroupMember.query().count(), 1)
34 UserModel().delete(usr.user_id)
35 Session().commit()
36
37 self.assertEqual(UsersGroupMember.query().all(), [])
38
39 def test_additonal_email_as_main(self):
40 usr = UserModel().create_or_update(username=u'test_user',
41 password=u'qweqwe',
42 email=u'main_email@rhodecode.org',
43 firstname=u'u1', lastname=u'u1')
44 Session().commit()
45
46 def do():
47 m = UserEmailMap()
48 m.email = u'main_email@rhodecode.org'
49 m.user = usr
50 Session().add(m)
51 Session().commit()
52 self.assertRaises(AttributeError, do)
53
54 UserModel().delete(usr.user_id)
55 Session().commit()
56
57 def test_extra_email_map(self):
58 usr = UserModel().create_or_update(username=u'test_user',
59 password=u'qweqwe',
60 email=u'main_email@rhodecode.org',
61 firstname=u'u1', lastname=u'u1')
62 Session().commit()
63
64 m = UserEmailMap()
65 m.email = u'main_email2@rhodecode.org'
66 m.user = usr
67 Session().add(m)
68 Session().commit()
69
70 u = User.get_by_email(email='main_email@rhodecode.org')
71 self.assertEqual(usr.user_id, u.user_id)
72 self.assertEqual(usr.username, u.username)
73
74 u = User.get_by_email(email='main_email2@rhodecode.org')
75 self.assertEqual(usr.user_id, u.user_id)
76 self.assertEqual(usr.username, u.username)
77 u = User.get_by_email(email='main_email3@rhodecode.org')
78 self.assertEqual(None, u)
79
80 UserModel().delete(usr.user_id)
81 Session().commit()
82
83
84 class TestUsers(unittest.TestCase):
85
86 def __init__(self, methodName='runTest'):
87 super(TestUsers, self).__init__(methodName=methodName)
88
89 def setUp(self):
90 self.u1 = UserModel().create_or_update(username=u'u1',
91 password=u'qweqwe',
92 email=u'u1@rhodecode.org',
93 firstname=u'u1', lastname=u'u1')
94
95 def tearDown(self):
96 perm = Permission.query().all()
97 for p in perm:
98 UserModel().revoke_perm(self.u1, p)
99
100 UserModel().delete(self.u1)
101 Session().commit()
102
103 def test_add_perm(self):
104 perm = Permission.query().all()[0]
105 UserModel().grant_perm(self.u1, perm)
106 Session().commit()
107 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
108
109 def test_has_perm(self):
110 perm = Permission.query().all()
111 for p in perm:
112 has_p = UserModel().has_perm(self.u1, p)
113 self.assertEqual(False, has_p)
114
115 def test_revoke_perm(self):
116 perm = Permission.query().all()[0]
117 UserModel().grant_perm(self.u1, perm)
118 Session().commit()
119 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
120
121 #revoke
122 UserModel().revoke_perm(self.u1, perm)
123 Session().commit()
124 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them
@@ -1,761 +1,189
1 1 import os
2 2 import unittest
3 3 from rhodecode.tests import *
4 4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
8 UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
9 Repository, UserEmailMap
10 from sqlalchemy.exc import IntegrityError, DatabaseError
5 from rhodecode.model.db import User, Notification, UserNotification
11 6 from rhodecode.model.user import UserModel
12 7
13 8 from rhodecode.model.meta import Session
14 9 from rhodecode.model.notification import NotificationModel
15 from rhodecode.model.users_group import UsersGroupModel
16 from rhodecode.lib.auth import AuthUser
17
18
19 def _make_group(path, desc='desc', parent_id=None,
20 skip_if_exists=False):
21
22 gr = RepoGroup.get_by_group_name(path)
23 if gr and skip_if_exists:
24 return gr
25
26 gr = ReposGroupModel().create(path, desc, parent_id)
27 return gr
28
29
30 class TestReposGroups(unittest.TestCase):
31
32 def setUp(self):
33 self.g1 = _make_group('test1', skip_if_exists=True)
34 Session.commit()
35 self.g2 = _make_group('test2', skip_if_exists=True)
36 Session.commit()
37 self.g3 = _make_group('test3', skip_if_exists=True)
38 Session.commit()
39
40 def tearDown(self):
41 print 'out'
42
43 def __check_path(self, *path):
44 """
45 Checks the path for existance !
46 """
47 path = [TESTS_TMP_PATH] + list(path)
48 path = os.path.join(*path)
49 return os.path.isdir(path)
50
51 def _check_folders(self):
52 print os.listdir(TESTS_TMP_PATH)
53
54 def __delete_group(self, id_):
55 ReposGroupModel().delete(id_)
56
57 def __update_group(self, id_, path, desc='desc', parent_id=None):
58 form_data = dict(
59 group_name=path,
60 group_description=desc,
61 group_parent_id=parent_id,
62 perms_updates=[],
63 perms_new=[]
64 )
65 gr = ReposGroupModel().update(id_, form_data)
66 return gr
67
68 def test_create_group(self):
69 g = _make_group('newGroup')
70 self.assertEqual(g.full_path, 'newGroup')
71
72 self.assertTrue(self.__check_path('newGroup'))
73
74 def test_create_same_name_group(self):
75 self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
76 Session.rollback()
77
78 def test_same_subgroup(self):
79 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
80 self.assertEqual(sg1.parent_group, self.g1)
81 self.assertEqual(sg1.full_path, 'test1/sub1')
82 self.assertTrue(self.__check_path('test1', 'sub1'))
83
84 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
85 self.assertEqual(ssg1.parent_group, sg1)
86 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
87 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
88
89 def test_remove_group(self):
90 sg1 = _make_group('deleteme')
91 self.__delete_group(sg1.group_id)
92
93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 self.assertFalse(self.__check_path('deteteme'))
95
96 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
97 self.__delete_group(sg1.group_id)
98
99 self.assertEqual(RepoGroup.get(sg1.group_id), None)
100 self.assertFalse(self.__check_path('test1', 'deteteme'))
101
102 def test_rename_single_group(self):
103 sg1 = _make_group('initial')
104
105 new_sg1 = self.__update_group(sg1.group_id, 'after')
106 self.assertTrue(self.__check_path('after'))
107 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
108
109 def test_update_group_parent(self):
110
111 sg1 = _make_group('initial', parent_id=self.g1.group_id)
112
113 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
114 self.assertTrue(self.__check_path('test1', 'after'))
115 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
116
117 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
118 self.assertTrue(self.__check_path('test3', 'after'))
119 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
120
121 new_sg1 = self.__update_group(sg1.group_id, 'hello')
122 self.assertTrue(self.__check_path('hello'))
123
124 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
125
126 def test_subgrouping_with_repo(self):
127
128 g1 = _make_group('g1')
129 g2 = _make_group('g2')
130
131 # create new repo
132 form_data = dict(repo_name='john',
133 repo_name_full='john',
134 fork_name=None,
135 description=None,
136 repo_group=None,
137 private=False,
138 repo_type='hg',
139 clone_uri=None,
140 landing_rev='tip')
141 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
142 r = RepoModel().create(form_data, cur_user)
143
144 self.assertEqual(r.repo_name, 'john')
145
146 # put repo into group
147 form_data = form_data
148 form_data['repo_group'] = g1.group_id
149 form_data['perms_new'] = []
150 form_data['perms_updates'] = []
151 RepoModel().update(r.repo_name, form_data)
152 self.assertEqual(r.repo_name, 'g1/john')
153
154 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
155 self.assertTrue(self.__check_path('g2', 'g1'))
156
157 # test repo
158 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
159
160 def test_move_to_root(self):
161 g1 = _make_group('t11')
162 Session.commit()
163 g2 = _make_group('t22', parent_id=g1.group_id)
164 Session.commit()
165
166 self.assertEqual(g2.full_path, 't11/t22')
167 self.assertTrue(self.__check_path('t11', 't22'))
168
169 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
170 Session.commit()
171
172 self.assertEqual(g2.group_name, 'g22')
173 # we moved out group from t1 to '' so it's full path should be 'g2'
174 self.assertEqual(g2.full_path, 'g22')
175 self.assertFalse(self.__check_path('t11', 't22'))
176 self.assertTrue(self.__check_path('g22'))
177
178
179 class TestUser(unittest.TestCase):
180 def __init__(self, methodName='runTest'):
181 Session.remove()
182 super(TestUser, self).__init__(methodName=methodName)
183
184 def test_create_and_remove(self):
185 usr = UserModel().create_or_update(username=u'test_user',
186 password=u'qweqwe',
187 email=u'u232@rhodecode.org',
188 firstname=u'u1', lastname=u'u1')
189 Session.commit()
190 self.assertEqual(User.get_by_username(u'test_user'), usr)
191
192 # make users group
193 users_group = UsersGroupModel().create('some_example_group')
194 Session.commit()
195
196 UsersGroupModel().add_user_to_group(users_group, usr)
197 Session.commit()
198
199 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
200 self.assertEqual(UsersGroupMember.query().count(), 1)
201 UserModel().delete(usr.user_id)
202 Session.commit()
203
204 self.assertEqual(UsersGroupMember.query().all(), [])
205
206 def test_additonal_email_as_main(self):
207 usr = UserModel().create_or_update(username=u'test_user',
208 password=u'qweqwe',
209 email=u'main_email@rhodecode.org',
210 firstname=u'u1', lastname=u'u1')
211 Session.commit()
212
213 def do():
214 m = UserEmailMap()
215 m.email = u'main_email@rhodecode.org'
216 m.user = usr
217 Session.add(m)
218 Session.commit()
219 self.assertRaises(AttributeError, do)
220
221 UserModel().delete(usr.user_id)
222 Session.commit()
223
224 def test_extra_email_map(self):
225 usr = UserModel().create_or_update(username=u'test_user',
226 password=u'qweqwe',
227 email=u'main_email@rhodecode.org',
228 firstname=u'u1', lastname=u'u1')
229 Session.commit()
230
231 m = UserEmailMap()
232 m.email = u'main_email2@rhodecode.org'
233 m.user = usr
234 Session.add(m)
235 Session.commit()
236
237 u = User.get_by_email(email='main_email@rhodecode.org')
238 self.assertEqual(usr.user_id, u.user_id)
239 self.assertEqual(usr.username, u.username)
240
241 u = User.get_by_email(email='main_email2@rhodecode.org')
242 self.assertEqual(usr.user_id, u.user_id)
243 self.assertEqual(usr.username, u.username)
244 u = User.get_by_email(email='main_email3@rhodecode.org')
245 self.assertEqual(None, u)
246
247 UserModel().delete(usr.user_id)
248 Session.commit()
249 10
250 11
251 12 class TestNotifications(unittest.TestCase):
252 13
253 14 def __init__(self, methodName='runTest'):
254 15 Session.remove()
255 16 self.u1 = UserModel().create_or_update(username=u'u1',
256 17 password=u'qweqwe',
257 18 email=u'u1@rhodecode.org',
258 19 firstname=u'u1', lastname=u'u1')
259 Session.commit()
20 Session().commit()
260 21 self.u1 = self.u1.user_id
261 22
262 23 self.u2 = UserModel().create_or_update(username=u'u2',
263 24 password=u'qweqwe',
264 25 email=u'u2@rhodecode.org',
265 26 firstname=u'u2', lastname=u'u3')
266 Session.commit()
27 Session().commit()
267 28 self.u2 = self.u2.user_id
268 29
269 30 self.u3 = UserModel().create_or_update(username=u'u3',
270 31 password=u'qweqwe',
271 32 email=u'u3@rhodecode.org',
272 33 firstname=u'u3', lastname=u'u3')
273 Session.commit()
34 Session().commit()
274 35 self.u3 = self.u3.user_id
275 36
276 37 super(TestNotifications, self).__init__(methodName=methodName)
277 38
278 39 def _clean_notifications(self):
279 40 for n in Notification.query().all():
280 Session.delete(n)
41 Session().delete(n)
281 42
282 Session.commit()
43 Session().commit()
283 44 self.assertEqual(Notification.query().all(), [])
284 45
285 46 def tearDown(self):
286 47 self._clean_notifications()
287 48
288 49 def test_create_notification(self):
289 50 self.assertEqual([], Notification.query().all())
290 51 self.assertEqual([], UserNotification.query().all())
291 52
292 53 usrs = [self.u1, self.u2]
293 54 notification = NotificationModel().create(created_by=self.u1,
294 55 subject=u'subj', body=u'hi there',
295 56 recipients=usrs)
296 Session.commit()
57 Session().commit()
297 58 u1 = User.get(self.u1)
298 59 u2 = User.get(self.u2)
299 60 u3 = User.get(self.u3)
300 61 notifications = Notification.query().all()
301 62 self.assertEqual(len(notifications), 1)
302 63
303 64 unotification = UserNotification.query()\
304 65 .filter(UserNotification.notification == notification).all()
305 66
306 67 self.assertEqual(notifications[0].recipients, [u1, u2])
307 68 self.assertEqual(notification.notification_id,
308 69 notifications[0].notification_id)
309 70 self.assertEqual(len(unotification), len(usrs))
310 71 self.assertEqual([x.user.user_id for x in unotification], usrs)
311 72
312 73 def test_user_notifications(self):
313 74 self.assertEqual([], Notification.query().all())
314 75 self.assertEqual([], UserNotification.query().all())
315 76
316 77 notification1 = NotificationModel().create(created_by=self.u1,
317 78 subject=u'subj', body=u'hi there1',
318 79 recipients=[self.u3])
319 Session.commit()
80 Session().commit()
320 81 notification2 = NotificationModel().create(created_by=self.u1,
321 82 subject=u'subj', body=u'hi there2',
322 83 recipients=[self.u3])
323 Session.commit()
324 u3 = Session.query(User).get(self.u3)
84 Session().commit()
85 u3 = Session().query(User).get(self.u3)
325 86
326 87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
327 88 sorted([notification2, notification1]))
328 89
329 90 def test_delete_notifications(self):
330 91 self.assertEqual([], Notification.query().all())
331 92 self.assertEqual([], UserNotification.query().all())
332 93
333 94 notification = NotificationModel().create(created_by=self.u1,
334 95 subject=u'title', body=u'hi there3',
335 96 recipients=[self.u3, self.u1, self.u2])
336 Session.commit()
97 Session().commit()
337 98 notifications = Notification.query().all()
338 99 self.assertTrue(notification in notifications)
339 100
340 101 Notification.delete(notification.notification_id)
341 Session.commit()
102 Session().commit()
342 103
343 104 notifications = Notification.query().all()
344 105 self.assertFalse(notification in notifications)
345 106
346 107 un = UserNotification.query().filter(UserNotification.notification
347 108 == notification).all()
348 109 self.assertEqual(un, [])
349 110
350 111 def test_delete_association(self):
351 112
352 113 self.assertEqual([], Notification.query().all())
353 114 self.assertEqual([], UserNotification.query().all())
354 115
355 116 notification = NotificationModel().create(created_by=self.u1,
356 117 subject=u'title', body=u'hi there3',
357 118 recipients=[self.u3, self.u1, self.u2])
358 Session.commit()
119 Session().commit()
359 120
360 121 unotification = UserNotification.query()\
361 122 .filter(UserNotification.notification ==
362 123 notification)\
363 124 .filter(UserNotification.user_id == self.u3)\
364 125 .scalar()
365 126
366 127 self.assertEqual(unotification.user_id, self.u3)
367 128
368 129 NotificationModel().delete(self.u3,
369 130 notification.notification_id)
370 Session.commit()
131 Session().commit()
371 132
372 133 u3notification = UserNotification.query()\
373 134 .filter(UserNotification.notification ==
374 135 notification)\
375 136 .filter(UserNotification.user_id == self.u3)\
376 137 .scalar()
377 138
378 139 self.assertEqual(u3notification, None)
379 140
380 141 # notification object is still there
381 142 self.assertEqual(Notification.query().all(), [notification])
382 143
383 144 #u1 and u2 still have assignments
384 145 u1notification = UserNotification.query()\
385 146 .filter(UserNotification.notification ==
386 147 notification)\
387 148 .filter(UserNotification.user_id == self.u1)\
388 149 .scalar()
389 150 self.assertNotEqual(u1notification, None)
390 151 u2notification = UserNotification.query()\
391 152 .filter(UserNotification.notification ==
392 153 notification)\
393 154 .filter(UserNotification.user_id == self.u2)\
394 155 .scalar()
395 156 self.assertNotEqual(u2notification, None)
396 157
397 158 def test_notification_counter(self):
398 159 self._clean_notifications()
399 160 self.assertEqual([], Notification.query().all())
400 161 self.assertEqual([], UserNotification.query().all())
401 162
402 163 NotificationModel().create(created_by=self.u1,
403 164 subject=u'title', body=u'hi there_delete',
404 165 recipients=[self.u3, self.u1])
405 Session.commit()
166 Session().commit()
406 167
407 168 self.assertEqual(NotificationModel()
408 169 .get_unread_cnt_for_user(self.u1), 1)
409 170 self.assertEqual(NotificationModel()
410 171 .get_unread_cnt_for_user(self.u2), 0)
411 172 self.assertEqual(NotificationModel()
412 173 .get_unread_cnt_for_user(self.u3), 1)
413 174
414 175 notification = NotificationModel().create(created_by=self.u1,
415 176 subject=u'title', body=u'hi there3',
416 177 recipients=[self.u3, self.u1, self.u2])
417 Session.commit()
178 Session().commit()
418 179
419 180 self.assertEqual(NotificationModel()
420 181 .get_unread_cnt_for_user(self.u1), 2)
421 182 self.assertEqual(NotificationModel()
422 183 .get_unread_cnt_for_user(self.u2), 1)
423 184 self.assertEqual(NotificationModel()
424 185 .get_unread_cnt_for_user(self.u3), 2)
425 186
426 187
427 class TestUsers(unittest.TestCase):
428
429 def __init__(self, methodName='runTest'):
430 super(TestUsers, self).__init__(methodName=methodName)
431
432 def setUp(self):
433 self.u1 = UserModel().create_or_update(username=u'u1',
434 password=u'qweqwe',
435 email=u'u1@rhodecode.org',
436 firstname=u'u1', lastname=u'u1')
437
438 def tearDown(self):
439 perm = Permission.query().all()
440 for p in perm:
441 UserModel().revoke_perm(self.u1, p)
442
443 UserModel().delete(self.u1)
444 Session.commit()
445
446 def test_add_perm(self):
447 perm = Permission.query().all()[0]
448 UserModel().grant_perm(self.u1, perm)
449 Session.commit()
450 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
451
452 def test_has_perm(self):
453 perm = Permission.query().all()
454 for p in perm:
455 has_p = UserModel().has_perm(self.u1, p)
456 self.assertEqual(False, has_p)
457
458 def test_revoke_perm(self):
459 perm = Permission.query().all()[0]
460 UserModel().grant_perm(self.u1, perm)
461 Session.commit()
462 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
463
464 #revoke
465 UserModel().revoke_perm(self.u1, perm)
466 Session.commit()
467 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
468 188
469 189
470 class TestPermissions(unittest.TestCase):
471 def __init__(self, methodName='runTest'):
472 super(TestPermissions, self).__init__(methodName=methodName)
473
474 def setUp(self):
475 self.u1 = UserModel().create_or_update(
476 username=u'u1', password=u'qweqwe',
477 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
478 )
479 self.u2 = UserModel().create_or_update(
480 username=u'u2', password=u'qweqwe',
481 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
482 )
483 self.anon = User.get_by_username('default')
484 self.a1 = UserModel().create_or_update(
485 username=u'a1', password=u'qweqwe',
486 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
487 )
488 Session.commit()
489
490 def tearDown(self):
491 if hasattr(self, 'test_repo'):
492 RepoModel().delete(repo=self.test_repo)
493 UserModel().delete(self.u1)
494 UserModel().delete(self.u2)
495 UserModel().delete(self.a1)
496 if hasattr(self, 'g1'):
497 ReposGroupModel().delete(self.g1.group_id)
498 if hasattr(self, 'g2'):
499 ReposGroupModel().delete(self.g2.group_id)
500
501 if hasattr(self, 'ug1'):
502 UsersGroupModel().delete(self.ug1, force=True)
503
504 Session.commit()
505
506 def test_default_perms_set(self):
507 u1_auth = AuthUser(user_id=self.u1.user_id)
508 perms = {
509 'repositories_groups': {},
510 'global': set([u'hg.create.repository', u'repository.read',
511 u'hg.register.manual_activate']),
512 'repositories': {u'vcs_test_hg': u'repository.read'}
513 }
514 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
515 perms['repositories'][HG_REPO])
516 new_perm = 'repository.write'
517 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
518 Session.commit()
519
520 u1_auth = AuthUser(user_id=self.u1.user_id)
521 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm)
522
523 def test_default_admin_perms_set(self):
524 a1_auth = AuthUser(user_id=self.a1.user_id)
525 perms = {
526 'repositories_groups': {},
527 'global': set([u'hg.admin']),
528 'repositories': {u'vcs_test_hg': u'repository.admin'}
529 }
530 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
531 perms['repositories'][HG_REPO])
532 new_perm = 'repository.write'
533 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm)
534 Session.commit()
535 # cannot really downgrade admins permissions !? they still get's set as
536 # admin !
537 u1_auth = AuthUser(user_id=self.a1.user_id)
538 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
539 perms['repositories'][HG_REPO])
540
541 def test_default_group_perms(self):
542 self.g1 = _make_group('test1', skip_if_exists=True)
543 self.g2 = _make_group('test2', skip_if_exists=True)
544 u1_auth = AuthUser(user_id=self.u1.user_id)
545 perms = {
546 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
547 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
548 'repositories': {u'vcs_test_hg': u'repository.read'}
549 }
550 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
551 perms['repositories'][HG_REPO])
552 self.assertEqual(u1_auth.permissions['repositories_groups'],
553 perms['repositories_groups'])
554
555 def test_default_admin_group_perms(self):
556 self.g1 = _make_group('test1', skip_if_exists=True)
557 self.g2 = _make_group('test2', skip_if_exists=True)
558 a1_auth = AuthUser(user_id=self.a1.user_id)
559 perms = {
560 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
561 'global': set(['hg.admin']),
562 'repositories': {u'vcs_test_hg': 'repository.admin'}
563 }
564
565 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
566 perms['repositories'][HG_REPO])
567 self.assertEqual(a1_auth.permissions['repositories_groups'],
568 perms['repositories_groups'])
569
570 def test_propagated_permission_from_users_group(self):
571 # make group
572 self.ug1 = UsersGroupModel().create('G1')
573 # add user to group
574 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
575
576 # set permission to lower
577 new_perm = 'repository.none'
578 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
579 Session.commit()
580 u1_auth = AuthUser(user_id=self.u1.user_id)
581 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
582 new_perm)
583
584 # grant perm for group this should override permission from user
585 new_perm = 'repository.write'
586 RepoModel().grant_users_group_permission(repo=HG_REPO,
587 group_name=self.ug1,
588 perm=new_perm)
589 # check perms
590 u1_auth = AuthUser(user_id=self.u1.user_id)
591 perms = {
592 'repositories_groups': {},
593 'global': set([u'hg.create.repository', u'repository.read',
594 u'hg.register.manual_activate']),
595 'repositories': {u'vcs_test_hg': u'repository.read'}
596 }
597 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
598 new_perm)
599 self.assertEqual(u1_auth.permissions['repositories_groups'],
600 perms['repositories_groups'])
601
602 def test_propagated_permission_from_users_group_lower_weight(self):
603 # make group
604 self.ug1 = UsersGroupModel().create('G1')
605 # add user to group
606 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
607
608 # set permission to lower
609 new_perm_h = 'repository.write'
610 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
611 perm=new_perm_h)
612 Session.commit()
613 u1_auth = AuthUser(user_id=self.u1.user_id)
614 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
615 new_perm_h)
616
617 # grant perm for group this should NOT override permission from user
618 # since it's lower than granted
619 new_perm_l = 'repository.read'
620 RepoModel().grant_users_group_permission(repo=HG_REPO,
621 group_name=self.ug1,
622 perm=new_perm_l)
623 # check perms
624 u1_auth = AuthUser(user_id=self.u1.user_id)
625 perms = {
626 'repositories_groups': {},
627 'global': set([u'hg.create.repository', u'repository.read',
628 u'hg.register.manual_activate']),
629 'repositories': {u'vcs_test_hg': u'repository.write'}
630 }
631 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
632 new_perm_h)
633 self.assertEqual(u1_auth.permissions['repositories_groups'],
634 perms['repositories_groups'])
635
636 def test_repo_in_group_permissions(self):
637 self.g1 = _make_group('group1', skip_if_exists=True)
638 self.g2 = _make_group('group2', skip_if_exists=True)
639 Session.commit()
640 # both perms should be read !
641 u1_auth = AuthUser(user_id=self.u1.user_id)
642 self.assertEqual(u1_auth.permissions['repositories_groups'],
643 {u'group1': u'group.read', u'group2': u'group.read'})
644
645 a1_auth = AuthUser(user_id=self.anon.user_id)
646 self.assertEqual(a1_auth.permissions['repositories_groups'],
647 {u'group1': u'group.read', u'group2': u'group.read'})
648
649 #Change perms to none for both groups
650 ReposGroupModel().grant_user_permission(repos_group=self.g1,
651 user=self.anon,
652 perm='group.none')
653 ReposGroupModel().grant_user_permission(repos_group=self.g2,
654 user=self.anon,
655 perm='group.none')
656
657 u1_auth = AuthUser(user_id=self.u1.user_id)
658 self.assertEqual(u1_auth.permissions['repositories_groups'],
659 {u'group1': u'group.none', u'group2': u'group.none'})
660
661 a1_auth = AuthUser(user_id=self.anon.user_id)
662 self.assertEqual(a1_auth.permissions['repositories_groups'],
663 {u'group1': u'group.none', u'group2': u'group.none'})
664
665 # add repo to group
666 form_data = {
667 'repo_name': HG_REPO,
668 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
669 'repo_type': 'hg',
670 'clone_uri': '',
671 'repo_group': self.g1.group_id,
672 'description': 'desc',
673 'private': False,
674 'landing_rev': 'tip'
675 }
676 self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
677 Session.commit()
678
679 u1_auth = AuthUser(user_id=self.u1.user_id)
680 self.assertEqual(u1_auth.permissions['repositories_groups'],
681 {u'group1': u'group.none', u'group2': u'group.none'})
682
683 a1_auth = AuthUser(user_id=self.anon.user_id)
684 self.assertEqual(a1_auth.permissions['repositories_groups'],
685 {u'group1': u'group.none', u'group2': u'group.none'})
686
687 #grant permission for u2 !
688 ReposGroupModel().grant_user_permission(repos_group=self.g1,
689 user=self.u2,
690 perm='group.read')
691 ReposGroupModel().grant_user_permission(repos_group=self.g2,
692 user=self.u2,
693 perm='group.read')
694 Session.commit()
695 self.assertNotEqual(self.u1, self.u2)
696 #u1 and anon should have not change perms while u2 should !
697 u1_auth = AuthUser(user_id=self.u1.user_id)
698 self.assertEqual(u1_auth.permissions['repositories_groups'],
699 {u'group1': u'group.none', u'group2': u'group.none'})
700
701 u2_auth = AuthUser(user_id=self.u2.user_id)
702 self.assertEqual(u2_auth.permissions['repositories_groups'],
703 {u'group1': u'group.read', u'group2': u'group.read'})
704
705 a1_auth = AuthUser(user_id=self.anon.user_id)
706 self.assertEqual(a1_auth.permissions['repositories_groups'],
707 {u'group1': u'group.none', u'group2': u'group.none'})
708
709 def test_repo_group_user_as_user_group_member(self):
710 # create Group1
711 self.g1 = _make_group('group1', skip_if_exists=True)
712 Session.commit()
713 a1_auth = AuthUser(user_id=self.anon.user_id)
714
715 self.assertEqual(a1_auth.permissions['repositories_groups'],
716 {u'group1': u'group.read'})
717
718 # set default permission to none
719 ReposGroupModel().grant_user_permission(repos_group=self.g1,
720 user=self.anon,
721 perm='group.none')
722 # make group
723 self.ug1 = UsersGroupModel().create('G1')
724 # add user to group
725 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
726 Session.commit()
727
728 # check if user is in the group
729 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
730 self.assertEqual(membrs, [self.u1.user_id])
731 # add some user to that group
732
733 # check his permissions
734 a1_auth = AuthUser(user_id=self.anon.user_id)
735 self.assertEqual(a1_auth.permissions['repositories_groups'],
736 {u'group1': u'group.none'})
737
738 u1_auth = AuthUser(user_id=self.u1.user_id)
739 self.assertEqual(u1_auth.permissions['repositories_groups'],
740 {u'group1': u'group.none'})
741
742 # grant ug1 read permissions for
743 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
744 group_name=self.ug1,
745 perm='group.read')
746 Session.commit()
747 # check if the
748 obj = Session.query(UsersGroupRepoGroupToPerm)\
749 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
750 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
751 .scalar()
752 self.assertEqual(obj.permission.permission_name, 'group.read')
753
754 a1_auth = AuthUser(user_id=self.anon.user_id)
755
756 self.assertEqual(a1_auth.permissions['repositories_groups'],
757 {u'group1': u'group.none'})
758
759 u1_auth = AuthUser(user_id=self.u1.user_id)
760 self.assertEqual(u1_auth.permissions['repositories_groups'],
761 {u'group1': u'group.read'})
1 NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
@@ -1,211 +1,213
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import shutil
29 29 import logging
30 30 from os.path import join as jn
31 31 from os.path import dirname as dn
32 32
33 33 from tempfile import _RandomNameSequence
34 34 from subprocess import Popen, PIPE
35 35
36 36 from paste.deploy import appconfig
37 37 from pylons import config
38 38 from sqlalchemy import engine_from_config
39 39
40 40 from rhodecode.lib.utils import add_cache
41 41 from rhodecode.model import init_model
42 42 from rhodecode.model import meta
43 43 from rhodecode.model.db import User, Repository
44 44 from rhodecode.lib.auth import get_crypt_password
45 45
46 46 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
47 47 from rhodecode.config.environment import load_environment
48 48
49 49 rel_path = dn(dn(dn(os.path.abspath(__file__))))
50 50 conf = appconfig('config:development.ini', relative_to=rel_path)
51 51 load_environment(conf.global_conf, conf.local_conf)
52 52
53 53 add_cache(conf)
54 54
55 55 USER = 'test_admin'
56 56 PASS = 'test12'
57 57 HOST = 'hg.local'
58 58 METHOD = 'pull'
59 59 DEBUG = True
60 60 log = logging.getLogger(__name__)
61 61
62 62
63 63 class Command(object):
64 64
65 65 def __init__(self, cwd):
66 66 self.cwd = cwd
67 67
68 68 def execute(self, cmd, *args):
69 69 """Runs command on the system with given ``args``.
70 70 """
71 71
72 72 command = cmd + ' ' + ' '.join(args)
73 73 log.debug('Executing %s' % command)
74 74 if DEBUG:
75 75 print command
76 76 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
77 77 stdout, stderr = p.communicate()
78 78 if DEBUG:
79 79 print stdout, stderr
80 80 return stdout, stderr
81 81
82
82 83 def get_session():
83 84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 85 init_model(engine)
85 86 sa = meta.Session
86 87 return sa
87 88
88 89
89 90 def create_test_user(force=True):
90 91 print 'creating test user'
91 92 sa = get_session()
92 93
93 94 user = sa.query(User).filter(User.username == USER).scalar()
94 95
95 96 if force and user is not None:
96 97 print 'removing current user'
97 98 for repo in sa.query(Repository).filter(Repository.user == user).all():
98 99 sa.delete(repo)
99 100 sa.delete(user)
100 101 sa.commit()
101 102
102 103 if user is None or force:
103 104 print 'creating new one'
104 105 new_usr = User()
105 106 new_usr.username = USER
106 107 new_usr.password = get_crypt_password(PASS)
107 108 new_usr.email = 'mail@mail.com'
108 109 new_usr.name = 'test'
109 110 new_usr.lastname = 'lasttestname'
110 111 new_usr.active = True
111 112 new_usr.admin = True
112 113 sa.add(new_usr)
113 114 sa.commit()
114 115
115 116 print 'done'
116 117
117 118
118 119 def create_test_repo(force=True):
119 120 print 'creating test repo'
120 121 from rhodecode.model.repo import RepoModel
121 122 sa = get_session()
122 123
123 124 user = sa.query(User).filter(User.username == USER).scalar()
124 125 if user is None:
125 126 raise Exception('user not found')
126 127
127
128 128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129 129
130 130 if repo is None:
131 131 print 'repo not found creating'
132 132
133 133 form_data = {'repo_name':HG_REPO,
134 134 'repo_type':'hg',
135 135 'private':False,
136 136 'clone_uri':'' }
137 137 rm = RepoModel(sa)
138 138 rm.base_path = '/home/hg'
139 139 rm.create(form_data, user)
140 140
141 141 print 'done'
142 142
143
143 144 def set_anonymous_access(enable=True):
144 145 sa = get_session()
145 146 user = sa.query(User).filter(User.username == 'default').one()
146 147 user.active = enable
147 148 sa.add(user)
148 149 sa.commit()
149 150
151
150 152 def get_anonymous_access():
151 153 sa = get_session()
152 154 return sa.query(User).filter(User.username == 'default').one().active
153 155
154 156
155 157 #==============================================================================
156 158 # TESTS
157 159 #==============================================================================
158 160 def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
159 161 seq=None):
160 162 cwd = path = jn(TESTS_TMP_PATH, repo)
161 163
162 164 if seq == None:
163 165 seq = _RandomNameSequence().next()
164 166
165 167 try:
166 168 shutil.rmtree(path, ignore_errors=True)
167 169 os.makedirs(path)
168 170 #print 'made dirs %s' % jn(path)
169 171 except OSError:
170 172 raise
171 173
172 174 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
173 175 {'user':USER,
174 176 'pass':PASS,
175 177 'host':HOST,
176 178 'cloned_repo':repo, }
177 179
178 180 dest = path + seq
179 181 if method == 'pull':
180 182 stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
181 183 else:
182 184 stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
183 185
184 186 if no_errors is False:
185 187 assert """adding file changes""" in stdout, 'no messages about cloning'
186 188 assert """abort""" not in stderr , 'got error from clone'
187 189
188 190 if __name__ == '__main__':
189 191 try:
190 192 create_test_user(force=False)
191 193 seq = None
192 194 import time
193 195
194 196 try:
195 197 METHOD = sys.argv[3]
196 198 except:
197 199 pass
198 200
199 201 if METHOD == 'pull':
200 202 seq = _RandomNameSequence().next()
201 203 test_clone_with_credentials(repo=sys.argv[1], method='clone',
202 204 seq=seq)
203 205 s = time.time()
204 206 for i in range(1, int(sys.argv[2]) + 1):
205 207 print 'take', i
206 208 test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
207 209 seq=seq)
208 210 print 'time taken %.3f' % (time.time() - s)
209 211 except Exception, e:
210 212 raise
211 213 sys.exit('stop on %s' % e)
1 NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
@@ -1,401 +1,410
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import time
28 28 import sys
29 29 import shutil
30 30 import logging
31 31
32 32 from os.path import join as jn
33 33 from os.path import dirname as dn
34 34
35 35 from tempfile import _RandomNameSequence
36 36 from subprocess import Popen, PIPE
37 37
38 38 from paste.deploy import appconfig
39 39 from pylons import config
40 40 from sqlalchemy import engine_from_config
41 41
42 42 from rhodecode.lib.utils import add_cache
43 43 from rhodecode.model import init_model
44 44 from rhodecode.model import meta
45 45 from rhodecode.model.db import User, Repository, UserLog
46 46 from rhodecode.lib.auth import get_crypt_password
47 47
48 48 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
49 49 from rhodecode.config.environment import load_environment
50 50
51 51 rel_path = dn(dn(dn(os.path.abspath(__file__))))
52 52
53 53 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
54 54 load_environment(conf.global_conf, conf.local_conf)
55 55
56 56 add_cache(conf)
57 57
58 58 USER = 'test_admin'
59 59 PASS = 'test12'
60 60 HOST = '127.0.0.1:5000'
61 61 DEBUG = False
62 62 print 'DEBUG:', DEBUG
63 63 log = logging.getLogger(__name__)
64 64
65 65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 66 init_model(engine)
67 sa = meta.Session
67 sa = meta.Session()
68
68 69
69 70 class Command(object):
70 71
71 72 def __init__(self, cwd):
72 73 self.cwd = cwd
73 74
74 75 def execute(self, cmd, *args):
75 76 """Runs command on the system with given ``args``.
76 77 """
77 78
78 79 command = cmd + ' ' + ' '.join(args)
79 80 log.debug('Executing %s' % command)
80 81 if DEBUG:
81 82 print command
82 83 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
83 84 stdout, stderr = p.communicate()
84 85 if DEBUG:
85 86 print stdout, stderr
86 87 return stdout, stderr
87 88
88 89
89 90 def test_wrapp(func):
90 91
91 92 def __wrapp(*args, **kwargs):
92 93 print '>>>%s' % func.__name__
93 94 try:
94 95 res = func(*args, **kwargs)
95 96 except Exception, e:
96 97 print ('###############\n-'
97 98 '--%s failed %s--\n'
98 99 '###############\n' % (func.__name__, e))
99 100 sys.exit()
100 101 print '++OK++'
101 102 return res
102 103 return __wrapp
103 104
104 105
105 106 def create_test_user(force=True):
106 107 print '\tcreating test user'
107 108
108 109 user = User.get_by_username(USER)
109 110
110 111 if force and user is not None:
111 112 print '\tremoving current user'
112 113 for repo in Repository.query().filter(Repository.user == user).all():
113 114 sa.delete(repo)
114 115 sa.delete(user)
115 116 sa.commit()
116 117
117 118 if user is None or force:
118 119 print '\tcreating new one'
119 120 new_usr = User()
120 121 new_usr.username = USER
121 122 new_usr.password = get_crypt_password(PASS)
122 123 new_usr.email = 'mail@mail.com'
123 124 new_usr.name = 'test'
124 125 new_usr.lastname = 'lasttestname'
125 126 new_usr.active = True
126 127 new_usr.admin = True
127 128 sa.add(new_usr)
128 129 sa.commit()
129 130
130 131 print '\tdone'
131 132
132 133
133 134 def create_test_repo(force=True):
134 135 from rhodecode.model.repo import RepoModel
135 136
136 137 user = User.get_by_username(USER)
137 138 if user is None:
138 139 raise Exception('user not found')
139 140
140
141 141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
142 142
143 143 if repo is None:
144 144 print '\trepo not found creating'
145 145
146 146 form_data = {'repo_name':HG_REPO,
147 147 'repo_type':'hg',
148 148 'private':False,
149 149 'clone_uri':'' }
150 150 rm = RepoModel(sa)
151 151 rm.base_path = '/home/hg'
152 152 rm.create(form_data, user)
153 153
154 154
155 155 def set_anonymous_access(enable=True):
156 156 user = User.get_by_username('default')
157 157 user.active = enable
158 158 sa.add(user)
159 159 sa.commit()
160 160 print '\tanonymous access is now:', enable
161 161 if enable != User.get_by_username('default').active:
162 162 raise Exception('Cannot set anonymous access')
163 163
164
164 165 def get_anonymous_access():
165 166 user = User.get_by_username('default')
166 167 return user.active
167 168
168 169
169 170 #==============================================================================
170 171 # TESTS
171 172 #==============================================================================
172 173 @test_wrapp
173 174 def test_clone_with_credentials(no_errors=False):
174 175 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
175 176
176 177 try:
177 178 shutil.rmtree(path, ignore_errors=True)
178 179 os.makedirs(path)
179 180 #print 'made dirs %s' % jn(path)
180 181 except OSError:
181 182 raise
182 183
183 184 print '\tchecking if anonymous access is enabled'
184 185 anonymous_access = get_anonymous_access()
185 186 if anonymous_access:
186 187 print '\tenabled, disabling it '
187 188 set_anonymous_access(enable=False)
188 189
189 190 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
190 191 {'user':USER,
191 192 'pass':PASS,
192 193 'host':HOST,
193 194 'cloned_repo':HG_REPO,
194 195 'dest':path}
195 196
196 197 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
197 198
198 199 if no_errors is False:
199 200 assert """adding file changes""" in stdout, 'no messages about cloning'
200 201 assert """abort""" not in stderr , 'got error from clone'
201 202
202 203
203 204 @test_wrapp
204 205 def test_clone_anonymous():
205 206 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
206 207
207 208 try:
208 209 shutil.rmtree(path, ignore_errors=True)
209 210 os.makedirs(path)
210 211 #print 'made dirs %s' % jn(path)
211 212 except OSError:
212 213 raise
213 214
214 215
215 216 print '\tchecking if anonymous access is enabled'
216 217 anonymous_access = get_anonymous_access()
217 218 if not anonymous_access:
218 219 print '\tnot enabled, enabling it '
219 220 set_anonymous_access(enable=True)
220 221
221 222 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
222 223 {'user':USER,
223 224 'pass':PASS,
224 225 'host':HOST,
225 226 'cloned_repo':HG_REPO,
226 227 'dest':path}
227 228
228 229 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
229 230
230 231 assert """adding file changes""" in stdout, 'no messages about cloning'
231 232 assert """abort""" not in stderr , 'got error from clone'
232 233
233 234 #disable if it was enabled
234 235 if not anonymous_access:
235 236 print '\tdisabling anonymous access'
236 237 set_anonymous_access(enable=False)
237 238
239
238 240 @test_wrapp
239 241 def test_clone_wrong_credentials():
240 242 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
241 243
242 244 try:
243 245 shutil.rmtree(path, ignore_errors=True)
244 246 os.makedirs(path)
245 247 #print 'made dirs %s' % jn(path)
246 248 except OSError:
247 249 raise
248 250
249 251 print '\tchecking if anonymous access is enabled'
250 252 anonymous_access = get_anonymous_access()
251 253 if anonymous_access:
252 254 print '\tenabled, disabling it '
253 255 set_anonymous_access(enable=False)
254 256
255 257 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
256 258 {'user':USER + 'error',
257 259 'pass':PASS,
258 260 'host':HOST,
259 261 'cloned_repo':HG_REPO,
260 262 'dest':path}
261 263
262 264 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
263 265
264 266 if not """abort: authorization failed""" in stderr:
265 267 raise Exception('Failure')
266 268
269
267 270 @test_wrapp
268 271 def test_pull():
269 272 pass
270 273
274
271 275 @test_wrapp
272 276 def test_push_modify_file(f_name='setup.py'):
273 277 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
274 278 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
275 279 for i in xrange(5):
276 280 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
277 281 Command(cwd).execute(cmd)
278 282
279 283 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
280 284 Command(cwd).execute(cmd)
281 285
282 286 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
283 287
288
284 289 @test_wrapp
285 290 def test_push_new_file(commits=15, with_clone=True):
286 291
287 292 if with_clone:
288 293 test_clone_with_credentials(no_errors=True)
289 294
290 295 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
291 296 added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
292 297
293 298 Command(cwd).execute('touch %s' % added_file)
294 299
295 300 Command(cwd).execute('hg add %s' % added_file)
296 301
297 302 for i in xrange(commits):
298 303 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
299 304 Command(cwd).execute(cmd)
300 305
301 306 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
302 307 'Marcin Kuźminski <marcin@python-blog.com>',
303 308 added_file)
304 309 Command(cwd).execute(cmd)
305 310
306 311 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
307 312 {'user':USER,
308 313 'pass':PASS,
309 314 'host':HOST,
310 315 'cloned_repo':HG_REPO,
311 316 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
312 317
313 318 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
314 319
320
315 321 @test_wrapp
316 322 def test_push_wrong_credentials():
317 323 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
318 324 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
319 325 {'user':USER + 'xxx',
320 326 'pass':PASS,
321 327 'host':HOST,
322 328 'cloned_repo':HG_REPO,
323 329 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
324 330
325 331 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
326 332 for i in xrange(5):
327 333 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
328 334 Command(cwd).execute(cmd)
329 335
330 336 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
331 337 Command(cwd).execute(cmd)
332 338
333 339 Command(cwd).execute('hg push %s' % clone_url)
334 340
341
335 342 @test_wrapp
336 343 def test_push_wrong_path():
337 344 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
338 345 added_file = jn(path, 'somefile.py')
339 346
340 347 try:
341 348 shutil.rmtree(path, ignore_errors=True)
342 349 os.makedirs(path)
343 350 print '\tmade dirs %s' % jn(path)
344 351 except OSError:
345 352 raise
346 353
347 354 Command(cwd).execute("""echo '' > %s""" % added_file)
348 355 Command(cwd).execute("""hg init %s""" % path)
349 356 Command(cwd).execute("""hg add %s""" % added_file)
350 357
351 358 for i in xrange(2):
352 359 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
353 360 Command(cwd).execute(cmd)
354 361
355 362 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
356 363 Command(cwd).execute(cmd)
357 364
358 365 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
359 366 {'user':USER,
360 367 'pass':PASS,
361 368 'host':HOST,
362 369 'cloned_repo':HG_REPO + '_error',
363 370 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
364 371
365 372 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
366 373 if not """abort: HTTP Error 403: Forbidden""" in stderr:
367 374 raise Exception('Failure')
368 375
376
369 377 @test_wrapp
370 378 def get_logs():
371 379 return UserLog.query().all()
372 380
381
373 382 @test_wrapp
374 383 def test_logs(initial):
375 384 logs = UserLog.query().all()
376 385 operations = 4
377 386 if len(initial) + operations != len(logs):
378 387 raise Exception("missing number of logs initial:%s vs current:%s" % \
379 388 (len(initial), len(logs)))
380 389
381 390
382 391 if __name__ == '__main__':
383 392 create_test_user(force=False)
384 393 create_test_repo()
385 394
386 395 initial_logs = get_logs()
387 396 print 'initial activity logs: %s' % len(initial_logs)
388 397 s = time.time()
389 398 #test_push_modify_file()
390 399 test_clone_with_credentials()
391 400 test_clone_wrong_credentials()
392 401
393 402 test_push_new_file(commits=2, with_clone=True)
394 403
395 404 test_clone_anonymous()
396 405 test_push_wrong_path()
397 406
398 407 test_push_wrong_credentials()
399 408
400 409 test_logs(initial_logs)
401 410 print 'finished ok in %.3f' % (time.time() - s)
@@ -1,56 +1,56
1 1 """
2 2 Unit tests for vcs_ library.
3 3
4 4 In order to run tests we need to prepare our environment first. Tests would be
5 5 run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from
6 6 ``vcs.backends.BACKENDS``.
7 7
8 8 For each SCM we run tests for, we need some repository. We would use
9 9 repositories location from system environment variables or test suite defaults
10 10 - see ``conf`` module for more detail. We simply try to check if repository at
11 11 certain location exists, if not we would try to fetch them. At ``test_vcs`` or
12 12 ``test_common`` we run unit tests common for each repository type and for
13 13 example specific mercurial tests are located at ``test_hg`` module.
14 14
15 15 Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
16 16 function at ``tests/__init__.py``.
17 17
18 18 .. _vcs: http://bitbucket.org/marcinkuzminski/vcs
19 19 .. _unittest: http://pypi.python.org/pypi/unittest
20 20
21 21 """
22 22 import os
23 23 from rhodecode.lib import vcs
24 24 from rhodecode.lib.vcs.utils.compat import unittest
25 from utils import VCSTestError, SCMFetcher
25 26 from rhodecode.tests import *
26 from utils import VCSTestError, SCMFetcher
27 27
28 28
29 29 def setup_package():
30 30 """
31 31 Prepares whole package for tests which mainly means it would try to fetch
32 32 test repositories or use already existing ones.
33 33 """
34 34 fetchers = {
35 35 'hg': {
36 36 'alias': 'hg',
37 37 'test_repo_path': TEST_HG_REPO,
38 38 'remote_repo': HG_REMOTE_REPO,
39 39 'clone_cmd': 'hg clone',
40 40 },
41 41 'git': {
42 42 'alias': 'git',
43 43 'test_repo_path': TEST_GIT_REPO,
44 44 'remote_repo': GIT_REMOTE_REPO,
45 45 'clone_cmd': 'git clone --bare',
46 46 },
47 47 }
48 48 try:
49 49 for scm, fetcher_info in fetchers.items():
50 50 fetcher = SCMFetcher(**fetcher_info)
51 51 fetcher.setup()
52 52 except VCSTestError, err:
53 53 raise RuntimeError(str(err))
54 54
55 55 #start_dir = os.path.abspath(os.path.dirname(__file__))
56 56 #unittest.defaultTestLoader.discover(start_dir)
General Comments 0
You need to be logged in to leave comments. Login now