##// END OF EJS Templates
orginized test module...
marcink -
r2527:95624ce4 beta
parent child Browse files
Show More
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -0,0 +1,316
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
8 from rhodecode.model.user import UserModel
9
10 from rhodecode.model.meta import Session
11 from rhodecode.model.users_group import UsersGroupModel
12 from rhodecode.lib.auth import AuthUser
13
14
15 def _make_group(path, desc='desc', parent_id=None,
16 skip_if_exists=False):
17
18 gr = RepoGroup.get_by_group_name(path)
19 if gr and skip_if_exists:
20 return gr
21
22 gr = ReposGroupModel().create(path, desc, parent_id)
23 return gr
24
25
26 class TestPermissions(unittest.TestCase):
27 def __init__(self, methodName='runTest'):
28 super(TestPermissions, self).__init__(methodName=methodName)
29
30 def setUp(self):
31 self.u1 = UserModel().create_or_update(
32 username=u'u1', password=u'qweqwe',
33 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
34 )
35 self.u2 = UserModel().create_or_update(
36 username=u'u2', password=u'qweqwe',
37 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
38 )
39 self.anon = User.get_by_username('default')
40 self.a1 = UserModel().create_or_update(
41 username=u'a1', password=u'qweqwe',
42 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
43 )
44 Session().commit()
45
46 def tearDown(self):
47 if hasattr(self, 'test_repo'):
48 RepoModel().delete(repo=self.test_repo)
49 UserModel().delete(self.u1)
50 UserModel().delete(self.u2)
51 UserModel().delete(self.a1)
52 if hasattr(self, 'g1'):
53 ReposGroupModel().delete(self.g1.group_id)
54 if hasattr(self, 'g2'):
55 ReposGroupModel().delete(self.g2.group_id)
56
57 if hasattr(self, 'ug1'):
58 UsersGroupModel().delete(self.ug1, force=True)
59
60 Session().commit()
61
62 def test_default_perms_set(self):
63 u1_auth = AuthUser(user_id=self.u1.user_id)
64 perms = {
65 'repositories_groups': {},
66 'global': set([u'hg.create.repository', u'repository.read',
67 u'hg.register.manual_activate']),
68 'repositories': {u'vcs_test_hg': u'repository.read'}
69 }
70 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 perms['repositories'][HG_REPO])
72 new_perm = 'repository.write'
73 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 perm=new_perm)
75 Session().commit()
76
77 u1_auth = AuthUser(user_id=self.u1.user_id)
78 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 new_perm)
80
81 def test_default_admin_perms_set(self):
82 a1_auth = AuthUser(user_id=self.a1.user_id)
83 perms = {
84 'repositories_groups': {},
85 'global': set([u'hg.admin']),
86 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 }
88 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 perms['repositories'][HG_REPO])
90 new_perm = 'repository.write'
91 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 perm=new_perm)
93 Session().commit()
94 # cannot really downgrade admins permissions !? they still get's set as
95 # admin !
96 u1_auth = AuthUser(user_id=self.a1.user_id)
97 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 perms['repositories'][HG_REPO])
99
100 def test_default_group_perms(self):
101 self.g1 = _make_group('test1', skip_if_exists=True)
102 self.g2 = _make_group('test2', skip_if_exists=True)
103 u1_auth = AuthUser(user_id=self.u1.user_id)
104 perms = {
105 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
107 'repositories': {u'vcs_test_hg': u'repository.read'}
108 }
109 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 perms['repositories'][HG_REPO])
111 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 perms['repositories_groups'])
113
114 def test_default_admin_group_perms(self):
115 self.g1 = _make_group('test1', skip_if_exists=True)
116 self.g2 = _make_group('test2', skip_if_exists=True)
117 a1_auth = AuthUser(user_id=self.a1.user_id)
118 perms = {
119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
120 'global': set(['hg.admin']),
121 'repositories': {u'vcs_test_hg': 'repository.admin'}
122 }
123
124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
125 perms['repositories'][HG_REPO])
126 self.assertEqual(a1_auth.permissions['repositories_groups'],
127 perms['repositories_groups'])
128
129 def test_propagated_permission_from_users_group(self):
130 # make group
131 self.ug1 = UsersGroupModel().create('G1')
132 # add user to group
133 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
134
135 # set permission to lower
136 new_perm = 'repository.none'
137 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
138 Session().commit()
139 u1_auth = AuthUser(user_id=self.u1.user_id)
140 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
141 new_perm)
142
143 # grant perm for group this should override permission from user
144 new_perm = 'repository.write'
145 RepoModel().grant_users_group_permission(repo=HG_REPO,
146 group_name=self.ug1,
147 perm=new_perm)
148 # check perms
149 u1_auth = AuthUser(user_id=self.u1.user_id)
150 perms = {
151 'repositories_groups': {},
152 'global': set([u'hg.create.repository', u'repository.read',
153 u'hg.register.manual_activate']),
154 'repositories': {u'vcs_test_hg': u'repository.read'}
155 }
156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
157 new_perm)
158 self.assertEqual(u1_auth.permissions['repositories_groups'],
159 perms['repositories_groups'])
160
161 def test_propagated_permission_from_users_group_lower_weight(self):
162 # make group
163 self.ug1 = UsersGroupModel().create('G1')
164 # add user to group
165 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
166
167 # set permission to lower
168 new_perm_h = 'repository.write'
169 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
170 perm=new_perm_h)
171 Session().commit()
172 u1_auth = AuthUser(user_id=self.u1.user_id)
173 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
174 new_perm_h)
175
176 # grant perm for group this should NOT override permission from user
177 # since it's lower than granted
178 new_perm_l = 'repository.read'
179 RepoModel().grant_users_group_permission(repo=HG_REPO,
180 group_name=self.ug1,
181 perm=new_perm_l)
182 # check perms
183 u1_auth = AuthUser(user_id=self.u1.user_id)
184 perms = {
185 'repositories_groups': {},
186 'global': set([u'hg.create.repository', u'repository.read',
187 u'hg.register.manual_activate']),
188 'repositories': {u'vcs_test_hg': u'repository.write'}
189 }
190 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
191 new_perm_h)
192 self.assertEqual(u1_auth.permissions['repositories_groups'],
193 perms['repositories_groups'])
194
195 def test_repo_in_group_permissions(self):
196 self.g1 = _make_group('group1', skip_if_exists=True)
197 self.g2 = _make_group('group2', skip_if_exists=True)
198 Session().commit()
199 # both perms should be read !
200 u1_auth = AuthUser(user_id=self.u1.user_id)
201 self.assertEqual(u1_auth.permissions['repositories_groups'],
202 {u'group1': u'group.read', u'group2': u'group.read'})
203
204 a1_auth = AuthUser(user_id=self.anon.user_id)
205 self.assertEqual(a1_auth.permissions['repositories_groups'],
206 {u'group1': u'group.read', u'group2': u'group.read'})
207
208 #Change perms to none for both groups
209 ReposGroupModel().grant_user_permission(repos_group=self.g1,
210 user=self.anon,
211 perm='group.none')
212 ReposGroupModel().grant_user_permission(repos_group=self.g2,
213 user=self.anon,
214 perm='group.none')
215
216 u1_auth = AuthUser(user_id=self.u1.user_id)
217 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 {u'group1': u'group.none', u'group2': u'group.none'})
219
220 a1_auth = AuthUser(user_id=self.anon.user_id)
221 self.assertEqual(a1_auth.permissions['repositories_groups'],
222 {u'group1': u'group.none', u'group2': u'group.none'})
223
224 # add repo to group
225 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
226 self.test_repo = RepoModel().create_repo(
227 repo_name=name,
228 repo_type='hg',
229 description='',
230 owner=self.u1,
231 )
232 Session().commit()
233
234 u1_auth = AuthUser(user_id=self.u1.user_id)
235 self.assertEqual(u1_auth.permissions['repositories_groups'],
236 {u'group1': u'group.none', u'group2': u'group.none'})
237
238 a1_auth = AuthUser(user_id=self.anon.user_id)
239 self.assertEqual(a1_auth.permissions['repositories_groups'],
240 {u'group1': u'group.none', u'group2': u'group.none'})
241
242 #grant permission for u2 !
243 ReposGroupModel().grant_user_permission(repos_group=self.g1,
244 user=self.u2,
245 perm='group.read')
246 ReposGroupModel().grant_user_permission(repos_group=self.g2,
247 user=self.u2,
248 perm='group.read')
249 Session().commit()
250 self.assertNotEqual(self.u1, self.u2)
251 #u1 and anon should have not change perms while u2 should !
252 u1_auth = AuthUser(user_id=self.u1.user_id)
253 self.assertEqual(u1_auth.permissions['repositories_groups'],
254 {u'group1': u'group.none', u'group2': u'group.none'})
255
256 u2_auth = AuthUser(user_id=self.u2.user_id)
257 self.assertEqual(u2_auth.permissions['repositories_groups'],
258 {u'group1': u'group.read', u'group2': u'group.read'})
259
260 a1_auth = AuthUser(user_id=self.anon.user_id)
261 self.assertEqual(a1_auth.permissions['repositories_groups'],
262 {u'group1': u'group.none', u'group2': u'group.none'})
263
264 def test_repo_group_user_as_user_group_member(self):
265 # create Group1
266 self.g1 = _make_group('group1', skip_if_exists=True)
267 Session().commit()
268 a1_auth = AuthUser(user_id=self.anon.user_id)
269
270 self.assertEqual(a1_auth.permissions['repositories_groups'],
271 {u'group1': u'group.read'})
272
273 # set default permission to none
274 ReposGroupModel().grant_user_permission(repos_group=self.g1,
275 user=self.anon,
276 perm='group.none')
277 # make group
278 self.ug1 = UsersGroupModel().create('G1')
279 # add user to group
280 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
281 Session().commit()
282
283 # check if user is in the group
284 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
285 self.assertEqual(membrs, [self.u1.user_id])
286 # add some user to that group
287
288 # check his permissions
289 a1_auth = AuthUser(user_id=self.anon.user_id)
290 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 {u'group1': u'group.none'})
292
293 u1_auth = AuthUser(user_id=self.u1.user_id)
294 self.assertEqual(u1_auth.permissions['repositories_groups'],
295 {u'group1': u'group.none'})
296
297 # grant ug1 read permissions for
298 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
299 group_name=self.ug1,
300 perm='group.read')
301 Session().commit()
302 # check if the
303 obj = Session().query(UsersGroupRepoGroupToPerm)\
304 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
305 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
306 .scalar()
307 self.assertEqual(obj.permission.permission_name, 'group.read')
308
309 a1_auth = AuthUser(user_id=self.anon.user_id)
310
311 self.assertEqual(a1_auth.permissions['repositories_groups'],
312 {u'group1': u'group.none'})
313
314 u1_auth = AuthUser(user_id=self.u1.user_id)
315 self.assertEqual(u1_auth.permissions['repositories_groups'],
316 {u'group1': u'group.read'})
@@ -0,0 +1,170
1 import os
2 import unittest
3 from rhodecode.tests import *
4
5 from rhodecode.model.repos_group import ReposGroupModel
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User
8 from rhodecode.model.meta import Session
9 from sqlalchemy.exc import IntegrityError
10
11
12 def _make_group(path, desc='desc', parent_id=None,
13 skip_if_exists=False):
14
15 gr = RepoGroup.get_by_group_name(path)
16 if gr and skip_if_exists:
17 return gr
18
19 gr = ReposGroupModel().create(path, desc, parent_id)
20 return gr
21
22
23 class TestReposGroups(unittest.TestCase):
24
25 def setUp(self):
26 self.g1 = _make_group('test1', skip_if_exists=True)
27 Session().commit()
28 self.g2 = _make_group('test2', skip_if_exists=True)
29 Session().commit()
30 self.g3 = _make_group('test3', skip_if_exists=True)
31 Session().commit()
32
33 def tearDown(self):
34 print 'out'
35
36 def __check_path(self, *path):
37 """
38 Checks the path for existance !
39 """
40 path = [TESTS_TMP_PATH] + list(path)
41 path = os.path.join(*path)
42 return os.path.isdir(path)
43
44 def _check_folders(self):
45 print os.listdir(TESTS_TMP_PATH)
46
47 def __delete_group(self, id_):
48 ReposGroupModel().delete(id_)
49
50 def __update_group(self, id_, path, desc='desc', parent_id=None):
51 form_data = dict(
52 group_name=path,
53 group_description=desc,
54 group_parent_id=parent_id,
55 perms_updates=[],
56 perms_new=[]
57 )
58 gr = ReposGroupModel().update(id_, form_data)
59 return gr
60
61 def test_create_group(self):
62 g = _make_group('newGroup')
63 self.assertEqual(g.full_path, 'newGroup')
64
65 self.assertTrue(self.__check_path('newGroup'))
66
67 def test_create_same_name_group(self):
68 self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
69 Session().rollback()
70
71 def test_same_subgroup(self):
72 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
73 self.assertEqual(sg1.parent_group, self.g1)
74 self.assertEqual(sg1.full_path, 'test1/sub1')
75 self.assertTrue(self.__check_path('test1', 'sub1'))
76
77 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
78 self.assertEqual(ssg1.parent_group, sg1)
79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
81
82 def test_remove_group(self):
83 sg1 = _make_group('deleteme')
84 self.__delete_group(sg1.group_id)
85
86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
87 self.assertFalse(self.__check_path('deteteme'))
88
89 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
90 self.__delete_group(sg1.group_id)
91
92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
93 self.assertFalse(self.__check_path('test1', 'deteteme'))
94
95 def test_rename_single_group(self):
96 sg1 = _make_group('initial')
97
98 new_sg1 = self.__update_group(sg1.group_id, 'after')
99 self.assertTrue(self.__check_path('after'))
100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
101
102 def test_update_group_parent(self):
103
104 sg1 = _make_group('initial', parent_id=self.g1.group_id)
105
106 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
107 self.assertTrue(self.__check_path('test1', 'after'))
108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
109
110 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
111 self.assertTrue(self.__check_path('test3', 'after'))
112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
113
114 new_sg1 = self.__update_group(sg1.group_id, 'hello')
115 self.assertTrue(self.__check_path('hello'))
116
117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
118
119 def test_subgrouping_with_repo(self):
120
121 g1 = _make_group('g1')
122 g2 = _make_group('g2')
123
124 # create new repo
125 form_data = dict(repo_name='john',
126 repo_name_full='john',
127 fork_name=None,
128 description=None,
129 repo_group=None,
130 private=False,
131 repo_type='hg',
132 clone_uri=None,
133 landing_rev='tip')
134 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
135 r = RepoModel().create(form_data, cur_user)
136
137 self.assertEqual(r.repo_name, 'john')
138
139 # put repo into group
140 form_data = form_data
141 form_data['repo_group'] = g1.group_id
142 form_data['perms_new'] = []
143 form_data['perms_updates'] = []
144 RepoModel().update(r.repo_name, form_data)
145 self.assertEqual(r.repo_name, 'g1/john')
146
147 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
148 self.assertTrue(self.__check_path('g2', 'g1'))
149
150 # test repo
151 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
152 r.just_name]))
153
154 def test_move_to_root(self):
155 g1 = _make_group('t11')
156 Session().commit()
157 g2 = _make_group('t22', parent_id=g1.group_id)
158 Session().commit()
159
160 self.assertEqual(g2.full_path, 't11/t22')
161 self.assertTrue(self.__check_path('t11', 't22'))
162
163 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
164 Session().commit()
165
166 self.assertEqual(g2.group_name, 'g22')
167 # we moved out group from t1 to '' so it's full path should be 'g2'
168 self.assertEqual(g2.full_path, 'g22')
169 self.assertFalse(self.__check_path('t11', 't22'))
170 self.assertTrue(self.__check_path('g22')) No newline at end of file
@@ -0,0 +1,124
1 import unittest
2 from rhodecode.tests import *
3
4 from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
5 Permission
6 from rhodecode.model.user import UserModel
7
8 from rhodecode.model.meta import Session
9 from rhodecode.model.users_group import UsersGroupModel
10
11
12 class TestUser(unittest.TestCase):
13 def __init__(self, methodName='runTest'):
14 Session.remove()
15 super(TestUser, self).__init__(methodName=methodName)
16
17 def test_create_and_remove(self):
18 usr = UserModel().create_or_update(username=u'test_user',
19 password=u'qweqwe',
20 email=u'u232@rhodecode.org',
21 firstname=u'u1', lastname=u'u1')
22 Session().commit()
23 self.assertEqual(User.get_by_username(u'test_user'), usr)
24
25 # make users group
26 users_group = UsersGroupModel().create('some_example_group')
27 Session().commit()
28
29 UsersGroupModel().add_user_to_group(users_group, usr)
30 Session().commit()
31
32 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
33 self.assertEqual(UsersGroupMember.query().count(), 1)
34 UserModel().delete(usr.user_id)
35 Session().commit()
36
37 self.assertEqual(UsersGroupMember.query().all(), [])
38
39 def test_additonal_email_as_main(self):
40 usr = UserModel().create_or_update(username=u'test_user',
41 password=u'qweqwe',
42 email=u'main_email@rhodecode.org',
43 firstname=u'u1', lastname=u'u1')
44 Session().commit()
45
46 def do():
47 m = UserEmailMap()
48 m.email = u'main_email@rhodecode.org'
49 m.user = usr
50 Session().add(m)
51 Session().commit()
52 self.assertRaises(AttributeError, do)
53
54 UserModel().delete(usr.user_id)
55 Session().commit()
56
57 def test_extra_email_map(self):
58 usr = UserModel().create_or_update(username=u'test_user',
59 password=u'qweqwe',
60 email=u'main_email@rhodecode.org',
61 firstname=u'u1', lastname=u'u1')
62 Session().commit()
63
64 m = UserEmailMap()
65 m.email = u'main_email2@rhodecode.org'
66 m.user = usr
67 Session().add(m)
68 Session().commit()
69
70 u = User.get_by_email(email='main_email@rhodecode.org')
71 self.assertEqual(usr.user_id, u.user_id)
72 self.assertEqual(usr.username, u.username)
73
74 u = User.get_by_email(email='main_email2@rhodecode.org')
75 self.assertEqual(usr.user_id, u.user_id)
76 self.assertEqual(usr.username, u.username)
77 u = User.get_by_email(email='main_email3@rhodecode.org')
78 self.assertEqual(None, u)
79
80 UserModel().delete(usr.user_id)
81 Session().commit()
82
83
84 class TestUsers(unittest.TestCase):
85
86 def __init__(self, methodName='runTest'):
87 super(TestUsers, self).__init__(methodName=methodName)
88
89 def setUp(self):
90 self.u1 = UserModel().create_or_update(username=u'u1',
91 password=u'qweqwe',
92 email=u'u1@rhodecode.org',
93 firstname=u'u1', lastname=u'u1')
94
95 def tearDown(self):
96 perm = Permission.query().all()
97 for p in perm:
98 UserModel().revoke_perm(self.u1, p)
99
100 UserModel().delete(self.u1)
101 Session().commit()
102
103 def test_add_perm(self):
104 perm = Permission.query().all()[0]
105 UserModel().grant_perm(self.u1, perm)
106 Session().commit()
107 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
108
109 def test_has_perm(self):
110 perm = Permission.query().all()
111 for p in perm:
112 has_p = UserModel().has_perm(self.u1, p)
113 self.assertEqual(False, has_p)
114
115 def test_revoke_perm(self):
116 perm = Permission.query().all()[0]
117 UserModel().grant_perm(self.u1, perm)
118 Session().commit()
119 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
120
121 #revoke
122 UserModel().revoke_perm(self.u1, perm)
123 Session().commit()
124 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them
@@ -1,761 +1,189
1 import os
1 import os
2 import unittest
2 import unittest
3 from rhodecode.tests import *
3 from rhodecode.tests import *
4
4
5 from rhodecode.model.repos_group import ReposGroupModel
5 from rhodecode.model.db import User, Notification, UserNotification
6 from rhodecode.model.repo import RepoModel
7 from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
8 UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
9 Repository, UserEmailMap
10 from sqlalchemy.exc import IntegrityError, DatabaseError
11 from rhodecode.model.user import UserModel
6 from rhodecode.model.user import UserModel
12
7
13 from rhodecode.model.meta import Session
8 from rhodecode.model.meta import Session
14 from rhodecode.model.notification import NotificationModel
9 from rhodecode.model.notification import NotificationModel
15 from rhodecode.model.users_group import UsersGroupModel
16 from rhodecode.lib.auth import AuthUser
17
18
19 def _make_group(path, desc='desc', parent_id=None,
20 skip_if_exists=False):
21
22 gr = RepoGroup.get_by_group_name(path)
23 if gr and skip_if_exists:
24 return gr
25
26 gr = ReposGroupModel().create(path, desc, parent_id)
27 return gr
28
29
30 class TestReposGroups(unittest.TestCase):
31
32 def setUp(self):
33 self.g1 = _make_group('test1', skip_if_exists=True)
34 Session.commit()
35 self.g2 = _make_group('test2', skip_if_exists=True)
36 Session.commit()
37 self.g3 = _make_group('test3', skip_if_exists=True)
38 Session.commit()
39
40 def tearDown(self):
41 print 'out'
42
43 def __check_path(self, *path):
44 """
45 Checks the path for existance !
46 """
47 path = [TESTS_TMP_PATH] + list(path)
48 path = os.path.join(*path)
49 return os.path.isdir(path)
50
51 def _check_folders(self):
52 print os.listdir(TESTS_TMP_PATH)
53
54 def __delete_group(self, id_):
55 ReposGroupModel().delete(id_)
56
57 def __update_group(self, id_, path, desc='desc', parent_id=None):
58 form_data = dict(
59 group_name=path,
60 group_description=desc,
61 group_parent_id=parent_id,
62 perms_updates=[],
63 perms_new=[]
64 )
65 gr = ReposGroupModel().update(id_, form_data)
66 return gr
67
68 def test_create_group(self):
69 g = _make_group('newGroup')
70 self.assertEqual(g.full_path, 'newGroup')
71
72 self.assertTrue(self.__check_path('newGroup'))
73
74 def test_create_same_name_group(self):
75 self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
76 Session.rollback()
77
78 def test_same_subgroup(self):
79 sg1 = _make_group('sub1', parent_id=self.g1.group_id)
80 self.assertEqual(sg1.parent_group, self.g1)
81 self.assertEqual(sg1.full_path, 'test1/sub1')
82 self.assertTrue(self.__check_path('test1', 'sub1'))
83
84 ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
85 self.assertEqual(ssg1.parent_group, sg1)
86 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
87 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
88
89 def test_remove_group(self):
90 sg1 = _make_group('deleteme')
91 self.__delete_group(sg1.group_id)
92
93 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 self.assertFalse(self.__check_path('deteteme'))
95
96 sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
97 self.__delete_group(sg1.group_id)
98
99 self.assertEqual(RepoGroup.get(sg1.group_id), None)
100 self.assertFalse(self.__check_path('test1', 'deteteme'))
101
102 def test_rename_single_group(self):
103 sg1 = _make_group('initial')
104
105 new_sg1 = self.__update_group(sg1.group_id, 'after')
106 self.assertTrue(self.__check_path('after'))
107 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
108
109 def test_update_group_parent(self):
110
111 sg1 = _make_group('initial', parent_id=self.g1.group_id)
112
113 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
114 self.assertTrue(self.__check_path('test1', 'after'))
115 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
116
117 new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
118 self.assertTrue(self.__check_path('test3', 'after'))
119 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
120
121 new_sg1 = self.__update_group(sg1.group_id, 'hello')
122 self.assertTrue(self.__check_path('hello'))
123
124 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
125
126 def test_subgrouping_with_repo(self):
127
128 g1 = _make_group('g1')
129 g2 = _make_group('g2')
130
131 # create new repo
132 form_data = dict(repo_name='john',
133 repo_name_full='john',
134 fork_name=None,
135 description=None,
136 repo_group=None,
137 private=False,
138 repo_type='hg',
139 clone_uri=None,
140 landing_rev='tip')
141 cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
142 r = RepoModel().create(form_data, cur_user)
143
144 self.assertEqual(r.repo_name, 'john')
145
146 # put repo into group
147 form_data = form_data
148 form_data['repo_group'] = g1.group_id
149 form_data['perms_new'] = []
150 form_data['perms_updates'] = []
151 RepoModel().update(r.repo_name, form_data)
152 self.assertEqual(r.repo_name, 'g1/john')
153
154 self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
155 self.assertTrue(self.__check_path('g2', 'g1'))
156
157 # test repo
158 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
159
160 def test_move_to_root(self):
161 g1 = _make_group('t11')
162 Session.commit()
163 g2 = _make_group('t22', parent_id=g1.group_id)
164 Session.commit()
165
166 self.assertEqual(g2.full_path, 't11/t22')
167 self.assertTrue(self.__check_path('t11', 't22'))
168
169 g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
170 Session.commit()
171
172 self.assertEqual(g2.group_name, 'g22')
173 # we moved out group from t1 to '' so it's full path should be 'g2'
174 self.assertEqual(g2.full_path, 'g22')
175 self.assertFalse(self.__check_path('t11', 't22'))
176 self.assertTrue(self.__check_path('g22'))
177
178
179 class TestUser(unittest.TestCase):
180 def __init__(self, methodName='runTest'):
181 Session.remove()
182 super(TestUser, self).__init__(methodName=methodName)
183
184 def test_create_and_remove(self):
185 usr = UserModel().create_or_update(username=u'test_user',
186 password=u'qweqwe',
187 email=u'u232@rhodecode.org',
188 firstname=u'u1', lastname=u'u1')
189 Session.commit()
190 self.assertEqual(User.get_by_username(u'test_user'), usr)
191
192 # make users group
193 users_group = UsersGroupModel().create('some_example_group')
194 Session.commit()
195
196 UsersGroupModel().add_user_to_group(users_group, usr)
197 Session.commit()
198
199 self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
200 self.assertEqual(UsersGroupMember.query().count(), 1)
201 UserModel().delete(usr.user_id)
202 Session.commit()
203
204 self.assertEqual(UsersGroupMember.query().all(), [])
205
206 def test_additonal_email_as_main(self):
207 usr = UserModel().create_or_update(username=u'test_user',
208 password=u'qweqwe',
209 email=u'main_email@rhodecode.org',
210 firstname=u'u1', lastname=u'u1')
211 Session.commit()
212
213 def do():
214 m = UserEmailMap()
215 m.email = u'main_email@rhodecode.org'
216 m.user = usr
217 Session.add(m)
218 Session.commit()
219 self.assertRaises(AttributeError, do)
220
221 UserModel().delete(usr.user_id)
222 Session.commit()
223
224 def test_extra_email_map(self):
225 usr = UserModel().create_or_update(username=u'test_user',
226 password=u'qweqwe',
227 email=u'main_email@rhodecode.org',
228 firstname=u'u1', lastname=u'u1')
229 Session.commit()
230
231 m = UserEmailMap()
232 m.email = u'main_email2@rhodecode.org'
233 m.user = usr
234 Session.add(m)
235 Session.commit()
236
237 u = User.get_by_email(email='main_email@rhodecode.org')
238 self.assertEqual(usr.user_id, u.user_id)
239 self.assertEqual(usr.username, u.username)
240
241 u = User.get_by_email(email='main_email2@rhodecode.org')
242 self.assertEqual(usr.user_id, u.user_id)
243 self.assertEqual(usr.username, u.username)
244 u = User.get_by_email(email='main_email3@rhodecode.org')
245 self.assertEqual(None, u)
246
247 UserModel().delete(usr.user_id)
248 Session.commit()
249
10
250
11
251 class TestNotifications(unittest.TestCase):
12 class TestNotifications(unittest.TestCase):
252
13
253 def __init__(self, methodName='runTest'):
14 def __init__(self, methodName='runTest'):
254 Session.remove()
15 Session.remove()
255 self.u1 = UserModel().create_or_update(username=u'u1',
16 self.u1 = UserModel().create_or_update(username=u'u1',
256 password=u'qweqwe',
17 password=u'qweqwe',
257 email=u'u1@rhodecode.org',
18 email=u'u1@rhodecode.org',
258 firstname=u'u1', lastname=u'u1')
19 firstname=u'u1', lastname=u'u1')
259 Session.commit()
20 Session().commit()
260 self.u1 = self.u1.user_id
21 self.u1 = self.u1.user_id
261
22
262 self.u2 = UserModel().create_or_update(username=u'u2',
23 self.u2 = UserModel().create_or_update(username=u'u2',
263 password=u'qweqwe',
24 password=u'qweqwe',
264 email=u'u2@rhodecode.org',
25 email=u'u2@rhodecode.org',
265 firstname=u'u2', lastname=u'u3')
26 firstname=u'u2', lastname=u'u3')
266 Session.commit()
27 Session().commit()
267 self.u2 = self.u2.user_id
28 self.u2 = self.u2.user_id
268
29
269 self.u3 = UserModel().create_or_update(username=u'u3',
30 self.u3 = UserModel().create_or_update(username=u'u3',
270 password=u'qweqwe',
31 password=u'qweqwe',
271 email=u'u3@rhodecode.org',
32 email=u'u3@rhodecode.org',
272 firstname=u'u3', lastname=u'u3')
33 firstname=u'u3', lastname=u'u3')
273 Session.commit()
34 Session().commit()
274 self.u3 = self.u3.user_id
35 self.u3 = self.u3.user_id
275
36
276 super(TestNotifications, self).__init__(methodName=methodName)
37 super(TestNotifications, self).__init__(methodName=methodName)
277
38
278 def _clean_notifications(self):
39 def _clean_notifications(self):
279 for n in Notification.query().all():
40 for n in Notification.query().all():
280 Session.delete(n)
41 Session().delete(n)
281
42
282 Session.commit()
43 Session().commit()
283 self.assertEqual(Notification.query().all(), [])
44 self.assertEqual(Notification.query().all(), [])
284
45
285 def tearDown(self):
46 def tearDown(self):
286 self._clean_notifications()
47 self._clean_notifications()
287
48
288 def test_create_notification(self):
49 def test_create_notification(self):
289 self.assertEqual([], Notification.query().all())
50 self.assertEqual([], Notification.query().all())
290 self.assertEqual([], UserNotification.query().all())
51 self.assertEqual([], UserNotification.query().all())
291
52
292 usrs = [self.u1, self.u2]
53 usrs = [self.u1, self.u2]
293 notification = NotificationModel().create(created_by=self.u1,
54 notification = NotificationModel().create(created_by=self.u1,
294 subject=u'subj', body=u'hi there',
55 subject=u'subj', body=u'hi there',
295 recipients=usrs)
56 recipients=usrs)
296 Session.commit()
57 Session().commit()
297 u1 = User.get(self.u1)
58 u1 = User.get(self.u1)
298 u2 = User.get(self.u2)
59 u2 = User.get(self.u2)
299 u3 = User.get(self.u3)
60 u3 = User.get(self.u3)
300 notifications = Notification.query().all()
61 notifications = Notification.query().all()
301 self.assertEqual(len(notifications), 1)
62 self.assertEqual(len(notifications), 1)
302
63
303 unotification = UserNotification.query()\
64 unotification = UserNotification.query()\
304 .filter(UserNotification.notification == notification).all()
65 .filter(UserNotification.notification == notification).all()
305
66
306 self.assertEqual(notifications[0].recipients, [u1, u2])
67 self.assertEqual(notifications[0].recipients, [u1, u2])
307 self.assertEqual(notification.notification_id,
68 self.assertEqual(notification.notification_id,
308 notifications[0].notification_id)
69 notifications[0].notification_id)
309 self.assertEqual(len(unotification), len(usrs))
70 self.assertEqual(len(unotification), len(usrs))
310 self.assertEqual([x.user.user_id for x in unotification], usrs)
71 self.assertEqual([x.user.user_id for x in unotification], usrs)
311
72
312 def test_user_notifications(self):
73 def test_user_notifications(self):
313 self.assertEqual([], Notification.query().all())
74 self.assertEqual([], Notification.query().all())
314 self.assertEqual([], UserNotification.query().all())
75 self.assertEqual([], UserNotification.query().all())
315
76
316 notification1 = NotificationModel().create(created_by=self.u1,
77 notification1 = NotificationModel().create(created_by=self.u1,
317 subject=u'subj', body=u'hi there1',
78 subject=u'subj', body=u'hi there1',
318 recipients=[self.u3])
79 recipients=[self.u3])
319 Session.commit()
80 Session().commit()
320 notification2 = NotificationModel().create(created_by=self.u1,
81 notification2 = NotificationModel().create(created_by=self.u1,
321 subject=u'subj', body=u'hi there2',
82 subject=u'subj', body=u'hi there2',
322 recipients=[self.u3])
83 recipients=[self.u3])
323 Session.commit()
84 Session().commit()
324 u3 = Session.query(User).get(self.u3)
85 u3 = Session().query(User).get(self.u3)
325
86
326 self.assertEqual(sorted([x.notification for x in u3.notifications]),
87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
327 sorted([notification2, notification1]))
88 sorted([notification2, notification1]))
328
89
329 def test_delete_notifications(self):
90 def test_delete_notifications(self):
330 self.assertEqual([], Notification.query().all())
91 self.assertEqual([], Notification.query().all())
331 self.assertEqual([], UserNotification.query().all())
92 self.assertEqual([], UserNotification.query().all())
332
93
333 notification = NotificationModel().create(created_by=self.u1,
94 notification = NotificationModel().create(created_by=self.u1,
334 subject=u'title', body=u'hi there3',
95 subject=u'title', body=u'hi there3',
335 recipients=[self.u3, self.u1, self.u2])
96 recipients=[self.u3, self.u1, self.u2])
336 Session.commit()
97 Session().commit()
337 notifications = Notification.query().all()
98 notifications = Notification.query().all()
338 self.assertTrue(notification in notifications)
99 self.assertTrue(notification in notifications)
339
100
340 Notification.delete(notification.notification_id)
101 Notification.delete(notification.notification_id)
341 Session.commit()
102 Session().commit()
342
103
343 notifications = Notification.query().all()
104 notifications = Notification.query().all()
344 self.assertFalse(notification in notifications)
105 self.assertFalse(notification in notifications)
345
106
346 un = UserNotification.query().filter(UserNotification.notification
107 un = UserNotification.query().filter(UserNotification.notification
347 == notification).all()
108 == notification).all()
348 self.assertEqual(un, [])
109 self.assertEqual(un, [])
349
110
350 def test_delete_association(self):
111 def test_delete_association(self):
351
112
352 self.assertEqual([], Notification.query().all())
113 self.assertEqual([], Notification.query().all())
353 self.assertEqual([], UserNotification.query().all())
114 self.assertEqual([], UserNotification.query().all())
354
115
355 notification = NotificationModel().create(created_by=self.u1,
116 notification = NotificationModel().create(created_by=self.u1,
356 subject=u'title', body=u'hi there3',
117 subject=u'title', body=u'hi there3',
357 recipients=[self.u3, self.u1, self.u2])
118 recipients=[self.u3, self.u1, self.u2])
358 Session.commit()
119 Session().commit()
359
120
360 unotification = UserNotification.query()\
121 unotification = UserNotification.query()\
361 .filter(UserNotification.notification ==
122 .filter(UserNotification.notification ==
362 notification)\
123 notification)\
363 .filter(UserNotification.user_id == self.u3)\
124 .filter(UserNotification.user_id == self.u3)\
364 .scalar()
125 .scalar()
365
126
366 self.assertEqual(unotification.user_id, self.u3)
127 self.assertEqual(unotification.user_id, self.u3)
367
128
368 NotificationModel().delete(self.u3,
129 NotificationModel().delete(self.u3,
369 notification.notification_id)
130 notification.notification_id)
370 Session.commit()
131 Session().commit()
371
132
372 u3notification = UserNotification.query()\
133 u3notification = UserNotification.query()\
373 .filter(UserNotification.notification ==
134 .filter(UserNotification.notification ==
374 notification)\
135 notification)\
375 .filter(UserNotification.user_id == self.u3)\
136 .filter(UserNotification.user_id == self.u3)\
376 .scalar()
137 .scalar()
377
138
378 self.assertEqual(u3notification, None)
139 self.assertEqual(u3notification, None)
379
140
380 # notification object is still there
141 # notification object is still there
381 self.assertEqual(Notification.query().all(), [notification])
142 self.assertEqual(Notification.query().all(), [notification])
382
143
383 #u1 and u2 still have assignments
144 #u1 and u2 still have assignments
384 u1notification = UserNotification.query()\
145 u1notification = UserNotification.query()\
385 .filter(UserNotification.notification ==
146 .filter(UserNotification.notification ==
386 notification)\
147 notification)\
387 .filter(UserNotification.user_id == self.u1)\
148 .filter(UserNotification.user_id == self.u1)\
388 .scalar()
149 .scalar()
389 self.assertNotEqual(u1notification, None)
150 self.assertNotEqual(u1notification, None)
390 u2notification = UserNotification.query()\
151 u2notification = UserNotification.query()\
391 .filter(UserNotification.notification ==
152 .filter(UserNotification.notification ==
392 notification)\
153 notification)\
393 .filter(UserNotification.user_id == self.u2)\
154 .filter(UserNotification.user_id == self.u2)\
394 .scalar()
155 .scalar()
395 self.assertNotEqual(u2notification, None)
156 self.assertNotEqual(u2notification, None)
396
157
397 def test_notification_counter(self):
158 def test_notification_counter(self):
398 self._clean_notifications()
159 self._clean_notifications()
399 self.assertEqual([], Notification.query().all())
160 self.assertEqual([], Notification.query().all())
400 self.assertEqual([], UserNotification.query().all())
161 self.assertEqual([], UserNotification.query().all())
401
162
402 NotificationModel().create(created_by=self.u1,
163 NotificationModel().create(created_by=self.u1,
403 subject=u'title', body=u'hi there_delete',
164 subject=u'title', body=u'hi there_delete',
404 recipients=[self.u3, self.u1])
165 recipients=[self.u3, self.u1])
405 Session.commit()
166 Session().commit()
406
167
407 self.assertEqual(NotificationModel()
168 self.assertEqual(NotificationModel()
408 .get_unread_cnt_for_user(self.u1), 1)
169 .get_unread_cnt_for_user(self.u1), 1)
409 self.assertEqual(NotificationModel()
170 self.assertEqual(NotificationModel()
410 .get_unread_cnt_for_user(self.u2), 0)
171 .get_unread_cnt_for_user(self.u2), 0)
411 self.assertEqual(NotificationModel()
172 self.assertEqual(NotificationModel()
412 .get_unread_cnt_for_user(self.u3), 1)
173 .get_unread_cnt_for_user(self.u3), 1)
413
174
414 notification = NotificationModel().create(created_by=self.u1,
175 notification = NotificationModel().create(created_by=self.u1,
415 subject=u'title', body=u'hi there3',
176 subject=u'title', body=u'hi there3',
416 recipients=[self.u3, self.u1, self.u2])
177 recipients=[self.u3, self.u1, self.u2])
417 Session.commit()
178 Session().commit()
418
179
419 self.assertEqual(NotificationModel()
180 self.assertEqual(NotificationModel()
420 .get_unread_cnt_for_user(self.u1), 2)
181 .get_unread_cnt_for_user(self.u1), 2)
421 self.assertEqual(NotificationModel()
182 self.assertEqual(NotificationModel()
422 .get_unread_cnt_for_user(self.u2), 1)
183 .get_unread_cnt_for_user(self.u2), 1)
423 self.assertEqual(NotificationModel()
184 self.assertEqual(NotificationModel()
424 .get_unread_cnt_for_user(self.u3), 2)
185 .get_unread_cnt_for_user(self.u3), 2)
425
186
426
187
427 class TestUsers(unittest.TestCase):
428
429 def __init__(self, methodName='runTest'):
430 super(TestUsers, self).__init__(methodName=methodName)
431
432 def setUp(self):
433 self.u1 = UserModel().create_or_update(username=u'u1',
434 password=u'qweqwe',
435 email=u'u1@rhodecode.org',
436 firstname=u'u1', lastname=u'u1')
437
438 def tearDown(self):
439 perm = Permission.query().all()
440 for p in perm:
441 UserModel().revoke_perm(self.u1, p)
442
443 UserModel().delete(self.u1)
444 Session.commit()
445
446 def test_add_perm(self):
447 perm = Permission.query().all()[0]
448 UserModel().grant_perm(self.u1, perm)
449 Session.commit()
450 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
451
452 def test_has_perm(self):
453 perm = Permission.query().all()
454 for p in perm:
455 has_p = UserModel().has_perm(self.u1, p)
456 self.assertEqual(False, has_p)
457
458 def test_revoke_perm(self):
459 perm = Permission.query().all()[0]
460 UserModel().grant_perm(self.u1, perm)
461 Session.commit()
462 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
463
464 #revoke
465 UserModel().revoke_perm(self.u1, perm)
466 Session.commit()
467 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
468
188
469
189
470 class TestPermissions(unittest.TestCase):
471 def __init__(self, methodName='runTest'):
472 super(TestPermissions, self).__init__(methodName=methodName)
473
474 def setUp(self):
475 self.u1 = UserModel().create_or_update(
476 username=u'u1', password=u'qweqwe',
477 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
478 )
479 self.u2 = UserModel().create_or_update(
480 username=u'u2', password=u'qweqwe',
481 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
482 )
483 self.anon = User.get_by_username('default')
484 self.a1 = UserModel().create_or_update(
485 username=u'a1', password=u'qweqwe',
486 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
487 )
488 Session.commit()
489
490 def tearDown(self):
491 if hasattr(self, 'test_repo'):
492 RepoModel().delete(repo=self.test_repo)
493 UserModel().delete(self.u1)
494 UserModel().delete(self.u2)
495 UserModel().delete(self.a1)
496 if hasattr(self, 'g1'):
497 ReposGroupModel().delete(self.g1.group_id)
498 if hasattr(self, 'g2'):
499 ReposGroupModel().delete(self.g2.group_id)
500
501 if hasattr(self, 'ug1'):
502 UsersGroupModel().delete(self.ug1, force=True)
503
504 Session.commit()
505
506 def test_default_perms_set(self):
507 u1_auth = AuthUser(user_id=self.u1.user_id)
508 perms = {
509 'repositories_groups': {},
510 'global': set([u'hg.create.repository', u'repository.read',
511 u'hg.register.manual_activate']),
512 'repositories': {u'vcs_test_hg': u'repository.read'}
513 }
514 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
515 perms['repositories'][HG_REPO])
516 new_perm = 'repository.write'
517 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
518 Session.commit()
519
520 u1_auth = AuthUser(user_id=self.u1.user_id)
521 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm)
522
523 def test_default_admin_perms_set(self):
524 a1_auth = AuthUser(user_id=self.a1.user_id)
525 perms = {
526 'repositories_groups': {},
527 'global': set([u'hg.admin']),
528 'repositories': {u'vcs_test_hg': u'repository.admin'}
529 }
530 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
531 perms['repositories'][HG_REPO])
532 new_perm = 'repository.write'
533 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm)
534 Session.commit()
535 # cannot really downgrade admins permissions !? they still get's set as
536 # admin !
537 u1_auth = AuthUser(user_id=self.a1.user_id)
538 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
539 perms['repositories'][HG_REPO])
540
541 def test_default_group_perms(self):
542 self.g1 = _make_group('test1', skip_if_exists=True)
543 self.g2 = _make_group('test2', skip_if_exists=True)
544 u1_auth = AuthUser(user_id=self.u1.user_id)
545 perms = {
546 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
547 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
548 'repositories': {u'vcs_test_hg': u'repository.read'}
549 }
550 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
551 perms['repositories'][HG_REPO])
552 self.assertEqual(u1_auth.permissions['repositories_groups'],
553 perms['repositories_groups'])
554
555 def test_default_admin_group_perms(self):
556 self.g1 = _make_group('test1', skip_if_exists=True)
557 self.g2 = _make_group('test2', skip_if_exists=True)
558 a1_auth = AuthUser(user_id=self.a1.user_id)
559 perms = {
560 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
561 'global': set(['hg.admin']),
562 'repositories': {u'vcs_test_hg': 'repository.admin'}
563 }
564
565 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
566 perms['repositories'][HG_REPO])
567 self.assertEqual(a1_auth.permissions['repositories_groups'],
568 perms['repositories_groups'])
569
570 def test_propagated_permission_from_users_group(self):
571 # make group
572 self.ug1 = UsersGroupModel().create('G1')
573 # add user to group
574 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
575
576 # set permission to lower
577 new_perm = 'repository.none'
578 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
579 Session.commit()
580 u1_auth = AuthUser(user_id=self.u1.user_id)
581 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
582 new_perm)
583
584 # grant perm for group this should override permission from user
585 new_perm = 'repository.write'
586 RepoModel().grant_users_group_permission(repo=HG_REPO,
587 group_name=self.ug1,
588 perm=new_perm)
589 # check perms
590 u1_auth = AuthUser(user_id=self.u1.user_id)
591 perms = {
592 'repositories_groups': {},
593 'global': set([u'hg.create.repository', u'repository.read',
594 u'hg.register.manual_activate']),
595 'repositories': {u'vcs_test_hg': u'repository.read'}
596 }
597 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
598 new_perm)
599 self.assertEqual(u1_auth.permissions['repositories_groups'],
600 perms['repositories_groups'])
601
602 def test_propagated_permission_from_users_group_lower_weight(self):
603 # make group
604 self.ug1 = UsersGroupModel().create('G1')
605 # add user to group
606 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
607
608 # set permission to lower
609 new_perm_h = 'repository.write'
610 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
611 perm=new_perm_h)
612 Session.commit()
613 u1_auth = AuthUser(user_id=self.u1.user_id)
614 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
615 new_perm_h)
616
617 # grant perm for group this should NOT override permission from user
618 # since it's lower than granted
619 new_perm_l = 'repository.read'
620 RepoModel().grant_users_group_permission(repo=HG_REPO,
621 group_name=self.ug1,
622 perm=new_perm_l)
623 # check perms
624 u1_auth = AuthUser(user_id=self.u1.user_id)
625 perms = {
626 'repositories_groups': {},
627 'global': set([u'hg.create.repository', u'repository.read',
628 u'hg.register.manual_activate']),
629 'repositories': {u'vcs_test_hg': u'repository.write'}
630 }
631 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
632 new_perm_h)
633 self.assertEqual(u1_auth.permissions['repositories_groups'],
634 perms['repositories_groups'])
635
636 def test_repo_in_group_permissions(self):
637 self.g1 = _make_group('group1', skip_if_exists=True)
638 self.g2 = _make_group('group2', skip_if_exists=True)
639 Session.commit()
640 # both perms should be read !
641 u1_auth = AuthUser(user_id=self.u1.user_id)
642 self.assertEqual(u1_auth.permissions['repositories_groups'],
643 {u'group1': u'group.read', u'group2': u'group.read'})
644
645 a1_auth = AuthUser(user_id=self.anon.user_id)
646 self.assertEqual(a1_auth.permissions['repositories_groups'],
647 {u'group1': u'group.read', u'group2': u'group.read'})
648
649 #Change perms to none for both groups
650 ReposGroupModel().grant_user_permission(repos_group=self.g1,
651 user=self.anon,
652 perm='group.none')
653 ReposGroupModel().grant_user_permission(repos_group=self.g2,
654 user=self.anon,
655 perm='group.none')
656
657 u1_auth = AuthUser(user_id=self.u1.user_id)
658 self.assertEqual(u1_auth.permissions['repositories_groups'],
659 {u'group1': u'group.none', u'group2': u'group.none'})
660
661 a1_auth = AuthUser(user_id=self.anon.user_id)
662 self.assertEqual(a1_auth.permissions['repositories_groups'],
663 {u'group1': u'group.none', u'group2': u'group.none'})
664
665 # add repo to group
666 form_data = {
667 'repo_name': HG_REPO,
668 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
669 'repo_type': 'hg',
670 'clone_uri': '',
671 'repo_group': self.g1.group_id,
672 'description': 'desc',
673 'private': False,
674 'landing_rev': 'tip'
675 }
676 self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
677 Session.commit()
678
679 u1_auth = AuthUser(user_id=self.u1.user_id)
680 self.assertEqual(u1_auth.permissions['repositories_groups'],
681 {u'group1': u'group.none', u'group2': u'group.none'})
682
683 a1_auth = AuthUser(user_id=self.anon.user_id)
684 self.assertEqual(a1_auth.permissions['repositories_groups'],
685 {u'group1': u'group.none', u'group2': u'group.none'})
686
687 #grant permission for u2 !
688 ReposGroupModel().grant_user_permission(repos_group=self.g1,
689 user=self.u2,
690 perm='group.read')
691 ReposGroupModel().grant_user_permission(repos_group=self.g2,
692 user=self.u2,
693 perm='group.read')
694 Session.commit()
695 self.assertNotEqual(self.u1, self.u2)
696 #u1 and anon should have not change perms while u2 should !
697 u1_auth = AuthUser(user_id=self.u1.user_id)
698 self.assertEqual(u1_auth.permissions['repositories_groups'],
699 {u'group1': u'group.none', u'group2': u'group.none'})
700
701 u2_auth = AuthUser(user_id=self.u2.user_id)
702 self.assertEqual(u2_auth.permissions['repositories_groups'],
703 {u'group1': u'group.read', u'group2': u'group.read'})
704
705 a1_auth = AuthUser(user_id=self.anon.user_id)
706 self.assertEqual(a1_auth.permissions['repositories_groups'],
707 {u'group1': u'group.none', u'group2': u'group.none'})
708
709 def test_repo_group_user_as_user_group_member(self):
710 # create Group1
711 self.g1 = _make_group('group1', skip_if_exists=True)
712 Session.commit()
713 a1_auth = AuthUser(user_id=self.anon.user_id)
714
715 self.assertEqual(a1_auth.permissions['repositories_groups'],
716 {u'group1': u'group.read'})
717
718 # set default permission to none
719 ReposGroupModel().grant_user_permission(repos_group=self.g1,
720 user=self.anon,
721 perm='group.none')
722 # make group
723 self.ug1 = UsersGroupModel().create('G1')
724 # add user to group
725 UsersGroupModel().add_user_to_group(self.ug1, self.u1)
726 Session.commit()
727
728 # check if user is in the group
729 membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
730 self.assertEqual(membrs, [self.u1.user_id])
731 # add some user to that group
732
733 # check his permissions
734 a1_auth = AuthUser(user_id=self.anon.user_id)
735 self.assertEqual(a1_auth.permissions['repositories_groups'],
736 {u'group1': u'group.none'})
737
738 u1_auth = AuthUser(user_id=self.u1.user_id)
739 self.assertEqual(u1_auth.permissions['repositories_groups'],
740 {u'group1': u'group.none'})
741
742 # grant ug1 read permissions for
743 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
744 group_name=self.ug1,
745 perm='group.read')
746 Session.commit()
747 # check if the
748 obj = Session.query(UsersGroupRepoGroupToPerm)\
749 .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
750 .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
751 .scalar()
752 self.assertEqual(obj.permission.permission_name, 'group.read')
753
754 a1_auth = AuthUser(user_id=self.anon.user_id)
755
756 self.assertEqual(a1_auth.permissions['repositories_groups'],
757 {u'group1': u'group.none'})
758
759 u1_auth = AuthUser(user_id=self.u1.user_id)
760 self.assertEqual(u1_auth.permissions['repositories_groups'],
761 {u'group1': u'group.read'})
1 NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch
@@ -1,211 +1,213
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_hg_operations
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations
6 Test suite for making push/pull operations
7
7
8 :created_on: Dec 30, 2010
8 :created_on: Dec 30, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import shutil
28 import shutil
29 import logging
29 import logging
30 from os.path import join as jn
30 from os.path import join as jn
31 from os.path import dirname as dn
31 from os.path import dirname as dn
32
32
33 from tempfile import _RandomNameSequence
33 from tempfile import _RandomNameSequence
34 from subprocess import Popen, PIPE
34 from subprocess import Popen, PIPE
35
35
36 from paste.deploy import appconfig
36 from paste.deploy import appconfig
37 from pylons import config
37 from pylons import config
38 from sqlalchemy import engine_from_config
38 from sqlalchemy import engine_from_config
39
39
40 from rhodecode.lib.utils import add_cache
40 from rhodecode.lib.utils import add_cache
41 from rhodecode.model import init_model
41 from rhodecode.model import init_model
42 from rhodecode.model import meta
42 from rhodecode.model import meta
43 from rhodecode.model.db import User, Repository
43 from rhodecode.model.db import User, Repository
44 from rhodecode.lib.auth import get_crypt_password
44 from rhodecode.lib.auth import get_crypt_password
45
45
46 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
46 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
47 from rhodecode.config.environment import load_environment
47 from rhodecode.config.environment import load_environment
48
48
49 rel_path = dn(dn(dn(os.path.abspath(__file__))))
49 rel_path = dn(dn(dn(os.path.abspath(__file__))))
50 conf = appconfig('config:development.ini', relative_to=rel_path)
50 conf = appconfig('config:development.ini', relative_to=rel_path)
51 load_environment(conf.global_conf, conf.local_conf)
51 load_environment(conf.global_conf, conf.local_conf)
52
52
53 add_cache(conf)
53 add_cache(conf)
54
54
55 USER = 'test_admin'
55 USER = 'test_admin'
56 PASS = 'test12'
56 PASS = 'test12'
57 HOST = 'hg.local'
57 HOST = 'hg.local'
58 METHOD = 'pull'
58 METHOD = 'pull'
59 DEBUG = True
59 DEBUG = True
60 log = logging.getLogger(__name__)
60 log = logging.getLogger(__name__)
61
61
62
62
63 class Command(object):
63 class Command(object):
64
64
65 def __init__(self, cwd):
65 def __init__(self, cwd):
66 self.cwd = cwd
66 self.cwd = cwd
67
67
68 def execute(self, cmd, *args):
68 def execute(self, cmd, *args):
69 """Runs command on the system with given ``args``.
69 """Runs command on the system with given ``args``.
70 """
70 """
71
71
72 command = cmd + ' ' + ' '.join(args)
72 command = cmd + ' ' + ' '.join(args)
73 log.debug('Executing %s' % command)
73 log.debug('Executing %s' % command)
74 if DEBUG:
74 if DEBUG:
75 print command
75 print command
76 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
76 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
77 stdout, stderr = p.communicate()
77 stdout, stderr = p.communicate()
78 if DEBUG:
78 if DEBUG:
79 print stdout, stderr
79 print stdout, stderr
80 return stdout, stderr
80 return stdout, stderr
81
81
82
82 def get_session():
83 def get_session():
83 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
84 init_model(engine)
85 init_model(engine)
85 sa = meta.Session
86 sa = meta.Session
86 return sa
87 return sa
87
88
88
89
89 def create_test_user(force=True):
90 def create_test_user(force=True):
90 print 'creating test user'
91 print 'creating test user'
91 sa = get_session()
92 sa = get_session()
92
93
93 user = sa.query(User).filter(User.username == USER).scalar()
94 user = sa.query(User).filter(User.username == USER).scalar()
94
95
95 if force and user is not None:
96 if force and user is not None:
96 print 'removing current user'
97 print 'removing current user'
97 for repo in sa.query(Repository).filter(Repository.user == user).all():
98 for repo in sa.query(Repository).filter(Repository.user == user).all():
98 sa.delete(repo)
99 sa.delete(repo)
99 sa.delete(user)
100 sa.delete(user)
100 sa.commit()
101 sa.commit()
101
102
102 if user is None or force:
103 if user is None or force:
103 print 'creating new one'
104 print 'creating new one'
104 new_usr = User()
105 new_usr = User()
105 new_usr.username = USER
106 new_usr.username = USER
106 new_usr.password = get_crypt_password(PASS)
107 new_usr.password = get_crypt_password(PASS)
107 new_usr.email = 'mail@mail.com'
108 new_usr.email = 'mail@mail.com'
108 new_usr.name = 'test'
109 new_usr.name = 'test'
109 new_usr.lastname = 'lasttestname'
110 new_usr.lastname = 'lasttestname'
110 new_usr.active = True
111 new_usr.active = True
111 new_usr.admin = True
112 new_usr.admin = True
112 sa.add(new_usr)
113 sa.add(new_usr)
113 sa.commit()
114 sa.commit()
114
115
115 print 'done'
116 print 'done'
116
117
117
118
118 def create_test_repo(force=True):
119 def create_test_repo(force=True):
119 print 'creating test repo'
120 print 'creating test repo'
120 from rhodecode.model.repo import RepoModel
121 from rhodecode.model.repo import RepoModel
121 sa = get_session()
122 sa = get_session()
122
123
123 user = sa.query(User).filter(User.username == USER).scalar()
124 user = sa.query(User).filter(User.username == USER).scalar()
124 if user is None:
125 if user is None:
125 raise Exception('user not found')
126 raise Exception('user not found')
126
127
127
128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129
129
130 if repo is None:
130 if repo is None:
131 print 'repo not found creating'
131 print 'repo not found creating'
132
132
133 form_data = {'repo_name':HG_REPO,
133 form_data = {'repo_name':HG_REPO,
134 'repo_type':'hg',
134 'repo_type':'hg',
135 'private':False,
135 'private':False,
136 'clone_uri':'' }
136 'clone_uri':'' }
137 rm = RepoModel(sa)
137 rm = RepoModel(sa)
138 rm.base_path = '/home/hg'
138 rm.base_path = '/home/hg'
139 rm.create(form_data, user)
139 rm.create(form_data, user)
140
140
141 print 'done'
141 print 'done'
142
142
143
143 def set_anonymous_access(enable=True):
144 def set_anonymous_access(enable=True):
144 sa = get_session()
145 sa = get_session()
145 user = sa.query(User).filter(User.username == 'default').one()
146 user = sa.query(User).filter(User.username == 'default').one()
146 user.active = enable
147 user.active = enable
147 sa.add(user)
148 sa.add(user)
148 sa.commit()
149 sa.commit()
149
150
151
150 def get_anonymous_access():
152 def get_anonymous_access():
151 sa = get_session()
153 sa = get_session()
152 return sa.query(User).filter(User.username == 'default').one().active
154 return sa.query(User).filter(User.username == 'default').one().active
153
155
154
156
155 #==============================================================================
157 #==============================================================================
156 # TESTS
158 # TESTS
157 #==============================================================================
159 #==============================================================================
158 def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
160 def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
159 seq=None):
161 seq=None):
160 cwd = path = jn(TESTS_TMP_PATH, repo)
162 cwd = path = jn(TESTS_TMP_PATH, repo)
161
163
162 if seq == None:
164 if seq == None:
163 seq = _RandomNameSequence().next()
165 seq = _RandomNameSequence().next()
164
166
165 try:
167 try:
166 shutil.rmtree(path, ignore_errors=True)
168 shutil.rmtree(path, ignore_errors=True)
167 os.makedirs(path)
169 os.makedirs(path)
168 #print 'made dirs %s' % jn(path)
170 #print 'made dirs %s' % jn(path)
169 except OSError:
171 except OSError:
170 raise
172 raise
171
173
172 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
174 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
173 {'user':USER,
175 {'user':USER,
174 'pass':PASS,
176 'pass':PASS,
175 'host':HOST,
177 'host':HOST,
176 'cloned_repo':repo, }
178 'cloned_repo':repo, }
177
179
178 dest = path + seq
180 dest = path + seq
179 if method == 'pull':
181 if method == 'pull':
180 stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
182 stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
181 else:
183 else:
182 stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
184 stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
183
185
184 if no_errors is False:
186 if no_errors is False:
185 assert """adding file changes""" in stdout, 'no messages about cloning'
187 assert """adding file changes""" in stdout, 'no messages about cloning'
186 assert """abort""" not in stderr , 'got error from clone'
188 assert """abort""" not in stderr , 'got error from clone'
187
189
188 if __name__ == '__main__':
190 if __name__ == '__main__':
189 try:
191 try:
190 create_test_user(force=False)
192 create_test_user(force=False)
191 seq = None
193 seq = None
192 import time
194 import time
193
195
194 try:
196 try:
195 METHOD = sys.argv[3]
197 METHOD = sys.argv[3]
196 except:
198 except:
197 pass
199 pass
198
200
199 if METHOD == 'pull':
201 if METHOD == 'pull':
200 seq = _RandomNameSequence().next()
202 seq = _RandomNameSequence().next()
201 test_clone_with_credentials(repo=sys.argv[1], method='clone',
203 test_clone_with_credentials(repo=sys.argv[1], method='clone',
202 seq=seq)
204 seq=seq)
203 s = time.time()
205 s = time.time()
204 for i in range(1, int(sys.argv[2]) + 1):
206 for i in range(1, int(sys.argv[2]) + 1):
205 print 'take', i
207 print 'take', i
206 test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
208 test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
207 seq=seq)
209 seq=seq)
208 print 'time taken %.3f' % (time.time() - s)
210 print 'time taken %.3f' % (time.time() - s)
209 except Exception, e:
211 except Exception, e:
210 raise
212 raise
211 sys.exit('stop on %s' % e)
213 sys.exit('stop on %s' % e)
1 NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py
@@ -1,401 +1,410
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.tests.test_hg_operations
3 rhodecode.tests.test_hg_operations
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Test suite for making push/pull operations
6 Test suite for making push/pull operations
7
7
8 :created_on: Dec 30, 2010
8 :created_on: Dec 30, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import time
27 import time
28 import sys
28 import sys
29 import shutil
29 import shutil
30 import logging
30 import logging
31
31
32 from os.path import join as jn
32 from os.path import join as jn
33 from os.path import dirname as dn
33 from os.path import dirname as dn
34
34
35 from tempfile import _RandomNameSequence
35 from tempfile import _RandomNameSequence
36 from subprocess import Popen, PIPE
36 from subprocess import Popen, PIPE
37
37
38 from paste.deploy import appconfig
38 from paste.deploy import appconfig
39 from pylons import config
39 from pylons import config
40 from sqlalchemy import engine_from_config
40 from sqlalchemy import engine_from_config
41
41
42 from rhodecode.lib.utils import add_cache
42 from rhodecode.lib.utils import add_cache
43 from rhodecode.model import init_model
43 from rhodecode.model import init_model
44 from rhodecode.model import meta
44 from rhodecode.model import meta
45 from rhodecode.model.db import User, Repository, UserLog
45 from rhodecode.model.db import User, Repository, UserLog
46 from rhodecode.lib.auth import get_crypt_password
46 from rhodecode.lib.auth import get_crypt_password
47
47
48 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
48 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
49 from rhodecode.config.environment import load_environment
49 from rhodecode.config.environment import load_environment
50
50
51 rel_path = dn(dn(dn(os.path.abspath(__file__))))
51 rel_path = dn(dn(dn(os.path.abspath(__file__))))
52
52
53 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
53 conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
54 load_environment(conf.global_conf, conf.local_conf)
54 load_environment(conf.global_conf, conf.local_conf)
55
55
56 add_cache(conf)
56 add_cache(conf)
57
57
58 USER = 'test_admin'
58 USER = 'test_admin'
59 PASS = 'test12'
59 PASS = 'test12'
60 HOST = '127.0.0.1:5000'
60 HOST = '127.0.0.1:5000'
61 DEBUG = False
61 DEBUG = False
62 print 'DEBUG:', DEBUG
62 print 'DEBUG:', DEBUG
63 log = logging.getLogger(__name__)
63 log = logging.getLogger(__name__)
64
64
65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
65 engine = engine_from_config(conf, 'sqlalchemy.db1.')
66 init_model(engine)
66 init_model(engine)
67 sa = meta.Session
67 sa = meta.Session()
68
68
69
69 class Command(object):
70 class Command(object):
70
71
71 def __init__(self, cwd):
72 def __init__(self, cwd):
72 self.cwd = cwd
73 self.cwd = cwd
73
74
74 def execute(self, cmd, *args):
75 def execute(self, cmd, *args):
75 """Runs command on the system with given ``args``.
76 """Runs command on the system with given ``args``.
76 """
77 """
77
78
78 command = cmd + ' ' + ' '.join(args)
79 command = cmd + ' ' + ' '.join(args)
79 log.debug('Executing %s' % command)
80 log.debug('Executing %s' % command)
80 if DEBUG:
81 if DEBUG:
81 print command
82 print command
82 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
83 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
83 stdout, stderr = p.communicate()
84 stdout, stderr = p.communicate()
84 if DEBUG:
85 if DEBUG:
85 print stdout, stderr
86 print stdout, stderr
86 return stdout, stderr
87 return stdout, stderr
87
88
88
89
89 def test_wrapp(func):
90 def test_wrapp(func):
90
91
91 def __wrapp(*args, **kwargs):
92 def __wrapp(*args, **kwargs):
92 print '>>>%s' % func.__name__
93 print '>>>%s' % func.__name__
93 try:
94 try:
94 res = func(*args, **kwargs)
95 res = func(*args, **kwargs)
95 except Exception, e:
96 except Exception, e:
96 print ('###############\n-'
97 print ('###############\n-'
97 '--%s failed %s--\n'
98 '--%s failed %s--\n'
98 '###############\n' % (func.__name__, e))
99 '###############\n' % (func.__name__, e))
99 sys.exit()
100 sys.exit()
100 print '++OK++'
101 print '++OK++'
101 return res
102 return res
102 return __wrapp
103 return __wrapp
103
104
104
105
105 def create_test_user(force=True):
106 def create_test_user(force=True):
106 print '\tcreating test user'
107 print '\tcreating test user'
107
108
108 user = User.get_by_username(USER)
109 user = User.get_by_username(USER)
109
110
110 if force and user is not None:
111 if force and user is not None:
111 print '\tremoving current user'
112 print '\tremoving current user'
112 for repo in Repository.query().filter(Repository.user == user).all():
113 for repo in Repository.query().filter(Repository.user == user).all():
113 sa.delete(repo)
114 sa.delete(repo)
114 sa.delete(user)
115 sa.delete(user)
115 sa.commit()
116 sa.commit()
116
117
117 if user is None or force:
118 if user is None or force:
118 print '\tcreating new one'
119 print '\tcreating new one'
119 new_usr = User()
120 new_usr = User()
120 new_usr.username = USER
121 new_usr.username = USER
121 new_usr.password = get_crypt_password(PASS)
122 new_usr.password = get_crypt_password(PASS)
122 new_usr.email = 'mail@mail.com'
123 new_usr.email = 'mail@mail.com'
123 new_usr.name = 'test'
124 new_usr.name = 'test'
124 new_usr.lastname = 'lasttestname'
125 new_usr.lastname = 'lasttestname'
125 new_usr.active = True
126 new_usr.active = True
126 new_usr.admin = True
127 new_usr.admin = True
127 sa.add(new_usr)
128 sa.add(new_usr)
128 sa.commit()
129 sa.commit()
129
130
130 print '\tdone'
131 print '\tdone'
131
132
132
133
133 def create_test_repo(force=True):
134 def create_test_repo(force=True):
134 from rhodecode.model.repo import RepoModel
135 from rhodecode.model.repo import RepoModel
135
136
136 user = User.get_by_username(USER)
137 user = User.get_by_username(USER)
137 if user is None:
138 if user is None:
138 raise Exception('user not found')
139 raise Exception('user not found')
139
140
140
141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
141 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
142
142
143 if repo is None:
143 if repo is None:
144 print '\trepo not found creating'
144 print '\trepo not found creating'
145
145
146 form_data = {'repo_name':HG_REPO,
146 form_data = {'repo_name':HG_REPO,
147 'repo_type':'hg',
147 'repo_type':'hg',
148 'private':False,
148 'private':False,
149 'clone_uri':'' }
149 'clone_uri':'' }
150 rm = RepoModel(sa)
150 rm = RepoModel(sa)
151 rm.base_path = '/home/hg'
151 rm.base_path = '/home/hg'
152 rm.create(form_data, user)
152 rm.create(form_data, user)
153
153
154
154
155 def set_anonymous_access(enable=True):
155 def set_anonymous_access(enable=True):
156 user = User.get_by_username('default')
156 user = User.get_by_username('default')
157 user.active = enable
157 user.active = enable
158 sa.add(user)
158 sa.add(user)
159 sa.commit()
159 sa.commit()
160 print '\tanonymous access is now:', enable
160 print '\tanonymous access is now:', enable
161 if enable != User.get_by_username('default').active:
161 if enable != User.get_by_username('default').active:
162 raise Exception('Cannot set anonymous access')
162 raise Exception('Cannot set anonymous access')
163
163
164
164 def get_anonymous_access():
165 def get_anonymous_access():
165 user = User.get_by_username('default')
166 user = User.get_by_username('default')
166 return user.active
167 return user.active
167
168
168
169
169 #==============================================================================
170 #==============================================================================
170 # TESTS
171 # TESTS
171 #==============================================================================
172 #==============================================================================
172 @test_wrapp
173 @test_wrapp
173 def test_clone_with_credentials(no_errors=False):
174 def test_clone_with_credentials(no_errors=False):
174 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
175 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
175
176
176 try:
177 try:
177 shutil.rmtree(path, ignore_errors=True)
178 shutil.rmtree(path, ignore_errors=True)
178 os.makedirs(path)
179 os.makedirs(path)
179 #print 'made dirs %s' % jn(path)
180 #print 'made dirs %s' % jn(path)
180 except OSError:
181 except OSError:
181 raise
182 raise
182
183
183 print '\tchecking if anonymous access is enabled'
184 print '\tchecking if anonymous access is enabled'
184 anonymous_access = get_anonymous_access()
185 anonymous_access = get_anonymous_access()
185 if anonymous_access:
186 if anonymous_access:
186 print '\tenabled, disabling it '
187 print '\tenabled, disabling it '
187 set_anonymous_access(enable=False)
188 set_anonymous_access(enable=False)
188
189
189 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
190 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
190 {'user':USER,
191 {'user':USER,
191 'pass':PASS,
192 'pass':PASS,
192 'host':HOST,
193 'host':HOST,
193 'cloned_repo':HG_REPO,
194 'cloned_repo':HG_REPO,
194 'dest':path}
195 'dest':path}
195
196
196 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
197 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
197
198
198 if no_errors is False:
199 if no_errors is False:
199 assert """adding file changes""" in stdout, 'no messages about cloning'
200 assert """adding file changes""" in stdout, 'no messages about cloning'
200 assert """abort""" not in stderr , 'got error from clone'
201 assert """abort""" not in stderr , 'got error from clone'
201
202
202
203
203 @test_wrapp
204 @test_wrapp
204 def test_clone_anonymous():
205 def test_clone_anonymous():
205 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
206 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
206
207
207 try:
208 try:
208 shutil.rmtree(path, ignore_errors=True)
209 shutil.rmtree(path, ignore_errors=True)
209 os.makedirs(path)
210 os.makedirs(path)
210 #print 'made dirs %s' % jn(path)
211 #print 'made dirs %s' % jn(path)
211 except OSError:
212 except OSError:
212 raise
213 raise
213
214
214
215
215 print '\tchecking if anonymous access is enabled'
216 print '\tchecking if anonymous access is enabled'
216 anonymous_access = get_anonymous_access()
217 anonymous_access = get_anonymous_access()
217 if not anonymous_access:
218 if not anonymous_access:
218 print '\tnot enabled, enabling it '
219 print '\tnot enabled, enabling it '
219 set_anonymous_access(enable=True)
220 set_anonymous_access(enable=True)
220
221
221 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
222 clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
222 {'user':USER,
223 {'user':USER,
223 'pass':PASS,
224 'pass':PASS,
224 'host':HOST,
225 'host':HOST,
225 'cloned_repo':HG_REPO,
226 'cloned_repo':HG_REPO,
226 'dest':path}
227 'dest':path}
227
228
228 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
229 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
229
230
230 assert """adding file changes""" in stdout, 'no messages about cloning'
231 assert """adding file changes""" in stdout, 'no messages about cloning'
231 assert """abort""" not in stderr , 'got error from clone'
232 assert """abort""" not in stderr , 'got error from clone'
232
233
233 #disable if it was enabled
234 #disable if it was enabled
234 if not anonymous_access:
235 if not anonymous_access:
235 print '\tdisabling anonymous access'
236 print '\tdisabling anonymous access'
236 set_anonymous_access(enable=False)
237 set_anonymous_access(enable=False)
237
238
239
238 @test_wrapp
240 @test_wrapp
239 def test_clone_wrong_credentials():
241 def test_clone_wrong_credentials():
240 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
242 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
241
243
242 try:
244 try:
243 shutil.rmtree(path, ignore_errors=True)
245 shutil.rmtree(path, ignore_errors=True)
244 os.makedirs(path)
246 os.makedirs(path)
245 #print 'made dirs %s' % jn(path)
247 #print 'made dirs %s' % jn(path)
246 except OSError:
248 except OSError:
247 raise
249 raise
248
250
249 print '\tchecking if anonymous access is enabled'
251 print '\tchecking if anonymous access is enabled'
250 anonymous_access = get_anonymous_access()
252 anonymous_access = get_anonymous_access()
251 if anonymous_access:
253 if anonymous_access:
252 print '\tenabled, disabling it '
254 print '\tenabled, disabling it '
253 set_anonymous_access(enable=False)
255 set_anonymous_access(enable=False)
254
256
255 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
257 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
256 {'user':USER + 'error',
258 {'user':USER + 'error',
257 'pass':PASS,
259 'pass':PASS,
258 'host':HOST,
260 'host':HOST,
259 'cloned_repo':HG_REPO,
261 'cloned_repo':HG_REPO,
260 'dest':path}
262 'dest':path}
261
263
262 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
264 stdout, stderr = Command(cwd).execute('hg clone', clone_url)
263
265
264 if not """abort: authorization failed""" in stderr:
266 if not """abort: authorization failed""" in stderr:
265 raise Exception('Failure')
267 raise Exception('Failure')
266
268
269
267 @test_wrapp
270 @test_wrapp
268 def test_pull():
271 def test_pull():
269 pass
272 pass
270
273
274
271 @test_wrapp
275 @test_wrapp
272 def test_push_modify_file(f_name='setup.py'):
276 def test_push_modify_file(f_name='setup.py'):
273 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
277 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
274 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
278 modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
275 for i in xrange(5):
279 for i in xrange(5):
276 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
280 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
277 Command(cwd).execute(cmd)
281 Command(cwd).execute(cmd)
278
282
279 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
283 cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
280 Command(cwd).execute(cmd)
284 Command(cwd).execute(cmd)
281
285
282 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
286 Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
283
287
288
284 @test_wrapp
289 @test_wrapp
285 def test_push_new_file(commits=15, with_clone=True):
290 def test_push_new_file(commits=15, with_clone=True):
286
291
287 if with_clone:
292 if with_clone:
288 test_clone_with_credentials(no_errors=True)
293 test_clone_with_credentials(no_errors=True)
289
294
290 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
295 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
291 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
296 added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
292
297
293 Command(cwd).execute('touch %s' % added_file)
298 Command(cwd).execute('touch %s' % added_file)
294
299
295 Command(cwd).execute('hg add %s' % added_file)
300 Command(cwd).execute('hg add %s' % added_file)
296
301
297 for i in xrange(commits):
302 for i in xrange(commits):
298 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
303 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
299 Command(cwd).execute(cmd)
304 Command(cwd).execute(cmd)
300
305
301 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
306 cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
302 'Marcin KuΕΊminski <marcin@python-blog.com>',
307 'Marcin KuΕΊminski <marcin@python-blog.com>',
303 added_file)
308 added_file)
304 Command(cwd).execute(cmd)
309 Command(cwd).execute(cmd)
305
310
306 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
311 push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
307 {'user':USER,
312 {'user':USER,
308 'pass':PASS,
313 'pass':PASS,
309 'host':HOST,
314 'host':HOST,
310 'cloned_repo':HG_REPO,
315 'cloned_repo':HG_REPO,
311 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
316 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
312
317
313 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
318 Command(cwd).execute('hg push --verbose --debug %s' % push_url)
314
319
320
315 @test_wrapp
321 @test_wrapp
316 def test_push_wrong_credentials():
322 def test_push_wrong_credentials():
317 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
323 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
318 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
324 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
319 {'user':USER + 'xxx',
325 {'user':USER + 'xxx',
320 'pass':PASS,
326 'pass':PASS,
321 'host':HOST,
327 'host':HOST,
322 'cloned_repo':HG_REPO,
328 'cloned_repo':HG_REPO,
323 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
329 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
324
330
325 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
331 modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
326 for i in xrange(5):
332 for i in xrange(5):
327 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
333 cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
328 Command(cwd).execute(cmd)
334 Command(cwd).execute(cmd)
329
335
330 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
336 cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
331 Command(cwd).execute(cmd)
337 Command(cwd).execute(cmd)
332
338
333 Command(cwd).execute('hg push %s' % clone_url)
339 Command(cwd).execute('hg push %s' % clone_url)
334
340
341
335 @test_wrapp
342 @test_wrapp
336 def test_push_wrong_path():
343 def test_push_wrong_path():
337 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
344 cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
338 added_file = jn(path, 'somefile.py')
345 added_file = jn(path, 'somefile.py')
339
346
340 try:
347 try:
341 shutil.rmtree(path, ignore_errors=True)
348 shutil.rmtree(path, ignore_errors=True)
342 os.makedirs(path)
349 os.makedirs(path)
343 print '\tmade dirs %s' % jn(path)
350 print '\tmade dirs %s' % jn(path)
344 except OSError:
351 except OSError:
345 raise
352 raise
346
353
347 Command(cwd).execute("""echo '' > %s""" % added_file)
354 Command(cwd).execute("""echo '' > %s""" % added_file)
348 Command(cwd).execute("""hg init %s""" % path)
355 Command(cwd).execute("""hg init %s""" % path)
349 Command(cwd).execute("""hg add %s""" % added_file)
356 Command(cwd).execute("""hg add %s""" % added_file)
350
357
351 for i in xrange(2):
358 for i in xrange(2):
352 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
359 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
353 Command(cwd).execute(cmd)
360 Command(cwd).execute(cmd)
354
361
355 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
362 cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
356 Command(cwd).execute(cmd)
363 Command(cwd).execute(cmd)
357
364
358 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
365 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
359 {'user':USER,
366 {'user':USER,
360 'pass':PASS,
367 'pass':PASS,
361 'host':HOST,
368 'host':HOST,
362 'cloned_repo':HG_REPO + '_error',
369 'cloned_repo':HG_REPO + '_error',
363 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
370 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
364
371
365 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
372 stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
366 if not """abort: HTTP Error 403: Forbidden""" in stderr:
373 if not """abort: HTTP Error 403: Forbidden""" in stderr:
367 raise Exception('Failure')
374 raise Exception('Failure')
368
375
376
369 @test_wrapp
377 @test_wrapp
370 def get_logs():
378 def get_logs():
371 return UserLog.query().all()
379 return UserLog.query().all()
372
380
381
373 @test_wrapp
382 @test_wrapp
374 def test_logs(initial):
383 def test_logs(initial):
375 logs = UserLog.query().all()
384 logs = UserLog.query().all()
376 operations = 4
385 operations = 4
377 if len(initial) + operations != len(logs):
386 if len(initial) + operations != len(logs):
378 raise Exception("missing number of logs initial:%s vs current:%s" % \
387 raise Exception("missing number of logs initial:%s vs current:%s" % \
379 (len(initial), len(logs)))
388 (len(initial), len(logs)))
380
389
381
390
382 if __name__ == '__main__':
391 if __name__ == '__main__':
383 create_test_user(force=False)
392 create_test_user(force=False)
384 create_test_repo()
393 create_test_repo()
385
394
386 initial_logs = get_logs()
395 initial_logs = get_logs()
387 print 'initial activity logs: %s' % len(initial_logs)
396 print 'initial activity logs: %s' % len(initial_logs)
388 s = time.time()
397 s = time.time()
389 #test_push_modify_file()
398 #test_push_modify_file()
390 test_clone_with_credentials()
399 test_clone_with_credentials()
391 test_clone_wrong_credentials()
400 test_clone_wrong_credentials()
392
401
393 test_push_new_file(commits=2, with_clone=True)
402 test_push_new_file(commits=2, with_clone=True)
394
403
395 test_clone_anonymous()
404 test_clone_anonymous()
396 test_push_wrong_path()
405 test_push_wrong_path()
397
406
398 test_push_wrong_credentials()
407 test_push_wrong_credentials()
399
408
400 test_logs(initial_logs)
409 test_logs(initial_logs)
401 print 'finished ok in %.3f' % (time.time() - s)
410 print 'finished ok in %.3f' % (time.time() - s)
@@ -1,56 +1,56
1 """
1 """
2 Unit tests for vcs_ library.
2 Unit tests for vcs_ library.
3
3
4 In order to run tests we need to prepare our environment first. Tests would be
4 In order to run tests we need to prepare our environment first. Tests would be
5 run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from
5 run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from
6 ``vcs.backends.BACKENDS``.
6 ``vcs.backends.BACKENDS``.
7
7
8 For each SCM we run tests for, we need some repository. We would use
8 For each SCM we run tests for, we need some repository. We would use
9 repositories location from system environment variables or test suite defaults
9 repositories location from system environment variables or test suite defaults
10 - see ``conf`` module for more detail. We simply try to check if repository at
10 - see ``conf`` module for more detail. We simply try to check if repository at
11 certain location exists, if not we would try to fetch them. At ``test_vcs`` or
11 certain location exists, if not we would try to fetch them. At ``test_vcs`` or
12 ``test_common`` we run unit tests common for each repository type and for
12 ``test_common`` we run unit tests common for each repository type and for
13 example specific mercurial tests are located at ``test_hg`` module.
13 example specific mercurial tests are located at ``test_hg`` module.
14
14
15 Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
15 Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
16 function at ``tests/__init__.py``.
16 function at ``tests/__init__.py``.
17
17
18 .. _vcs: http://bitbucket.org/marcinkuzminski/vcs
18 .. _vcs: http://bitbucket.org/marcinkuzminski/vcs
19 .. _unittest: http://pypi.python.org/pypi/unittest
19 .. _unittest: http://pypi.python.org/pypi/unittest
20
20
21 """
21 """
22 import os
22 import os
23 from rhodecode.lib import vcs
23 from rhodecode.lib import vcs
24 from rhodecode.lib.vcs.utils.compat import unittest
24 from rhodecode.lib.vcs.utils.compat import unittest
25 from utils import VCSTestError, SCMFetcher
25 from rhodecode.tests import *
26 from rhodecode.tests import *
26 from utils import VCSTestError, SCMFetcher
27
27
28
28
29 def setup_package():
29 def setup_package():
30 """
30 """
31 Prepares whole package for tests which mainly means it would try to fetch
31 Prepares whole package for tests which mainly means it would try to fetch
32 test repositories or use already existing ones.
32 test repositories or use already existing ones.
33 """
33 """
34 fetchers = {
34 fetchers = {
35 'hg': {
35 'hg': {
36 'alias': 'hg',
36 'alias': 'hg',
37 'test_repo_path': TEST_HG_REPO,
37 'test_repo_path': TEST_HG_REPO,
38 'remote_repo': HG_REMOTE_REPO,
38 'remote_repo': HG_REMOTE_REPO,
39 'clone_cmd': 'hg clone',
39 'clone_cmd': 'hg clone',
40 },
40 },
41 'git': {
41 'git': {
42 'alias': 'git',
42 'alias': 'git',
43 'test_repo_path': TEST_GIT_REPO,
43 'test_repo_path': TEST_GIT_REPO,
44 'remote_repo': GIT_REMOTE_REPO,
44 'remote_repo': GIT_REMOTE_REPO,
45 'clone_cmd': 'git clone --bare',
45 'clone_cmd': 'git clone --bare',
46 },
46 },
47 }
47 }
48 try:
48 try:
49 for scm, fetcher_info in fetchers.items():
49 for scm, fetcher_info in fetchers.items():
50 fetcher = SCMFetcher(**fetcher_info)
50 fetcher = SCMFetcher(**fetcher_info)
51 fetcher.setup()
51 fetcher.setup()
52 except VCSTestError, err:
52 except VCSTestError, err:
53 raise RuntimeError(str(err))
53 raise RuntimeError(str(err))
54
54
55 #start_dir = os.path.abspath(os.path.dirname(__file__))
55 #start_dir = os.path.abspath(os.path.dirname(__file__))
56 #unittest.defaultTestLoader.discover(start_dir)
56 #unittest.defaultTestLoader.discover(start_dir)
General Comments 0
You need to be logged in to leave comments. Login now