##// END OF EJS Templates
pytest: use hmac.new instead of hmac.HMAC...
Mads Kiilerich -
r8381:11ab74b7 default
parent child Browse files
Show More
@@ -1,508 +1,508 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.model.user
16 16 ~~~~~~~~~~~~~~~~~~~~
17 17
18 18 users model for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 9, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28
29 29 import hashlib
30 30 import hmac
31 31 import logging
32 32 import time
33 33 import traceback
34 34
35 35 from sqlalchemy.exc import DatabaseError
36 36 from tg import config
37 37 from tg.i18n import ugettext as _
38 38
39 39 from kallithea.lib.exceptions import DefaultUserException, UserOwnsReposException
40 40 from kallithea.lib.utils2 import generate_api_key, get_current_authuser
41 41 from kallithea.model.db import Permission, User, UserEmailMap, UserIpMap, UserToPerm
42 42 from kallithea.model.meta import Session
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class UserModel(object):
49 49 password_reset_token_lifetime = 86400 # 24 hours
50 50
51 51 def get(self, user_id):
52 52 user = User.query()
53 53 return user.get(user_id)
54 54
55 55 def get_user(self, user):
56 56 return User.guess_instance(user)
57 57
58 58 def create(self, form_data, cur_user=None):
59 59 if not cur_user:
60 60 cur_user = getattr(get_current_authuser(), 'username', None)
61 61
62 62 from kallithea.lib.hooks import check_allowed_create_user, log_create_user
63 63 _fd = form_data
64 64 user_data = {
65 65 'username': _fd['username'],
66 66 'password': _fd['password'],
67 67 'email': _fd['email'],
68 68 'firstname': _fd['firstname'],
69 69 'lastname': _fd['lastname'],
70 70 'active': _fd['active'],
71 71 'admin': False
72 72 }
73 73 # raises UserCreationError if it's not allowed
74 74 check_allowed_create_user(user_data, cur_user)
75 75 from kallithea.lib.auth import get_crypt_password
76 76
77 77 new_user = User()
78 78 for k, v in form_data.items():
79 79 if k == 'password':
80 80 v = get_crypt_password(v)
81 81 if k == 'firstname':
82 82 k = 'name'
83 83 setattr(new_user, k, v)
84 84
85 85 new_user.api_key = generate_api_key()
86 86 Session().add(new_user)
87 87 Session().flush() # make database assign new_user.user_id
88 88
89 89 log_create_user(new_user.get_dict(), cur_user)
90 90 return new_user
91 91
92 92 def create_or_update(self, username, password, email, firstname='',
93 93 lastname='', active=True, admin=False,
94 94 extern_type=None, extern_name=None, cur_user=None):
95 95 """
96 96 Creates a new instance if not found, or updates current one
97 97
98 98 :param username:
99 99 :param password:
100 100 :param email:
101 101 :param active:
102 102 :param firstname:
103 103 :param lastname:
104 104 :param active:
105 105 :param admin:
106 106 :param extern_name:
107 107 :param extern_type:
108 108 :param cur_user:
109 109 """
110 110 if not cur_user:
111 111 cur_user = getattr(get_current_authuser(), 'username', None)
112 112
113 113 from kallithea.lib.auth import check_password, get_crypt_password
114 114 from kallithea.lib.hooks import check_allowed_create_user, log_create_user
115 115 user_data = {
116 116 'username': username, 'password': password,
117 117 'email': email, 'firstname': firstname, 'lastname': lastname,
118 118 'active': active, 'admin': admin
119 119 }
120 120 # raises UserCreationError if it's not allowed
121 121 check_allowed_create_user(user_data, cur_user)
122 122
123 123 log.debug('Checking for %s account in Kallithea database', username)
124 124 user = User.get_by_username(username, case_insensitive=True)
125 125 if user is None:
126 126 log.debug('creating new user %s', username)
127 127 new_user = User()
128 128 edit = False
129 129 else:
130 130 log.debug('updating user %s', username)
131 131 new_user = user
132 132 edit = True
133 133
134 134 try:
135 135 new_user.username = username
136 136 new_user.admin = admin
137 137 new_user.email = email
138 138 new_user.active = active
139 139 new_user.extern_name = extern_name
140 140 new_user.extern_type = extern_type
141 141 new_user.name = firstname
142 142 new_user.lastname = lastname
143 143
144 144 if not edit:
145 145 new_user.api_key = generate_api_key()
146 146
147 147 # set password only if creating an user or password is changed
148 148 password_change = new_user.password and \
149 149 not check_password(password, new_user.password)
150 150 if not edit or password_change:
151 151 reason = 'new password' if edit else 'new user'
152 152 log.debug('Updating password reason=>%s', reason)
153 153 new_user.password = get_crypt_password(password) \
154 154 if password else ''
155 155
156 156 if user is None:
157 157 Session().add(new_user)
158 158 Session().flush() # make database assign new_user.user_id
159 159
160 160 if not edit:
161 161 log_create_user(new_user.get_dict(), cur_user)
162 162
163 163 return new_user
164 164 except (DatabaseError,):
165 165 log.error(traceback.format_exc())
166 166 raise
167 167
168 168 def create_registration(self, form_data):
169 169 import kallithea.lib.helpers as h
170 170 from kallithea.model.notification import NotificationModel
171 171
172 172 form_data['admin'] = False
173 173 form_data['extern_type'] = User.DEFAULT_AUTH_TYPE
174 174 form_data['extern_name'] = ''
175 175 new_user = self.create(form_data)
176 176
177 177 # notification to admins
178 178 subject = _('New user registration')
179 179 body = (
180 180 'New user registration\n'
181 181 '---------------------\n'
182 182 '- Username: {user.username}\n'
183 183 '- Full Name: {user.full_name}\n'
184 184 '- Email: {user.email}\n'
185 185 ).format(user=new_user)
186 186 edit_url = h.canonical_url('edit_user', id=new_user.user_id)
187 187 email_kwargs = {
188 188 'registered_user_url': edit_url,
189 189 'new_username': new_user.username,
190 190 'new_email': new_user.email,
191 191 'new_full_name': new_user.full_name}
192 192 NotificationModel().create(created_by=new_user, subject=subject,
193 193 body=body, recipients=None,
194 194 type_=NotificationModel.TYPE_REGISTRATION,
195 195 email_kwargs=email_kwargs)
196 196
197 197 def update(self, user_id, form_data, skip_attrs=None):
198 198 from kallithea.lib.auth import get_crypt_password
199 199 skip_attrs = skip_attrs or []
200 200 user = self.get(user_id)
201 201 if user.is_default_user:
202 202 raise DefaultUserException(
203 203 _("You can't edit this user since it's "
204 204 "crucial for entire application"))
205 205
206 206 for k, v in form_data.items():
207 207 if k in skip_attrs:
208 208 continue
209 209 if k == 'new_password' and v:
210 210 user.password = get_crypt_password(v)
211 211 else:
212 212 # old legacy thing orm models store firstname as name,
213 213 # need proper refactor to username
214 214 if k == 'firstname':
215 215 k = 'name'
216 216 setattr(user, k, v)
217 217
218 218 def update_user(self, user, **kwargs):
219 219 from kallithea.lib.auth import get_crypt_password
220 220
221 221 user = User.guess_instance(user)
222 222 if user.is_default_user:
223 223 raise DefaultUserException(
224 224 _("You can't edit this user since it's"
225 225 " crucial for entire application")
226 226 )
227 227
228 228 for k, v in kwargs.items():
229 229 if k == 'password' and v:
230 230 v = get_crypt_password(v)
231 231
232 232 setattr(user, k, v)
233 233 return user
234 234
235 235 def delete(self, user, cur_user=None):
236 236 if cur_user is None:
237 237 cur_user = getattr(get_current_authuser(), 'username', None)
238 238 user = User.guess_instance(user)
239 239
240 240 if user.is_default_user:
241 241 raise DefaultUserException(
242 242 _("You can't remove this user since it is"
243 243 " crucial for the entire application"))
244 244 if user.repositories:
245 245 repos = [x.repo_name for x in user.repositories]
246 246 raise UserOwnsReposException(
247 247 _('User "%s" still owns %s repositories and cannot be '
248 248 'removed. Switch owners or remove those repositories: %s')
249 249 % (user.username, len(repos), ', '.join(repos)))
250 250 if user.repo_groups:
251 251 repogroups = [x.group_name for x in user.repo_groups]
252 252 raise UserOwnsReposException(_(
253 253 'User "%s" still owns %s repository groups and cannot be '
254 254 'removed. Switch owners or remove those repository groups: %s')
255 255 % (user.username, len(repogroups), ', '.join(repogroups)))
256 256 if user.user_groups:
257 257 usergroups = [x.users_group_name for x in user.user_groups]
258 258 raise UserOwnsReposException(
259 259 _('User "%s" still owns %s user groups and cannot be '
260 260 'removed. Switch owners or remove those user groups: %s')
261 261 % (user.username, len(usergroups), ', '.join(usergroups)))
262 262 Session().delete(user)
263 263
264 264 from kallithea.lib.hooks import log_delete_user
265 265 log_delete_user(user.get_dict(), cur_user)
266 266
267 267 def can_change_password(self, user):
268 268 from kallithea.lib import auth_modules
269 269 managed_fields = auth_modules.get_managed_fields(user)
270 270 return 'password' not in managed_fields
271 271
272 272 def get_reset_password_token(self, user, timestamp, session_id):
273 273 """
274 274 The token is a 40-digit hexstring, calculated as a HMAC-SHA1.
275 275
276 276 In a traditional HMAC scenario, an attacker is unable to know or
277 277 influence the secret key, but can know or influence the message
278 278 and token. This scenario is slightly different (in particular
279 279 since the message sender is also the message recipient), but
280 280 sufficiently similar to use an HMAC. Benefits compared to a plain
281 281 SHA1 hash includes resistance against a length extension attack.
282 282
283 283 The HMAC key consists of the following values (known only to the
284 284 server and authorized users):
285 285
286 286 * per-application secret (the `app_instance_uuid` setting), without
287 287 which an attacker cannot counterfeit tokens
288 288 * hashed user password, invalidating the token upon password change
289 289
290 290 The HMAC message consists of the following values (potentially known
291 291 to an attacker):
292 292
293 293 * session ID (the anti-CSRF token), requiring an attacker to have
294 294 access to the browser session in which the token was created
295 295 * numeric user ID, limiting the token to a specific user (yet allowing
296 296 users to be renamed)
297 297 * user email address
298 298 * time of token issue (a Unix timestamp, to enable token expiration)
299 299
300 300 The key and message values are separated by NUL characters, which are
301 301 guaranteed not to occur in any of the values.
302 302 """
303 303 app_secret = config.get('app_instance_uuid')
304 return hmac.HMAC(
305 key='\0'.join([app_secret, user.password]).encode('utf-8'),
304 return hmac.new(
305 '\0'.join([app_secret, user.password]).encode('utf-8'),
306 306 msg='\0'.join([session_id, str(user.user_id), user.email, str(timestamp)]).encode('utf-8'),
307 307 digestmod=hashlib.sha1,
308 308 ).hexdigest()
309 309
310 310 def send_reset_password_email(self, data):
311 311 """
312 312 Sends email with a password reset token and link to the password
313 313 reset confirmation page with all information (including the token)
314 314 pre-filled. Also returns URL of that page, only without the token,
315 315 allowing users to copy-paste or manually enter the token from the
316 316 email.
317 317 """
318 318 import kallithea.lib.helpers as h
319 319 from kallithea.lib.celerylib import tasks
320 320 from kallithea.model.notification import EmailNotificationModel
321 321
322 322 user_email = data['email']
323 323 user = User.get_by_email(user_email)
324 324 timestamp = int(time.time())
325 325 if user is not None:
326 326 if self.can_change_password(user):
327 327 log.debug('password reset user %s found', user)
328 328 token = self.get_reset_password_token(user,
329 329 timestamp,
330 330 h.session_csrf_secret_token())
331 331 # URL must be fully qualified; but since the token is locked to
332 332 # the current browser session, we must provide a URL with the
333 333 # current scheme and hostname, rather than the canonical_url.
334 334 link = h.url('reset_password_confirmation', qualified=True,
335 335 email=user_email,
336 336 timestamp=timestamp,
337 337 token=token)
338 338 else:
339 339 log.debug('password reset user %s found but was managed', user)
340 340 token = link = None
341 341 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
342 342 body = EmailNotificationModel().get_email_tmpl(
343 343 reg_type, 'txt',
344 344 user=user.short_contact,
345 345 reset_token=token,
346 346 reset_url=link)
347 347 html_body = EmailNotificationModel().get_email_tmpl(
348 348 reg_type, 'html',
349 349 user=user.short_contact,
350 350 reset_token=token,
351 351 reset_url=link)
352 352 log.debug('sending email')
353 353 tasks.send_email([user_email], _("Password reset link"), body, html_body)
354 354 log.info('send new password mail to %s', user_email)
355 355 else:
356 356 log.debug("password reset email %s not found", user_email)
357 357
358 358 return h.url('reset_password_confirmation',
359 359 email=user_email,
360 360 timestamp=timestamp)
361 361
362 362 def verify_reset_password_token(self, email, timestamp, token):
363 363 import kallithea.lib.helpers as h
364 364 user = User.get_by_email(email)
365 365 if user is None:
366 366 log.debug("user with email %s not found", email)
367 367 return False
368 368
369 369 token_age = int(time.time()) - int(timestamp)
370 370
371 371 if token_age < 0:
372 372 log.debug('timestamp is from the future')
373 373 return False
374 374
375 375 if token_age > UserModel.password_reset_token_lifetime:
376 376 log.debug('password reset token expired')
377 377 return False
378 378
379 379 expected_token = self.get_reset_password_token(user,
380 380 timestamp,
381 381 h.session_csrf_secret_token())
382 382 log.debug('computed password reset token: %s', expected_token)
383 383 log.debug('received password reset token: %s', token)
384 384 return expected_token == token
385 385
386 386 def reset_password(self, user_email, new_passwd):
387 387 from kallithea.lib import auth
388 388 from kallithea.lib.celerylib import tasks
389 389 user = User.get_by_email(user_email)
390 390 if user is not None:
391 391 if not self.can_change_password(user):
392 392 raise Exception('trying to change password for external user')
393 393 user.password = auth.get_crypt_password(new_passwd)
394 394 Session().commit()
395 395 log.info('change password for %s', user_email)
396 396 if new_passwd is None:
397 397 raise Exception('unable to set new password')
398 398
399 399 tasks.send_email([user_email],
400 400 _('Password reset notification'),
401 401 _('The password to your account %s has been changed using password reset form.') % (user.username,))
402 402 log.info('send password reset mail to %s', user_email)
403 403
404 404 return True
405 405
406 406 def has_perm(self, user, perm):
407 407 perm = Permission.guess_instance(perm)
408 408 user = User.guess_instance(user)
409 409
410 410 return UserToPerm.query().filter(UserToPerm.user == user) \
411 411 .filter(UserToPerm.permission == perm).scalar() is not None
412 412
413 413 def grant_perm(self, user, perm):
414 414 """
415 415 Grant user global permissions
416 416
417 417 :param user:
418 418 :param perm:
419 419 """
420 420 user = User.guess_instance(user)
421 421 perm = Permission.guess_instance(perm)
422 422 # if this permission is already granted skip it
423 423 _perm = UserToPerm.query() \
424 424 .filter(UserToPerm.user == user) \
425 425 .filter(UserToPerm.permission == perm) \
426 426 .scalar()
427 427 if _perm:
428 428 return
429 429 new = UserToPerm()
430 430 new.user = user
431 431 new.permission = perm
432 432 Session().add(new)
433 433 return new
434 434
435 435 def revoke_perm(self, user, perm):
436 436 """
437 437 Revoke users global permissions
438 438
439 439 :param user:
440 440 :param perm:
441 441 """
442 442 user = User.guess_instance(user)
443 443 perm = Permission.guess_instance(perm)
444 444
445 445 UserToPerm.query().filter(
446 446 UserToPerm.user == user,
447 447 UserToPerm.permission == perm,
448 448 ).delete()
449 449
450 450 def add_extra_email(self, user, email):
451 451 """
452 452 Adds email address to UserEmailMap
453 453
454 454 :param user:
455 455 :param email:
456 456 """
457 457 from kallithea.model import forms
458 458 form = forms.UserExtraEmailForm()()
459 459 data = form.to_python(dict(email=email))
460 460 user = User.guess_instance(user)
461 461
462 462 obj = UserEmailMap()
463 463 obj.user = user
464 464 obj.email = data['email']
465 465 Session().add(obj)
466 466 return obj
467 467
468 468 def delete_extra_email(self, user, email_id):
469 469 """
470 470 Removes email address from UserEmailMap
471 471
472 472 :param user:
473 473 :param email_id:
474 474 """
475 475 user = User.guess_instance(user)
476 476 obj = UserEmailMap.query().get(email_id)
477 477 if obj is not None:
478 478 Session().delete(obj)
479 479
480 480 def add_extra_ip(self, user, ip):
481 481 """
482 482 Adds IP address to UserIpMap
483 483
484 484 :param user:
485 485 :param ip:
486 486 """
487 487 from kallithea.model import forms
488 488 form = forms.UserExtraIpForm()()
489 489 data = form.to_python(dict(ip=ip))
490 490 user = User.guess_instance(user)
491 491
492 492 obj = UserIpMap()
493 493 obj.user = user
494 494 obj.ip_addr = data['ip']
495 495 Session().add(obj)
496 496 return obj
497 497
498 498 def delete_extra_ip(self, user, ip_id):
499 499 """
500 500 Removes IP address from UserIpMap
501 501
502 502 :param user:
503 503 :param ip_id:
504 504 """
505 505 user = User.guess_instance(user)
506 506 obj = UserIpMap.query().get(ip_id)
507 507 if obj:
508 508 Session().delete(obj)
General Comments 0
You need to be logged in to leave comments. Login now