my_account.py
861 lines
| 32.3 KiB
| text/x-python
|
PythonLexer
r5088 | # Copyright (C) 2016-2023 RhodeCode GmbH | |||
r1920 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r5367 | import time | |||
r1920 | import logging | |||
import datetime | ||||
r3424 | import string | |||
r1920 | ||||
import formencode | ||||
r2080 | import formencode.htmlfill | |||
r3424 | import peppercorn | |||
r4316 | from pyramid.httpexceptions import HTTPFound, HTTPNotFound | |||
r1920 | ||||
from rhodecode.apps._base import BaseAppView, DataGridAppView | ||||
from rhodecode import forms | ||||
from rhodecode.lib import helpers as h | ||||
from rhodecode.lib import audit_logger | ||||
r4974 | from rhodecode.lib import ext_json | |||
r4153 | from rhodecode.lib.auth import ( | |||
LoginRequired, NotAnonymous, CSRFRequired, | ||||
HasRepoPermissionAny, HasRepoGroupPermissionAny, AuthUser) | ||||
r2080 | from rhodecode.lib.channelstream import ( | |||
channelstream_request, ChannelstreamException) | ||||
r5065 | from rhodecode.lib.hash_utils import md5_safe | |||
r1920 | from rhodecode.lib.utils2 import safe_int, md5, str2bool | |||
from rhodecode.model.auth_token import AuthTokenModel | ||||
from rhodecode.model.comment import CommentsModel | ||||
from rhodecode.model.db import ( | ||||
r5365 | IntegrityError, or_, in_filter_generator, select, | |||
r3424 | Repository, UserEmailMap, UserApiKeys, UserFollowing, | |||
r4690 | PullRequest, UserBookmark, RepoGroup, ChangesetStatus) | |||
r5367 | from rhodecode.model.forms import TOTPForm | |||
r1920 | from rhodecode.model.meta import Session | |||
from rhodecode.model.pull_request import PullRequestModel | ||||
from rhodecode.model.user import UserModel | ||||
r2496 | from rhodecode.model.user_group import UserGroupModel | |||
r1920 | from rhodecode.model.validation_schema.schemas import user_schema | |||
log = logging.getLogger(__name__) | ||||
class MyAccountView(BaseAppView, DataGridAppView): | ||||
ALLOW_SCOPED_TOKENS = False | ||||
""" | ||||
This view has alternative version inside EE, if modified please take a look | ||||
in there as well. | ||||
""" | ||||
def load_default_context(self): | ||||
c = self._get_local_tmpl_context() | ||||
c.user = c.auth_user.get_instance() | ||||
c.allow_scoped_tokens = self.ALLOW_SCOPED_TOKENS | ||||
return c | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_profile(self): | ||||
c = self.load_default_context() | ||||
c.active = 'profile' | ||||
r4240 | c.extern_type = c.user.extern_type | |||
r1920 | return self._get_template_context(c) | |||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r4610 | def my_account_edit(self): | |||
c = self.load_default_context() | ||||
c.active = 'profile_edit' | ||||
c.extern_type = c.user.extern_type | ||||
c.extern_name = c.user.extern_name | ||||
schema = user_schema.UserProfileSchema().bind( | ||||
username=c.user.username, user_emails=c.user.emails) | ||||
appstruct = { | ||||
'username': c.user.username, | ||||
'email': c.user.email, | ||||
'firstname': c.user.firstname, | ||||
'lastname': c.user.lastname, | ||||
'description': c.user.description, | ||||
} | ||||
c.form = forms.RcForm( | ||||
schema, appstruct=appstruct, | ||||
action=h.route_path('my_account_update'), | ||||
buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'profile_edit' | ||||
c.perm_user = c.auth_user | ||||
c.extern_type = c.user.extern_type | ||||
c.extern_name = c.user.extern_name | ||||
schema = user_schema.UserProfileSchema().bind( | ||||
username=c.user.username, user_emails=c.user.emails) | ||||
form = forms.RcForm( | ||||
schema, buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
r5065 | controls = list(self.request.POST.items()) | |||
r4610 | try: | |||
valid_data = form.validate(controls) | ||||
skip_attrs = ['admin', 'active', 'extern_type', 'extern_name', | ||||
'new_password', 'password_confirmation'] | ||||
if c.extern_type != "rhodecode": | ||||
# forbid updating username for external accounts | ||||
skip_attrs.append('username') | ||||
old_email = c.user.email | ||||
UserModel().update_user( | ||||
self._rhodecode_user.user_id, skip_attrs=skip_attrs, | ||||
**valid_data) | ||||
if old_email != valid_data['email']: | ||||
old = UserEmailMap.query() \ | ||||
.filter(UserEmailMap.user == c.user)\ | ||||
.filter(UserEmailMap.email == valid_data['email'])\ | ||||
.first() | ||||
old.email = old_email | ||||
h.flash(_('Your account was updated successfully'), category='success') | ||||
Session().commit() | ||||
except forms.ValidationFailure as e: | ||||
c.form = e | ||||
return self._get_template_context(c) | ||||
r5352 | ||||
r4610 | except Exception: | |||
log.exception("Exception updating user") | ||||
h.flash(_('Error occurred during update of user'), | ||||
category='error') | ||||
raise HTTPFound(h.route_path('my_account_profile')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r1920 | def my_account_password(self): | |||
c = self.load_default_context() | ||||
c.active = 'password' | ||||
c.extern_type = c.user.extern_type | ||||
schema = user_schema.ChangePasswordSchema().bind( | ||||
username=c.user.username) | ||||
form = forms.Form( | ||||
r1944 | schema, | |||
action=h.route_path('my_account_password_update'), | ||||
buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
r1920 | ||||
c.form = form | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_password_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'password' | ||||
c.extern_type = c.user.extern_type | ||||
schema = user_schema.ChangePasswordSchema().bind( | ||||
username=c.user.username) | ||||
form = forms.Form( | ||||
schema, buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
if c.extern_type != 'rhodecode': | ||||
raise HTTPFound(self.request.route_path('my_account_password')) | ||||
r5065 | controls = list(self.request.POST.items()) | |||
r1920 | try: | |||
valid_data = form.validate(controls) | ||||
UserModel().update_user(c.user.user_id, **valid_data) | ||||
c.user.update_userdata(force_password_change=False) | ||||
Session().commit() | ||||
except forms.ValidationFailure as e: | ||||
c.form = e | ||||
return self._get_template_context(c) | ||||
except Exception: | ||||
log.exception("Exception updating password") | ||||
h.flash(_('Error occurred during update of user password'), | ||||
category='error') | ||||
else: | ||||
instance = c.auth_user.get_instance() | ||||
self.session.setdefault('rhodecode_user', {}).update( | ||||
r5065 | {'password': md5_safe(instance.password)}) | |||
r1920 | self.session.save() | |||
h.flash(_("Successfully updated password"), category='success') | ||||
raise HTTPFound(self.request.route_path('my_account_password')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r5360 | def my_account_2fa(self): | |||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
r5371 | c.active = '2fa' | |||
r5367 | user_instance = c.auth_user.get_instance() | |||
r5360 | locked_by_admin = user_instance.has_forced_2fa | |||
c.state_of_2fa = user_instance.has_enabled_2fa | ||||
r5367 | c.user_seen_2fa_recovery_codes = user_instance.has_seen_2fa_codes | |||
r5360 | c.locked_2fa = str2bool(locked_by_admin) | |||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
r5367 | def my_account_2fa_update(self): | |||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
r5371 | c.active = '2fa' | |||
r5367 | user_instance = c.auth_user.get_instance() | |||
state = self.request.POST.get('2fa_status') == '1' | ||||
user_instance.has_enabled_2fa = state | ||||
user_instance.update_userdata(update_2fa=time.time()) | ||||
Session().commit() | ||||
r5372 | if state: | |||
h.flash(_("2FA has been successfully enabled"), category='success') | ||||
else: | ||||
h.flash(_("2FA has been successfully disabled"), category='success') | ||||
r5368 | raise HTTPFound(self.request.route_path('my_account_configure_2fa')) | |||
r5367 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_2fa_show_recovery_codes(self): | ||||
c = self.load_default_context() | ||||
user_instance = c.auth_user.get_instance() | ||||
user_instance.has_seen_2fa_codes = True | ||||
Session().commit() | ||||
return {'recovery_codes': user_instance.get_2fa_recovery_codes()} | ||||
r5360 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_2fa_regenerate_recovery_codes(self): | ||||
r5367 | _ = self.request.translate | |||
c = self.load_default_context() | ||||
user_instance = c.auth_user.get_instance() | ||||
totp_form = TOTPForm(_, user_instance, allow_recovery_code_use=True)() | ||||
post_items = dict(self.request.POST) | ||||
# NOTE: inject secret, as it's a post configured saved item. | ||||
r5374 | post_items['secret_totp'] = user_instance.secret_2fa | |||
r5367 | try: | |||
totp_form.to_python(post_items) | ||||
user_instance.regenerate_2fa_recovery_codes() | ||||
Session().commit() | ||||
except formencode.Invalid as errors: | ||||
h.flash(_("Failed to generate new recovery codes: {}").format(errors), category='error') | ||||
r5368 | raise HTTPFound(self.request.route_path('my_account_configure_2fa')) | |||
r5367 | except Exception as e: | |||
h.flash(_("Failed to generate new recovery codes: {}").format(e), category='error') | ||||
r5368 | raise HTTPFound(self.request.route_path('my_account_configure_2fa')) | |||
r5367 | ||||
r5368 | raise HTTPFound(self.request.route_path('my_account_configure_2fa', _query={'show-recovery-codes': 1})) | |||
r5360 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r1920 | def my_account_auth_tokens(self): | |||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'auth_tokens' | ||||
r2083 | c.lifetime_values = AuthTokenModel.get_lifetime_values(translator=_) | |||
r1920 | c.role_values = [ | |||
(x, AuthTokenModel.cls._get_role_name(x)) | ||||
for x in AuthTokenModel.cls.ROLES] | ||||
c.role_options = [(c.role_values, _("Role"))] | ||||
c.user_auth_tokens = AuthTokenModel().get_auth_tokens( | ||||
c.user.user_id, show_expired=True) | ||||
r2118 | c.role_vcs = AuthTokenModel.cls.ROLE_VCS | |||
r1920 | return self._get_template_context(c) | |||
r4316 | @LoginRequired() | |||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_auth_tokens_view(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
auth_token_id = self.request.POST.get('auth_token_id') | ||||
if auth_token_id: | ||||
token = UserApiKeys.get_or_404(auth_token_id) | ||||
if token.user.user_id != c.user.user_id: | ||||
raise HTTPNotFound() | ||||
return { | ||||
'auth_token': token.api_key | ||||
} | ||||
r1920 | def maybe_attach_token_scope(self, token): | |||
# implemented in EE edition | ||||
pass | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_auth_tokens_add(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
lifetime = safe_int(self.request.POST.get('lifetime'), -1) | ||||
description = self.request.POST.get('description') | ||||
role = self.request.POST.get('role') | ||||
r2951 | token = UserModel().add_auth_token( | |||
user=c.user.user_id, | ||||
lifetime_minutes=lifetime, role=role, description=description, | ||||
scope_callback=self.maybe_attach_token_scope) | ||||
r1920 | token_data = token.get_api_data() | |||
audit_logger.store_web( | ||||
'user.edit.token.add', action_data={ | ||||
'data': {'token': token_data, 'user': 'self'}}, | ||||
user=self._rhodecode_user, ) | ||||
Session().commit() | ||||
h.flash(_("Auth token successfully created"), category='success') | ||||
return HTTPFound(h.route_path('my_account_auth_tokens')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_auth_tokens_delete(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
del_auth_token = self.request.POST.get('del_auth_token') | ||||
if del_auth_token: | ||||
r1956 | token = UserApiKeys.get_or_404(del_auth_token) | |||
r1920 | token_data = token.get_api_data() | |||
AuthTokenModel().delete(del_auth_token, c.user.user_id) | ||||
audit_logger.store_web( | ||||
'user.edit.token.delete', action_data={ | ||||
'data': {'token': token_data, 'user': 'self'}}, | ||||
user=self._rhodecode_user,) | ||||
Session().commit() | ||||
h.flash(_("Auth token successfully deleted"), category='success') | ||||
return HTTPFound(h.route_path('my_account_auth_tokens')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_emails(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'emails' | ||||
c.user_email_map = UserEmailMap.query()\ | ||||
.filter(UserEmailMap.user == c.user).all() | ||||
Bartłomiej Wołyńczyk
|
r2592 | |||
schema = user_schema.AddEmailSchema().bind( | ||||
username=c.user.username, user_emails=c.user.emails) | ||||
form = forms.RcForm(schema, | ||||
action=h.route_path('my_account_emails_add'), | ||||
buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
c.form = form | ||||
r1920 | return self._get_template_context(c) | |||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_emails_add(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
Bartłomiej Wołyńczyk
|
r2592 | c.active = 'emails' | ||
r1920 | ||||
Bartłomiej Wołyńczyk
|
r2592 | schema = user_schema.AddEmailSchema().bind( | ||
username=c.user.username, user_emails=c.user.emails) | ||||
r1920 | ||||
Bartłomiej Wołyńczyk
|
r2592 | form = forms.RcForm( | ||
schema, action=h.route_path('my_account_emails_add'), | ||||
buttons=(forms.buttons.save, forms.buttons.reset)) | ||||
r5065 | controls = list(self.request.POST.items()) | |||
r1920 | try: | |||
Bartłomiej Wołyńczyk
|
r2592 | valid_data = form.validate(controls) | ||
UserModel().add_extra_email(c.user.user_id, valid_data['email']) | ||||
r1920 | audit_logger.store_web( | |||
'user.edit.email.add', action_data={ | ||||
Bartłomiej Wołyńczyk
|
r2592 | 'data': {'email': valid_data['email'], 'user': 'self'}}, | ||
r1920 | user=self._rhodecode_user,) | |||
Session().commit() | ||||
except formencode.Invalid as error: | ||||
h.flash(h.escape(error.error_dict['email']), category='error') | ||||
Bartłomiej Wołyńczyk
|
r2592 | except forms.ValidationFailure as e: | ||
c.user_email_map = UserEmailMap.query() \ | ||||
.filter(UserEmailMap.user == c.user).all() | ||||
c.form = e | ||||
return self._get_template_context(c) | ||||
r1920 | except Exception: | |||
Bartłomiej Wołyńczyk
|
r2592 | log.exception("Exception adding email") | ||
h.flash(_('Error occurred during adding email'), | ||||
r1920 | category='error') | |||
Bartłomiej Wołyńczyk
|
r2592 | else: | ||
h.flash(_("Successfully added email"), category='success') | ||||
raise HTTPFound(self.request.route_path('my_account_emails')) | ||||
r1920 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_emails_delete(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
del_email_id = self.request.POST.get('del_email_id') | ||||
if del_email_id: | ||||
r1956 | email = UserEmailMap.get_or_404(del_email_id).email | |||
r1920 | UserModel().delete_extra_email(c.user.user_id, del_email_id) | |||
audit_logger.store_web( | ||||
'user.edit.email.delete', action_data={ | ||||
'data': {'email': email, 'user': 'self'}}, | ||||
user=self._rhodecode_user,) | ||||
Session().commit() | ||||
h.flash(_("Email successfully deleted"), | ||||
category='success') | ||||
return HTTPFound(h.route_path('my_account_emails')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_notifications_test_channelstream(self): | ||||
message = 'Test message sent via Channelstream by user: {}, on {}'.format( | ||||
self._rhodecode_user.username, datetime.datetime.now()) | ||||
payload = { | ||||
# 'channel': 'broadcast', | ||||
'type': 'message', | ||||
'timestamp': datetime.datetime.utcnow(), | ||||
'user': 'system', | ||||
'pm_users': [self._rhodecode_user.username], | ||||
'message': { | ||||
'message': message, | ||||
'level': 'info', | ||||
'topic': '/notifications' | ||||
} | ||||
} | ||||
registry = self.request.registry | ||||
rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {}) | ||||
channelstream_config = rhodecode_plugins.get('channelstream', {}) | ||||
try: | ||||
channelstream_request(channelstream_config, [payload], '/message') | ||||
except ChannelstreamException as e: | ||||
log.exception('Failed to send channelstream data') | ||||
r5093 | return {"response": f'ERROR: {e.__class__.__name__}'} | |||
r1920 | return {"response": 'Channelstream data sent. ' | |||
'You should see a new live message now.'} | ||||
def _load_my_repos_data(self, watched=False): | ||||
r4153 | ||||
allowed_ids = [-1] + self._rhodecode_user.repo_acl_ids_from_stack(AuthUser.repo_read_perms) | ||||
r1920 | if watched: | |||
r4153 | # repos user watch | |||
repo_list = Session().query( | ||||
Repository | ||||
) \ | ||||
.join( | ||||
(UserFollowing, UserFollowing.follows_repo_id == Repository.repo_id) | ||||
) \ | ||||
.filter( | ||||
UserFollowing.user_id == self._rhodecode_user.user_id | ||||
) \ | ||||
.filter(or_( | ||||
# generate multiple IN to fix limitation problems | ||||
*in_filter_generator(Repository.repo_id, allowed_ids)) | ||||
) \ | ||||
.order_by(Repository.repo_name) \ | ||||
r1920 | .all() | |||
r4153 | ||||
r1920 | else: | |||
r4153 | # repos user is owner of | |||
repo_list = Session().query( | ||||
Repository | ||||
) \ | ||||
.filter( | ||||
Repository.user_id == self._rhodecode_user.user_id | ||||
) \ | ||||
.filter(or_( | ||||
# generate multiple IN to fix limitation problems | ||||
*in_filter_generator(Repository.repo_id, allowed_ids)) | ||||
) \ | ||||
.order_by(Repository.repo_name) \ | ||||
.all() | ||||
r1920 | ||||
r4153 | _render = self.request.get_partial_renderer( | |||
'rhodecode:templates/data_table/_dt_elements.mako') | ||||
def repo_lnk(name, rtype, rstate, private, archived, fork_of): | ||||
return _render('repo_name', name, rtype, rstate, private, archived, fork_of, | ||||
short_name=False, admin=False) | ||||
repos_data = [] | ||||
for repo in repo_list: | ||||
row = { | ||||
"name": repo_lnk(repo.repo_name, repo.repo_type, repo.repo_state, | ||||
repo.private, repo.archived, repo.fork), | ||||
"name_raw": repo.repo_name.lower(), | ||||
} | ||||
repos_data.append(row) | ||||
r1920 | # json used to render the grid | |||
r4974 | return ext_json.str_json(repos_data) | |||
r1920 | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_repos(self): | ||||
c = self.load_default_context() | ||||
c.active = 'repos' | ||||
# json used to render the grid | ||||
c.data = self._load_my_repos_data() | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_watched(self): | ||||
c = self.load_default_context() | ||||
c.active = 'watched' | ||||
# json used to render the grid | ||||
c.data = self._load_my_repos_data(watched=True) | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r3424 | def my_account_bookmarks(self): | |||
c = self.load_default_context() | ||||
c.active = 'bookmarks' | ||||
r5365 | ||||
user_bookmarks = \ | ||||
select(UserBookmark, Repository, RepoGroup) \ | ||||
.where(UserBookmark.user_id == self._rhodecode_user.user_id) \ | ||||
.outerjoin(Repository, Repository.repo_id == UserBookmark.bookmark_repo_id) \ | ||||
.outerjoin(RepoGroup, RepoGroup.group_id == UserBookmark.bookmark_repo_group_id) \ | ||||
.order_by(UserBookmark.position.asc()) | ||||
c.user_bookmark_items = Session().execute(user_bookmarks).all() | ||||
r3424 | return self._get_template_context(c) | |||
r3995 | def _process_bookmark_entry(self, entry, user_id): | |||
r3424 | position = safe_int(entry.get('position')) | |||
r3995 | cur_position = safe_int(entry.get('cur_position')) | |||
r4143 | if position is None: | |||
r3424 | return | |||
# check if this is an existing entry | ||||
is_new = False | ||||
r3995 | db_entry = UserBookmark().get_by_position_for_user(cur_position, user_id) | |||
r3424 | ||||
if db_entry and str2bool(entry.get('remove')): | ||||
log.debug('Marked bookmark %s for deletion', db_entry) | ||||
Session().delete(db_entry) | ||||
return | ||||
if not db_entry: | ||||
# new | ||||
db_entry = UserBookmark() | ||||
is_new = True | ||||
should_save = False | ||||
default_redirect_url = '' | ||||
# save repo | ||||
r3751 | if entry.get('bookmark_repo') and safe_int(entry.get('bookmark_repo')): | |||
r3424 | repo = Repository.get(entry['bookmark_repo']) | |||
perm_check = HasRepoPermissionAny( | ||||
'repository.read', 'repository.write', 'repository.admin') | ||||
if repo and perm_check(repo_name=repo.repo_name): | ||||
db_entry.repository = repo | ||||
should_save = True | ||||
default_redirect_url = '${repo_url}' | ||||
# save repo group | ||||
r3751 | elif entry.get('bookmark_repo_group') and safe_int(entry.get('bookmark_repo_group')): | |||
r3424 | repo_group = RepoGroup.get(entry['bookmark_repo_group']) | |||
perm_check = HasRepoGroupPermissionAny( | ||||
'group.read', 'group.write', 'group.admin') | ||||
if repo_group and perm_check(group_name=repo_group.group_name): | ||||
db_entry.repository_group = repo_group | ||||
should_save = True | ||||
default_redirect_url = '${repo_group_url}' | ||||
# save generic info | ||||
elif entry.get('title') and entry.get('redirect_url'): | ||||
should_save = True | ||||
if should_save: | ||||
# mark user and position | ||||
db_entry.user_id = user_id | ||||
db_entry.position = position | ||||
db_entry.title = entry.get('title') | ||||
db_entry.redirect_url = entry.get('redirect_url') or default_redirect_url | ||||
r3995 | log.debug('Saving bookmark %s, new:%s', db_entry, is_new) | |||
r3424 | ||||
Session().add(db_entry) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_account_bookmarks_update(self): | ||||
_ = self.request.translate | ||||
c = self.load_default_context() | ||||
c.active = 'bookmarks' | ||||
controls = peppercorn.parse(self.request.POST.items()) | ||||
user_id = c.user.user_id | ||||
r3995 | # validate positions | |||
positions = {} | ||||
for entry in controls.get('bookmarks', []): | ||||
position = safe_int(entry['position']) | ||||
if position is None: | ||||
continue | ||||
if position in positions: | ||||
h.flash(_("Position {} is defined twice. " | ||||
"Please correct this error.").format(position), category='error') | ||||
return HTTPFound(h.route_path('my_account_bookmarks')) | ||||
entry['position'] = position | ||||
entry['cur_position'] = safe_int(entry.get('cur_position')) | ||||
positions[position] = entry | ||||
r3424 | try: | |||
r3995 | for entry in positions.values(): | |||
self._process_bookmark_entry(entry, user_id) | ||||
r3424 | ||||
Session().commit() | ||||
h.flash(_("Update Bookmarks"), category='success') | ||||
except IntegrityError: | ||||
h.flash(_("Failed to update bookmarks. " | ||||
r3995 | "Make sure an unique position is used."), category='error') | |||
r3424 | ||||
return HTTPFound(h.route_path('my_account_bookmarks')) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_goto_bookmark(self): | ||||
bookmark_id = self.request.matchdict['bookmark_id'] | ||||
user_bookmark = UserBookmark().query()\ | ||||
.filter(UserBookmark.user_id == self.request.user.user_id) \ | ||||
.filter(UserBookmark.position == bookmark_id).scalar() | ||||
redirect_url = h.route_path('my_account_bookmarks') | ||||
if not user_bookmark: | ||||
raise HTTPFound(redirect_url) | ||||
r3526 | # repository set | |||
r3424 | if user_bookmark.repository: | |||
repo_name = user_bookmark.repository.repo_name | ||||
base_redirect_url = h.route_path( | ||||
'repo_summary', repo_name=repo_name) | ||||
if user_bookmark.redirect_url and \ | ||||
'${repo_url}' in user_bookmark.redirect_url: | ||||
redirect_url = string.Template(user_bookmark.redirect_url)\ | ||||
.safe_substitute({'repo_url': base_redirect_url}) | ||||
else: | ||||
redirect_url = base_redirect_url | ||||
r3526 | # repository group set | |||
r3424 | elif user_bookmark.repository_group: | |||
repo_group_name = user_bookmark.repository_group.group_name | ||||
base_redirect_url = h.route_path( | ||||
'repo_group_home', repo_group_name=repo_group_name) | ||||
if user_bookmark.redirect_url and \ | ||||
'${repo_group_url}' in user_bookmark.redirect_url: | ||||
redirect_url = string.Template(user_bookmark.redirect_url)\ | ||||
.safe_substitute({'repo_group_url': base_redirect_url}) | ||||
else: | ||||
redirect_url = base_redirect_url | ||||
r3526 | # custom URL set | |||
r3424 | elif user_bookmark.redirect_url: | |||
r3526 | server_url = h.route_url('home').rstrip('/') | |||
redirect_url = string.Template(user_bookmark.redirect_url) \ | ||||
.safe_substitute({'server_url': server_url}) | ||||
r3424 | ||||
log.debug('Redirecting bookmark %s to %s', user_bookmark, redirect_url) | ||||
raise HTTPFound(redirect_url) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
r1920 | def my_account_perms(self): | |||
c = self.load_default_context() | ||||
c.active = 'perms' | ||||
c.perm_user = c.auth_user | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_notifications(self): | ||||
c = self.load_default_context() | ||||
c.active = 'notifications' | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
@CSRFRequired() | ||||
def my_notifications_toggle_visibility(self): | ||||
user = self._rhodecode_db_user | ||||
new_status = not user.user_data.get('notification_status', True) | ||||
user.update_userdata(notification_status=new_status) | ||||
Session().commit() | ||||
return user.user_data['notification_status'] | ||||
r4690 | def _get_pull_requests_list(self, statuses, filter_type=None): | |||
r1920 | draw, start, limit = self._extract_chunk(self.request) | |||
search_q, order_by, order_dir = self._extract_ordering(self.request) | ||||
r4558 | ||||
r1920 | _render = self.request.get_partial_renderer( | |||
r2313 | 'rhodecode:templates/data_table/_dt_elements.mako') | |||
r1920 | ||||
r4690 | if filter_type == 'awaiting_my_review': | |||
pull_requests = PullRequestModel().get_im_participating_in_for_review( | ||||
user_id=self._rhodecode_user.user_id, | ||||
statuses=statuses, query=search_q, | ||||
offset=start, length=limit, order_by=order_by, | ||||
order_dir=order_dir) | ||||
r1920 | ||||
r4690 | pull_requests_total_count = PullRequestModel().count_im_participating_in_for_review( | |||
user_id=self._rhodecode_user.user_id, statuses=statuses, query=search_q) | ||||
else: | ||||
pull_requests = PullRequestModel().get_im_participating_in( | ||||
user_id=self._rhodecode_user.user_id, | ||||
statuses=statuses, query=search_q, | ||||
offset=start, length=limit, order_by=order_by, | ||||
order_dir=order_dir) | ||||
pull_requests_total_count = PullRequestModel().count_im_participating_in( | ||||
user_id=self._rhodecode_user.user_id, statuses=statuses, query=search_q) | ||||
r1920 | ||||
data = [] | ||||
comments_model = CommentsModel() | ||||
for pr in pull_requests: | ||||
repo_id = pr.target_repo_id | ||||
r4506 | comments_count = comments_model.get_all_comments( | |||
r4553 | repo_id, pull_request=pr, include_drafts=False, count_only=True) | |||
r1920 | owned = pr.user_id == self._rhodecode_user.user_id | |||
r4690 | review_statuses = pr.reviewers_statuses(user=self._rhodecode_db_user) | |||
my_review_status = ChangesetStatus.STATUS_NOT_REVIEWED | ||||
if review_statuses and review_statuses[4]: | ||||
_review_obj, _user, _reasons, _mandatory, statuses = review_statuses | ||||
my_review_status = statuses[0][1].status | ||||
r1920 | data.append({ | |||
'target_repo': _render('pullrequest_target_repo', | ||||
pr.target_repo.repo_name), | ||||
'name': _render('pullrequest_name', | ||||
r4103 | pr.pull_request_id, pr.pull_request_state, | |||
pr.work_in_progress, pr.target_repo.repo_name, | ||||
r1920 | short=True), | |||
'name_raw': pr.pull_request_id, | ||||
'status': _render('pullrequest_status', | ||||
pr.calculated_review_status()), | ||||
r4690 | 'my_status': _render('pullrequest_status', | |||
my_review_status), | ||||
r3816 | 'title': _render('pullrequest_title', pr.title, pr.description), | |||
r5170 | 'pr_flow': _render('pullrequest_commit_flow', pr), | |||
r1920 | 'description': h.escape(pr.description), | |||
'updated_on': _render('pullrequest_updated_on', | ||||
r4557 | h.datetime_to_time(pr.updated_on), | |||
pr.versions_count), | ||||
r1920 | 'updated_on_raw': h.datetime_to_time(pr.updated_on), | |||
'created_on': _render('pullrequest_updated_on', | ||||
h.datetime_to_time(pr.created_on)), | ||||
'created_on_raw': h.datetime_to_time(pr.created_on), | ||||
r3816 | 'state': pr.pull_request_state, | |||
r1920 | 'author': _render('pullrequest_author', | |||
pr.author.full_contact, ), | ||||
'author_raw': pr.author.full_name, | ||||
r4506 | 'comments': _render('pullrequest_comments', comments_count), | |||
'comments_raw': comments_count, | ||||
r1920 | 'closed': pr.is_closed(), | |||
'owned': owned | ||||
}) | ||||
# json used to render the grid | ||||
data = ({ | ||||
'draw': draw, | ||||
'data': data, | ||||
'recordsTotal': pull_requests_total_count, | ||||
'recordsFiltered': pull_requests_total_count, | ||||
}) | ||||
return data | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_pullrequests(self): | ||||
c = self.load_default_context() | ||||
c.active = 'pullrequests' | ||||
req_get = self.request.GET | ||||
r4690 | c.closed = str2bool(req_get.get('closed')) | |||
c.awaiting_my_review = str2bool(req_get.get('awaiting_my_review')) | ||||
c.selected_filter = 'all' | ||||
if c.closed: | ||||
c.selected_filter = 'all_closed' | ||||
if c.awaiting_my_review: | ||||
c.selected_filter = 'awaiting_my_review' | ||||
r1920 | ||||
return self._get_template_context(c) | ||||
@LoginRequired() | ||||
@NotAnonymous() | ||||
def my_account_pullrequests_data(self): | ||||
r2358 | self.load_default_context() | |||
r1920 | req_get = self.request.GET | |||
r4690 | ||||
awaiting_my_review = str2bool(req_get.get('awaiting_my_review')) | ||||
r1920 | closed = str2bool(req_get.get('closed')) | |||
statuses = [PullRequest.STATUS_NEW, PullRequest.STATUS_OPEN] | ||||
if closed: | ||||
statuses += [PullRequest.STATUS_CLOSED] | ||||
r4690 | filter_type = \ | |||
'awaiting_my_review' if awaiting_my_review \ | ||||
else None | ||||
data = self._get_pull_requests_list(statuses=statuses, filter_type=filter_type) | ||||
r1920 | return data | |||
r2496 | @LoginRequired() | |||
@NotAnonymous() | ||||
def my_account_user_group_membership(self): | ||||
c = self.load_default_context() | ||||
c.active = 'user_group_membership' | ||||
groups = [UserGroupModel.get_user_groups_as_dict(group.users_group) | ||||
for group in self._rhodecode_db_user.group_member] | ||||
r4974 | c.user_groups = ext_json.str_json(groups) | |||
r2496 | return self._get_template_context(c) | |||