Show More
@@ -1,569 +1,579 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2019 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | import urlparse |
|
22 | 22 | |
|
23 | 23 | import mock |
|
24 | 24 | import pytest |
|
25 | 25 | |
|
26 | 26 | from rhodecode.tests import ( |
|
27 | 27 | assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN, |
|
28 | 28 | no_newline_id_generator) |
|
29 | 29 | from rhodecode.tests.fixture import Fixture |
|
30 | 30 | from rhodecode.lib.auth import check_password |
|
31 | 31 | from rhodecode.lib import helpers as h |
|
32 | 32 | from rhodecode.model.auth_token import AuthTokenModel |
|
33 | 33 | from rhodecode.model.db import User, Notification, UserApiKeys |
|
34 | 34 | from rhodecode.model.meta import Session |
|
35 | 35 | |
|
36 | 36 | fixture = Fixture() |
|
37 | 37 | |
|
38 | 38 | whitelist_view = ['RepoCommitsView:repo_commit_raw'] |
|
39 | 39 | |
|
40 | 40 | |
|
41 | 41 | def route_path(name, params=None, **kwargs): |
|
42 | 42 | import urllib |
|
43 | 43 | from rhodecode.apps._base import ADMIN_PREFIX |
|
44 | 44 | |
|
45 | 45 | base_url = { |
|
46 | 46 | 'login': ADMIN_PREFIX + '/login', |
|
47 | 47 | 'logout': ADMIN_PREFIX + '/logout', |
|
48 | 48 | 'register': ADMIN_PREFIX + '/register', |
|
49 | 49 | 'reset_password': |
|
50 | 50 | ADMIN_PREFIX + '/password_reset', |
|
51 | 51 | 'reset_password_confirmation': |
|
52 | 52 | ADMIN_PREFIX + '/password_reset_confirmation', |
|
53 | 53 | |
|
54 | 54 | 'admin_permissions_application': |
|
55 | 55 | ADMIN_PREFIX + '/permissions/application', |
|
56 | 56 | 'admin_permissions_application_update': |
|
57 | 57 | ADMIN_PREFIX + '/permissions/application/update', |
|
58 | 58 | |
|
59 | 59 | 'repo_commit_raw': '/{repo_name}/raw-changeset/{commit_id}' |
|
60 | 60 | |
|
61 | 61 | }[name].format(**kwargs) |
|
62 | 62 | |
|
63 | 63 | if params: |
|
64 | 64 | base_url = '{}?{}'.format(base_url, urllib.urlencode(params)) |
|
65 | 65 | return base_url |
|
66 | 66 | |
|
67 | 67 | |
|
68 | 68 | @pytest.mark.usefixtures('app') |
|
69 | 69 | class TestLoginController(object): |
|
70 | 70 | destroy_users = set() |
|
71 | 71 | |
|
72 | 72 | @classmethod |
|
73 | 73 | def teardown_class(cls): |
|
74 | 74 | fixture.destroy_users(cls.destroy_users) |
|
75 | 75 | |
|
76 | 76 | def teardown_method(self, method): |
|
77 | 77 | for n in Notification.query().all(): |
|
78 | 78 | Session().delete(n) |
|
79 | 79 | |
|
80 | 80 | Session().commit() |
|
81 | 81 | assert Notification.query().all() == [] |
|
82 | 82 | |
|
83 | 83 | def test_index(self): |
|
84 | 84 | response = self.app.get(route_path('login')) |
|
85 | 85 | assert response.status == '200 OK' |
|
86 | 86 | # Test response... |
|
87 | 87 | |
|
88 | 88 | def test_login_admin_ok(self): |
|
89 | 89 | response = self.app.post(route_path('login'), |
|
90 | 90 | {'username': 'test_admin', |
|
91 | 91 | 'password': 'test12'}, status=302) |
|
92 | 92 | response = response.follow() |
|
93 | 93 | session = response.get_session_from_response() |
|
94 | 94 | username = session['rhodecode_user'].get('username') |
|
95 | 95 | assert username == 'test_admin' |
|
96 | 96 | response.mustcontain('/%s' % HG_REPO) |
|
97 | 97 | |
|
98 | 98 | def test_login_regular_ok(self): |
|
99 | 99 | response = self.app.post(route_path('login'), |
|
100 | 100 | {'username': 'test_regular', |
|
101 | 101 | 'password': 'test12'}, status=302) |
|
102 | 102 | |
|
103 | 103 | response = response.follow() |
|
104 | 104 | session = response.get_session_from_response() |
|
105 | 105 | username = session['rhodecode_user'].get('username') |
|
106 | 106 | assert username == 'test_regular' |
|
107 | 107 | |
|
108 | 108 | response.mustcontain('/%s' % HG_REPO) |
|
109 | 109 | |
|
110 | 110 | def test_login_regular_forbidden_when_super_admin_restriction(self): |
|
111 | 111 | from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin |
|
112 |
with fixture. |
|
|
112 | with fixture.auth_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN): | |
|
113 | response = self.app.post(route_path('login'), | |
|
114 | {'username': 'test_regular', | |
|
115 | 'password': 'test12'}) | |
|
116 | ||
|
117 | response.mustcontain('invalid user name') | |
|
118 | response.mustcontain('invalid password') | |
|
119 | ||
|
120 | def test_login_regular_forbidden_when_scope_restriction(self): | |
|
121 | from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin | |
|
122 | with fixture.scope_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS): | |
|
113 | 123 | response = self.app.post(route_path('login'), |
|
114 | 124 | {'username': 'test_regular', |
|
115 | 125 | 'password': 'test12'}) |
|
116 | 126 | |
|
117 | 127 | response.mustcontain('invalid user name') |
|
118 | 128 | response.mustcontain('invalid password') |
|
119 | 129 | |
|
120 | 130 | def test_login_ok_came_from(self): |
|
121 | 131 | test_came_from = '/_admin/users?branch=stable' |
|
122 | 132 | _url = '{}?came_from={}'.format(route_path('login'), test_came_from) |
|
123 | 133 | response = self.app.post( |
|
124 | 134 | _url, {'username': 'test_admin', 'password': 'test12'}, status=302) |
|
125 | 135 | |
|
126 | 136 | assert 'branch=stable' in response.location |
|
127 | 137 | response = response.follow() |
|
128 | 138 | |
|
129 | 139 | assert response.status == '200 OK' |
|
130 | 140 | response.mustcontain('Users administration') |
|
131 | 141 | |
|
132 | 142 | def test_redirect_to_login_with_get_args(self): |
|
133 | 143 | with fixture.anon_access(False): |
|
134 | 144 | kwargs = {'branch': 'stable'} |
|
135 | 145 | response = self.app.get( |
|
136 | 146 | h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs), |
|
137 | 147 | status=302) |
|
138 | 148 | |
|
139 | 149 | response_query = urlparse.parse_qsl(response.location) |
|
140 | 150 | assert 'branch=stable' in response_query[0][1] |
|
141 | 151 | |
|
142 | 152 | def test_login_form_with_get_args(self): |
|
143 | 153 | _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login')) |
|
144 | 154 | response = self.app.get(_url) |
|
145 | 155 | assert 'branch%3Dstable' in response.form.action |
|
146 | 156 | |
|
147 | 157 | @pytest.mark.parametrize("url_came_from", [ |
|
148 | 158 | 'data:text/html,<script>window.alert("xss")</script>', |
|
149 | 159 | 'mailto:test@rhodecode.org', |
|
150 | 160 | 'file:///etc/passwd', |
|
151 | 161 | 'ftp://some.ftp.server', |
|
152 | 162 | 'http://other.domain', |
|
153 | 163 | '/\r\nX-Forwarded-Host: http://example.org', |
|
154 | 164 | ], ids=no_newline_id_generator) |
|
155 | 165 | def test_login_bad_came_froms(self, url_came_from): |
|
156 | 166 | _url = '{}?came_from={}'.format(route_path('login'), url_came_from) |
|
157 | 167 | response = self.app.post( |
|
158 | 168 | _url, |
|
159 | 169 | {'username': 'test_admin', 'password': 'test12'}) |
|
160 | 170 | assert response.status == '302 Found' |
|
161 | 171 | response = response.follow() |
|
162 | 172 | assert response.status == '200 OK' |
|
163 | 173 | assert response.request.path == '/' |
|
164 | 174 | |
|
165 | 175 | def test_login_short_password(self): |
|
166 | 176 | response = self.app.post(route_path('login'), |
|
167 | 177 | {'username': 'test_admin', |
|
168 | 178 | 'password': 'as'}) |
|
169 | 179 | assert response.status == '200 OK' |
|
170 | 180 | |
|
171 | 181 | response.mustcontain('Enter 3 characters or more') |
|
172 | 182 | |
|
173 | 183 | def test_login_wrong_non_ascii_password(self, user_regular): |
|
174 | 184 | response = self.app.post( |
|
175 | 185 | route_path('login'), |
|
176 | 186 | {'username': user_regular.username, |
|
177 | 187 | 'password': u'invalid-non-asci\xe4'.encode('utf8')}) |
|
178 | 188 | |
|
179 | 189 | response.mustcontain('invalid user name') |
|
180 | 190 | response.mustcontain('invalid password') |
|
181 | 191 | |
|
182 | 192 | def test_login_with_non_ascii_password(self, user_util): |
|
183 | 193 | password = u'valid-non-ascii\xe4' |
|
184 | 194 | user = user_util.create_user(password=password) |
|
185 | 195 | response = self.app.post( |
|
186 | 196 | route_path('login'), |
|
187 | 197 | {'username': user.username, |
|
188 | 198 | 'password': password.encode('utf-8')}) |
|
189 | 199 | assert response.status_code == 302 |
|
190 | 200 | |
|
191 | 201 | def test_login_wrong_username_password(self): |
|
192 | 202 | response = self.app.post(route_path('login'), |
|
193 | 203 | {'username': 'error', |
|
194 | 204 | 'password': 'test12'}) |
|
195 | 205 | |
|
196 | 206 | response.mustcontain('invalid user name') |
|
197 | 207 | response.mustcontain('invalid password') |
|
198 | 208 | |
|
199 | 209 | def test_login_admin_ok_password_migration(self, real_crypto_backend): |
|
200 | 210 | from rhodecode.lib import auth |
|
201 | 211 | |
|
202 | 212 | # create new user, with sha256 password |
|
203 | 213 | temp_user = 'test_admin_sha256' |
|
204 | 214 | user = fixture.create_user(temp_user) |
|
205 | 215 | user.password = auth._RhodeCodeCryptoSha256().hash_create( |
|
206 | 216 | b'test123') |
|
207 | 217 | Session().add(user) |
|
208 | 218 | Session().commit() |
|
209 | 219 | self.destroy_users.add(temp_user) |
|
210 | 220 | response = self.app.post(route_path('login'), |
|
211 | 221 | {'username': temp_user, |
|
212 | 222 | 'password': 'test123'}, status=302) |
|
213 | 223 | |
|
214 | 224 | response = response.follow() |
|
215 | 225 | session = response.get_session_from_response() |
|
216 | 226 | username = session['rhodecode_user'].get('username') |
|
217 | 227 | assert username == temp_user |
|
218 | 228 | response.mustcontain('/%s' % HG_REPO) |
|
219 | 229 | |
|
220 | 230 | # new password should be bcrypted, after log-in and transfer |
|
221 | 231 | user = User.get_by_username(temp_user) |
|
222 | 232 | assert user.password.startswith('$') |
|
223 | 233 | |
|
224 | 234 | # REGISTRATIONS |
|
225 | 235 | def test_register(self): |
|
226 | 236 | response = self.app.get(route_path('register')) |
|
227 | 237 | response.mustcontain('Create an Account') |
|
228 | 238 | |
|
229 | 239 | def test_register_err_same_username(self): |
|
230 | 240 | uname = 'test_admin' |
|
231 | 241 | response = self.app.post( |
|
232 | 242 | route_path('register'), |
|
233 | 243 | { |
|
234 | 244 | 'username': uname, |
|
235 | 245 | 'password': 'test12', |
|
236 | 246 | 'password_confirmation': 'test12', |
|
237 | 247 | 'email': 'goodmail@domain.com', |
|
238 | 248 | 'firstname': 'test', |
|
239 | 249 | 'lastname': 'test' |
|
240 | 250 | } |
|
241 | 251 | ) |
|
242 | 252 | |
|
243 | 253 | assertr = response.assert_response() |
|
244 | 254 | msg = 'Username "%(username)s" already exists' |
|
245 | 255 | msg = msg % {'username': uname} |
|
246 | 256 | assertr.element_contains('#username+.error-message', msg) |
|
247 | 257 | |
|
248 | 258 | def test_register_err_same_email(self): |
|
249 | 259 | response = self.app.post( |
|
250 | 260 | route_path('register'), |
|
251 | 261 | { |
|
252 | 262 | 'username': 'test_admin_0', |
|
253 | 263 | 'password': 'test12', |
|
254 | 264 | 'password_confirmation': 'test12', |
|
255 | 265 | 'email': 'test_admin@mail.com', |
|
256 | 266 | 'firstname': 'test', |
|
257 | 267 | 'lastname': 'test' |
|
258 | 268 | } |
|
259 | 269 | ) |
|
260 | 270 | |
|
261 | 271 | assertr = response.assert_response() |
|
262 | 272 | msg = u'This e-mail address is already taken' |
|
263 | 273 | assertr.element_contains('#email+.error-message', msg) |
|
264 | 274 | |
|
265 | 275 | def test_register_err_same_email_case_sensitive(self): |
|
266 | 276 | response = self.app.post( |
|
267 | 277 | route_path('register'), |
|
268 | 278 | { |
|
269 | 279 | 'username': 'test_admin_1', |
|
270 | 280 | 'password': 'test12', |
|
271 | 281 | 'password_confirmation': 'test12', |
|
272 | 282 | 'email': 'TesT_Admin@mail.COM', |
|
273 | 283 | 'firstname': 'test', |
|
274 | 284 | 'lastname': 'test' |
|
275 | 285 | } |
|
276 | 286 | ) |
|
277 | 287 | assertr = response.assert_response() |
|
278 | 288 | msg = u'This e-mail address is already taken' |
|
279 | 289 | assertr.element_contains('#email+.error-message', msg) |
|
280 | 290 | |
|
281 | 291 | def test_register_err_wrong_data(self): |
|
282 | 292 | response = self.app.post( |
|
283 | 293 | route_path('register'), |
|
284 | 294 | { |
|
285 | 295 | 'username': 'xs', |
|
286 | 296 | 'password': 'test', |
|
287 | 297 | 'password_confirmation': 'test', |
|
288 | 298 | 'email': 'goodmailm', |
|
289 | 299 | 'firstname': 'test', |
|
290 | 300 | 'lastname': 'test' |
|
291 | 301 | } |
|
292 | 302 | ) |
|
293 | 303 | assert response.status == '200 OK' |
|
294 | 304 | response.mustcontain('An email address must contain a single @') |
|
295 | 305 | response.mustcontain('Enter a value 6 characters long or more') |
|
296 | 306 | |
|
297 | 307 | def test_register_err_username(self): |
|
298 | 308 | response = self.app.post( |
|
299 | 309 | route_path('register'), |
|
300 | 310 | { |
|
301 | 311 | 'username': 'error user', |
|
302 | 312 | 'password': 'test12', |
|
303 | 313 | 'password_confirmation': 'test12', |
|
304 | 314 | 'email': 'goodmailm', |
|
305 | 315 | 'firstname': 'test', |
|
306 | 316 | 'lastname': 'test' |
|
307 | 317 | } |
|
308 | 318 | ) |
|
309 | 319 | |
|
310 | 320 | response.mustcontain('An email address must contain a single @') |
|
311 | 321 | response.mustcontain( |
|
312 | 322 | 'Username may only contain ' |
|
313 | 323 | 'alphanumeric characters underscores, ' |
|
314 | 324 | 'periods or dashes and must begin with ' |
|
315 | 325 | 'alphanumeric character') |
|
316 | 326 | |
|
317 | 327 | def test_register_err_case_sensitive(self): |
|
318 | 328 | usr = 'Test_Admin' |
|
319 | 329 | response = self.app.post( |
|
320 | 330 | route_path('register'), |
|
321 | 331 | { |
|
322 | 332 | 'username': usr, |
|
323 | 333 | 'password': 'test12', |
|
324 | 334 | 'password_confirmation': 'test12', |
|
325 | 335 | 'email': 'goodmailm', |
|
326 | 336 | 'firstname': 'test', |
|
327 | 337 | 'lastname': 'test' |
|
328 | 338 | } |
|
329 | 339 | ) |
|
330 | 340 | |
|
331 | 341 | assertr = response.assert_response() |
|
332 | 342 | msg = u'Username "%(username)s" already exists' |
|
333 | 343 | msg = msg % {'username': usr} |
|
334 | 344 | assertr.element_contains('#username+.error-message', msg) |
|
335 | 345 | |
|
336 | 346 | def test_register_special_chars(self): |
|
337 | 347 | response = self.app.post( |
|
338 | 348 | route_path('register'), |
|
339 | 349 | { |
|
340 | 350 | 'username': 'xxxaxn', |
|
341 | 351 | 'password': 'Δ ΔΕΊΕΌΔ ΕΕΕΕ', |
|
342 | 352 | 'password_confirmation': 'Δ ΔΕΊΕΌΔ ΕΕΕΕ', |
|
343 | 353 | 'email': 'goodmailm@test.plx', |
|
344 | 354 | 'firstname': 'test', |
|
345 | 355 | 'lastname': 'test' |
|
346 | 356 | } |
|
347 | 357 | ) |
|
348 | 358 | |
|
349 | 359 | msg = u'Invalid characters (non-ascii) in password' |
|
350 | 360 | response.mustcontain(msg) |
|
351 | 361 | |
|
352 | 362 | def test_register_password_mismatch(self): |
|
353 | 363 | response = self.app.post( |
|
354 | 364 | route_path('register'), |
|
355 | 365 | { |
|
356 | 366 | 'username': 'xs', |
|
357 | 367 | 'password': '123qwe', |
|
358 | 368 | 'password_confirmation': 'qwe123', |
|
359 | 369 | 'email': 'goodmailm@test.plxa', |
|
360 | 370 | 'firstname': 'test', |
|
361 | 371 | 'lastname': 'test' |
|
362 | 372 | } |
|
363 | 373 | ) |
|
364 | 374 | msg = u'Passwords do not match' |
|
365 | 375 | response.mustcontain(msg) |
|
366 | 376 | |
|
367 | 377 | def test_register_ok(self): |
|
368 | 378 | username = 'test_regular4' |
|
369 | 379 | password = 'qweqwe' |
|
370 | 380 | email = 'marcin@test.com' |
|
371 | 381 | name = 'testname' |
|
372 | 382 | lastname = 'testlastname' |
|
373 | 383 | |
|
374 | 384 | # this initializes a session |
|
375 | 385 | response = self.app.get(route_path('register')) |
|
376 | 386 | response.mustcontain('Create an Account') |
|
377 | 387 | |
|
378 | 388 | |
|
379 | 389 | response = self.app.post( |
|
380 | 390 | route_path('register'), |
|
381 | 391 | { |
|
382 | 392 | 'username': username, |
|
383 | 393 | 'password': password, |
|
384 | 394 | 'password_confirmation': password, |
|
385 | 395 | 'email': email, |
|
386 | 396 | 'firstname': name, |
|
387 | 397 | 'lastname': lastname, |
|
388 | 398 | 'admin': True |
|
389 | 399 | }, |
|
390 | 400 | status=302 |
|
391 | 401 | ) # This should be overridden |
|
392 | 402 | |
|
393 | 403 | assert_session_flash( |
|
394 | 404 | response, 'You have successfully registered with RhodeCode') |
|
395 | 405 | |
|
396 | 406 | ret = Session().query(User).filter( |
|
397 | 407 | User.username == 'test_regular4').one() |
|
398 | 408 | assert ret.username == username |
|
399 | 409 | assert check_password(password, ret.password) |
|
400 | 410 | assert ret.email == email |
|
401 | 411 | assert ret.name == name |
|
402 | 412 | assert ret.lastname == lastname |
|
403 | 413 | assert ret.auth_tokens is not None |
|
404 | 414 | assert not ret.admin |
|
405 | 415 | |
|
406 | 416 | def test_forgot_password_wrong_mail(self): |
|
407 | 417 | bad_email = 'marcin@wrongmail.org' |
|
408 | 418 | # this initializes a session |
|
409 | 419 | self.app.get(route_path('reset_password')) |
|
410 | 420 | |
|
411 | 421 | response = self.app.post( |
|
412 | 422 | route_path('reset_password'), {'email': bad_email, } |
|
413 | 423 | ) |
|
414 | 424 | assert_session_flash(response, |
|
415 | 425 | 'If such email exists, a password reset link was sent to it.') |
|
416 | 426 | |
|
417 | 427 | def test_forgot_password(self, user_util): |
|
418 | 428 | # this initializes a session |
|
419 | 429 | self.app.get(route_path('reset_password')) |
|
420 | 430 | |
|
421 | 431 | user = user_util.create_user() |
|
422 | 432 | user_id = user.user_id |
|
423 | 433 | email = user.email |
|
424 | 434 | |
|
425 | 435 | response = self.app.post(route_path('reset_password'), {'email': email, }) |
|
426 | 436 | |
|
427 | 437 | assert_session_flash(response, |
|
428 | 438 | 'If such email exists, a password reset link was sent to it.') |
|
429 | 439 | |
|
430 | 440 | # BAD KEY |
|
431 | 441 | confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), 'badkey') |
|
432 | 442 | response = self.app.get(confirm_url, status=302) |
|
433 | 443 | assert response.location.endswith(route_path('reset_password')) |
|
434 | 444 | assert_session_flash(response, 'Given reset token is invalid') |
|
435 | 445 | |
|
436 | 446 | response.follow() # cleanup flash |
|
437 | 447 | |
|
438 | 448 | # GOOD KEY |
|
439 | 449 | key = UserApiKeys.query()\ |
|
440 | 450 | .filter(UserApiKeys.user_id == user_id)\ |
|
441 | 451 | .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\ |
|
442 | 452 | .first() |
|
443 | 453 | |
|
444 | 454 | assert key |
|
445 | 455 | |
|
446 | 456 | confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key) |
|
447 | 457 | response = self.app.get(confirm_url) |
|
448 | 458 | assert response.status == '302 Found' |
|
449 | 459 | assert response.location.endswith(route_path('login')) |
|
450 | 460 | |
|
451 | 461 | assert_session_flash( |
|
452 | 462 | response, |
|
453 | 463 | 'Your password reset was successful, ' |
|
454 | 464 | 'a new password has been sent to your email') |
|
455 | 465 | |
|
456 | 466 | response.follow() |
|
457 | 467 | |
|
458 | 468 | def _get_api_whitelist(self, values=None): |
|
459 | 469 | config = {'api_access_controllers_whitelist': values or []} |
|
460 | 470 | return config |
|
461 | 471 | |
|
462 | 472 | @pytest.mark.parametrize("test_name, auth_token", [ |
|
463 | 473 | ('none', None), |
|
464 | 474 | ('empty_string', ''), |
|
465 | 475 | ('fake_number', '123456'), |
|
466 | 476 | ('proper_auth_token', None) |
|
467 | 477 | ]) |
|
468 | 478 | def test_access_not_whitelisted_page_via_auth_token( |
|
469 | 479 | self, test_name, auth_token, user_admin): |
|
470 | 480 | |
|
471 | 481 | whitelist = self._get_api_whitelist([]) |
|
472 | 482 | with mock.patch.dict('rhodecode.CONFIG', whitelist): |
|
473 | 483 | assert [] == whitelist['api_access_controllers_whitelist'] |
|
474 | 484 | if test_name == 'proper_auth_token': |
|
475 | 485 | # use builtin if api_key is None |
|
476 | 486 | auth_token = user_admin.api_key |
|
477 | 487 | |
|
478 | 488 | with fixture.anon_access(False): |
|
479 | 489 | self.app.get( |
|
480 | 490 | route_path('repo_commit_raw', |
|
481 | 491 | repo_name=HG_REPO, commit_id='tip', |
|
482 | 492 | params=dict(api_key=auth_token)), |
|
483 | 493 | status=302) |
|
484 | 494 | |
|
485 | 495 | @pytest.mark.parametrize("test_name, auth_token, code", [ |
|
486 | 496 | ('none', None, 302), |
|
487 | 497 | ('empty_string', '', 302), |
|
488 | 498 | ('fake_number', '123456', 302), |
|
489 | 499 | ('proper_auth_token', None, 200) |
|
490 | 500 | ]) |
|
491 | 501 | def test_access_whitelisted_page_via_auth_token( |
|
492 | 502 | self, test_name, auth_token, code, user_admin): |
|
493 | 503 | |
|
494 | 504 | whitelist = self._get_api_whitelist(whitelist_view) |
|
495 | 505 | |
|
496 | 506 | with mock.patch.dict('rhodecode.CONFIG', whitelist): |
|
497 | 507 | assert whitelist_view == whitelist['api_access_controllers_whitelist'] |
|
498 | 508 | |
|
499 | 509 | if test_name == 'proper_auth_token': |
|
500 | 510 | auth_token = user_admin.api_key |
|
501 | 511 | assert auth_token |
|
502 | 512 | |
|
503 | 513 | with fixture.anon_access(False): |
|
504 | 514 | self.app.get( |
|
505 | 515 | route_path('repo_commit_raw', |
|
506 | 516 | repo_name=HG_REPO, commit_id='tip', |
|
507 | 517 | params=dict(api_key=auth_token)), |
|
508 | 518 | status=code) |
|
509 | 519 | |
|
510 | 520 | @pytest.mark.parametrize("test_name, auth_token, code", [ |
|
511 | 521 | ('proper_auth_token', None, 200), |
|
512 | 522 | ('wrong_auth_token', '123456', 302), |
|
513 | 523 | ]) |
|
514 | 524 | def test_access_whitelisted_page_via_auth_token_bound_to_token( |
|
515 | 525 | self, test_name, auth_token, code, user_admin): |
|
516 | 526 | |
|
517 | 527 | expected_token = auth_token |
|
518 | 528 | if test_name == 'proper_auth_token': |
|
519 | 529 | auth_token = user_admin.api_key |
|
520 | 530 | expected_token = auth_token |
|
521 | 531 | assert auth_token |
|
522 | 532 | |
|
523 | 533 | whitelist = self._get_api_whitelist([ |
|
524 | 534 | 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)]) |
|
525 | 535 | |
|
526 | 536 | with mock.patch.dict('rhodecode.CONFIG', whitelist): |
|
527 | 537 | |
|
528 | 538 | with fixture.anon_access(False): |
|
529 | 539 | self.app.get( |
|
530 | 540 | route_path('repo_commit_raw', |
|
531 | 541 | repo_name=HG_REPO, commit_id='tip', |
|
532 | 542 | params=dict(api_key=auth_token)), |
|
533 | 543 | status=code) |
|
534 | 544 | |
|
535 | 545 | def test_access_page_via_extra_auth_token(self): |
|
536 | 546 | whitelist = self._get_api_whitelist(whitelist_view) |
|
537 | 547 | with mock.patch.dict('rhodecode.CONFIG', whitelist): |
|
538 | 548 | assert whitelist_view == \ |
|
539 | 549 | whitelist['api_access_controllers_whitelist'] |
|
540 | 550 | |
|
541 | 551 | new_auth_token = AuthTokenModel().create( |
|
542 | 552 | TEST_USER_ADMIN_LOGIN, 'test') |
|
543 | 553 | Session().commit() |
|
544 | 554 | with fixture.anon_access(False): |
|
545 | 555 | self.app.get( |
|
546 | 556 | route_path('repo_commit_raw', |
|
547 | 557 | repo_name=HG_REPO, commit_id='tip', |
|
548 | 558 | params=dict(api_key=new_auth_token.api_key)), |
|
549 | 559 | status=200) |
|
550 | 560 | |
|
551 | 561 | def test_access_page_via_expired_auth_token(self): |
|
552 | 562 | whitelist = self._get_api_whitelist(whitelist_view) |
|
553 | 563 | with mock.patch.dict('rhodecode.CONFIG', whitelist): |
|
554 | 564 | assert whitelist_view == \ |
|
555 | 565 | whitelist['api_access_controllers_whitelist'] |
|
556 | 566 | |
|
557 | 567 | new_auth_token = AuthTokenModel().create( |
|
558 | 568 | TEST_USER_ADMIN_LOGIN, 'test') |
|
559 | 569 | Session().commit() |
|
560 | 570 | # patch the api key and make it expired |
|
561 | 571 | new_auth_token.expires = 0 |
|
562 | 572 | Session().add(new_auth_token) |
|
563 | 573 | Session().commit() |
|
564 | 574 | with fixture.anon_access(False): |
|
565 | 575 | self.app.get( |
|
566 | 576 | route_path('repo_commit_raw', |
|
567 | 577 | repo_name=HG_REPO, commit_id='tip', |
|
568 | 578 | params=dict(api_key=new_auth_token.api_key)), |
|
569 | 579 | status=302) |
@@ -1,186 +1,220 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2012-2019 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | """ |
|
22 | 22 | RhodeCode authentication plugin for built in internal auth |
|
23 | 23 | """ |
|
24 | 24 | |
|
25 | 25 | import logging |
|
26 | 26 | |
|
27 | 27 | import colander |
|
28 | 28 | |
|
29 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |
|
30 | 29 | from rhodecode.translation import _ |
|
31 | ||
|
32 | from rhodecode.authentication.base import RhodeCodeAuthPluginBase, hybrid_property | |
|
33 | from rhodecode.authentication.routes import AuthnPluginResourceBase | |
|
34 | 30 | from rhodecode.lib.utils2 import safe_str |
|
35 | 31 | from rhodecode.model.db import User |
|
32 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |
|
33 | from rhodecode.authentication.base import ( | |
|
34 | RhodeCodeAuthPluginBase, hybrid_property, HTTP_TYPE, VCS_TYPE) | |
|
35 | from rhodecode.authentication.routes import AuthnPluginResourceBase | |
|
36 | 36 | |
|
37 | 37 | log = logging.getLogger(__name__) |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 | def plugin_factory(plugin_id, *args, **kwargs): |
|
41 | 41 | plugin = RhodeCodeAuthPlugin(plugin_id) |
|
42 | 42 | return plugin |
|
43 | 43 | |
|
44 | 44 | |
|
45 | 45 | class RhodecodeAuthnResource(AuthnPluginResourceBase): |
|
46 | 46 | pass |
|
47 | 47 | |
|
48 | 48 | |
|
49 | 49 | class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase): |
|
50 | 50 | uid = 'rhodecode' |
|
51 |
|
|
|
52 |
|
|
|
51 | AUTH_RESTRICTION_NONE = 'user_all' | |
|
52 | AUTH_RESTRICTION_SUPER_ADMIN = 'user_super_admin' | |
|
53 | AUTH_RESTRICTION_SCOPE_ALL = 'scope_all' | |
|
54 | AUTH_RESTRICTION_SCOPE_HTTP = 'scope_http' | |
|
55 | AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs' | |
|
53 | 56 | |
|
54 | 57 | def includeme(self, config): |
|
55 | 58 | config.add_authn_plugin(self) |
|
56 | 59 | config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self)) |
|
57 | 60 | config.add_view( |
|
58 | 61 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
59 | 62 | attr='settings_get', |
|
60 | 63 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
61 | 64 | request_method='GET', |
|
62 | 65 | route_name='auth_home', |
|
63 | 66 | context=RhodecodeAuthnResource) |
|
64 | 67 | config.add_view( |
|
65 | 68 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
66 | 69 | attr='settings_post', |
|
67 | 70 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
68 | 71 | request_method='POST', |
|
69 | 72 | route_name='auth_home', |
|
70 | 73 | context=RhodecodeAuthnResource) |
|
71 | 74 | |
|
72 | 75 | def get_settings_schema(self): |
|
73 | 76 | return RhodeCodeSettingsSchema() |
|
74 | 77 | |
|
75 | 78 | def get_display_name(self): |
|
76 | 79 | return _('RhodeCode Internal') |
|
77 | 80 | |
|
78 | 81 | @classmethod |
|
79 | 82 | def docs(cls): |
|
80 | 83 | return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth.html" |
|
81 | 84 | |
|
82 | 85 | @hybrid_property |
|
83 | 86 | def name(self): |
|
84 | 87 | return u"rhodecode" |
|
85 | 88 | |
|
86 | 89 | def user_activation_state(self): |
|
87 | 90 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] |
|
88 | 91 | return 'hg.register.auto_activate' in def_user_perms |
|
89 | 92 | |
|
90 | 93 | def allows_authentication_from( |
|
91 | 94 | self, user, allows_non_existing_user=True, |
|
92 | 95 | allowed_auth_plugins=None, allowed_auth_sources=None): |
|
93 | 96 | """ |
|
94 | 97 | Custom method for this auth that doesn't accept non existing users. |
|
95 | 98 | We know that user exists in our database. |
|
96 | 99 | """ |
|
97 | 100 | allows_non_existing_user = False |
|
98 | 101 | return super(RhodeCodeAuthPlugin, self).allows_authentication_from( |
|
99 | 102 | user, allows_non_existing_user=allows_non_existing_user) |
|
100 | 103 | |
|
101 | 104 | def auth(self, userobj, username, password, settings, **kwargs): |
|
102 | 105 | if not userobj: |
|
103 | 106 | log.debug('userobj was:%s skipping', userobj) |
|
104 | 107 | return None |
|
105 | 108 | |
|
106 | 109 | if userobj.extern_type != self.name: |
|
107 | log.warning( | |
|
108 | "userobj:%s extern_type mismatch got:`%s` expected:`%s`", | |
|
110 | log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`", | |
|
109 | 111 | userobj, userobj.extern_type, self.name) |
|
110 | 112 | return None |
|
111 | 113 | |
|
112 | login_restriction = settings.get('login_restriction', '') | |
|
113 | if login_restriction == self.LOGIN_RESTRICTION_SUPER_ADMIN and userobj.admin is False: | |
|
114 | log.info( | |
|
115 | "userobj:%s is not super-admin and login restriction is set to %s", | |
|
116 | userobj, login_restriction) | |
|
114 | # check scope of auth | |
|
115 | scope_restriction = settings.get('scope_restriction', '') | |
|
116 | ||
|
117 | if scope_restriction == self.AUTH_RESTRICTION_SCOPE_HTTP \ | |
|
118 | and self.auth_type != HTTP_TYPE: | |
|
119 | log.warning("userobj:%s tried scope type %s and scope restriction is set to %s", | |
|
120 | userobj, self.auth_type, scope_restriction) | |
|
121 | return None | |
|
122 | ||
|
123 | if scope_restriction == self.AUTH_RESTRICTION_SCOPE_VCS \ | |
|
124 | and self.auth_type != VCS_TYPE: | |
|
125 | log.warning("userobj:%s tried scope type %s and scope restriction is set to %s", | |
|
126 | userobj, self.auth_type, scope_restriction) | |
|
127 | return None | |
|
128 | ||
|
129 | # check super-admin restriction | |
|
130 | auth_restriction = settings.get('auth_restriction', '') | |
|
131 | ||
|
132 | if auth_restriction == self.AUTH_RESTRICTION_SUPER_ADMIN \ | |
|
133 | and userobj.admin is False: | |
|
134 | log.warning("userobj:%s is not super-admin and auth restriction is set to %s", | |
|
135 | userobj, auth_restriction) | |
|
117 | 136 | return None |
|
118 | 137 | |
|
119 | 138 | user_attrs = { |
|
120 | 139 | "username": userobj.username, |
|
121 | 140 | "firstname": userobj.firstname, |
|
122 | 141 | "lastname": userobj.lastname, |
|
123 | 142 | "groups": [], |
|
124 | 143 | 'user_group_sync': False, |
|
125 | 144 | "email": userobj.email, |
|
126 | 145 | "admin": userobj.admin, |
|
127 | 146 | "active": userobj.active, |
|
128 | 147 | "active_from_extern": userobj.active, |
|
129 | 148 | "extern_name": userobj.user_id, |
|
130 | 149 | "extern_type": userobj.extern_type, |
|
131 | 150 | } |
|
132 | 151 | |
|
133 | 152 | log.debug("User attributes:%s", user_attrs) |
|
134 | 153 | if userobj.active: |
|
135 | 154 | from rhodecode.lib import auth |
|
136 | 155 | crypto_backend = auth.crypto_backend() |
|
137 | 156 | password_encoded = safe_str(password) |
|
138 | 157 | password_match, new_hash = crypto_backend.hash_check_with_upgrade( |
|
139 | 158 | password_encoded, userobj.password or '') |
|
140 | 159 | |
|
141 | 160 | if password_match and new_hash: |
|
142 | 161 | log.debug('user %s properly authenticated, but ' |
|
143 | 162 | 'requires hash change to bcrypt', userobj) |
|
144 | 163 | # if password match, and we use OLD deprecated hash, |
|
145 | 164 | # we should migrate this user hash password to the new hash |
|
146 | 165 | # we store the new returned by hash_check_with_upgrade function |
|
147 | 166 | user_attrs['_hash_migrate'] = new_hash |
|
148 | 167 | |
|
149 | 168 | if userobj.username == User.DEFAULT_USER and userobj.active: |
|
150 | 169 | log.info('user `%s` authenticated correctly as anonymous user', |
|
151 | 170 | userobj.username) |
|
152 | 171 | return user_attrs |
|
153 | 172 | |
|
154 | 173 | elif userobj.username == username and password_match: |
|
155 | 174 | log.info('user `%s` authenticated correctly', userobj.username) |
|
156 | 175 | return user_attrs |
|
157 | log.warn("user `%s` used a wrong password when " | |
|
176 | log.warning("user `%s` used a wrong password when " | |
|
158 | 177 | "authenticating on this plugin", userobj.username) |
|
159 | 178 | return None |
|
160 | 179 | else: |
|
161 | log.warning( | |
|
162 | 'user `%s` failed to authenticate via %s, reason: account not ' | |
|
180 | log.warning('user `%s` failed to authenticate via %s, reason: account not ' | |
|
163 | 181 | 'active.', username, self.name) |
|
164 | 182 | return None |
|
165 | 183 | |
|
166 | 184 | |
|
167 | 185 | class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase): |
|
168 | login_restriction_choices = [ | |
|
169 | (RhodeCodeAuthPlugin.LOGIN_RESTRICTION_NONE, 'All users'), | |
|
170 |
(RhodeCodeAuthPlugin. |
|
|
186 | ||
|
187 | auth_restriction_choices = [ | |
|
188 | (RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE, 'All users'), | |
|
189 | (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN, 'Super admins only'), | |
|
190 | ] | |
|
191 | ||
|
192 | auth_scope_choices = [ | |
|
193 | (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL, 'HTTP and VCS'), | |
|
194 | (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_HTTP, 'HTTP only'), | |
|
171 | 195 | ] |
|
172 | 196 | |
|
173 |
|
|
|
197 | auth_restriction = colander.SchemaNode( | |
|
174 | 198 | colander.String(), |
|
175 |
default= |
|
|
176 |
description=_(' |
|
|
177 |
title=_(' |
|
|
178 |
validator=colander.OneOf([x[0] for x in |
|
|
199 | default=auth_restriction_choices[0], | |
|
200 | description=_('Allowed user types for authentication using this plugin.'), | |
|
201 | title=_('User restriction'), | |
|
202 | validator=colander.OneOf([x[0] for x in auth_restriction_choices]), | |
|
179 | 203 | widget='select_with_labels', |
|
180 |
choices= |
|
|
204 | choices=auth_restriction_choices | |
|
205 | ) | |
|
206 | scope_restriction = colander.SchemaNode( | |
|
207 | colander.String(), | |
|
208 | default=auth_scope_choices[0], | |
|
209 | description=_('Allowed protocols for authentication using this plugin. ' | |
|
210 | 'VCS means GIT/HG/SVN. HTTP is web based login.'), | |
|
211 | title=_('Scope restriction'), | |
|
212 | validator=colander.OneOf([x[0] for x in auth_scope_choices]), | |
|
213 | widget='select_with_labels', | |
|
214 | choices=auth_scope_choices | |
|
181 | 215 | ) |
|
182 | 216 | |
|
183 | 217 | |
|
184 | 218 | def includeme(config): |
|
185 | 219 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid) |
|
186 | 220 | plugin_factory(plugin_id).includeme(config) |
@@ -1,157 +1,177 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2016-2019 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | """ |
|
22 | 22 | RhodeCode authentication token plugin for built in internal auth |
|
23 | 23 | """ |
|
24 | 24 | |
|
25 | 25 | import logging |
|
26 | import colander | |
|
26 | 27 | |
|
28 | from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase | |
|
27 | 29 | from rhodecode.translation import _ |
|
28 | 30 | from rhodecode.authentication.base import ( |
|
29 | 31 | RhodeCodeAuthPluginBase, VCS_TYPE, hybrid_property) |
|
30 | 32 | from rhodecode.authentication.routes import AuthnPluginResourceBase |
|
31 | 33 | from rhodecode.model.db import User, UserApiKeys, Repository |
|
32 | 34 | |
|
33 | 35 | |
|
34 | 36 | log = logging.getLogger(__name__) |
|
35 | 37 | |
|
36 | 38 | |
|
37 | 39 | def plugin_factory(plugin_id, *args, **kwargs): |
|
38 | 40 | plugin = RhodeCodeAuthPlugin(plugin_id) |
|
39 | 41 | return plugin |
|
40 | 42 | |
|
41 | 43 | |
|
42 | 44 | class RhodecodeAuthnResource(AuthnPluginResourceBase): |
|
43 | 45 | pass |
|
44 | 46 | |
|
45 | 47 | |
|
46 | 48 | class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase): |
|
47 | 49 | """ |
|
48 | 50 | Enables usage of authentication tokens for vcs operations. |
|
49 | 51 | """ |
|
50 | 52 | uid = 'token' |
|
53 | AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs' | |
|
51 | 54 | |
|
52 | 55 | def includeme(self, config): |
|
53 | 56 | config.add_authn_plugin(self) |
|
54 | 57 | config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self)) |
|
55 | 58 | config.add_view( |
|
56 | 59 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
57 | 60 | attr='settings_get', |
|
58 | 61 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
59 | 62 | request_method='GET', |
|
60 | 63 | route_name='auth_home', |
|
61 | 64 | context=RhodecodeAuthnResource) |
|
62 | 65 | config.add_view( |
|
63 | 66 | 'rhodecode.authentication.views.AuthnPluginViewBase', |
|
64 | 67 | attr='settings_post', |
|
65 | 68 | renderer='rhodecode:templates/admin/auth/plugin_settings.mako', |
|
66 | 69 | request_method='POST', |
|
67 | 70 | route_name='auth_home', |
|
68 | 71 | context=RhodecodeAuthnResource) |
|
69 | 72 | |
|
73 | def get_settings_schema(self): | |
|
74 | return RhodeCodeSettingsSchema() | |
|
75 | ||
|
70 | 76 | def get_display_name(self): |
|
71 | 77 | return _('Rhodecode Token') |
|
72 | 78 | |
|
73 | 79 | @classmethod |
|
74 | 80 | def docs(cls): |
|
75 | 81 | return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-token.html" |
|
76 | 82 | |
|
77 | 83 | @hybrid_property |
|
78 | 84 | def name(self): |
|
79 | 85 | return u"authtoken" |
|
80 | 86 | |
|
81 | 87 | def user_activation_state(self): |
|
82 | 88 | def_user_perms = User.get_default_user().AuthUser().permissions['global'] |
|
83 | 89 | return 'hg.register.auto_activate' in def_user_perms |
|
84 | 90 | |
|
85 | 91 | def allows_authentication_from( |
|
86 | 92 | self, user, allows_non_existing_user=True, |
|
87 | 93 | allowed_auth_plugins=None, allowed_auth_sources=None): |
|
88 | 94 | """ |
|
89 | 95 | Custom method for this auth that doesn't accept empty users. And also |
|
90 | 96 | allows users from all other active plugins to use it and also |
|
91 | 97 | authenticate against it. But only via vcs mode |
|
92 | 98 | """ |
|
93 | 99 | from rhodecode.authentication.base import get_authn_registry |
|
94 | 100 | authn_registry = get_authn_registry() |
|
95 | 101 | |
|
96 | 102 | active_plugins = set( |
|
97 | 103 | [x.name for x in authn_registry.get_plugins_for_authentication()]) |
|
98 | 104 | active_plugins.discard(self.name) |
|
99 | 105 | |
|
100 | 106 | allowed_auth_plugins = [self.name] + list(active_plugins) |
|
101 | 107 | # only for vcs operations |
|
102 | 108 | allowed_auth_sources = [VCS_TYPE] |
|
103 | 109 | |
|
104 | 110 | return super(RhodeCodeAuthPlugin, self).allows_authentication_from( |
|
105 | 111 | user, allows_non_existing_user=False, |
|
106 | 112 | allowed_auth_plugins=allowed_auth_plugins, |
|
107 | 113 | allowed_auth_sources=allowed_auth_sources) |
|
108 | 114 | |
|
109 | 115 | def auth(self, userobj, username, password, settings, **kwargs): |
|
110 | 116 | if not userobj: |
|
111 | 117 | log.debug('userobj was:%s skipping', userobj) |
|
112 | 118 | return None |
|
113 | 119 | |
|
114 | 120 | user_attrs = { |
|
115 | 121 | "username": userobj.username, |
|
116 | 122 | "firstname": userobj.firstname, |
|
117 | 123 | "lastname": userobj.lastname, |
|
118 | 124 | "groups": [], |
|
119 | 125 | 'user_group_sync': False, |
|
120 | 126 | "email": userobj.email, |
|
121 | 127 | "admin": userobj.admin, |
|
122 | 128 | "active": userobj.active, |
|
123 | 129 | "active_from_extern": userobj.active, |
|
124 | 130 | "extern_name": userobj.user_id, |
|
125 | 131 | "extern_type": userobj.extern_type, |
|
126 | 132 | } |
|
127 | 133 | |
|
128 | 134 | log.debug('Authenticating user with args %s', user_attrs) |
|
129 | 135 | if userobj.active: |
|
130 | 136 | # calling context repo for token scopes |
|
131 | 137 | scope_repo_id = None |
|
132 | 138 | if self.acl_repo_name: |
|
133 | 139 | repo = Repository.get_by_repo_name(self.acl_repo_name) |
|
134 | 140 | scope_repo_id = repo.repo_id if repo else None |
|
135 | 141 | |
|
136 | 142 | token_match = userobj.authenticate_by_token( |
|
137 | 143 | password, roles=[UserApiKeys.ROLE_VCS], |
|
138 | 144 | scope_repo_id=scope_repo_id) |
|
139 | 145 | |
|
140 | 146 | if userobj.username == username and token_match: |
|
141 | 147 | log.info( |
|
142 | 148 | 'user `%s` successfully authenticated via %s', |
|
143 | 149 | user_attrs['username'], self.name) |
|
144 | 150 | return user_attrs |
|
145 | log.warn( | |
|
146 | 'user `%s` failed to authenticate via %s, reason: bad or ' | |
|
151 | log.warning('user `%s` failed to authenticate via %s, reason: bad or ' | |
|
147 | 152 | 'inactive token.', username, self.name) |
|
148 | 153 | else: |
|
149 | log.warning( | |
|
150 | 'user `%s` failed to authenticate via %s, reason: account not ' | |
|
154 | log.warning('user `%s` failed to authenticate via %s, reason: account not ' | |
|
151 | 155 | 'active.', username, self.name) |
|
152 | 156 | return None |
|
153 | 157 | |
|
154 | 158 | |
|
155 | 159 | def includeme(config): |
|
156 | 160 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid) |
|
157 | 161 | plugin_factory(plugin_id).includeme(config) |
|
162 | ||
|
163 | ||
|
164 | class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase): | |
|
165 | auth_scope_choices = [ | |
|
166 | (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS, 'VCS only'), | |
|
167 | ] | |
|
168 | ||
|
169 | scope_restriction = colander.SchemaNode( | |
|
170 | colander.String(), | |
|
171 | default=auth_scope_choices[0], | |
|
172 | description=_('Choose operation scope restriction when authenticating.'), | |
|
173 | title=_('Scope restriction'), | |
|
174 | validator=colander.OneOf([x[0] for x in auth_scope_choices]), | |
|
175 | widget='select_with_labels', | |
|
176 | choices=auth_scope_choices | |
|
177 | ) |
@@ -1,383 +1,415 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | |
|
3 | 3 | # Copyright (C) 2010-2019 RhodeCode GmbH |
|
4 | 4 | # |
|
5 | 5 | # This program is free software: you can redistribute it and/or modify |
|
6 | 6 | # it under the terms of the GNU Affero General Public License, version 3 |
|
7 | 7 | # (only), as published by the Free Software Foundation. |
|
8 | 8 | # |
|
9 | 9 | # This program is distributed in the hope that it will be useful, |
|
10 | 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 | 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 | 12 | # GNU General Public License for more details. |
|
13 | 13 | # |
|
14 | 14 | # You should have received a copy of the GNU Affero General Public License |
|
15 | 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | 16 | # |
|
17 | 17 | # This program is dual-licensed. If you wish to learn more about the |
|
18 | 18 | # RhodeCode Enterprise Edition, including its added features, Support services, |
|
19 | 19 | # and proprietary license terms, please see https://rhodecode.com/licenses/ |
|
20 | 20 | |
|
21 | 21 | """ |
|
22 | 22 | Helpers for fixture generation |
|
23 | 23 | """ |
|
24 | 24 | |
|
25 | 25 | import os |
|
26 | 26 | import time |
|
27 | 27 | import tempfile |
|
28 | 28 | import shutil |
|
29 | 29 | |
|
30 | 30 | import configobj |
|
31 | 31 | |
|
32 | 32 | from rhodecode.tests import * |
|
33 | 33 | from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap |
|
34 | 34 | from rhodecode.model.meta import Session |
|
35 | 35 | from rhodecode.model.repo import RepoModel |
|
36 | 36 | from rhodecode.model.user import UserModel |
|
37 | 37 | from rhodecode.model.repo_group import RepoGroupModel |
|
38 | 38 | from rhodecode.model.user_group import UserGroupModel |
|
39 | 39 | from rhodecode.model.gist import GistModel |
|
40 | 40 | from rhodecode.model.auth_token import AuthTokenModel |
|
41 | 41 | from rhodecode.authentication.plugins.auth_rhodecode import \ |
|
42 | 42 | RhodeCodeAuthPlugin |
|
43 | 43 | |
|
44 | 44 | dn = os.path.dirname |
|
45 | 45 | FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures') |
|
46 | 46 | |
|
47 | 47 | |
|
48 | 48 | def error_function(*args, **kwargs): |
|
49 | 49 | raise Exception('Total Crash !') |
|
50 | 50 | |
|
51 | 51 | |
|
52 | 52 | class TestINI(object): |
|
53 | 53 | """ |
|
54 | 54 | Allows to create a new test.ini file as a copy of existing one with edited |
|
55 | 55 | data. Example usage:: |
|
56 | 56 | |
|
57 | 57 | with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path: |
|
58 | 58 | print('paster server %s' % new_test_ini) |
|
59 | 59 | """ |
|
60 | 60 | |
|
61 | 61 | def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT', |
|
62 | 62 | destroy=True, dir=None): |
|
63 | 63 | self.ini_file_path = ini_file_path |
|
64 | 64 | self.ini_params = ini_params |
|
65 | 65 | self.new_path = None |
|
66 | 66 | self.new_path_prefix = new_file_prefix |
|
67 | 67 | self._destroy = destroy |
|
68 | 68 | self._dir = dir |
|
69 | 69 | |
|
70 | 70 | def __enter__(self): |
|
71 | 71 | return self.create() |
|
72 | 72 | |
|
73 | 73 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
74 | 74 | self.destroy() |
|
75 | 75 | |
|
76 | 76 | def create(self): |
|
77 | 77 | config = configobj.ConfigObj( |
|
78 | 78 | self.ini_file_path, file_error=True, write_empty_values=True) |
|
79 | 79 | |
|
80 | 80 | for data in self.ini_params: |
|
81 | 81 | section, ini_params = data.items()[0] |
|
82 | 82 | for key, val in ini_params.items(): |
|
83 | 83 | config[section][key] = val |
|
84 | 84 | with tempfile.NamedTemporaryFile( |
|
85 | 85 | prefix=self.new_path_prefix, suffix='.ini', dir=self._dir, |
|
86 | 86 | delete=False) as new_ini_file: |
|
87 | 87 | config.write(new_ini_file) |
|
88 | 88 | self.new_path = new_ini_file.name |
|
89 | 89 | |
|
90 | 90 | return self.new_path |
|
91 | 91 | |
|
92 | 92 | def destroy(self): |
|
93 | 93 | if self._destroy: |
|
94 | 94 | os.remove(self.new_path) |
|
95 | 95 | |
|
96 | 96 | |
|
97 | 97 | class Fixture(object): |
|
98 | 98 | |
|
99 | 99 | def anon_access(self, status): |
|
100 | 100 | """ |
|
101 | 101 | Context process for disabling anonymous access. use like: |
|
102 | 102 | fixture = Fixture() |
|
103 | 103 | with fixture.anon_access(False): |
|
104 | 104 | #tests |
|
105 | 105 | |
|
106 | 106 | after this block anon access will be set to `not status` |
|
107 | 107 | """ |
|
108 | 108 | |
|
109 | 109 | class context(object): |
|
110 | 110 | def __enter__(self): |
|
111 | 111 | anon = User.get_default_user() |
|
112 | 112 | anon.active = status |
|
113 | 113 | Session().add(anon) |
|
114 | 114 | Session().commit() |
|
115 | 115 | time.sleep(1.5) # must sleep for cache (1s to expire) |
|
116 | 116 | |
|
117 | 117 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
118 | 118 | anon = User.get_default_user() |
|
119 | 119 | anon.active = not status |
|
120 | 120 | Session().add(anon) |
|
121 | 121 | Session().commit() |
|
122 | 122 | |
|
123 | 123 | return context() |
|
124 | 124 | |
|
125 |
def |
|
|
125 | def auth_restriction(self, auth_restriction): | |
|
126 | 126 | """ |
|
127 |
Context process for changing the builtin rhodecode plugin |
|
|
127 | Context process for changing the builtin rhodecode plugin auth restrictions. | |
|
128 | 128 | Use like: |
|
129 | 129 | fixture = Fixture() |
|
130 |
with fixture. |
|
|
130 | with fixture.auth_restriction('super_admin'): | |
|
131 | 131 | #tests |
|
132 | 132 | |
|
133 |
after this block |
|
|
133 | after this block auth restriction will be taken off | |
|
134 | 134 | """ |
|
135 | 135 | |
|
136 | 136 | class context(object): |
|
137 | 137 | def _get_pluing(self): |
|
138 | 138 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format( |
|
139 | 139 | RhodeCodeAuthPlugin.uid) |
|
140 | 140 | plugin = RhodeCodeAuthPlugin(plugin_id) |
|
141 | 141 | return plugin |
|
142 | 142 | |
|
143 | 143 | def __enter__(self): |
|
144 | 144 | plugin = self._get_pluing() |
|
145 | 145 | plugin.create_or_update_setting( |
|
146 |
' |
|
|
146 | 'auth_restriction', auth_restriction) | |
|
147 | 147 | Session().commit() |
|
148 | 148 | |
|
149 | 149 | def __exit__(self, exc_type, exc_val, exc_tb): |
|
150 | 150 | plugin = self._get_pluing() |
|
151 | 151 | plugin.create_or_update_setting( |
|
152 |
' |
|
|
152 | 'auth_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE) | |
|
153 | Session().commit() | |
|
154 | ||
|
155 | return context() | |
|
156 | ||
|
157 | def scope_restriction(self, scope_restriction): | |
|
158 | """ | |
|
159 | Context process for changing the builtin rhodecode plugin scope restrictions. | |
|
160 | Use like: | |
|
161 | fixture = Fixture() | |
|
162 | with fixture.scope_restriction('scope_http'): | |
|
163 | #tests | |
|
164 | ||
|
165 | after this block scope restriction will be taken off | |
|
166 | """ | |
|
167 | ||
|
168 | class context(object): | |
|
169 | def _get_pluing(self): | |
|
170 | plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format( | |
|
171 | RhodeCodeAuthPlugin.uid) | |
|
172 | plugin = RhodeCodeAuthPlugin(plugin_id) | |
|
173 | return plugin | |
|
174 | ||
|
175 | def __enter__(self): | |
|
176 | plugin = self._get_pluing() | |
|
177 | plugin.create_or_update_setting( | |
|
178 | 'scope_restriction', scope_restriction) | |
|
179 | Session().commit() | |
|
180 | ||
|
181 | def __exit__(self, exc_type, exc_val, exc_tb): | |
|
182 | plugin = self._get_pluing() | |
|
183 | plugin.create_or_update_setting( | |
|
184 | 'scope_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL) | |
|
153 | 185 | Session().commit() |
|
154 | 186 | |
|
155 | 187 | return context() |
|
156 | 188 | |
|
157 | 189 | def _get_repo_create_params(self, **custom): |
|
158 | 190 | defs = { |
|
159 | 191 | 'repo_name': None, |
|
160 | 192 | 'repo_type': 'hg', |
|
161 | 193 | 'clone_uri': '', |
|
162 | 194 | 'push_uri': '', |
|
163 | 195 | 'repo_group': '-1', |
|
164 | 196 | 'repo_description': 'DESC', |
|
165 | 197 | 'repo_private': False, |
|
166 | 198 | 'repo_landing_rev': 'rev:tip', |
|
167 | 199 | 'repo_copy_permissions': False, |
|
168 | 200 | 'repo_state': Repository.STATE_CREATED, |
|
169 | 201 | } |
|
170 | 202 | defs.update(custom) |
|
171 | 203 | if 'repo_name_full' not in custom: |
|
172 | 204 | defs.update({'repo_name_full': defs['repo_name']}) |
|
173 | 205 | |
|
174 | 206 | # fix the repo name if passed as repo_name_full |
|
175 | 207 | if defs['repo_name']: |
|
176 | 208 | defs['repo_name'] = defs['repo_name'].split('/')[-1] |
|
177 | 209 | |
|
178 | 210 | return defs |
|
179 | 211 | |
|
180 | 212 | def _get_group_create_params(self, **custom): |
|
181 | 213 | defs = { |
|
182 | 214 | 'group_name': None, |
|
183 | 215 | 'group_description': 'DESC', |
|
184 | 216 | 'perm_updates': [], |
|
185 | 217 | 'perm_additions': [], |
|
186 | 218 | 'perm_deletions': [], |
|
187 | 219 | 'group_parent_id': -1, |
|
188 | 220 | 'enable_locking': False, |
|
189 | 221 | 'recursive': False, |
|
190 | 222 | } |
|
191 | 223 | defs.update(custom) |
|
192 | 224 | |
|
193 | 225 | return defs |
|
194 | 226 | |
|
195 | 227 | def _get_user_create_params(self, name, **custom): |
|
196 | 228 | defs = { |
|
197 | 229 | 'username': name, |
|
198 | 230 | 'password': 'qweqwe', |
|
199 | 231 | 'email': '%s+test@rhodecode.org' % name, |
|
200 | 232 | 'firstname': 'TestUser', |
|
201 | 233 | 'lastname': 'Test', |
|
202 | 234 | 'active': True, |
|
203 | 235 | 'admin': False, |
|
204 | 236 | 'extern_type': 'rhodecode', |
|
205 | 237 | 'extern_name': None, |
|
206 | 238 | } |
|
207 | 239 | defs.update(custom) |
|
208 | 240 | |
|
209 | 241 | return defs |
|
210 | 242 | |
|
211 | 243 | def _get_user_group_create_params(self, name, **custom): |
|
212 | 244 | defs = { |
|
213 | 245 | 'users_group_name': name, |
|
214 | 246 | 'user_group_description': 'DESC', |
|
215 | 247 | 'users_group_active': True, |
|
216 | 248 | 'user_group_data': {}, |
|
217 | 249 | } |
|
218 | 250 | defs.update(custom) |
|
219 | 251 | |
|
220 | 252 | return defs |
|
221 | 253 | |
|
222 | 254 | def create_repo(self, name, **kwargs): |
|
223 | 255 | repo_group = kwargs.get('repo_group') |
|
224 | 256 | if isinstance(repo_group, RepoGroup): |
|
225 | 257 | kwargs['repo_group'] = repo_group.group_id |
|
226 | 258 | name = name.split(Repository.NAME_SEP)[-1] |
|
227 | 259 | name = Repository.NAME_SEP.join((repo_group.group_name, name)) |
|
228 | 260 | |
|
229 | 261 | if 'skip_if_exists' in kwargs: |
|
230 | 262 | del kwargs['skip_if_exists'] |
|
231 | 263 | r = Repository.get_by_repo_name(name) |
|
232 | 264 | if r: |
|
233 | 265 | return r |
|
234 | 266 | |
|
235 | 267 | form_data = self._get_repo_create_params(repo_name=name, **kwargs) |
|
236 | 268 | cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) |
|
237 | 269 | RepoModel().create(form_data, cur_user) |
|
238 | 270 | Session().commit() |
|
239 | 271 | repo = Repository.get_by_repo_name(name) |
|
240 | 272 | assert repo |
|
241 | 273 | return repo |
|
242 | 274 | |
|
243 | 275 | def create_fork(self, repo_to_fork, fork_name, **kwargs): |
|
244 | 276 | repo_to_fork = Repository.get_by_repo_name(repo_to_fork) |
|
245 | 277 | |
|
246 | 278 | form_data = self._get_repo_create_params(repo_name=fork_name, |
|
247 | 279 | fork_parent_id=repo_to_fork.repo_id, |
|
248 | 280 | repo_type=repo_to_fork.repo_type, |
|
249 | 281 | **kwargs) |
|
250 | 282 | #TODO: fix it !! |
|
251 | 283 | form_data['description'] = form_data['repo_description'] |
|
252 | 284 | form_data['private'] = form_data['repo_private'] |
|
253 | 285 | form_data['landing_rev'] = form_data['repo_landing_rev'] |
|
254 | 286 | |
|
255 | 287 | owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) |
|
256 | 288 | RepoModel().create_fork(form_data, cur_user=owner) |
|
257 | 289 | Session().commit() |
|
258 | 290 | r = Repository.get_by_repo_name(fork_name) |
|
259 | 291 | assert r |
|
260 | 292 | return r |
|
261 | 293 | |
|
262 | 294 | def destroy_repo(self, repo_name, **kwargs): |
|
263 | 295 | RepoModel().delete(repo_name, pull_requests='delete', **kwargs) |
|
264 | 296 | Session().commit() |
|
265 | 297 | |
|
266 | 298 | def destroy_repo_on_filesystem(self, repo_name): |
|
267 | 299 | rm_path = os.path.join(RepoModel().repos_path, repo_name) |
|
268 | 300 | if os.path.isdir(rm_path): |
|
269 | 301 | shutil.rmtree(rm_path) |
|
270 | 302 | |
|
271 | 303 | def create_repo_group(self, name, **kwargs): |
|
272 | 304 | if 'skip_if_exists' in kwargs: |
|
273 | 305 | del kwargs['skip_if_exists'] |
|
274 | 306 | gr = RepoGroup.get_by_group_name(group_name=name) |
|
275 | 307 | if gr: |
|
276 | 308 | return gr |
|
277 | 309 | form_data = self._get_group_create_params(group_name=name, **kwargs) |
|
278 | 310 | owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) |
|
279 | 311 | gr = RepoGroupModel().create( |
|
280 | 312 | group_name=form_data['group_name'], |
|
281 | 313 | group_description=form_data['group_name'], |
|
282 | 314 | owner=owner) |
|
283 | 315 | Session().commit() |
|
284 | 316 | gr = RepoGroup.get_by_group_name(gr.group_name) |
|
285 | 317 | return gr |
|
286 | 318 | |
|
287 | 319 | def destroy_repo_group(self, repogroupid): |
|
288 | 320 | RepoGroupModel().delete(repogroupid) |
|
289 | 321 | Session().commit() |
|
290 | 322 | |
|
291 | 323 | def create_user(self, name, **kwargs): |
|
292 | 324 | if 'skip_if_exists' in kwargs: |
|
293 | 325 | del kwargs['skip_if_exists'] |
|
294 | 326 | user = User.get_by_username(name) |
|
295 | 327 | if user: |
|
296 | 328 | return user |
|
297 | 329 | form_data = self._get_user_create_params(name, **kwargs) |
|
298 | 330 | user = UserModel().create(form_data) |
|
299 | 331 | |
|
300 | 332 | # create token for user |
|
301 | 333 | AuthTokenModel().create( |
|
302 | 334 | user=user, description=u'TEST_USER_TOKEN') |
|
303 | 335 | |
|
304 | 336 | Session().commit() |
|
305 | 337 | user = User.get_by_username(user.username) |
|
306 | 338 | return user |
|
307 | 339 | |
|
308 | 340 | def destroy_user(self, userid): |
|
309 | 341 | UserModel().delete(userid) |
|
310 | 342 | Session().commit() |
|
311 | 343 | |
|
312 | 344 | def create_additional_user_email(self, user, email): |
|
313 | 345 | uem = UserEmailMap() |
|
314 | 346 | uem.user = user |
|
315 | 347 | uem.email = email |
|
316 | 348 | Session().add(uem) |
|
317 | 349 | return uem |
|
318 | 350 | |
|
319 | 351 | def destroy_users(self, userid_iter): |
|
320 | 352 | for user_id in userid_iter: |
|
321 | 353 | if User.get_by_username(user_id): |
|
322 | 354 | UserModel().delete(user_id) |
|
323 | 355 | Session().commit() |
|
324 | 356 | |
|
325 | 357 | def create_user_group(self, name, **kwargs): |
|
326 | 358 | if 'skip_if_exists' in kwargs: |
|
327 | 359 | del kwargs['skip_if_exists'] |
|
328 | 360 | gr = UserGroup.get_by_group_name(group_name=name) |
|
329 | 361 | if gr: |
|
330 | 362 | return gr |
|
331 | 363 | # map active flag to the real attribute. For API consistency of fixtures |
|
332 | 364 | if 'active' in kwargs: |
|
333 | 365 | kwargs['users_group_active'] = kwargs['active'] |
|
334 | 366 | del kwargs['active'] |
|
335 | 367 | form_data = self._get_user_group_create_params(name, **kwargs) |
|
336 | 368 | owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) |
|
337 | 369 | user_group = UserGroupModel().create( |
|
338 | 370 | name=form_data['users_group_name'], |
|
339 | 371 | description=form_data['user_group_description'], |
|
340 | 372 | owner=owner, active=form_data['users_group_active'], |
|
341 | 373 | group_data=form_data['user_group_data']) |
|
342 | 374 | Session().commit() |
|
343 | 375 | user_group = UserGroup.get_by_group_name(user_group.users_group_name) |
|
344 | 376 | return user_group |
|
345 | 377 | |
|
346 | 378 | def destroy_user_group(self, usergroupid): |
|
347 | 379 | UserGroupModel().delete(user_group=usergroupid, force=True) |
|
348 | 380 | Session().commit() |
|
349 | 381 | |
|
350 | 382 | def create_gist(self, **kwargs): |
|
351 | 383 | form_data = { |
|
352 | 384 | 'description': 'new-gist', |
|
353 | 385 | 'owner': TEST_USER_ADMIN_LOGIN, |
|
354 | 386 | 'gist_type': GistModel.cls.GIST_PUBLIC, |
|
355 | 387 | 'lifetime': -1, |
|
356 | 388 | 'acl_level': Gist.ACL_LEVEL_PUBLIC, |
|
357 | 389 | 'gist_mapping': {'filename1.txt': {'content': 'hello world'},} |
|
358 | 390 | } |
|
359 | 391 | form_data.update(kwargs) |
|
360 | 392 | gist = GistModel().create( |
|
361 | 393 | description=form_data['description'], owner=form_data['owner'], |
|
362 | 394 | gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'], |
|
363 | 395 | lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level'] |
|
364 | 396 | ) |
|
365 | 397 | Session().commit() |
|
366 | 398 | return gist |
|
367 | 399 | |
|
368 | 400 | def destroy_gists(self, gistid=None): |
|
369 | 401 | for g in GistModel.cls.get_all(): |
|
370 | 402 | if gistid: |
|
371 | 403 | if gistid == g.gist_access_id: |
|
372 | 404 | GistModel().delete(g) |
|
373 | 405 | else: |
|
374 | 406 | GistModel().delete(g) |
|
375 | 407 | Session().commit() |
|
376 | 408 | |
|
377 | 409 | def load_resource(self, resource_name, strip=False): |
|
378 | 410 | with open(os.path.join(FIXTURES, resource_name)) as f: |
|
379 | 411 | source = f.read() |
|
380 | 412 | if strip: |
|
381 | 413 | source = source.strip() |
|
382 | 414 | |
|
383 | 415 | return source |
General Comments 0
You need to be logged in to leave comments.
Login now