Show More
1 | NO CONTENT: new file 100644 |
|
NO CONTENT: new file 100644 |
@@ -0,0 +1,316 | |||||
|
1 | import os | |||
|
2 | import unittest | |||
|
3 | from rhodecode.tests import * | |||
|
4 | ||||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |||
|
6 | from rhodecode.model.repo import RepoModel | |||
|
7 | from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm | |||
|
8 | from rhodecode.model.user import UserModel | |||
|
9 | ||||
|
10 | from rhodecode.model.meta import Session | |||
|
11 | from rhodecode.model.users_group import UsersGroupModel | |||
|
12 | from rhodecode.lib.auth import AuthUser | |||
|
13 | ||||
|
14 | ||||
|
15 | def _make_group(path, desc='desc', parent_id=None, | |||
|
16 | skip_if_exists=False): | |||
|
17 | ||||
|
18 | gr = RepoGroup.get_by_group_name(path) | |||
|
19 | if gr and skip_if_exists: | |||
|
20 | return gr | |||
|
21 | ||||
|
22 | gr = ReposGroupModel().create(path, desc, parent_id) | |||
|
23 | return gr | |||
|
24 | ||||
|
25 | ||||
|
26 | class TestPermissions(unittest.TestCase): | |||
|
27 | def __init__(self, methodName='runTest'): | |||
|
28 | super(TestPermissions, self).__init__(methodName=methodName) | |||
|
29 | ||||
|
30 | def setUp(self): | |||
|
31 | self.u1 = UserModel().create_or_update( | |||
|
32 | username=u'u1', password=u'qweqwe', | |||
|
33 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |||
|
34 | ) | |||
|
35 | self.u2 = UserModel().create_or_update( | |||
|
36 | username=u'u2', password=u'qweqwe', | |||
|
37 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |||
|
38 | ) | |||
|
39 | self.anon = User.get_by_username('default') | |||
|
40 | self.a1 = UserModel().create_or_update( | |||
|
41 | username=u'a1', password=u'qweqwe', | |||
|
42 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |||
|
43 | ) | |||
|
44 | Session().commit() | |||
|
45 | ||||
|
46 | def tearDown(self): | |||
|
47 | if hasattr(self, 'test_repo'): | |||
|
48 | RepoModel().delete(repo=self.test_repo) | |||
|
49 | UserModel().delete(self.u1) | |||
|
50 | UserModel().delete(self.u2) | |||
|
51 | UserModel().delete(self.a1) | |||
|
52 | if hasattr(self, 'g1'): | |||
|
53 | ReposGroupModel().delete(self.g1.group_id) | |||
|
54 | if hasattr(self, 'g2'): | |||
|
55 | ReposGroupModel().delete(self.g2.group_id) | |||
|
56 | ||||
|
57 | if hasattr(self, 'ug1'): | |||
|
58 | UsersGroupModel().delete(self.ug1, force=True) | |||
|
59 | ||||
|
60 | Session().commit() | |||
|
61 | ||||
|
62 | def test_default_perms_set(self): | |||
|
63 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
64 | perms = { | |||
|
65 | 'repositories_groups': {}, | |||
|
66 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
67 | u'hg.register.manual_activate']), | |||
|
68 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
69 | } | |||
|
70 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
71 | perms['repositories'][HG_REPO]) | |||
|
72 | new_perm = 'repository.write' | |||
|
73 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |||
|
74 | perm=new_perm) | |||
|
75 | Session().commit() | |||
|
76 | ||||
|
77 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
78 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
79 | new_perm) | |||
|
80 | ||||
|
81 | def test_default_admin_perms_set(self): | |||
|
82 | a1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
83 | perms = { | |||
|
84 | 'repositories_groups': {}, | |||
|
85 | 'global': set([u'hg.admin']), | |||
|
86 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |||
|
87 | } | |||
|
88 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |||
|
89 | perms['repositories'][HG_REPO]) | |||
|
90 | new_perm = 'repository.write' | |||
|
91 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, | |||
|
92 | perm=new_perm) | |||
|
93 | Session().commit() | |||
|
94 | # cannot really downgrade admins permissions !? they still get's set as | |||
|
95 | # admin ! | |||
|
96 | u1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
97 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
98 | perms['repositories'][HG_REPO]) | |||
|
99 | ||||
|
100 | def test_default_group_perms(self): | |||
|
101 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
102 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
103 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
104 | perms = { | |||
|
105 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |||
|
106 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |||
|
107 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
108 | } | |||
|
109 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
110 | perms['repositories'][HG_REPO]) | |||
|
111 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
112 | perms['repositories_groups']) | |||
|
113 | ||||
|
114 | def test_default_admin_group_perms(self): | |||
|
115 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
116 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
117 | a1_auth = AuthUser(user_id=self.a1.user_id) | |||
|
118 | perms = { | |||
|
119 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |||
|
120 | 'global': set(['hg.admin']), | |||
|
121 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |||
|
122 | } | |||
|
123 | ||||
|
124 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |||
|
125 | perms['repositories'][HG_REPO]) | |||
|
126 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
127 | perms['repositories_groups']) | |||
|
128 | ||||
|
129 | def test_propagated_permission_from_users_group(self): | |||
|
130 | # make group | |||
|
131 | self.ug1 = UsersGroupModel().create('G1') | |||
|
132 | # add user to group | |||
|
133 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
134 | ||||
|
135 | # set permission to lower | |||
|
136 | new_perm = 'repository.none' | |||
|
137 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |||
|
138 | Session().commit() | |||
|
139 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
140 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
141 | new_perm) | |||
|
142 | ||||
|
143 | # grant perm for group this should override permission from user | |||
|
144 | new_perm = 'repository.write' | |||
|
145 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |||
|
146 | group_name=self.ug1, | |||
|
147 | perm=new_perm) | |||
|
148 | # check perms | |||
|
149 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
150 | perms = { | |||
|
151 | 'repositories_groups': {}, | |||
|
152 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
153 | u'hg.register.manual_activate']), | |||
|
154 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |||
|
155 | } | |||
|
156 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
157 | new_perm) | |||
|
158 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
159 | perms['repositories_groups']) | |||
|
160 | ||||
|
161 | def test_propagated_permission_from_users_group_lower_weight(self): | |||
|
162 | # make group | |||
|
163 | self.ug1 = UsersGroupModel().create('G1') | |||
|
164 | # add user to group | |||
|
165 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
166 | ||||
|
167 | # set permission to lower | |||
|
168 | new_perm_h = 'repository.write' | |||
|
169 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |||
|
170 | perm=new_perm_h) | |||
|
171 | Session().commit() | |||
|
172 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
173 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
174 | new_perm_h) | |||
|
175 | ||||
|
176 | # grant perm for group this should NOT override permission from user | |||
|
177 | # since it's lower than granted | |||
|
178 | new_perm_l = 'repository.read' | |||
|
179 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |||
|
180 | group_name=self.ug1, | |||
|
181 | perm=new_perm_l) | |||
|
182 | # check perms | |||
|
183 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
184 | perms = { | |||
|
185 | 'repositories_groups': {}, | |||
|
186 | 'global': set([u'hg.create.repository', u'repository.read', | |||
|
187 | u'hg.register.manual_activate']), | |||
|
188 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |||
|
189 | } | |||
|
190 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |||
|
191 | new_perm_h) | |||
|
192 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
193 | perms['repositories_groups']) | |||
|
194 | ||||
|
195 | def test_repo_in_group_permissions(self): | |||
|
196 | self.g1 = _make_group('group1', skip_if_exists=True) | |||
|
197 | self.g2 = _make_group('group2', skip_if_exists=True) | |||
|
198 | Session().commit() | |||
|
199 | # both perms should be read ! | |||
|
200 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
201 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
202 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
203 | ||||
|
204 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
205 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
206 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
207 | ||||
|
208 | #Change perms to none for both groups | |||
|
209 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
210 | user=self.anon, | |||
|
211 | perm='group.none') | |||
|
212 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |||
|
213 | user=self.anon, | |||
|
214 | perm='group.none') | |||
|
215 | ||||
|
216 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
217 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
218 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
219 | ||||
|
220 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
221 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
222 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
223 | ||||
|
224 | # add repo to group | |||
|
225 | name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm']) | |||
|
226 | self.test_repo = RepoModel().create_repo( | |||
|
227 | repo_name=name, | |||
|
228 | repo_type='hg', | |||
|
229 | description='', | |||
|
230 | owner=self.u1, | |||
|
231 | ) | |||
|
232 | Session().commit() | |||
|
233 | ||||
|
234 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
235 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
236 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
237 | ||||
|
238 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
239 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
240 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
241 | ||||
|
242 | #grant permission for u2 ! | |||
|
243 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
244 | user=self.u2, | |||
|
245 | perm='group.read') | |||
|
246 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |||
|
247 | user=self.u2, | |||
|
248 | perm='group.read') | |||
|
249 | Session().commit() | |||
|
250 | self.assertNotEqual(self.u1, self.u2) | |||
|
251 | #u1 and anon should have not change perms while u2 should ! | |||
|
252 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
253 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
254 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
255 | ||||
|
256 | u2_auth = AuthUser(user_id=self.u2.user_id) | |||
|
257 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |||
|
258 | {u'group1': u'group.read', u'group2': u'group.read'}) | |||
|
259 | ||||
|
260 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
261 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
262 | {u'group1': u'group.none', u'group2': u'group.none'}) | |||
|
263 | ||||
|
264 | def test_repo_group_user_as_user_group_member(self): | |||
|
265 | # create Group1 | |||
|
266 | self.g1 = _make_group('group1', skip_if_exists=True) | |||
|
267 | Session().commit() | |||
|
268 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
269 | ||||
|
270 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
271 | {u'group1': u'group.read'}) | |||
|
272 | ||||
|
273 | # set default permission to none | |||
|
274 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |||
|
275 | user=self.anon, | |||
|
276 | perm='group.none') | |||
|
277 | # make group | |||
|
278 | self.ug1 = UsersGroupModel().create('G1') | |||
|
279 | # add user to group | |||
|
280 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |||
|
281 | Session().commit() | |||
|
282 | ||||
|
283 | # check if user is in the group | |||
|
284 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |||
|
285 | self.assertEqual(membrs, [self.u1.user_id]) | |||
|
286 | # add some user to that group | |||
|
287 | ||||
|
288 | # check his permissions | |||
|
289 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
290 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
291 | {u'group1': u'group.none'}) | |||
|
292 | ||||
|
293 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
294 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
295 | {u'group1': u'group.none'}) | |||
|
296 | ||||
|
297 | # grant ug1 read permissions for | |||
|
298 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |||
|
299 | group_name=self.ug1, | |||
|
300 | perm='group.read') | |||
|
301 | Session().commit() | |||
|
302 | # check if the | |||
|
303 | obj = Session().query(UsersGroupRepoGroupToPerm)\ | |||
|
304 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |||
|
305 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |||
|
306 | .scalar() | |||
|
307 | self.assertEqual(obj.permission.permission_name, 'group.read') | |||
|
308 | ||||
|
309 | a1_auth = AuthUser(user_id=self.anon.user_id) | |||
|
310 | ||||
|
311 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |||
|
312 | {u'group1': u'group.none'}) | |||
|
313 | ||||
|
314 | u1_auth = AuthUser(user_id=self.u1.user_id) | |||
|
315 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |||
|
316 | {u'group1': u'group.read'}) |
@@ -0,0 +1,170 | |||||
|
1 | import os | |||
|
2 | import unittest | |||
|
3 | from rhodecode.tests import * | |||
|
4 | ||||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |||
|
6 | from rhodecode.model.repo import RepoModel | |||
|
7 | from rhodecode.model.db import RepoGroup, User | |||
|
8 | from rhodecode.model.meta import Session | |||
|
9 | from sqlalchemy.exc import IntegrityError | |||
|
10 | ||||
|
11 | ||||
|
12 | def _make_group(path, desc='desc', parent_id=None, | |||
|
13 | skip_if_exists=False): | |||
|
14 | ||||
|
15 | gr = RepoGroup.get_by_group_name(path) | |||
|
16 | if gr and skip_if_exists: | |||
|
17 | return gr | |||
|
18 | ||||
|
19 | gr = ReposGroupModel().create(path, desc, parent_id) | |||
|
20 | return gr | |||
|
21 | ||||
|
22 | ||||
|
23 | class TestReposGroups(unittest.TestCase): | |||
|
24 | ||||
|
25 | def setUp(self): | |||
|
26 | self.g1 = _make_group('test1', skip_if_exists=True) | |||
|
27 | Session().commit() | |||
|
28 | self.g2 = _make_group('test2', skip_if_exists=True) | |||
|
29 | Session().commit() | |||
|
30 | self.g3 = _make_group('test3', skip_if_exists=True) | |||
|
31 | Session().commit() | |||
|
32 | ||||
|
33 | def tearDown(self): | |||
|
34 | print 'out' | |||
|
35 | ||||
|
36 | def __check_path(self, *path): | |||
|
37 | """ | |||
|
38 | Checks the path for existance ! | |||
|
39 | """ | |||
|
40 | path = [TESTS_TMP_PATH] + list(path) | |||
|
41 | path = os.path.join(*path) | |||
|
42 | return os.path.isdir(path) | |||
|
43 | ||||
|
44 | def _check_folders(self): | |||
|
45 | print os.listdir(TESTS_TMP_PATH) | |||
|
46 | ||||
|
47 | def __delete_group(self, id_): | |||
|
48 | ReposGroupModel().delete(id_) | |||
|
49 | ||||
|
50 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |||
|
51 | form_data = dict( | |||
|
52 | group_name=path, | |||
|
53 | group_description=desc, | |||
|
54 | group_parent_id=parent_id, | |||
|
55 | perms_updates=[], | |||
|
56 | perms_new=[] | |||
|
57 | ) | |||
|
58 | gr = ReposGroupModel().update(id_, form_data) | |||
|
59 | return gr | |||
|
60 | ||||
|
61 | def test_create_group(self): | |||
|
62 | g = _make_group('newGroup') | |||
|
63 | self.assertEqual(g.full_path, 'newGroup') | |||
|
64 | ||||
|
65 | self.assertTrue(self.__check_path('newGroup')) | |||
|
66 | ||||
|
67 | def test_create_same_name_group(self): | |||
|
68 | self.assertRaises(IntegrityError, lambda: _make_group('newGroup')) | |||
|
69 | Session().rollback() | |||
|
70 | ||||
|
71 | def test_same_subgroup(self): | |||
|
72 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |||
|
73 | self.assertEqual(sg1.parent_group, self.g1) | |||
|
74 | self.assertEqual(sg1.full_path, 'test1/sub1') | |||
|
75 | self.assertTrue(self.__check_path('test1', 'sub1')) | |||
|
76 | ||||
|
77 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |||
|
78 | self.assertEqual(ssg1.parent_group, sg1) | |||
|
79 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |||
|
80 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |||
|
81 | ||||
|
82 | def test_remove_group(self): | |||
|
83 | sg1 = _make_group('deleteme') | |||
|
84 | self.__delete_group(sg1.group_id) | |||
|
85 | ||||
|
86 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |||
|
87 | self.assertFalse(self.__check_path('deteteme')) | |||
|
88 | ||||
|
89 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |||
|
90 | self.__delete_group(sg1.group_id) | |||
|
91 | ||||
|
92 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |||
|
93 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |||
|
94 | ||||
|
95 | def test_rename_single_group(self): | |||
|
96 | sg1 = _make_group('initial') | |||
|
97 | ||||
|
98 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |||
|
99 | self.assertTrue(self.__check_path('after')) | |||
|
100 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |||
|
101 | ||||
|
102 | def test_update_group_parent(self): | |||
|
103 | ||||
|
104 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |||
|
105 | ||||
|
106 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |||
|
107 | self.assertTrue(self.__check_path('test1', 'after')) | |||
|
108 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |||
|
109 | ||||
|
110 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |||
|
111 | self.assertTrue(self.__check_path('test3', 'after')) | |||
|
112 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |||
|
113 | ||||
|
114 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |||
|
115 | self.assertTrue(self.__check_path('hello')) | |||
|
116 | ||||
|
117 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |||
|
118 | ||||
|
119 | def test_subgrouping_with_repo(self): | |||
|
120 | ||||
|
121 | g1 = _make_group('g1') | |||
|
122 | g2 = _make_group('g2') | |||
|
123 | ||||
|
124 | # create new repo | |||
|
125 | form_data = dict(repo_name='john', | |||
|
126 | repo_name_full='john', | |||
|
127 | fork_name=None, | |||
|
128 | description=None, | |||
|
129 | repo_group=None, | |||
|
130 | private=False, | |||
|
131 | repo_type='hg', | |||
|
132 | clone_uri=None, | |||
|
133 | landing_rev='tip') | |||
|
134 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |||
|
135 | r = RepoModel().create(form_data, cur_user) | |||
|
136 | ||||
|
137 | self.assertEqual(r.repo_name, 'john') | |||
|
138 | ||||
|
139 | # put repo into group | |||
|
140 | form_data = form_data | |||
|
141 | form_data['repo_group'] = g1.group_id | |||
|
142 | form_data['perms_new'] = [] | |||
|
143 | form_data['perms_updates'] = [] | |||
|
144 | RepoModel().update(r.repo_name, form_data) | |||
|
145 | self.assertEqual(r.repo_name, 'g1/john') | |||
|
146 | ||||
|
147 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |||
|
148 | self.assertTrue(self.__check_path('g2', 'g1')) | |||
|
149 | ||||
|
150 | # test repo | |||
|
151 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', | |||
|
152 | r.just_name])) | |||
|
153 | ||||
|
154 | def test_move_to_root(self): | |||
|
155 | g1 = _make_group('t11') | |||
|
156 | Session().commit() | |||
|
157 | g2 = _make_group('t22', parent_id=g1.group_id) | |||
|
158 | Session().commit() | |||
|
159 | ||||
|
160 | self.assertEqual(g2.full_path, 't11/t22') | |||
|
161 | self.assertTrue(self.__check_path('t11', 't22')) | |||
|
162 | ||||
|
163 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |||
|
164 | Session().commit() | |||
|
165 | ||||
|
166 | self.assertEqual(g2.group_name, 'g22') | |||
|
167 | # we moved out group from t1 to '' so it's full path should be 'g2' | |||
|
168 | self.assertEqual(g2.full_path, 'g22') | |||
|
169 | self.assertFalse(self.__check_path('t11', 't22')) | |||
|
170 | self.assertTrue(self.__check_path('g22')) No newline at end of file |
@@ -0,0 +1,124 | |||||
|
1 | import unittest | |||
|
2 | from rhodecode.tests import * | |||
|
3 | ||||
|
4 | from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\ | |||
|
5 | Permission | |||
|
6 | from rhodecode.model.user import UserModel | |||
|
7 | ||||
|
8 | from rhodecode.model.meta import Session | |||
|
9 | from rhodecode.model.users_group import UsersGroupModel | |||
|
10 | ||||
|
11 | ||||
|
12 | class TestUser(unittest.TestCase): | |||
|
13 | def __init__(self, methodName='runTest'): | |||
|
14 | Session.remove() | |||
|
15 | super(TestUser, self).__init__(methodName=methodName) | |||
|
16 | ||||
|
17 | def test_create_and_remove(self): | |||
|
18 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
19 | password=u'qweqwe', | |||
|
20 | email=u'u232@rhodecode.org', | |||
|
21 | firstname=u'u1', lastname=u'u1') | |||
|
22 | Session().commit() | |||
|
23 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |||
|
24 | ||||
|
25 | # make users group | |||
|
26 | users_group = UsersGroupModel().create('some_example_group') | |||
|
27 | Session().commit() | |||
|
28 | ||||
|
29 | UsersGroupModel().add_user_to_group(users_group, usr) | |||
|
30 | Session().commit() | |||
|
31 | ||||
|
32 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |||
|
33 | self.assertEqual(UsersGroupMember.query().count(), 1) | |||
|
34 | UserModel().delete(usr.user_id) | |||
|
35 | Session().commit() | |||
|
36 | ||||
|
37 | self.assertEqual(UsersGroupMember.query().all(), []) | |||
|
38 | ||||
|
39 | def test_additonal_email_as_main(self): | |||
|
40 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
41 | password=u'qweqwe', | |||
|
42 | email=u'main_email@rhodecode.org', | |||
|
43 | firstname=u'u1', lastname=u'u1') | |||
|
44 | Session().commit() | |||
|
45 | ||||
|
46 | def do(): | |||
|
47 | m = UserEmailMap() | |||
|
48 | m.email = u'main_email@rhodecode.org' | |||
|
49 | m.user = usr | |||
|
50 | Session().add(m) | |||
|
51 | Session().commit() | |||
|
52 | self.assertRaises(AttributeError, do) | |||
|
53 | ||||
|
54 | UserModel().delete(usr.user_id) | |||
|
55 | Session().commit() | |||
|
56 | ||||
|
57 | def test_extra_email_map(self): | |||
|
58 | usr = UserModel().create_or_update(username=u'test_user', | |||
|
59 | password=u'qweqwe', | |||
|
60 | email=u'main_email@rhodecode.org', | |||
|
61 | firstname=u'u1', lastname=u'u1') | |||
|
62 | Session().commit() | |||
|
63 | ||||
|
64 | m = UserEmailMap() | |||
|
65 | m.email = u'main_email2@rhodecode.org' | |||
|
66 | m.user = usr | |||
|
67 | Session().add(m) | |||
|
68 | Session().commit() | |||
|
69 | ||||
|
70 | u = User.get_by_email(email='main_email@rhodecode.org') | |||
|
71 | self.assertEqual(usr.user_id, u.user_id) | |||
|
72 | self.assertEqual(usr.username, u.username) | |||
|
73 | ||||
|
74 | u = User.get_by_email(email='main_email2@rhodecode.org') | |||
|
75 | self.assertEqual(usr.user_id, u.user_id) | |||
|
76 | self.assertEqual(usr.username, u.username) | |||
|
77 | u = User.get_by_email(email='main_email3@rhodecode.org') | |||
|
78 | self.assertEqual(None, u) | |||
|
79 | ||||
|
80 | UserModel().delete(usr.user_id) | |||
|
81 | Session().commit() | |||
|
82 | ||||
|
83 | ||||
|
84 | class TestUsers(unittest.TestCase): | |||
|
85 | ||||
|
86 | def __init__(self, methodName='runTest'): | |||
|
87 | super(TestUsers, self).__init__(methodName=methodName) | |||
|
88 | ||||
|
89 | def setUp(self): | |||
|
90 | self.u1 = UserModel().create_or_update(username=u'u1', | |||
|
91 | password=u'qweqwe', | |||
|
92 | email=u'u1@rhodecode.org', | |||
|
93 | firstname=u'u1', lastname=u'u1') | |||
|
94 | ||||
|
95 | def tearDown(self): | |||
|
96 | perm = Permission.query().all() | |||
|
97 | for p in perm: | |||
|
98 | UserModel().revoke_perm(self.u1, p) | |||
|
99 | ||||
|
100 | UserModel().delete(self.u1) | |||
|
101 | Session().commit() | |||
|
102 | ||||
|
103 | def test_add_perm(self): | |||
|
104 | perm = Permission.query().all()[0] | |||
|
105 | UserModel().grant_perm(self.u1, perm) | |||
|
106 | Session().commit() | |||
|
107 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |||
|
108 | ||||
|
109 | def test_has_perm(self): | |||
|
110 | perm = Permission.query().all() | |||
|
111 | for p in perm: | |||
|
112 | has_p = UserModel().has_perm(self.u1, p) | |||
|
113 | self.assertEqual(False, has_p) | |||
|
114 | ||||
|
115 | def test_revoke_perm(self): | |||
|
116 | perm = Permission.query().all()[0] | |||
|
117 | UserModel().grant_perm(self.u1, perm) | |||
|
118 | Session().commit() | |||
|
119 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |||
|
120 | ||||
|
121 | #revoke | |||
|
122 | UserModel().revoke_perm(self.u1, perm) | |||
|
123 | Session().commit() | |||
|
124 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them | |||||
@@ -1,761 +1,189 | |||||
1 | import os |
|
1 | import os | |
2 | import unittest |
|
2 | import unittest | |
3 | from rhodecode.tests import * |
|
3 | from rhodecode.tests import * | |
4 |
|
4 | |||
5 | from rhodecode.model.repos_group import ReposGroupModel |
|
5 | from rhodecode.model.db import User, Notification, UserNotification | |
6 | from rhodecode.model.repo import RepoModel |
|
|||
7 | from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \ |
|
|||
8 | UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\ |
|
|||
9 | Repository, UserEmailMap |
|
|||
10 | from sqlalchemy.exc import IntegrityError, DatabaseError |
|
|||
11 | from rhodecode.model.user import UserModel |
|
6 | from rhodecode.model.user import UserModel | |
12 |
|
7 | |||
13 | from rhodecode.model.meta import Session |
|
8 | from rhodecode.model.meta import Session | |
14 | from rhodecode.model.notification import NotificationModel |
|
9 | from rhodecode.model.notification import NotificationModel | |
15 | from rhodecode.model.users_group import UsersGroupModel |
|
|||
16 | from rhodecode.lib.auth import AuthUser |
|
|||
17 |
|
||||
18 |
|
||||
19 | def _make_group(path, desc='desc', parent_id=None, |
|
|||
20 | skip_if_exists=False): |
|
|||
21 |
|
||||
22 | gr = RepoGroup.get_by_group_name(path) |
|
|||
23 | if gr and skip_if_exists: |
|
|||
24 | return gr |
|
|||
25 |
|
||||
26 | gr = ReposGroupModel().create(path, desc, parent_id) |
|
|||
27 | return gr |
|
|||
28 |
|
||||
29 |
|
||||
30 | class TestReposGroups(unittest.TestCase): |
|
|||
31 |
|
||||
32 | def setUp(self): |
|
|||
33 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
34 | Session.commit() |
|
|||
35 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
36 | Session.commit() |
|
|||
37 | self.g3 = _make_group('test3', skip_if_exists=True) |
|
|||
38 | Session.commit() |
|
|||
39 |
|
||||
40 | def tearDown(self): |
|
|||
41 | print 'out' |
|
|||
42 |
|
||||
43 | def __check_path(self, *path): |
|
|||
44 | """ |
|
|||
45 | Checks the path for existance ! |
|
|||
46 | """ |
|
|||
47 | path = [TESTS_TMP_PATH] + list(path) |
|
|||
48 | path = os.path.join(*path) |
|
|||
49 | return os.path.isdir(path) |
|
|||
50 |
|
||||
51 | def _check_folders(self): |
|
|||
52 | print os.listdir(TESTS_TMP_PATH) |
|
|||
53 |
|
||||
54 | def __delete_group(self, id_): |
|
|||
55 | ReposGroupModel().delete(id_) |
|
|||
56 |
|
||||
57 | def __update_group(self, id_, path, desc='desc', parent_id=None): |
|
|||
58 | form_data = dict( |
|
|||
59 | group_name=path, |
|
|||
60 | group_description=desc, |
|
|||
61 | group_parent_id=parent_id, |
|
|||
62 | perms_updates=[], |
|
|||
63 | perms_new=[] |
|
|||
64 | ) |
|
|||
65 | gr = ReposGroupModel().update(id_, form_data) |
|
|||
66 | return gr |
|
|||
67 |
|
||||
68 | def test_create_group(self): |
|
|||
69 | g = _make_group('newGroup') |
|
|||
70 | self.assertEqual(g.full_path, 'newGroup') |
|
|||
71 |
|
||||
72 | self.assertTrue(self.__check_path('newGroup')) |
|
|||
73 |
|
||||
74 | def test_create_same_name_group(self): |
|
|||
75 | self.assertRaises(IntegrityError, lambda:_make_group('newGroup')) |
|
|||
76 | Session.rollback() |
|
|||
77 |
|
||||
78 | def test_same_subgroup(self): |
|
|||
79 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) |
|
|||
80 | self.assertEqual(sg1.parent_group, self.g1) |
|
|||
81 | self.assertEqual(sg1.full_path, 'test1/sub1') |
|
|||
82 | self.assertTrue(self.__check_path('test1', 'sub1')) |
|
|||
83 |
|
||||
84 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) |
|
|||
85 | self.assertEqual(ssg1.parent_group, sg1) |
|
|||
86 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') |
|
|||
87 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) |
|
|||
88 |
|
||||
89 | def test_remove_group(self): |
|
|||
90 | sg1 = _make_group('deleteme') |
|
|||
91 | self.__delete_group(sg1.group_id) |
|
|||
92 |
|
||||
93 | self.assertEqual(RepoGroup.get(sg1.group_id), None) |
|
|||
94 | self.assertFalse(self.__check_path('deteteme')) |
|
|||
95 |
|
||||
96 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) |
|
|||
97 | self.__delete_group(sg1.group_id) |
|
|||
98 |
|
||||
99 | self.assertEqual(RepoGroup.get(sg1.group_id), None) |
|
|||
100 | self.assertFalse(self.__check_path('test1', 'deteteme')) |
|
|||
101 |
|
||||
102 | def test_rename_single_group(self): |
|
|||
103 | sg1 = _make_group('initial') |
|
|||
104 |
|
||||
105 | new_sg1 = self.__update_group(sg1.group_id, 'after') |
|
|||
106 | self.assertTrue(self.__check_path('after')) |
|
|||
107 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) |
|
|||
108 |
|
||||
109 | def test_update_group_parent(self): |
|
|||
110 |
|
||||
111 | sg1 = _make_group('initial', parent_id=self.g1.group_id) |
|
|||
112 |
|
||||
113 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) |
|
|||
114 | self.assertTrue(self.__check_path('test1', 'after')) |
|
|||
115 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) |
|
|||
116 |
|
||||
117 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) |
|
|||
118 | self.assertTrue(self.__check_path('test3', 'after')) |
|
|||
119 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) |
|
|||
120 |
|
||||
121 | new_sg1 = self.__update_group(sg1.group_id, 'hello') |
|
|||
122 | self.assertTrue(self.__check_path('hello')) |
|
|||
123 |
|
||||
124 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) |
|
|||
125 |
|
||||
126 | def test_subgrouping_with_repo(self): |
|
|||
127 |
|
||||
128 | g1 = _make_group('g1') |
|
|||
129 | g2 = _make_group('g2') |
|
|||
130 |
|
||||
131 | # create new repo |
|
|||
132 | form_data = dict(repo_name='john', |
|
|||
133 | repo_name_full='john', |
|
|||
134 | fork_name=None, |
|
|||
135 | description=None, |
|
|||
136 | repo_group=None, |
|
|||
137 | private=False, |
|
|||
138 | repo_type='hg', |
|
|||
139 | clone_uri=None, |
|
|||
140 | landing_rev='tip') |
|
|||
141 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) |
|
|||
142 | r = RepoModel().create(form_data, cur_user) |
|
|||
143 |
|
||||
144 | self.assertEqual(r.repo_name, 'john') |
|
|||
145 |
|
||||
146 | # put repo into group |
|
|||
147 | form_data = form_data |
|
|||
148 | form_data['repo_group'] = g1.group_id |
|
|||
149 | form_data['perms_new'] = [] |
|
|||
150 | form_data['perms_updates'] = [] |
|
|||
151 | RepoModel().update(r.repo_name, form_data) |
|
|||
152 | self.assertEqual(r.repo_name, 'g1/john') |
|
|||
153 |
|
||||
154 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) |
|
|||
155 | self.assertTrue(self.__check_path('g2', 'g1')) |
|
|||
156 |
|
||||
157 | # test repo |
|
|||
158 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name])) |
|
|||
159 |
|
||||
160 | def test_move_to_root(self): |
|
|||
161 | g1 = _make_group('t11') |
|
|||
162 | Session.commit() |
|
|||
163 | g2 = _make_group('t22', parent_id=g1.group_id) |
|
|||
164 | Session.commit() |
|
|||
165 |
|
||||
166 | self.assertEqual(g2.full_path, 't11/t22') |
|
|||
167 | self.assertTrue(self.__check_path('t11', 't22')) |
|
|||
168 |
|
||||
169 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) |
|
|||
170 | Session.commit() |
|
|||
171 |
|
||||
172 | self.assertEqual(g2.group_name, 'g22') |
|
|||
173 | # we moved out group from t1 to '' so it's full path should be 'g2' |
|
|||
174 | self.assertEqual(g2.full_path, 'g22') |
|
|||
175 | self.assertFalse(self.__check_path('t11', 't22')) |
|
|||
176 | self.assertTrue(self.__check_path('g22')) |
|
|||
177 |
|
||||
178 |
|
||||
179 | class TestUser(unittest.TestCase): |
|
|||
180 | def __init__(self, methodName='runTest'): |
|
|||
181 | Session.remove() |
|
|||
182 | super(TestUser, self).__init__(methodName=methodName) |
|
|||
183 |
|
||||
184 | def test_create_and_remove(self): |
|
|||
185 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
186 | password=u'qweqwe', |
|
|||
187 | email=u'u232@rhodecode.org', |
|
|||
188 | firstname=u'u1', lastname=u'u1') |
|
|||
189 | Session.commit() |
|
|||
190 | self.assertEqual(User.get_by_username(u'test_user'), usr) |
|
|||
191 |
|
||||
192 | # make users group |
|
|||
193 | users_group = UsersGroupModel().create('some_example_group') |
|
|||
194 | Session.commit() |
|
|||
195 |
|
||||
196 | UsersGroupModel().add_user_to_group(users_group, usr) |
|
|||
197 | Session.commit() |
|
|||
198 |
|
||||
199 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) |
|
|||
200 | self.assertEqual(UsersGroupMember.query().count(), 1) |
|
|||
201 | UserModel().delete(usr.user_id) |
|
|||
202 | Session.commit() |
|
|||
203 |
|
||||
204 | self.assertEqual(UsersGroupMember.query().all(), []) |
|
|||
205 |
|
||||
206 | def test_additonal_email_as_main(self): |
|
|||
207 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
208 | password=u'qweqwe', |
|
|||
209 | email=u'main_email@rhodecode.org', |
|
|||
210 | firstname=u'u1', lastname=u'u1') |
|
|||
211 | Session.commit() |
|
|||
212 |
|
||||
213 | def do(): |
|
|||
214 | m = UserEmailMap() |
|
|||
215 | m.email = u'main_email@rhodecode.org' |
|
|||
216 | m.user = usr |
|
|||
217 | Session.add(m) |
|
|||
218 | Session.commit() |
|
|||
219 | self.assertRaises(AttributeError, do) |
|
|||
220 |
|
||||
221 | UserModel().delete(usr.user_id) |
|
|||
222 | Session.commit() |
|
|||
223 |
|
||||
224 | def test_extra_email_map(self): |
|
|||
225 | usr = UserModel().create_or_update(username=u'test_user', |
|
|||
226 | password=u'qweqwe', |
|
|||
227 | email=u'main_email@rhodecode.org', |
|
|||
228 | firstname=u'u1', lastname=u'u1') |
|
|||
229 | Session.commit() |
|
|||
230 |
|
||||
231 | m = UserEmailMap() |
|
|||
232 | m.email = u'main_email2@rhodecode.org' |
|
|||
233 | m.user = usr |
|
|||
234 | Session.add(m) |
|
|||
235 | Session.commit() |
|
|||
236 |
|
||||
237 | u = User.get_by_email(email='main_email@rhodecode.org') |
|
|||
238 | self.assertEqual(usr.user_id, u.user_id) |
|
|||
239 | self.assertEqual(usr.username, u.username) |
|
|||
240 |
|
||||
241 | u = User.get_by_email(email='main_email2@rhodecode.org') |
|
|||
242 | self.assertEqual(usr.user_id, u.user_id) |
|
|||
243 | self.assertEqual(usr.username, u.username) |
|
|||
244 | u = User.get_by_email(email='main_email3@rhodecode.org') |
|
|||
245 | self.assertEqual(None, u) |
|
|||
246 |
|
||||
247 | UserModel().delete(usr.user_id) |
|
|||
248 | Session.commit() |
|
|||
249 |
|
10 | |||
250 |
|
11 | |||
251 | class TestNotifications(unittest.TestCase): |
|
12 | class TestNotifications(unittest.TestCase): | |
252 |
|
13 | |||
253 | def __init__(self, methodName='runTest'): |
|
14 | def __init__(self, methodName='runTest'): | |
254 | Session.remove() |
|
15 | Session.remove() | |
255 | self.u1 = UserModel().create_or_update(username=u'u1', |
|
16 | self.u1 = UserModel().create_or_update(username=u'u1', | |
256 | password=u'qweqwe', |
|
17 | password=u'qweqwe', | |
257 | email=u'u1@rhodecode.org', |
|
18 | email=u'u1@rhodecode.org', | |
258 | firstname=u'u1', lastname=u'u1') |
|
19 | firstname=u'u1', lastname=u'u1') | |
259 | Session.commit() |
|
20 | Session().commit() | |
260 | self.u1 = self.u1.user_id |
|
21 | self.u1 = self.u1.user_id | |
261 |
|
22 | |||
262 | self.u2 = UserModel().create_or_update(username=u'u2', |
|
23 | self.u2 = UserModel().create_or_update(username=u'u2', | |
263 | password=u'qweqwe', |
|
24 | password=u'qweqwe', | |
264 | email=u'u2@rhodecode.org', |
|
25 | email=u'u2@rhodecode.org', | |
265 | firstname=u'u2', lastname=u'u3') |
|
26 | firstname=u'u2', lastname=u'u3') | |
266 | Session.commit() |
|
27 | Session().commit() | |
267 | self.u2 = self.u2.user_id |
|
28 | self.u2 = self.u2.user_id | |
268 |
|
29 | |||
269 | self.u3 = UserModel().create_or_update(username=u'u3', |
|
30 | self.u3 = UserModel().create_or_update(username=u'u3', | |
270 | password=u'qweqwe', |
|
31 | password=u'qweqwe', | |
271 | email=u'u3@rhodecode.org', |
|
32 | email=u'u3@rhodecode.org', | |
272 | firstname=u'u3', lastname=u'u3') |
|
33 | firstname=u'u3', lastname=u'u3') | |
273 | Session.commit() |
|
34 | Session().commit() | |
274 | self.u3 = self.u3.user_id |
|
35 | self.u3 = self.u3.user_id | |
275 |
|
36 | |||
276 | super(TestNotifications, self).__init__(methodName=methodName) |
|
37 | super(TestNotifications, self).__init__(methodName=methodName) | |
277 |
|
38 | |||
278 | def _clean_notifications(self): |
|
39 | def _clean_notifications(self): | |
279 | for n in Notification.query().all(): |
|
40 | for n in Notification.query().all(): | |
280 | Session.delete(n) |
|
41 | Session().delete(n) | |
281 |
|
42 | |||
282 | Session.commit() |
|
43 | Session().commit() | |
283 | self.assertEqual(Notification.query().all(), []) |
|
44 | self.assertEqual(Notification.query().all(), []) | |
284 |
|
45 | |||
285 | def tearDown(self): |
|
46 | def tearDown(self): | |
286 | self._clean_notifications() |
|
47 | self._clean_notifications() | |
287 |
|
48 | |||
288 | def test_create_notification(self): |
|
49 | def test_create_notification(self): | |
289 | self.assertEqual([], Notification.query().all()) |
|
50 | self.assertEqual([], Notification.query().all()) | |
290 | self.assertEqual([], UserNotification.query().all()) |
|
51 | self.assertEqual([], UserNotification.query().all()) | |
291 |
|
52 | |||
292 | usrs = [self.u1, self.u2] |
|
53 | usrs = [self.u1, self.u2] | |
293 | notification = NotificationModel().create(created_by=self.u1, |
|
54 | notification = NotificationModel().create(created_by=self.u1, | |
294 | subject=u'subj', body=u'hi there', |
|
55 | subject=u'subj', body=u'hi there', | |
295 | recipients=usrs) |
|
56 | recipients=usrs) | |
296 | Session.commit() |
|
57 | Session().commit() | |
297 | u1 = User.get(self.u1) |
|
58 | u1 = User.get(self.u1) | |
298 | u2 = User.get(self.u2) |
|
59 | u2 = User.get(self.u2) | |
299 | u3 = User.get(self.u3) |
|
60 | u3 = User.get(self.u3) | |
300 | notifications = Notification.query().all() |
|
61 | notifications = Notification.query().all() | |
301 | self.assertEqual(len(notifications), 1) |
|
62 | self.assertEqual(len(notifications), 1) | |
302 |
|
63 | |||
303 | unotification = UserNotification.query()\ |
|
64 | unotification = UserNotification.query()\ | |
304 | .filter(UserNotification.notification == notification).all() |
|
65 | .filter(UserNotification.notification == notification).all() | |
305 |
|
66 | |||
306 | self.assertEqual(notifications[0].recipients, [u1, u2]) |
|
67 | self.assertEqual(notifications[0].recipients, [u1, u2]) | |
307 | self.assertEqual(notification.notification_id, |
|
68 | self.assertEqual(notification.notification_id, | |
308 | notifications[0].notification_id) |
|
69 | notifications[0].notification_id) | |
309 | self.assertEqual(len(unotification), len(usrs)) |
|
70 | self.assertEqual(len(unotification), len(usrs)) | |
310 | self.assertEqual([x.user.user_id for x in unotification], usrs) |
|
71 | self.assertEqual([x.user.user_id for x in unotification], usrs) | |
311 |
|
72 | |||
312 | def test_user_notifications(self): |
|
73 | def test_user_notifications(self): | |
313 | self.assertEqual([], Notification.query().all()) |
|
74 | self.assertEqual([], Notification.query().all()) | |
314 | self.assertEqual([], UserNotification.query().all()) |
|
75 | self.assertEqual([], UserNotification.query().all()) | |
315 |
|
76 | |||
316 | notification1 = NotificationModel().create(created_by=self.u1, |
|
77 | notification1 = NotificationModel().create(created_by=self.u1, | |
317 | subject=u'subj', body=u'hi there1', |
|
78 | subject=u'subj', body=u'hi there1', | |
318 | recipients=[self.u3]) |
|
79 | recipients=[self.u3]) | |
319 | Session.commit() |
|
80 | Session().commit() | |
320 | notification2 = NotificationModel().create(created_by=self.u1, |
|
81 | notification2 = NotificationModel().create(created_by=self.u1, | |
321 | subject=u'subj', body=u'hi there2', |
|
82 | subject=u'subj', body=u'hi there2', | |
322 | recipients=[self.u3]) |
|
83 | recipients=[self.u3]) | |
323 | Session.commit() |
|
84 | Session().commit() | |
324 | u3 = Session.query(User).get(self.u3) |
|
85 | u3 = Session().query(User).get(self.u3) | |
325 |
|
86 | |||
326 | self.assertEqual(sorted([x.notification for x in u3.notifications]), |
|
87 | self.assertEqual(sorted([x.notification for x in u3.notifications]), | |
327 | sorted([notification2, notification1])) |
|
88 | sorted([notification2, notification1])) | |
328 |
|
89 | |||
329 | def test_delete_notifications(self): |
|
90 | def test_delete_notifications(self): | |
330 | self.assertEqual([], Notification.query().all()) |
|
91 | self.assertEqual([], Notification.query().all()) | |
331 | self.assertEqual([], UserNotification.query().all()) |
|
92 | self.assertEqual([], UserNotification.query().all()) | |
332 |
|
93 | |||
333 | notification = NotificationModel().create(created_by=self.u1, |
|
94 | notification = NotificationModel().create(created_by=self.u1, | |
334 | subject=u'title', body=u'hi there3', |
|
95 | subject=u'title', body=u'hi there3', | |
335 | recipients=[self.u3, self.u1, self.u2]) |
|
96 | recipients=[self.u3, self.u1, self.u2]) | |
336 | Session.commit() |
|
97 | Session().commit() | |
337 | notifications = Notification.query().all() |
|
98 | notifications = Notification.query().all() | |
338 | self.assertTrue(notification in notifications) |
|
99 | self.assertTrue(notification in notifications) | |
339 |
|
100 | |||
340 | Notification.delete(notification.notification_id) |
|
101 | Notification.delete(notification.notification_id) | |
341 | Session.commit() |
|
102 | Session().commit() | |
342 |
|
103 | |||
343 | notifications = Notification.query().all() |
|
104 | notifications = Notification.query().all() | |
344 | self.assertFalse(notification in notifications) |
|
105 | self.assertFalse(notification in notifications) | |
345 |
|
106 | |||
346 | un = UserNotification.query().filter(UserNotification.notification |
|
107 | un = UserNotification.query().filter(UserNotification.notification | |
347 | == notification).all() |
|
108 | == notification).all() | |
348 | self.assertEqual(un, []) |
|
109 | self.assertEqual(un, []) | |
349 |
|
110 | |||
350 | def test_delete_association(self): |
|
111 | def test_delete_association(self): | |
351 |
|
112 | |||
352 | self.assertEqual([], Notification.query().all()) |
|
113 | self.assertEqual([], Notification.query().all()) | |
353 | self.assertEqual([], UserNotification.query().all()) |
|
114 | self.assertEqual([], UserNotification.query().all()) | |
354 |
|
115 | |||
355 | notification = NotificationModel().create(created_by=self.u1, |
|
116 | notification = NotificationModel().create(created_by=self.u1, | |
356 | subject=u'title', body=u'hi there3', |
|
117 | subject=u'title', body=u'hi there3', | |
357 | recipients=[self.u3, self.u1, self.u2]) |
|
118 | recipients=[self.u3, self.u1, self.u2]) | |
358 | Session.commit() |
|
119 | Session().commit() | |
359 |
|
120 | |||
360 | unotification = UserNotification.query()\ |
|
121 | unotification = UserNotification.query()\ | |
361 | .filter(UserNotification.notification == |
|
122 | .filter(UserNotification.notification == | |
362 | notification)\ |
|
123 | notification)\ | |
363 | .filter(UserNotification.user_id == self.u3)\ |
|
124 | .filter(UserNotification.user_id == self.u3)\ | |
364 | .scalar() |
|
125 | .scalar() | |
365 |
|
126 | |||
366 | self.assertEqual(unotification.user_id, self.u3) |
|
127 | self.assertEqual(unotification.user_id, self.u3) | |
367 |
|
128 | |||
368 | NotificationModel().delete(self.u3, |
|
129 | NotificationModel().delete(self.u3, | |
369 | notification.notification_id) |
|
130 | notification.notification_id) | |
370 | Session.commit() |
|
131 | Session().commit() | |
371 |
|
132 | |||
372 | u3notification = UserNotification.query()\ |
|
133 | u3notification = UserNotification.query()\ | |
373 | .filter(UserNotification.notification == |
|
134 | .filter(UserNotification.notification == | |
374 | notification)\ |
|
135 | notification)\ | |
375 | .filter(UserNotification.user_id == self.u3)\ |
|
136 | .filter(UserNotification.user_id == self.u3)\ | |
376 | .scalar() |
|
137 | .scalar() | |
377 |
|
138 | |||
378 | self.assertEqual(u3notification, None) |
|
139 | self.assertEqual(u3notification, None) | |
379 |
|
140 | |||
380 | # notification object is still there |
|
141 | # notification object is still there | |
381 | self.assertEqual(Notification.query().all(), [notification]) |
|
142 | self.assertEqual(Notification.query().all(), [notification]) | |
382 |
|
143 | |||
383 | #u1 and u2 still have assignments |
|
144 | #u1 and u2 still have assignments | |
384 | u1notification = UserNotification.query()\ |
|
145 | u1notification = UserNotification.query()\ | |
385 | .filter(UserNotification.notification == |
|
146 | .filter(UserNotification.notification == | |
386 | notification)\ |
|
147 | notification)\ | |
387 | .filter(UserNotification.user_id == self.u1)\ |
|
148 | .filter(UserNotification.user_id == self.u1)\ | |
388 | .scalar() |
|
149 | .scalar() | |
389 | self.assertNotEqual(u1notification, None) |
|
150 | self.assertNotEqual(u1notification, None) | |
390 | u2notification = UserNotification.query()\ |
|
151 | u2notification = UserNotification.query()\ | |
391 | .filter(UserNotification.notification == |
|
152 | .filter(UserNotification.notification == | |
392 | notification)\ |
|
153 | notification)\ | |
393 | .filter(UserNotification.user_id == self.u2)\ |
|
154 | .filter(UserNotification.user_id == self.u2)\ | |
394 | .scalar() |
|
155 | .scalar() | |
395 | self.assertNotEqual(u2notification, None) |
|
156 | self.assertNotEqual(u2notification, None) | |
396 |
|
157 | |||
397 | def test_notification_counter(self): |
|
158 | def test_notification_counter(self): | |
398 | self._clean_notifications() |
|
159 | self._clean_notifications() | |
399 | self.assertEqual([], Notification.query().all()) |
|
160 | self.assertEqual([], Notification.query().all()) | |
400 | self.assertEqual([], UserNotification.query().all()) |
|
161 | self.assertEqual([], UserNotification.query().all()) | |
401 |
|
162 | |||
402 | NotificationModel().create(created_by=self.u1, |
|
163 | NotificationModel().create(created_by=self.u1, | |
403 | subject=u'title', body=u'hi there_delete', |
|
164 | subject=u'title', body=u'hi there_delete', | |
404 | recipients=[self.u3, self.u1]) |
|
165 | recipients=[self.u3, self.u1]) | |
405 | Session.commit() |
|
166 | Session().commit() | |
406 |
|
167 | |||
407 | self.assertEqual(NotificationModel() |
|
168 | self.assertEqual(NotificationModel() | |
408 | .get_unread_cnt_for_user(self.u1), 1) |
|
169 | .get_unread_cnt_for_user(self.u1), 1) | |
409 | self.assertEqual(NotificationModel() |
|
170 | self.assertEqual(NotificationModel() | |
410 | .get_unread_cnt_for_user(self.u2), 0) |
|
171 | .get_unread_cnt_for_user(self.u2), 0) | |
411 | self.assertEqual(NotificationModel() |
|
172 | self.assertEqual(NotificationModel() | |
412 | .get_unread_cnt_for_user(self.u3), 1) |
|
173 | .get_unread_cnt_for_user(self.u3), 1) | |
413 |
|
174 | |||
414 | notification = NotificationModel().create(created_by=self.u1, |
|
175 | notification = NotificationModel().create(created_by=self.u1, | |
415 | subject=u'title', body=u'hi there3', |
|
176 | subject=u'title', body=u'hi there3', | |
416 | recipients=[self.u3, self.u1, self.u2]) |
|
177 | recipients=[self.u3, self.u1, self.u2]) | |
417 | Session.commit() |
|
178 | Session().commit() | |
418 |
|
179 | |||
419 | self.assertEqual(NotificationModel() |
|
180 | self.assertEqual(NotificationModel() | |
420 | .get_unread_cnt_for_user(self.u1), 2) |
|
181 | .get_unread_cnt_for_user(self.u1), 2) | |
421 | self.assertEqual(NotificationModel() |
|
182 | self.assertEqual(NotificationModel() | |
422 | .get_unread_cnt_for_user(self.u2), 1) |
|
183 | .get_unread_cnt_for_user(self.u2), 1) | |
423 | self.assertEqual(NotificationModel() |
|
184 | self.assertEqual(NotificationModel() | |
424 | .get_unread_cnt_for_user(self.u3), 2) |
|
185 | .get_unread_cnt_for_user(self.u3), 2) | |
425 |
|
186 | |||
426 |
|
187 | |||
427 | class TestUsers(unittest.TestCase): |
|
|||
428 |
|
||||
429 | def __init__(self, methodName='runTest'): |
|
|||
430 | super(TestUsers, self).__init__(methodName=methodName) |
|
|||
431 |
|
||||
432 | def setUp(self): |
|
|||
433 | self.u1 = UserModel().create_or_update(username=u'u1', |
|
|||
434 | password=u'qweqwe', |
|
|||
435 | email=u'u1@rhodecode.org', |
|
|||
436 | firstname=u'u1', lastname=u'u1') |
|
|||
437 |
|
||||
438 | def tearDown(self): |
|
|||
439 | perm = Permission.query().all() |
|
|||
440 | for p in perm: |
|
|||
441 | UserModel().revoke_perm(self.u1, p) |
|
|||
442 |
|
||||
443 | UserModel().delete(self.u1) |
|
|||
444 | Session.commit() |
|
|||
445 |
|
||||
446 | def test_add_perm(self): |
|
|||
447 | perm = Permission.query().all()[0] |
|
|||
448 | UserModel().grant_perm(self.u1, perm) |
|
|||
449 | Session.commit() |
|
|||
450 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) |
|
|||
451 |
|
||||
452 | def test_has_perm(self): |
|
|||
453 | perm = Permission.query().all() |
|
|||
454 | for p in perm: |
|
|||
455 | has_p = UserModel().has_perm(self.u1, p) |
|
|||
456 | self.assertEqual(False, has_p) |
|
|||
457 |
|
||||
458 | def test_revoke_perm(self): |
|
|||
459 | perm = Permission.query().all()[0] |
|
|||
460 | UserModel().grant_perm(self.u1, perm) |
|
|||
461 | Session.commit() |
|
|||
462 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) |
|
|||
463 |
|
||||
464 | #revoke |
|
|||
465 | UserModel().revoke_perm(self.u1, perm) |
|
|||
466 | Session.commit() |
|
|||
467 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
|
|||
468 |
|
188 | |||
469 |
|
189 | |||
470 | class TestPermissions(unittest.TestCase): |
|
|||
471 | def __init__(self, methodName='runTest'): |
|
|||
472 | super(TestPermissions, self).__init__(methodName=methodName) |
|
|||
473 |
|
||||
474 | def setUp(self): |
|
|||
475 | self.u1 = UserModel().create_or_update( |
|
|||
476 | username=u'u1', password=u'qweqwe', |
|
|||
477 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' |
|
|||
478 | ) |
|
|||
479 | self.u2 = UserModel().create_or_update( |
|
|||
480 | username=u'u2', password=u'qweqwe', |
|
|||
481 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' |
|
|||
482 | ) |
|
|||
483 | self.anon = User.get_by_username('default') |
|
|||
484 | self.a1 = UserModel().create_or_update( |
|
|||
485 | username=u'a1', password=u'qweqwe', |
|
|||
486 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True |
|
|||
487 | ) |
|
|||
488 | Session.commit() |
|
|||
489 |
|
||||
490 | def tearDown(self): |
|
|||
491 | if hasattr(self, 'test_repo'): |
|
|||
492 | RepoModel().delete(repo=self.test_repo) |
|
|||
493 | UserModel().delete(self.u1) |
|
|||
494 | UserModel().delete(self.u2) |
|
|||
495 | UserModel().delete(self.a1) |
|
|||
496 | if hasattr(self, 'g1'): |
|
|||
497 | ReposGroupModel().delete(self.g1.group_id) |
|
|||
498 | if hasattr(self, 'g2'): |
|
|||
499 | ReposGroupModel().delete(self.g2.group_id) |
|
|||
500 |
|
||||
501 | if hasattr(self, 'ug1'): |
|
|||
502 | UsersGroupModel().delete(self.ug1, force=True) |
|
|||
503 |
|
||||
504 | Session.commit() |
|
|||
505 |
|
||||
506 | def test_default_perms_set(self): |
|
|||
507 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
508 | perms = { |
|
|||
509 | 'repositories_groups': {}, |
|
|||
510 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
511 | u'hg.register.manual_activate']), |
|
|||
512 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
513 | } |
|
|||
514 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
515 | perms['repositories'][HG_REPO]) |
|
|||
516 | new_perm = 'repository.write' |
|
|||
517 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) |
|
|||
518 | Session.commit() |
|
|||
519 |
|
||||
520 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
521 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm) |
|
|||
522 |
|
||||
523 | def test_default_admin_perms_set(self): |
|
|||
524 | a1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
525 | perms = { |
|
|||
526 | 'repositories_groups': {}, |
|
|||
527 | 'global': set([u'hg.admin']), |
|
|||
528 | 'repositories': {u'vcs_test_hg': u'repository.admin'} |
|
|||
529 | } |
|
|||
530 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], |
|
|||
531 | perms['repositories'][HG_REPO]) |
|
|||
532 | new_perm = 'repository.write' |
|
|||
533 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm) |
|
|||
534 | Session.commit() |
|
|||
535 | # cannot really downgrade admins permissions !? they still get's set as |
|
|||
536 | # admin ! |
|
|||
537 | u1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
538 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
539 | perms['repositories'][HG_REPO]) |
|
|||
540 |
|
||||
541 | def test_default_group_perms(self): |
|
|||
542 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
543 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
544 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
545 | perms = { |
|
|||
546 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, |
|
|||
547 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), |
|
|||
548 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
549 | } |
|
|||
550 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
551 | perms['repositories'][HG_REPO]) |
|
|||
552 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
553 | perms['repositories_groups']) |
|
|||
554 |
|
||||
555 | def test_default_admin_group_perms(self): |
|
|||
556 | self.g1 = _make_group('test1', skip_if_exists=True) |
|
|||
557 | self.g2 = _make_group('test2', skip_if_exists=True) |
|
|||
558 | a1_auth = AuthUser(user_id=self.a1.user_id) |
|
|||
559 | perms = { |
|
|||
560 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, |
|
|||
561 | 'global': set(['hg.admin']), |
|
|||
562 | 'repositories': {u'vcs_test_hg': 'repository.admin'} |
|
|||
563 | } |
|
|||
564 |
|
||||
565 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], |
|
|||
566 | perms['repositories'][HG_REPO]) |
|
|||
567 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
568 | perms['repositories_groups']) |
|
|||
569 |
|
||||
570 | def test_propagated_permission_from_users_group(self): |
|
|||
571 | # make group |
|
|||
572 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
573 | # add user to group |
|
|||
574 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
575 |
|
||||
576 | # set permission to lower |
|
|||
577 | new_perm = 'repository.none' |
|
|||
578 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) |
|
|||
579 | Session.commit() |
|
|||
580 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
581 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
582 | new_perm) |
|
|||
583 |
|
||||
584 | # grant perm for group this should override permission from user |
|
|||
585 | new_perm = 'repository.write' |
|
|||
586 | RepoModel().grant_users_group_permission(repo=HG_REPO, |
|
|||
587 | group_name=self.ug1, |
|
|||
588 | perm=new_perm) |
|
|||
589 | # check perms |
|
|||
590 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
591 | perms = { |
|
|||
592 | 'repositories_groups': {}, |
|
|||
593 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
594 | u'hg.register.manual_activate']), |
|
|||
595 | 'repositories': {u'vcs_test_hg': u'repository.read'} |
|
|||
596 | } |
|
|||
597 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
598 | new_perm) |
|
|||
599 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
600 | perms['repositories_groups']) |
|
|||
601 |
|
||||
602 | def test_propagated_permission_from_users_group_lower_weight(self): |
|
|||
603 | # make group |
|
|||
604 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
605 | # add user to group |
|
|||
606 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
607 |
|
||||
608 | # set permission to lower |
|
|||
609 | new_perm_h = 'repository.write' |
|
|||
610 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, |
|
|||
611 | perm=new_perm_h) |
|
|||
612 | Session.commit() |
|
|||
613 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
614 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
615 | new_perm_h) |
|
|||
616 |
|
||||
617 | # grant perm for group this should NOT override permission from user |
|
|||
618 | # since it's lower than granted |
|
|||
619 | new_perm_l = 'repository.read' |
|
|||
620 | RepoModel().grant_users_group_permission(repo=HG_REPO, |
|
|||
621 | group_name=self.ug1, |
|
|||
622 | perm=new_perm_l) |
|
|||
623 | # check perms |
|
|||
624 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
625 | perms = { |
|
|||
626 | 'repositories_groups': {}, |
|
|||
627 | 'global': set([u'hg.create.repository', u'repository.read', |
|
|||
628 | u'hg.register.manual_activate']), |
|
|||
629 | 'repositories': {u'vcs_test_hg': u'repository.write'} |
|
|||
630 | } |
|
|||
631 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], |
|
|||
632 | new_perm_h) |
|
|||
633 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
634 | perms['repositories_groups']) |
|
|||
635 |
|
||||
636 | def test_repo_in_group_permissions(self): |
|
|||
637 | self.g1 = _make_group('group1', skip_if_exists=True) |
|
|||
638 | self.g2 = _make_group('group2', skip_if_exists=True) |
|
|||
639 | Session.commit() |
|
|||
640 | # both perms should be read ! |
|
|||
641 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
642 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
643 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
644 |
|
||||
645 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
646 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
647 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
648 |
|
||||
649 | #Change perms to none for both groups |
|
|||
650 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
651 | user=self.anon, |
|
|||
652 | perm='group.none') |
|
|||
653 | ReposGroupModel().grant_user_permission(repos_group=self.g2, |
|
|||
654 | user=self.anon, |
|
|||
655 | perm='group.none') |
|
|||
656 |
|
||||
657 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
658 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
659 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
660 |
|
||||
661 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
662 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
663 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
664 |
|
||||
665 | # add repo to group |
|
|||
666 | form_data = { |
|
|||
667 | 'repo_name': HG_REPO, |
|
|||
668 | 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]), |
|
|||
669 | 'repo_type': 'hg', |
|
|||
670 | 'clone_uri': '', |
|
|||
671 | 'repo_group': self.g1.group_id, |
|
|||
672 | 'description': 'desc', |
|
|||
673 | 'private': False, |
|
|||
674 | 'landing_rev': 'tip' |
|
|||
675 | } |
|
|||
676 | self.test_repo = RepoModel().create(form_data, cur_user=self.u1) |
|
|||
677 | Session.commit() |
|
|||
678 |
|
||||
679 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
680 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
681 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
682 |
|
||||
683 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
684 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
685 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
686 |
|
||||
687 | #grant permission for u2 ! |
|
|||
688 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
689 | user=self.u2, |
|
|||
690 | perm='group.read') |
|
|||
691 | ReposGroupModel().grant_user_permission(repos_group=self.g2, |
|
|||
692 | user=self.u2, |
|
|||
693 | perm='group.read') |
|
|||
694 | Session.commit() |
|
|||
695 | self.assertNotEqual(self.u1, self.u2) |
|
|||
696 | #u1 and anon should have not change perms while u2 should ! |
|
|||
697 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
698 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
699 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
700 |
|
||||
701 | u2_auth = AuthUser(user_id=self.u2.user_id) |
|
|||
702 | self.assertEqual(u2_auth.permissions['repositories_groups'], |
|
|||
703 | {u'group1': u'group.read', u'group2': u'group.read'}) |
|
|||
704 |
|
||||
705 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
706 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
707 | {u'group1': u'group.none', u'group2': u'group.none'}) |
|
|||
708 |
|
||||
709 | def test_repo_group_user_as_user_group_member(self): |
|
|||
710 | # create Group1 |
|
|||
711 | self.g1 = _make_group('group1', skip_if_exists=True) |
|
|||
712 | Session.commit() |
|
|||
713 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
714 |
|
||||
715 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
716 | {u'group1': u'group.read'}) |
|
|||
717 |
|
||||
718 | # set default permission to none |
|
|||
719 | ReposGroupModel().grant_user_permission(repos_group=self.g1, |
|
|||
720 | user=self.anon, |
|
|||
721 | perm='group.none') |
|
|||
722 | # make group |
|
|||
723 | self.ug1 = UsersGroupModel().create('G1') |
|
|||
724 | # add user to group |
|
|||
725 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) |
|
|||
726 | Session.commit() |
|
|||
727 |
|
||||
728 | # check if user is in the group |
|
|||
729 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] |
|
|||
730 | self.assertEqual(membrs, [self.u1.user_id]) |
|
|||
731 | # add some user to that group |
|
|||
732 |
|
||||
733 | # check his permissions |
|
|||
734 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
735 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
736 | {u'group1': u'group.none'}) |
|
|||
737 |
|
||||
738 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
739 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
740 | {u'group1': u'group.none'}) |
|
|||
741 |
|
||||
742 | # grant ug1 read permissions for |
|
|||
743 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, |
|
|||
744 | group_name=self.ug1, |
|
|||
745 | perm='group.read') |
|
|||
746 | Session.commit() |
|
|||
747 | # check if the |
|
|||
748 | obj = Session.query(UsersGroupRepoGroupToPerm)\ |
|
|||
749 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ |
|
|||
750 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ |
|
|||
751 | .scalar() |
|
|||
752 | self.assertEqual(obj.permission.permission_name, 'group.read') |
|
|||
753 |
|
||||
754 | a1_auth = AuthUser(user_id=self.anon.user_id) |
|
|||
755 |
|
||||
756 | self.assertEqual(a1_auth.permissions['repositories_groups'], |
|
|||
757 | {u'group1': u'group.none'}) |
|
|||
758 |
|
||||
759 | u1_auth = AuthUser(user_id=self.u1.user_id) |
|
|||
760 | self.assertEqual(u1_auth.permissions['repositories_groups'], |
|
|||
761 | {u'group1': u'group.read'}) |
|
1 | NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
|
NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
@@ -1,211 +1,213 | |||||
1 | # -*- coding: utf-8 -*- |
|
1 | # -*- coding: utf-8 -*- | |
2 | """ |
|
2 | """ | |
3 | rhodecode.tests.test_hg_operations |
|
3 | rhodecode.tests.test_hg_operations | |
4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
5 |
|
5 | |||
6 | Test suite for making push/pull operations |
|
6 | Test suite for making push/pull operations | |
7 |
|
7 | |||
8 | :created_on: Dec 30, 2010 |
|
8 | :created_on: Dec 30, 2010 | |
9 | :author: marcink |
|
9 | :author: marcink | |
10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> |
|
10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | |
11 | :license: GPLv3, see COPYING for more details. |
|
11 | :license: GPLv3, see COPYING for more details. | |
12 | """ |
|
12 | """ | |
13 | # This program is free software: you can redistribute it and/or modify |
|
13 | # This program is free software: you can redistribute it and/or modify | |
14 | # it under the terms of the GNU General Public License as published by |
|
14 | # it under the terms of the GNU General Public License as published by | |
15 | # the Free Software Foundation, either version 3 of the License, or |
|
15 | # the Free Software Foundation, either version 3 of the License, or | |
16 | # (at your option) any later version. |
|
16 | # (at your option) any later version. | |
17 | # |
|
17 | # | |
18 | # This program is distributed in the hope that it will be useful, |
|
18 | # This program is distributed in the hope that it will be useful, | |
19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
21 | # GNU General Public License for more details. |
|
21 | # GNU General Public License for more details. | |
22 | # |
|
22 | # | |
23 | # You should have received a copy of the GNU General Public License |
|
23 | # You should have received a copy of the GNU General Public License | |
24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
25 |
|
25 | |||
26 | import os |
|
26 | import os | |
27 | import sys |
|
27 | import sys | |
28 | import shutil |
|
28 | import shutil | |
29 | import logging |
|
29 | import logging | |
30 | from os.path import join as jn |
|
30 | from os.path import join as jn | |
31 | from os.path import dirname as dn |
|
31 | from os.path import dirname as dn | |
32 |
|
32 | |||
33 | from tempfile import _RandomNameSequence |
|
33 | from tempfile import _RandomNameSequence | |
34 | from subprocess import Popen, PIPE |
|
34 | from subprocess import Popen, PIPE | |
35 |
|
35 | |||
36 | from paste.deploy import appconfig |
|
36 | from paste.deploy import appconfig | |
37 | from pylons import config |
|
37 | from pylons import config | |
38 | from sqlalchemy import engine_from_config |
|
38 | from sqlalchemy import engine_from_config | |
39 |
|
39 | |||
40 | from rhodecode.lib.utils import add_cache |
|
40 | from rhodecode.lib.utils import add_cache | |
41 | from rhodecode.model import init_model |
|
41 | from rhodecode.model import init_model | |
42 | from rhodecode.model import meta |
|
42 | from rhodecode.model import meta | |
43 | from rhodecode.model.db import User, Repository |
|
43 | from rhodecode.model.db import User, Repository | |
44 | from rhodecode.lib.auth import get_crypt_password |
|
44 | from rhodecode.lib.auth import get_crypt_password | |
45 |
|
45 | |||
46 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
|
46 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | |
47 | from rhodecode.config.environment import load_environment |
|
47 | from rhodecode.config.environment import load_environment | |
48 |
|
48 | |||
49 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) |
|
49 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) | |
50 | conf = appconfig('config:development.ini', relative_to=rel_path) |
|
50 | conf = appconfig('config:development.ini', relative_to=rel_path) | |
51 | load_environment(conf.global_conf, conf.local_conf) |
|
51 | load_environment(conf.global_conf, conf.local_conf) | |
52 |
|
52 | |||
53 | add_cache(conf) |
|
53 | add_cache(conf) | |
54 |
|
54 | |||
55 | USER = 'test_admin' |
|
55 | USER = 'test_admin' | |
56 | PASS = 'test12' |
|
56 | PASS = 'test12' | |
57 | HOST = 'hg.local' |
|
57 | HOST = 'hg.local' | |
58 | METHOD = 'pull' |
|
58 | METHOD = 'pull' | |
59 | DEBUG = True |
|
59 | DEBUG = True | |
60 | log = logging.getLogger(__name__) |
|
60 | log = logging.getLogger(__name__) | |
61 |
|
61 | |||
62 |
|
62 | |||
63 | class Command(object): |
|
63 | class Command(object): | |
64 |
|
64 | |||
65 | def __init__(self, cwd): |
|
65 | def __init__(self, cwd): | |
66 | self.cwd = cwd |
|
66 | self.cwd = cwd | |
67 |
|
67 | |||
68 | def execute(self, cmd, *args): |
|
68 | def execute(self, cmd, *args): | |
69 | """Runs command on the system with given ``args``. |
|
69 | """Runs command on the system with given ``args``. | |
70 | """ |
|
70 | """ | |
71 |
|
71 | |||
72 | command = cmd + ' ' + ' '.join(args) |
|
72 | command = cmd + ' ' + ' '.join(args) | |
73 | log.debug('Executing %s' % command) |
|
73 | log.debug('Executing %s' % command) | |
74 | if DEBUG: |
|
74 | if DEBUG: | |
75 | print command |
|
75 | print command | |
76 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) |
|
76 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | |
77 | stdout, stderr = p.communicate() |
|
77 | stdout, stderr = p.communicate() | |
78 | if DEBUG: |
|
78 | if DEBUG: | |
79 | print stdout, stderr |
|
79 | print stdout, stderr | |
80 | return stdout, stderr |
|
80 | return stdout, stderr | |
81 |
|
81 | |||
|
82 | ||||
82 | def get_session(): |
|
83 | def get_session(): | |
83 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
84 | engine = engine_from_config(conf, 'sqlalchemy.db1.') | |
84 | init_model(engine) |
|
85 | init_model(engine) | |
85 | sa = meta.Session |
|
86 | sa = meta.Session | |
86 | return sa |
|
87 | return sa | |
87 |
|
88 | |||
88 |
|
89 | |||
89 | def create_test_user(force=True): |
|
90 | def create_test_user(force=True): | |
90 | print 'creating test user' |
|
91 | print 'creating test user' | |
91 | sa = get_session() |
|
92 | sa = get_session() | |
92 |
|
93 | |||
93 | user = sa.query(User).filter(User.username == USER).scalar() |
|
94 | user = sa.query(User).filter(User.username == USER).scalar() | |
94 |
|
95 | |||
95 | if force and user is not None: |
|
96 | if force and user is not None: | |
96 | print 'removing current user' |
|
97 | print 'removing current user' | |
97 | for repo in sa.query(Repository).filter(Repository.user == user).all(): |
|
98 | for repo in sa.query(Repository).filter(Repository.user == user).all(): | |
98 | sa.delete(repo) |
|
99 | sa.delete(repo) | |
99 | sa.delete(user) |
|
100 | sa.delete(user) | |
100 | sa.commit() |
|
101 | sa.commit() | |
101 |
|
102 | |||
102 | if user is None or force: |
|
103 | if user is None or force: | |
103 | print 'creating new one' |
|
104 | print 'creating new one' | |
104 | new_usr = User() |
|
105 | new_usr = User() | |
105 | new_usr.username = USER |
|
106 | new_usr.username = USER | |
106 | new_usr.password = get_crypt_password(PASS) |
|
107 | new_usr.password = get_crypt_password(PASS) | |
107 | new_usr.email = 'mail@mail.com' |
|
108 | new_usr.email = 'mail@mail.com' | |
108 | new_usr.name = 'test' |
|
109 | new_usr.name = 'test' | |
109 | new_usr.lastname = 'lasttestname' |
|
110 | new_usr.lastname = 'lasttestname' | |
110 | new_usr.active = True |
|
111 | new_usr.active = True | |
111 | new_usr.admin = True |
|
112 | new_usr.admin = True | |
112 | sa.add(new_usr) |
|
113 | sa.add(new_usr) | |
113 | sa.commit() |
|
114 | sa.commit() | |
114 |
|
115 | |||
115 | print 'done' |
|
116 | print 'done' | |
116 |
|
117 | |||
117 |
|
118 | |||
118 | def create_test_repo(force=True): |
|
119 | def create_test_repo(force=True): | |
119 | print 'creating test repo' |
|
120 | print 'creating test repo' | |
120 | from rhodecode.model.repo import RepoModel |
|
121 | from rhodecode.model.repo import RepoModel | |
121 | sa = get_session() |
|
122 | sa = get_session() | |
122 |
|
123 | |||
123 | user = sa.query(User).filter(User.username == USER).scalar() |
|
124 | user = sa.query(User).filter(User.username == USER).scalar() | |
124 | if user is None: |
|
125 | if user is None: | |
125 | raise Exception('user not found') |
|
126 | raise Exception('user not found') | |
126 |
|
127 | |||
127 |
|
||||
128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | |
129 |
|
129 | |||
130 | if repo is None: |
|
130 | if repo is None: | |
131 | print 'repo not found creating' |
|
131 | print 'repo not found creating' | |
132 |
|
132 | |||
133 | form_data = {'repo_name':HG_REPO, |
|
133 | form_data = {'repo_name':HG_REPO, | |
134 | 'repo_type':'hg', |
|
134 | 'repo_type':'hg', | |
135 | 'private':False, |
|
135 | 'private':False, | |
136 | 'clone_uri':'' } |
|
136 | 'clone_uri':'' } | |
137 | rm = RepoModel(sa) |
|
137 | rm = RepoModel(sa) | |
138 | rm.base_path = '/home/hg' |
|
138 | rm.base_path = '/home/hg' | |
139 | rm.create(form_data, user) |
|
139 | rm.create(form_data, user) | |
140 |
|
140 | |||
141 | print 'done' |
|
141 | print 'done' | |
142 |
|
142 | |||
|
143 | ||||
143 | def set_anonymous_access(enable=True): |
|
144 | def set_anonymous_access(enable=True): | |
144 | sa = get_session() |
|
145 | sa = get_session() | |
145 | user = sa.query(User).filter(User.username == 'default').one() |
|
146 | user = sa.query(User).filter(User.username == 'default').one() | |
146 | user.active = enable |
|
147 | user.active = enable | |
147 | sa.add(user) |
|
148 | sa.add(user) | |
148 | sa.commit() |
|
149 | sa.commit() | |
149 |
|
150 | |||
|
151 | ||||
150 | def get_anonymous_access(): |
|
152 | def get_anonymous_access(): | |
151 | sa = get_session() |
|
153 | sa = get_session() | |
152 | return sa.query(User).filter(User.username == 'default').one().active |
|
154 | return sa.query(User).filter(User.username == 'default').one().active | |
153 |
|
155 | |||
154 |
|
156 | |||
155 | #============================================================================== |
|
157 | #============================================================================== | |
156 | # TESTS |
|
158 | # TESTS | |
157 | #============================================================================== |
|
159 | #============================================================================== | |
158 | def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, |
|
160 | def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, | |
159 | seq=None): |
|
161 | seq=None): | |
160 | cwd = path = jn(TESTS_TMP_PATH, repo) |
|
162 | cwd = path = jn(TESTS_TMP_PATH, repo) | |
161 |
|
163 | |||
162 | if seq == None: |
|
164 | if seq == None: | |
163 | seq = _RandomNameSequence().next() |
|
165 | seq = _RandomNameSequence().next() | |
164 |
|
166 | |||
165 | try: |
|
167 | try: | |
166 | shutil.rmtree(path, ignore_errors=True) |
|
168 | shutil.rmtree(path, ignore_errors=True) | |
167 | os.makedirs(path) |
|
169 | os.makedirs(path) | |
168 | #print 'made dirs %s' % jn(path) |
|
170 | #print 'made dirs %s' % jn(path) | |
169 | except OSError: |
|
171 | except OSError: | |
170 | raise |
|
172 | raise | |
171 |
|
173 | |||
172 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
174 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |
173 | {'user':USER, |
|
175 | {'user':USER, | |
174 | 'pass':PASS, |
|
176 | 'pass':PASS, | |
175 | 'host':HOST, |
|
177 | 'host':HOST, | |
176 | 'cloned_repo':repo, } |
|
178 | 'cloned_repo':repo, } | |
177 |
|
179 | |||
178 | dest = path + seq |
|
180 | dest = path + seq | |
179 | if method == 'pull': |
|
181 | if method == 'pull': | |
180 | stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url) |
|
182 | stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url) | |
181 | else: |
|
183 | else: | |
182 | stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest) |
|
184 | stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest) | |
183 |
|
185 | |||
184 | if no_errors is False: |
|
186 | if no_errors is False: | |
185 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
187 | assert """adding file changes""" in stdout, 'no messages about cloning' | |
186 | assert """abort""" not in stderr , 'got error from clone' |
|
188 | assert """abort""" not in stderr , 'got error from clone' | |
187 |
|
189 | |||
188 | if __name__ == '__main__': |
|
190 | if __name__ == '__main__': | |
189 | try: |
|
191 | try: | |
190 | create_test_user(force=False) |
|
192 | create_test_user(force=False) | |
191 | seq = None |
|
193 | seq = None | |
192 | import time |
|
194 | import time | |
193 |
|
195 | |||
194 | try: |
|
196 | try: | |
195 | METHOD = sys.argv[3] |
|
197 | METHOD = sys.argv[3] | |
196 | except: |
|
198 | except: | |
197 | pass |
|
199 | pass | |
198 |
|
200 | |||
199 | if METHOD == 'pull': |
|
201 | if METHOD == 'pull': | |
200 | seq = _RandomNameSequence().next() |
|
202 | seq = _RandomNameSequence().next() | |
201 | test_clone_with_credentials(repo=sys.argv[1], method='clone', |
|
203 | test_clone_with_credentials(repo=sys.argv[1], method='clone', | |
202 | seq=seq) |
|
204 | seq=seq) | |
203 | s = time.time() |
|
205 | s = time.time() | |
204 | for i in range(1, int(sys.argv[2]) + 1): |
|
206 | for i in range(1, int(sys.argv[2]) + 1): | |
205 | print 'take', i |
|
207 | print 'take', i | |
206 | test_clone_with_credentials(repo=sys.argv[1], method=METHOD, |
|
208 | test_clone_with_credentials(repo=sys.argv[1], method=METHOD, | |
207 | seq=seq) |
|
209 | seq=seq) | |
208 | print 'time taken %.3f' % (time.time() - s) |
|
210 | print 'time taken %.3f' % (time.time() - s) | |
209 | except Exception, e: |
|
211 | except Exception, e: | |
210 | raise |
|
212 | raise | |
211 | sys.exit('stop on %s' % e) |
|
213 | sys.exit('stop on %s' % e) |
1 | NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
|
NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
@@ -1,401 +1,410 | |||||
1 | # -*- coding: utf-8 -*- |
|
1 | # -*- coding: utf-8 -*- | |
2 | """ |
|
2 | """ | |
3 | rhodecode.tests.test_hg_operations |
|
3 | rhodecode.tests.test_hg_operations | |
4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
5 |
|
5 | |||
6 | Test suite for making push/pull operations |
|
6 | Test suite for making push/pull operations | |
7 |
|
7 | |||
8 | :created_on: Dec 30, 2010 |
|
8 | :created_on: Dec 30, 2010 | |
9 | :author: marcink |
|
9 | :author: marcink | |
10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> |
|
10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | |
11 | :license: GPLv3, see COPYING for more details. |
|
11 | :license: GPLv3, see COPYING for more details. | |
12 | """ |
|
12 | """ | |
13 | # This program is free software: you can redistribute it and/or modify |
|
13 | # This program is free software: you can redistribute it and/or modify | |
14 | # it under the terms of the GNU General Public License as published by |
|
14 | # it under the terms of the GNU General Public License as published by | |
15 | # the Free Software Foundation, either version 3 of the License, or |
|
15 | # the Free Software Foundation, either version 3 of the License, or | |
16 | # (at your option) any later version. |
|
16 | # (at your option) any later version. | |
17 | # |
|
17 | # | |
18 | # This program is distributed in the hope that it will be useful, |
|
18 | # This program is distributed in the hope that it will be useful, | |
19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
21 | # GNU General Public License for more details. |
|
21 | # GNU General Public License for more details. | |
22 | # |
|
22 | # | |
23 | # You should have received a copy of the GNU General Public License |
|
23 | # You should have received a copy of the GNU General Public License | |
24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
25 |
|
25 | |||
26 | import os |
|
26 | import os | |
27 | import time |
|
27 | import time | |
28 | import sys |
|
28 | import sys | |
29 | import shutil |
|
29 | import shutil | |
30 | import logging |
|
30 | import logging | |
31 |
|
31 | |||
32 | from os.path import join as jn |
|
32 | from os.path import join as jn | |
33 | from os.path import dirname as dn |
|
33 | from os.path import dirname as dn | |
34 |
|
34 | |||
35 | from tempfile import _RandomNameSequence |
|
35 | from tempfile import _RandomNameSequence | |
36 | from subprocess import Popen, PIPE |
|
36 | from subprocess import Popen, PIPE | |
37 |
|
37 | |||
38 | from paste.deploy import appconfig |
|
38 | from paste.deploy import appconfig | |
39 | from pylons import config |
|
39 | from pylons import config | |
40 | from sqlalchemy import engine_from_config |
|
40 | from sqlalchemy import engine_from_config | |
41 |
|
41 | |||
42 | from rhodecode.lib.utils import add_cache |
|
42 | from rhodecode.lib.utils import add_cache | |
43 | from rhodecode.model import init_model |
|
43 | from rhodecode.model import init_model | |
44 | from rhodecode.model import meta |
|
44 | from rhodecode.model import meta | |
45 | from rhodecode.model.db import User, Repository, UserLog |
|
45 | from rhodecode.model.db import User, Repository, UserLog | |
46 | from rhodecode.lib.auth import get_crypt_password |
|
46 | from rhodecode.lib.auth import get_crypt_password | |
47 |
|
47 | |||
48 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
|
48 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO | |
49 | from rhodecode.config.environment import load_environment |
|
49 | from rhodecode.config.environment import load_environment | |
50 |
|
50 | |||
51 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) |
|
51 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) | |
52 |
|
52 | |||
53 | conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path) |
|
53 | conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path) | |
54 | load_environment(conf.global_conf, conf.local_conf) |
|
54 | load_environment(conf.global_conf, conf.local_conf) | |
55 |
|
55 | |||
56 | add_cache(conf) |
|
56 | add_cache(conf) | |
57 |
|
57 | |||
58 | USER = 'test_admin' |
|
58 | USER = 'test_admin' | |
59 | PASS = 'test12' |
|
59 | PASS = 'test12' | |
60 | HOST = '127.0.0.1:5000' |
|
60 | HOST = '127.0.0.1:5000' | |
61 | DEBUG = False |
|
61 | DEBUG = False | |
62 | print 'DEBUG:', DEBUG |
|
62 | print 'DEBUG:', DEBUG | |
63 | log = logging.getLogger(__name__) |
|
63 | log = logging.getLogger(__name__) | |
64 |
|
64 | |||
65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') | |
66 | init_model(engine) |
|
66 | init_model(engine) | |
67 | sa = meta.Session |
|
67 | sa = meta.Session() | |
|
68 | ||||
68 |
|
69 | |||
69 | class Command(object): |
|
70 | class Command(object): | |
70 |
|
71 | |||
71 | def __init__(self, cwd): |
|
72 | def __init__(self, cwd): | |
72 | self.cwd = cwd |
|
73 | self.cwd = cwd | |
73 |
|
74 | |||
74 | def execute(self, cmd, *args): |
|
75 | def execute(self, cmd, *args): | |
75 | """Runs command on the system with given ``args``. |
|
76 | """Runs command on the system with given ``args``. | |
76 | """ |
|
77 | """ | |
77 |
|
78 | |||
78 | command = cmd + ' ' + ' '.join(args) |
|
79 | command = cmd + ' ' + ' '.join(args) | |
79 | log.debug('Executing %s' % command) |
|
80 | log.debug('Executing %s' % command) | |
80 | if DEBUG: |
|
81 | if DEBUG: | |
81 | print command |
|
82 | print command | |
82 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) |
|
83 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) | |
83 | stdout, stderr = p.communicate() |
|
84 | stdout, stderr = p.communicate() | |
84 | if DEBUG: |
|
85 | if DEBUG: | |
85 | print stdout, stderr |
|
86 | print stdout, stderr | |
86 | return stdout, stderr |
|
87 | return stdout, stderr | |
87 |
|
88 | |||
88 |
|
89 | |||
89 | def test_wrapp(func): |
|
90 | def test_wrapp(func): | |
90 |
|
91 | |||
91 | def __wrapp(*args, **kwargs): |
|
92 | def __wrapp(*args, **kwargs): | |
92 | print '>>>%s' % func.__name__ |
|
93 | print '>>>%s' % func.__name__ | |
93 | try: |
|
94 | try: | |
94 | res = func(*args, **kwargs) |
|
95 | res = func(*args, **kwargs) | |
95 | except Exception, e: |
|
96 | except Exception, e: | |
96 | print ('###############\n-' |
|
97 | print ('###############\n-' | |
97 | '--%s failed %s--\n' |
|
98 | '--%s failed %s--\n' | |
98 | '###############\n' % (func.__name__, e)) |
|
99 | '###############\n' % (func.__name__, e)) | |
99 | sys.exit() |
|
100 | sys.exit() | |
100 | print '++OK++' |
|
101 | print '++OK++' | |
101 | return res |
|
102 | return res | |
102 | return __wrapp |
|
103 | return __wrapp | |
103 |
|
104 | |||
104 |
|
105 | |||
105 | def create_test_user(force=True): |
|
106 | def create_test_user(force=True): | |
106 | print '\tcreating test user' |
|
107 | print '\tcreating test user' | |
107 |
|
108 | |||
108 | user = User.get_by_username(USER) |
|
109 | user = User.get_by_username(USER) | |
109 |
|
110 | |||
110 | if force and user is not None: |
|
111 | if force and user is not None: | |
111 | print '\tremoving current user' |
|
112 | print '\tremoving current user' | |
112 | for repo in Repository.query().filter(Repository.user == user).all(): |
|
113 | for repo in Repository.query().filter(Repository.user == user).all(): | |
113 | sa.delete(repo) |
|
114 | sa.delete(repo) | |
114 | sa.delete(user) |
|
115 | sa.delete(user) | |
115 | sa.commit() |
|
116 | sa.commit() | |
116 |
|
117 | |||
117 | if user is None or force: |
|
118 | if user is None or force: | |
118 | print '\tcreating new one' |
|
119 | print '\tcreating new one' | |
119 | new_usr = User() |
|
120 | new_usr = User() | |
120 | new_usr.username = USER |
|
121 | new_usr.username = USER | |
121 | new_usr.password = get_crypt_password(PASS) |
|
122 | new_usr.password = get_crypt_password(PASS) | |
122 | new_usr.email = 'mail@mail.com' |
|
123 | new_usr.email = 'mail@mail.com' | |
123 | new_usr.name = 'test' |
|
124 | new_usr.name = 'test' | |
124 | new_usr.lastname = 'lasttestname' |
|
125 | new_usr.lastname = 'lasttestname' | |
125 | new_usr.active = True |
|
126 | new_usr.active = True | |
126 | new_usr.admin = True |
|
127 | new_usr.admin = True | |
127 | sa.add(new_usr) |
|
128 | sa.add(new_usr) | |
128 | sa.commit() |
|
129 | sa.commit() | |
129 |
|
130 | |||
130 | print '\tdone' |
|
131 | print '\tdone' | |
131 |
|
132 | |||
132 |
|
133 | |||
133 | def create_test_repo(force=True): |
|
134 | def create_test_repo(force=True): | |
134 | from rhodecode.model.repo import RepoModel |
|
135 | from rhodecode.model.repo import RepoModel | |
135 |
|
136 | |||
136 | user = User.get_by_username(USER) |
|
137 | user = User.get_by_username(USER) | |
137 | if user is None: |
|
138 | if user is None: | |
138 | raise Exception('user not found') |
|
139 | raise Exception('user not found') | |
139 |
|
140 | |||
140 |
|
||||
141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() | |
142 |
|
142 | |||
143 | if repo is None: |
|
143 | if repo is None: | |
144 | print '\trepo not found creating' |
|
144 | print '\trepo not found creating' | |
145 |
|
145 | |||
146 | form_data = {'repo_name':HG_REPO, |
|
146 | form_data = {'repo_name':HG_REPO, | |
147 | 'repo_type':'hg', |
|
147 | 'repo_type':'hg', | |
148 | 'private':False, |
|
148 | 'private':False, | |
149 | 'clone_uri':'' } |
|
149 | 'clone_uri':'' } | |
150 | rm = RepoModel(sa) |
|
150 | rm = RepoModel(sa) | |
151 | rm.base_path = '/home/hg' |
|
151 | rm.base_path = '/home/hg' | |
152 | rm.create(form_data, user) |
|
152 | rm.create(form_data, user) | |
153 |
|
153 | |||
154 |
|
154 | |||
155 | def set_anonymous_access(enable=True): |
|
155 | def set_anonymous_access(enable=True): | |
156 | user = User.get_by_username('default') |
|
156 | user = User.get_by_username('default') | |
157 | user.active = enable |
|
157 | user.active = enable | |
158 | sa.add(user) |
|
158 | sa.add(user) | |
159 | sa.commit() |
|
159 | sa.commit() | |
160 | print '\tanonymous access is now:', enable |
|
160 | print '\tanonymous access is now:', enable | |
161 | if enable != User.get_by_username('default').active: |
|
161 | if enable != User.get_by_username('default').active: | |
162 | raise Exception('Cannot set anonymous access') |
|
162 | raise Exception('Cannot set anonymous access') | |
163 |
|
163 | |||
|
164 | ||||
164 | def get_anonymous_access(): |
|
165 | def get_anonymous_access(): | |
165 | user = User.get_by_username('default') |
|
166 | user = User.get_by_username('default') | |
166 | return user.active |
|
167 | return user.active | |
167 |
|
168 | |||
168 |
|
169 | |||
169 | #============================================================================== |
|
170 | #============================================================================== | |
170 | # TESTS |
|
171 | # TESTS | |
171 | #============================================================================== |
|
172 | #============================================================================== | |
172 | @test_wrapp |
|
173 | @test_wrapp | |
173 | def test_clone_with_credentials(no_errors=False): |
|
174 | def test_clone_with_credentials(no_errors=False): | |
174 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
175 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
175 |
|
176 | |||
176 | try: |
|
177 | try: | |
177 | shutil.rmtree(path, ignore_errors=True) |
|
178 | shutil.rmtree(path, ignore_errors=True) | |
178 | os.makedirs(path) |
|
179 | os.makedirs(path) | |
179 | #print 'made dirs %s' % jn(path) |
|
180 | #print 'made dirs %s' % jn(path) | |
180 | except OSError: |
|
181 | except OSError: | |
181 | raise |
|
182 | raise | |
182 |
|
183 | |||
183 | print '\tchecking if anonymous access is enabled' |
|
184 | print '\tchecking if anonymous access is enabled' | |
184 | anonymous_access = get_anonymous_access() |
|
185 | anonymous_access = get_anonymous_access() | |
185 | if anonymous_access: |
|
186 | if anonymous_access: | |
186 | print '\tenabled, disabling it ' |
|
187 | print '\tenabled, disabling it ' | |
187 | set_anonymous_access(enable=False) |
|
188 | set_anonymous_access(enable=False) | |
188 |
|
189 | |||
189 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
190 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | |
190 | {'user':USER, |
|
191 | {'user':USER, | |
191 | 'pass':PASS, |
|
192 | 'pass':PASS, | |
192 | 'host':HOST, |
|
193 | 'host':HOST, | |
193 | 'cloned_repo':HG_REPO, |
|
194 | 'cloned_repo':HG_REPO, | |
194 | 'dest':path} |
|
195 | 'dest':path} | |
195 |
|
196 | |||
196 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
197 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |
197 |
|
198 | |||
198 | if no_errors is False: |
|
199 | if no_errors is False: | |
199 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
200 | assert """adding file changes""" in stdout, 'no messages about cloning' | |
200 | assert """abort""" not in stderr , 'got error from clone' |
|
201 | assert """abort""" not in stderr , 'got error from clone' | |
201 |
|
202 | |||
202 |
|
203 | |||
203 | @test_wrapp |
|
204 | @test_wrapp | |
204 | def test_clone_anonymous(): |
|
205 | def test_clone_anonymous(): | |
205 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
206 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
206 |
|
207 | |||
207 | try: |
|
208 | try: | |
208 | shutil.rmtree(path, ignore_errors=True) |
|
209 | shutil.rmtree(path, ignore_errors=True) | |
209 | os.makedirs(path) |
|
210 | os.makedirs(path) | |
210 | #print 'made dirs %s' % jn(path) |
|
211 | #print 'made dirs %s' % jn(path) | |
211 | except OSError: |
|
212 | except OSError: | |
212 | raise |
|
213 | raise | |
213 |
|
214 | |||
214 |
|
215 | |||
215 | print '\tchecking if anonymous access is enabled' |
|
216 | print '\tchecking if anonymous access is enabled' | |
216 | anonymous_access = get_anonymous_access() |
|
217 | anonymous_access = get_anonymous_access() | |
217 | if not anonymous_access: |
|
218 | if not anonymous_access: | |
218 | print '\tnot enabled, enabling it ' |
|
219 | print '\tnot enabled, enabling it ' | |
219 | set_anonymous_access(enable=True) |
|
220 | set_anonymous_access(enable=True) | |
220 |
|
221 | |||
221 | clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
222 | clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ | |
222 | {'user':USER, |
|
223 | {'user':USER, | |
223 | 'pass':PASS, |
|
224 | 'pass':PASS, | |
224 | 'host':HOST, |
|
225 | 'host':HOST, | |
225 | 'cloned_repo':HG_REPO, |
|
226 | 'cloned_repo':HG_REPO, | |
226 | 'dest':path} |
|
227 | 'dest':path} | |
227 |
|
228 | |||
228 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
229 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |
229 |
|
230 | |||
230 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
231 | assert """adding file changes""" in stdout, 'no messages about cloning' | |
231 | assert """abort""" not in stderr , 'got error from clone' |
|
232 | assert """abort""" not in stderr , 'got error from clone' | |
232 |
|
233 | |||
233 | #disable if it was enabled |
|
234 | #disable if it was enabled | |
234 | if not anonymous_access: |
|
235 | if not anonymous_access: | |
235 | print '\tdisabling anonymous access' |
|
236 | print '\tdisabling anonymous access' | |
236 | set_anonymous_access(enable=False) |
|
237 | set_anonymous_access(enable=False) | |
237 |
|
238 | |||
|
239 | ||||
238 | @test_wrapp |
|
240 | @test_wrapp | |
239 | def test_clone_wrong_credentials(): |
|
241 | def test_clone_wrong_credentials(): | |
240 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
242 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
241 |
|
243 | |||
242 | try: |
|
244 | try: | |
243 | shutil.rmtree(path, ignore_errors=True) |
|
245 | shutil.rmtree(path, ignore_errors=True) | |
244 | os.makedirs(path) |
|
246 | os.makedirs(path) | |
245 | #print 'made dirs %s' % jn(path) |
|
247 | #print 'made dirs %s' % jn(path) | |
246 | except OSError: |
|
248 | except OSError: | |
247 | raise |
|
249 | raise | |
248 |
|
250 | |||
249 | print '\tchecking if anonymous access is enabled' |
|
251 | print '\tchecking if anonymous access is enabled' | |
250 | anonymous_access = get_anonymous_access() |
|
252 | anonymous_access = get_anonymous_access() | |
251 | if anonymous_access: |
|
253 | if anonymous_access: | |
252 | print '\tenabled, disabling it ' |
|
254 | print '\tenabled, disabling it ' | |
253 | set_anonymous_access(enable=False) |
|
255 | set_anonymous_access(enable=False) | |
254 |
|
256 | |||
255 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
257 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ | |
256 | {'user':USER + 'error', |
|
258 | {'user':USER + 'error', | |
257 | 'pass':PASS, |
|
259 | 'pass':PASS, | |
258 | 'host':HOST, |
|
260 | 'host':HOST, | |
259 | 'cloned_repo':HG_REPO, |
|
261 | 'cloned_repo':HG_REPO, | |
260 | 'dest':path} |
|
262 | 'dest':path} | |
261 |
|
263 | |||
262 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
264 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) | |
263 |
|
265 | |||
264 | if not """abort: authorization failed""" in stderr: |
|
266 | if not """abort: authorization failed""" in stderr: | |
265 | raise Exception('Failure') |
|
267 | raise Exception('Failure') | |
266 |
|
268 | |||
|
269 | ||||
267 | @test_wrapp |
|
270 | @test_wrapp | |
268 | def test_pull(): |
|
271 | def test_pull(): | |
269 | pass |
|
272 | pass | |
270 |
|
273 | |||
|
274 | ||||
271 | @test_wrapp |
|
275 | @test_wrapp | |
272 | def test_push_modify_file(f_name='setup.py'): |
|
276 | def test_push_modify_file(f_name='setup.py'): | |
273 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
277 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
274 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) |
|
278 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) | |
275 | for i in xrange(5): |
|
279 | for i in xrange(5): | |
276 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) |
|
280 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | |
277 | Command(cwd).execute(cmd) |
|
281 | Command(cwd).execute(cmd) | |
278 |
|
282 | |||
279 | cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) |
|
283 | cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) | |
280 | Command(cwd).execute(cmd) |
|
284 | Command(cwd).execute(cmd) | |
281 |
|
285 | |||
282 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) |
|
286 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) | |
283 |
|
287 | |||
|
288 | ||||
284 | @test_wrapp |
|
289 | @test_wrapp | |
285 | def test_push_new_file(commits=15, with_clone=True): |
|
290 | def test_push_new_file(commits=15, with_clone=True): | |
286 |
|
291 | |||
287 | if with_clone: |
|
292 | if with_clone: | |
288 | test_clone_with_credentials(no_errors=True) |
|
293 | test_clone_with_credentials(no_errors=True) | |
289 |
|
294 | |||
290 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
295 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
291 | added_file = jn(path, '%ssetupΔ ΕΌΕΊΔ.py' % _RandomNameSequence().next()) |
|
296 | added_file = jn(path, '%ssetupΔ ΕΌΕΊΔ.py' % _RandomNameSequence().next()) | |
292 |
|
297 | |||
293 | Command(cwd).execute('touch %s' % added_file) |
|
298 | Command(cwd).execute('touch %s' % added_file) | |
294 |
|
299 | |||
295 | Command(cwd).execute('hg add %s' % added_file) |
|
300 | Command(cwd).execute('hg add %s' % added_file) | |
296 |
|
301 | |||
297 | for i in xrange(commits): |
|
302 | for i in xrange(commits): | |
298 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) |
|
303 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |
299 | Command(cwd).execute(cmd) |
|
304 | Command(cwd).execute(cmd) | |
300 |
|
305 | |||
301 | cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, |
|
306 | cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, | |
302 | 'Marcin KuΕΊminski <marcin@python-blog.com>', |
|
307 | 'Marcin KuΕΊminski <marcin@python-blog.com>', | |
303 | added_file) |
|
308 | added_file) | |
304 | Command(cwd).execute(cmd) |
|
309 | Command(cwd).execute(cmd) | |
305 |
|
310 | |||
306 | push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
311 | push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |
307 | {'user':USER, |
|
312 | {'user':USER, | |
308 | 'pass':PASS, |
|
313 | 'pass':PASS, | |
309 | 'host':HOST, |
|
314 | 'host':HOST, | |
310 | 'cloned_repo':HG_REPO, |
|
315 | 'cloned_repo':HG_REPO, | |
311 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
316 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |
312 |
|
317 | |||
313 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) |
|
318 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) | |
314 |
|
319 | |||
|
320 | ||||
315 | @test_wrapp |
|
321 | @test_wrapp | |
316 | def test_push_wrong_credentials(): |
|
322 | def test_push_wrong_credentials(): | |
317 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
323 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
318 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
324 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |
319 | {'user':USER + 'xxx', |
|
325 | {'user':USER + 'xxx', | |
320 | 'pass':PASS, |
|
326 | 'pass':PASS, | |
321 | 'host':HOST, |
|
327 | 'host':HOST, | |
322 | 'cloned_repo':HG_REPO, |
|
328 | 'cloned_repo':HG_REPO, | |
323 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
329 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |
324 |
|
330 | |||
325 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') |
|
331 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') | |
326 | for i in xrange(5): |
|
332 | for i in xrange(5): | |
327 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) |
|
333 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) | |
328 | Command(cwd).execute(cmd) |
|
334 | Command(cwd).execute(cmd) | |
329 |
|
335 | |||
330 | cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) |
|
336 | cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) | |
331 | Command(cwd).execute(cmd) |
|
337 | Command(cwd).execute(cmd) | |
332 |
|
338 | |||
333 | Command(cwd).execute('hg push %s' % clone_url) |
|
339 | Command(cwd).execute('hg push %s' % clone_url) | |
334 |
|
340 | |||
|
341 | ||||
335 | @test_wrapp |
|
342 | @test_wrapp | |
336 | def test_push_wrong_path(): |
|
343 | def test_push_wrong_path(): | |
337 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
344 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) | |
338 | added_file = jn(path, 'somefile.py') |
|
345 | added_file = jn(path, 'somefile.py') | |
339 |
|
346 | |||
340 | try: |
|
347 | try: | |
341 | shutil.rmtree(path, ignore_errors=True) |
|
348 | shutil.rmtree(path, ignore_errors=True) | |
342 | os.makedirs(path) |
|
349 | os.makedirs(path) | |
343 | print '\tmade dirs %s' % jn(path) |
|
350 | print '\tmade dirs %s' % jn(path) | |
344 | except OSError: |
|
351 | except OSError: | |
345 | raise |
|
352 | raise | |
346 |
|
353 | |||
347 | Command(cwd).execute("""echo '' > %s""" % added_file) |
|
354 | Command(cwd).execute("""echo '' > %s""" % added_file) | |
348 | Command(cwd).execute("""hg init %s""" % path) |
|
355 | Command(cwd).execute("""hg init %s""" % path) | |
349 | Command(cwd).execute("""hg add %s""" % added_file) |
|
356 | Command(cwd).execute("""hg add %s""" % added_file) | |
350 |
|
357 | |||
351 | for i in xrange(2): |
|
358 | for i in xrange(2): | |
352 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) |
|
359 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) | |
353 | Command(cwd).execute(cmd) |
|
360 | Command(cwd).execute(cmd) | |
354 |
|
361 | |||
355 | cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) |
|
362 | cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) | |
356 | Command(cwd).execute(cmd) |
|
363 | Command(cwd).execute(cmd) | |
357 |
|
364 | |||
358 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
365 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ | |
359 | {'user':USER, |
|
366 | {'user':USER, | |
360 | 'pass':PASS, |
|
367 | 'pass':PASS, | |
361 | 'host':HOST, |
|
368 | 'host':HOST, | |
362 | 'cloned_repo':HG_REPO + '_error', |
|
369 | 'cloned_repo':HG_REPO + '_error', | |
363 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
370 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} | |
364 |
|
371 | |||
365 | stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) |
|
372 | stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) | |
366 | if not """abort: HTTP Error 403: Forbidden""" in stderr: |
|
373 | if not """abort: HTTP Error 403: Forbidden""" in stderr: | |
367 | raise Exception('Failure') |
|
374 | raise Exception('Failure') | |
368 |
|
375 | |||
|
376 | ||||
369 | @test_wrapp |
|
377 | @test_wrapp | |
370 | def get_logs(): |
|
378 | def get_logs(): | |
371 | return UserLog.query().all() |
|
379 | return UserLog.query().all() | |
372 |
|
380 | |||
|
381 | ||||
373 | @test_wrapp |
|
382 | @test_wrapp | |
374 | def test_logs(initial): |
|
383 | def test_logs(initial): | |
375 | logs = UserLog.query().all() |
|
384 | logs = UserLog.query().all() | |
376 | operations = 4 |
|
385 | operations = 4 | |
377 | if len(initial) + operations != len(logs): |
|
386 | if len(initial) + operations != len(logs): | |
378 | raise Exception("missing number of logs initial:%s vs current:%s" % \ |
|
387 | raise Exception("missing number of logs initial:%s vs current:%s" % \ | |
379 | (len(initial), len(logs))) |
|
388 | (len(initial), len(logs))) | |
380 |
|
389 | |||
381 |
|
390 | |||
382 | if __name__ == '__main__': |
|
391 | if __name__ == '__main__': | |
383 | create_test_user(force=False) |
|
392 | create_test_user(force=False) | |
384 | create_test_repo() |
|
393 | create_test_repo() | |
385 |
|
394 | |||
386 | initial_logs = get_logs() |
|
395 | initial_logs = get_logs() | |
387 | print 'initial activity logs: %s' % len(initial_logs) |
|
396 | print 'initial activity logs: %s' % len(initial_logs) | |
388 | s = time.time() |
|
397 | s = time.time() | |
389 | #test_push_modify_file() |
|
398 | #test_push_modify_file() | |
390 | test_clone_with_credentials() |
|
399 | test_clone_with_credentials() | |
391 | test_clone_wrong_credentials() |
|
400 | test_clone_wrong_credentials() | |
392 |
|
401 | |||
393 | test_push_new_file(commits=2, with_clone=True) |
|
402 | test_push_new_file(commits=2, with_clone=True) | |
394 |
|
403 | |||
395 | test_clone_anonymous() |
|
404 | test_clone_anonymous() | |
396 | test_push_wrong_path() |
|
405 | test_push_wrong_path() | |
397 |
|
406 | |||
398 | test_push_wrong_credentials() |
|
407 | test_push_wrong_credentials() | |
399 |
|
408 | |||
400 | test_logs(initial_logs) |
|
409 | test_logs(initial_logs) | |
401 | print 'finished ok in %.3f' % (time.time() - s) |
|
410 | print 'finished ok in %.3f' % (time.time() - s) |
@@ -1,56 +1,56 | |||||
1 | """ |
|
1 | """ | |
2 | Unit tests for vcs_ library. |
|
2 | Unit tests for vcs_ library. | |
3 |
|
3 | |||
4 | In order to run tests we need to prepare our environment first. Tests would be |
|
4 | In order to run tests we need to prepare our environment first. Tests would be | |
5 | run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from |
|
5 | run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from | |
6 | ``vcs.backends.BACKENDS``. |
|
6 | ``vcs.backends.BACKENDS``. | |
7 |
|
7 | |||
8 | For each SCM we run tests for, we need some repository. We would use |
|
8 | For each SCM we run tests for, we need some repository. We would use | |
9 | repositories location from system environment variables or test suite defaults |
|
9 | repositories location from system environment variables or test suite defaults | |
10 | - see ``conf`` module for more detail. We simply try to check if repository at |
|
10 | - see ``conf`` module for more detail. We simply try to check if repository at | |
11 | certain location exists, if not we would try to fetch them. At ``test_vcs`` or |
|
11 | certain location exists, if not we would try to fetch them. At ``test_vcs`` or | |
12 | ``test_common`` we run unit tests common for each repository type and for |
|
12 | ``test_common`` we run unit tests common for each repository type and for | |
13 | example specific mercurial tests are located at ``test_hg`` module. |
|
13 | example specific mercurial tests are located at ``test_hg`` module. | |
14 |
|
14 | |||
15 | Oh, and tests are run with ``unittest.collector`` wrapped by ``collector`` |
|
15 | Oh, and tests are run with ``unittest.collector`` wrapped by ``collector`` | |
16 | function at ``tests/__init__.py``. |
|
16 | function at ``tests/__init__.py``. | |
17 |
|
17 | |||
18 | .. _vcs: http://bitbucket.org/marcinkuzminski/vcs |
|
18 | .. _vcs: http://bitbucket.org/marcinkuzminski/vcs | |
19 | .. _unittest: http://pypi.python.org/pypi/unittest |
|
19 | .. _unittest: http://pypi.python.org/pypi/unittest | |
20 |
|
20 | |||
21 | """ |
|
21 | """ | |
22 | import os |
|
22 | import os | |
23 | from rhodecode.lib import vcs |
|
23 | from rhodecode.lib import vcs | |
24 | from rhodecode.lib.vcs.utils.compat import unittest |
|
24 | from rhodecode.lib.vcs.utils.compat import unittest | |
|
25 | from utils import VCSTestError, SCMFetcher | |||
25 | from rhodecode.tests import * |
|
26 | from rhodecode.tests import * | |
26 | from utils import VCSTestError, SCMFetcher |
|
|||
27 |
|
27 | |||
28 |
|
28 | |||
29 | def setup_package(): |
|
29 | def setup_package(): | |
30 | """ |
|
30 | """ | |
31 | Prepares whole package for tests which mainly means it would try to fetch |
|
31 | Prepares whole package for tests which mainly means it would try to fetch | |
32 | test repositories or use already existing ones. |
|
32 | test repositories or use already existing ones. | |
33 | """ |
|
33 | """ | |
34 | fetchers = { |
|
34 | fetchers = { | |
35 | 'hg': { |
|
35 | 'hg': { | |
36 | 'alias': 'hg', |
|
36 | 'alias': 'hg', | |
37 | 'test_repo_path': TEST_HG_REPO, |
|
37 | 'test_repo_path': TEST_HG_REPO, | |
38 | 'remote_repo': HG_REMOTE_REPO, |
|
38 | 'remote_repo': HG_REMOTE_REPO, | |
39 | 'clone_cmd': 'hg clone', |
|
39 | 'clone_cmd': 'hg clone', | |
40 | }, |
|
40 | }, | |
41 | 'git': { |
|
41 | 'git': { | |
42 | 'alias': 'git', |
|
42 | 'alias': 'git', | |
43 | 'test_repo_path': TEST_GIT_REPO, |
|
43 | 'test_repo_path': TEST_GIT_REPO, | |
44 | 'remote_repo': GIT_REMOTE_REPO, |
|
44 | 'remote_repo': GIT_REMOTE_REPO, | |
45 | 'clone_cmd': 'git clone --bare', |
|
45 | 'clone_cmd': 'git clone --bare', | |
46 | }, |
|
46 | }, | |
47 | } |
|
47 | } | |
48 | try: |
|
48 | try: | |
49 | for scm, fetcher_info in fetchers.items(): |
|
49 | for scm, fetcher_info in fetchers.items(): | |
50 | fetcher = SCMFetcher(**fetcher_info) |
|
50 | fetcher = SCMFetcher(**fetcher_info) | |
51 | fetcher.setup() |
|
51 | fetcher.setup() | |
52 | except VCSTestError, err: |
|
52 | except VCSTestError, err: | |
53 | raise RuntimeError(str(err)) |
|
53 | raise RuntimeError(str(err)) | |
54 |
|
54 | |||
55 | #start_dir = os.path.abspath(os.path.dirname(__file__)) |
|
55 | #start_dir = os.path.abspath(os.path.dirname(__file__)) | |
56 | #unittest.defaultTestLoader.discover(start_dir) |
|
56 | #unittest.defaultTestLoader.discover(start_dir) |
General Comments 0
You need to be logged in to leave comments.
Login now