user.py
841 lines
| 24.8 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import colander | ||||
import datetime | ||||
import json | ||||
import logging | ||||
import uuid | ||||
import pyramid.security as security | ||||
import appenlight.lib.helpers as h | ||||
from authomatic.adapters import WebObAdapter | ||||
from pyramid.view import view_config | ||||
from pyramid.httpexceptions import HTTPFound, HTTPUnprocessableEntity | ||||
from pyramid.httpexceptions import HTTPNotFound, HTTPBadRequest | ||||
from pyramid.security import NO_PERMISSION_REQUIRED | ||||
r153 | from ziggurat_foundations.models.services.external_identity import ( | |||
ExternalIdentityService, | ||||
) | ||||
r135 | from ziggurat_foundations.models.services.user import UserService | |||
r0 | ||||
from appenlight.lib import generate_random_string | ||||
from appenlight.lib.social import handle_social_data | ||||
r153 | from appenlight.lib.utils import ( | |||
channelstream_request, | ||||
add_cors_headers, | ||||
permission_tuple_to_dict, | ||||
) | ||||
r0 | from appenlight.models import DBSession | |||
from appenlight.models.alert_channels.email import EmailAlertChannel | ||||
from appenlight.models.alert_channel_action import AlertChannelAction | ||||
from appenlight.models.services.alert_channel import AlertChannelService | ||||
r153 | from appenlight.models.services.alert_channel_action import AlertChannelActionService | |||
r0 | from appenlight.models.auth_token import AuthToken | |||
from appenlight.models.report import REPORT_TYPE_MATRIX | ||||
from appenlight.models.user import User | ||||
from appenlight.models.services.user import UserService | ||||
from appenlight.subscribers import _ | ||||
from appenlight.validators import build_rule_schema | ||||
from appenlight import forms | ||||
from webob.multidict import MultiDict | ||||
log = logging.getLogger(__name__) | ||||
r153 | @view_config( | |||
route_name="users_no_id", | ||||
renderer="json", | ||||
request_method="GET", | ||||
permission="root_administration", | ||||
) | ||||
r0 | def users_list(request): | |||
""" | ||||
Returns users list | ||||
""" | ||||
r153 | props = [ | |||
"user_name", | ||||
"id", | ||||
"first_name", | ||||
"last_name", | ||||
"email", | ||||
"last_login_date", | ||||
"status", | ||||
] | ||||
r0 | users = UserService.all() | |||
users_dicts = [] | ||||
for user in users: | ||||
u_dict = user.get_dict(include_keys=props) | ||||
r153 | u_dict["gravatar_url"] = UserService.gravatar_url(user, s=20) | |||
r0 | users_dicts.append(u_dict) | |||
return users_dicts | ||||
r153 | @view_config( | |||
route_name="users_no_id", | ||||
renderer="json", | ||||
request_method="POST", | ||||
permission="root_administration", | ||||
) | ||||
r0 | def users_create(request): | |||
""" | ||||
Returns users list | ||||
""" | ||||
r153 | form = forms.UserCreateForm( | |||
MultiDict(request.safe_json_body or {}), csrf_context=request | ||||
) | ||||
r0 | if form.validate(): | |||
r153 | log.info("registering user") | |||
r128 | # probably not needed in the future since this requires root anyways | |||
# lets keep this here in case we lower view permission in the future | ||||
# if request.registry.settings['appenlight.disable_registration']: | ||||
# return HTTPUnprocessableEntity(body={'error': 'Registration is currently disabled.'}) | ||||
r0 | user = User() | |||
# insert new user here | ||||
DBSession.add(user) | ||||
form.populate_obj(user) | ||||
r135 | UserService.regenerate_security_code(user) | |||
UserService.set_password(user, user.user_password) | ||||
r0 | user.status = 1 if form.status.data else 0 | |||
r153 | request.session.flash(_("User created")) | |||
r0 | DBSession.flush() | |||
r153 | return user.get_dict( | |||
exclude_keys=[ | ||||
"security_code_date", | ||||
"notes", | ||||
"security_code", | ||||
"user_password", | ||||
] | ||||
) | ||||
r0 | else: | |||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
r153 | @view_config( | |||
route_name="users", | ||||
renderer="json", | ||||
request_method="GET", | ||||
permission="root_administration", | ||||
) | ||||
@view_config( | ||||
route_name="users", | ||||
renderer="json", | ||||
request_method="PATCH", | ||||
permission="root_administration", | ||||
) | ||||
r0 | def users_update(request): | |||
""" | ||||
Updates user object | ||||
""" | ||||
r153 | user = UserService.by_id(request.matchdict.get("user_id")) | |||
r0 | if not user: | |||
return HTTPNotFound() | ||||
post_data = request.safe_json_body or {} | ||||
r153 | if request.method == "PATCH": | |||
form = forms.UserUpdateForm(MultiDict(post_data), csrf_context=request) | ||||
r0 | if form.validate(): | |||
form.populate_obj(user, ignore_none=True) | ||||
if form.user_password.data: | ||||
r135 | UserService.set_password(user, user.user_password) | |||
r0 | if form.status.data: | |||
user.status = 1 | ||||
else: | ||||
user.status = 0 | ||||
else: | ||||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
r153 | return user.get_dict( | |||
exclude_keys=["security_code_date", "notes", "security_code", "user_password"] | ||||
) | ||||
r0 | ||||
r153 | @view_config( | |||
route_name="users_property", | ||||
match_param="key=resource_permissions", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_resource_permissions_list(request): | |||
""" | ||||
Get list of permissions assigned to specific resources | ||||
""" | ||||
r153 | user = UserService.by_id(request.matchdict.get("user_id")) | |||
r0 | if not user: | |||
return HTTPNotFound() | ||||
r153 | return [ | |||
permission_tuple_to_dict(perm) | ||||
for perm in UserService.resources_with_possible_perms(user) | ||||
] | ||||
@view_config( | ||||
route_name="users", | ||||
renderer="json", | ||||
request_method="DELETE", | ||||
permission="root_administration", | ||||
) | ||||
r0 | def users_DELETE(request): | |||
""" | ||||
Removes a user permanently from db - makes a check to see if after the | ||||
operation there will be at least one admin left | ||||
""" | ||||
r153 | msg = _("There needs to be at least one administrator in the system") | |||
user = UserService.by_id(request.matchdict.get("user_id")) | ||||
r0 | if user: | |||
r153 | users = UserService.users_for_perms(["root_administration"]).all() | |||
r0 | if len(users) < 2 and user.id == users[0].id: | |||
r153 | request.session.flash(msg, "warning") | |||
r0 | else: | |||
DBSession.delete(user) | ||||
r153 | request.session.flash(_("User removed")) | |||
r0 | return True | |||
request.response.status = 422 | ||||
return False | ||||
r153 | @view_config( | |||
route_name="users_self", | ||||
renderer="json", | ||||
request_method="GET", | ||||
permission="authenticated", | ||||
) | ||||
@view_config( | ||||
route_name="users_self", | ||||
renderer="json", | ||||
request_method="PATCH", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_self(request): | |||
""" | ||||
Updates user personal information | ||||
""" | ||||
r153 | if request.method == "PATCH": | |||
r0 | form = forms.gen_user_profile_form()( | |||
r153 | MultiDict(request.unsafe_json_body), csrf_context=request | |||
) | ||||
r0 | if form.validate(): | |||
form.populate_obj(request.user) | ||||
r153 | request.session.flash(_("Your profile got updated.")) | |||
r0 | else: | |||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
r62 | return request.user.get_dict( | |||
r153 | exclude_keys=["security_code_date", "notes", "security_code", "user_password"], | |||
extended_info=True, | ||||
) | ||||
@view_config( | ||||
route_name="users_self_property", | ||||
match_param="key=external_identities", | ||||
renderer="json", | ||||
request_method="GET", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_external_identies(request): | |||
user = request.user | ||||
r153 | identities = [ | |||
{"provider": ident.provider_name, "id": ident.external_user_name} | ||||
for ident in user.external_identities.all() | ||||
] | ||||
r0 | return identities | |||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=external_identities", | ||||
renderer="json", | ||||
request_method="DELETE", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_external_identies_DELETE(request): | |||
""" | ||||
Unbinds external identities(google,twitter etc.) from user account | ||||
""" | ||||
user = request.user | ||||
for identity in user.external_identities.all(): | ||||
r153 | log.info("found identity %s" % identity) | |||
if identity.provider_name == request.params.get( | ||||
"provider" | ||||
) and identity.external_user_name == request.params.get("id"): | ||||
log.info("remove identity %s" % identity) | ||||
r0 | DBSession.delete(identity) | |||
return True | ||||
return False | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=password", | ||||
renderer="json", | ||||
request_method="PATCH", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_password(request): | |||
""" | ||||
Sets new password for user account | ||||
""" | ||||
user = request.user | ||||
r153 | form = forms.ChangePasswordForm( | |||
MultiDict(request.unsafe_json_body), csrf_context=request | ||||
) | ||||
r0 | form.old_password.user = user | |||
if form.validate(): | ||||
r135 | UserService.regenerate_security_code(user) | |||
UserService.set_password(user, form.new_password.data) | ||||
r153 | msg = ( | |||
"Your password got updated. " "Next time log in with your new credentials." | ||||
) | ||||
r0 | request.session.flash(_(msg)) | |||
return True | ||||
else: | ||||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
return False | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=websocket", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def users_websocket(request): | |||
""" | ||||
Handle authorization of users trying to connect | ||||
""" | ||||
# handle preflight request | ||||
user = request.user | ||||
r153 | if request.method == "OPTIONS": | |||
res = request.response.body("OK") | ||||
r0 | add_cors_headers(res) | |||
return res | ||||
r153 | applications = UserService.resources_with_perms( | |||
user, ["view"], resource_types=["application"] | ||||
) | ||||
channels = ["app_%s" % app.resource_id for app in applications] | ||||
payload = { | ||||
"username": user.user_name, | ||||
"conn_id": str(uuid.uuid4()), | ||||
"channels": channels, | ||||
} | ||||
r0 | settings = request.registry.settings | |||
r64 | response = channelstream_request( | |||
r153 | settings["cometd.secret"], | |||
"/connect", | ||||
payload, | ||||
servers=[request.registry.settings["cometd_servers"]], | ||||
throw_exceptions=True, | ||||
) | ||||
r0 | return payload | |||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
request_method="GET", | ||||
match_param="key=alert_channels", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_channels(request): | |||
""" | ||||
Lists all available alert channels | ||||
""" | ||||
user = request.user | ||||
return [c.get_dict(extended_info=True) for c in user.alert_channels] | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=alert_actions", | ||||
request_method="GET", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_actions(request): | |||
""" | ||||
Lists all available alert channels | ||||
""" | ||||
user = request.user | ||||
return [r.get_dict(extended_info=True) for r in user.alert_actions] | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
renderer="json", | ||||
match_param="key=alert_channels_rules", | ||||
request_method="POST", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_channels_rule_POST(request): | |||
""" | ||||
Creates new notification rule for specific alert channel | ||||
""" | ||||
user = request.user | ||||
r153 | alert_action = AlertChannelAction(owner_id=request.user.id, type="report") | |||
r0 | DBSession.add(alert_action) | |||
DBSession.flush() | ||||
return alert_action.get_dict() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
permission="authenticated", | ||||
match_param="key=alert_channels_rules", | ||||
renderer="json", | ||||
request_method="DELETE", | ||||
) | ||||
r0 | def alert_channels_rule_DELETE(request): | |||
""" | ||||
Removes specific alert channel rule | ||||
""" | ||||
user = request.user | ||||
rule_action = AlertChannelActionService.by_owner_id_and_pkey( | ||||
r153 | user.id, request.GET.get("pkey") | |||
) | ||||
r0 | if rule_action: | |||
DBSession.delete(rule_action) | ||||
return True | ||||
return HTTPNotFound() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
permission="authenticated", | ||||
match_param="key=alert_channels_rules", | ||||
renderer="json", | ||||
request_method="PATCH", | ||||
) | ||||
r0 | def alert_channels_rule_PATCH(request): | |||
""" | ||||
Removes specific alert channel rule | ||||
""" | ||||
user = request.user | ||||
json_body = request.unsafe_json_body | ||||
r153 | schema = build_rule_schema(json_body["rule"], REPORT_TYPE_MATRIX) | |||
r0 | try: | |||
r153 | schema.deserialize(json_body["rule"]) | |||
r0 | except colander.Invalid as exc: | |||
return HTTPUnprocessableEntity(body=json.dumps(exc.asdict())) | ||||
rule_action = AlertChannelActionService.by_owner_id_and_pkey( | ||||
r153 | user.id, request.GET.get("pkey") | |||
) | ||||
r0 | ||||
if rule_action: | ||||
r153 | rule_action.rule = json_body["rule"] | |||
rule_action.resource_id = json_body["resource_id"] | ||||
rule_action.action = json_body["action"] | ||||
r0 | return rule_action.get_dict() | |||
return HTTPNotFound() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
permission="authenticated", | ||||
match_param="key=alert_channels", | ||||
renderer="json", | ||||
request_method="PATCH", | ||||
) | ||||
r0 | def alert_channels_PATCH(request): | |||
user = request.user | ||||
r153 | channel_name = request.GET.get("channel_name") | |||
channel_value = request.GET.get("channel_value") | ||||
r0 | # iterate over channels | |||
channel = None | ||||
for channel in user.alert_channels: | ||||
r153 | if ( | |||
channel.channel_name == channel_name | ||||
and channel.channel_value == channel_value | ||||
): | ||||
r0 | break | |||
if not channel: | ||||
return HTTPNotFound() | ||||
r153 | allowed_keys = ["daily_digest", "send_alerts"] | |||
r0 | for k, v in request.unsafe_json_body.items(): | |||
if k in allowed_keys: | ||||
setattr(channel, k, v) | ||||
else: | ||||
return HTTPBadRequest() | ||||
return channel.get_dict() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
permission="authenticated", | ||||
match_param="key=alert_channels", | ||||
request_method="POST", | ||||
renderer="json", | ||||
) | ||||
r0 | def alert_channels_POST(request): | |||
""" | ||||
Creates a new email alert channel for user, sends a validation email | ||||
""" | ||||
user = request.user | ||||
r153 | form = forms.EmailChannelCreateForm( | |||
MultiDict(request.unsafe_json_body), csrf_context=request | ||||
) | ||||
r0 | if not form.validate(): | |||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
email = form.email.data.strip() | ||||
channel = EmailAlertChannel() | ||||
r153 | channel.channel_name = "email" | |||
r0 | channel.channel_value = email | |||
security_code = generate_random_string(10) | ||||
r153 | channel.channel_json_conf = {"security_code": security_code} | |||
r0 | user.alert_channels.append(channel) | |||
r153 | email_vars = { | |||
"user": user, | ||||
"email": email, | ||||
"request": request, | ||||
"security_code": security_code, | ||||
"email_title": "AppEnlight :: " "Please authorize your email", | ||||
} | ||||
UserService.send_email( | ||||
request, | ||||
recipients=[email], | ||||
variables=email_vars, | ||||
template="/email_templates/authorize_email.jinja2", | ||||
) | ||||
request.session.flash(_("Your alert channel was " "added to the system.")) | ||||
r0 | request.session.flash( | |||
r153 | _( | |||
"You need to authorize your email channel, a message was " | ||||
"sent containing necessary information." | ||||
), | ||||
"warning", | ||||
) | ||||
r0 | DBSession.flush() | |||
channel.get_dict() | ||||
r153 | @view_config( | |||
route_name="section_view", | ||||
match_param=["section=user_section", "view=alert_channels_authorize"], | ||||
renderer="string", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_channels_authorize(request): | |||
""" | ||||
Performs alert channel authorization based on auth code sent in email | ||||
""" | ||||
user = request.user | ||||
for channel in user.alert_channels: | ||||
r153 | security_code = request.params.get("security_code", "") | |||
if channel.channel_json_conf["security_code"] == security_code: | ||||
r0 | channel.channel_validated = True | |||
r153 | request.session.flash(_("Your email was authorized.")) | |||
return HTTPFound(location=request.route_url("/")) | ||||
r0 | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
request_method="DELETE", | ||||
match_param="key=alert_channels", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_channel_DELETE(request): | |||
""" | ||||
Removes alert channel from users channel | ||||
""" | ||||
user = request.user | ||||
channel = None | ||||
for chan in user.alert_channels: | ||||
r153 | if chan.channel_name == request.params.get( | |||
"channel_name" | ||||
) and chan.channel_value == request.params.get("channel_value"): | ||||
r0 | channel = chan | |||
break | ||||
if channel: | ||||
user.alert_channels.remove(channel) | ||||
r153 | request.session.flash(_("Your channel was removed.")) | |||
r0 | return True | |||
return False | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
permission="authenticated", | ||||
match_param="key=alert_channels_actions_binds", | ||||
renderer="json", | ||||
request_method="POST", | ||||
) | ||||
r0 | def alert_channels_actions_binds_POST(request): | |||
""" | ||||
Adds alert action to users channels | ||||
""" | ||||
user = request.user | ||||
json_body = request.unsafe_json_body | ||||
channel = AlertChannelService.by_owner_id_and_pkey( | ||||
r153 | user.id, json_body.get("channel_pkey") | |||
) | ||||
r0 | ||||
rule_action = AlertChannelActionService.by_owner_id_and_pkey( | ||||
r153 | user.id, json_body.get("action_pkey") | |||
) | ||||
r0 | ||||
if channel and rule_action: | ||||
if channel.pkey not in [c.pkey for c in rule_action.channels]: | ||||
rule_action.channels.append(channel) | ||||
return rule_action.get_dict(extended_info=True) | ||||
return HTTPUnprocessableEntity() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
request_method="DELETE", | ||||
match_param="key=alert_channels_actions_binds", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def alert_channels_actions_binds_DELETE(request): | |||
""" | ||||
Removes alert action from users channels | ||||
""" | ||||
user = request.user | ||||
channel = AlertChannelService.by_owner_id_and_pkey( | ||||
r153 | user.id, request.GET.get("channel_pkey") | |||
) | ||||
r0 | ||||
rule_action = AlertChannelActionService.by_owner_id_and_pkey( | ||||
r153 | user.id, request.GET.get("action_pkey") | |||
) | ||||
r0 | ||||
if channel and rule_action: | ||||
if channel.pkey in [c.pkey for c in rule_action.channels]: | ||||
rule_action.channels.remove(channel) | ||||
return rule_action.get_dict(extended_info=True) | ||||
return HTTPUnprocessableEntity() | ||||
r153 | @view_config( | |||
route_name="social_auth_abort", renderer="string", permission=NO_PERMISSION_REQUIRED | ||||
) | ||||
r0 | def oauth_abort(request): | |||
""" | ||||
Handles problems with authorization via velruse | ||||
""" | ||||
r153 | @view_config(route_name="social_auth", permission=NO_PERMISSION_REQUIRED) | |||
r0 | def social_auth(request): | |||
# Get the internal provider name URL variable. | ||||
r153 | provider_name = request.matchdict.get("provider") | |||
r0 | ||||
# Start the login procedure. | ||||
adapter = WebObAdapter(request, request.response) | ||||
r10 | result = request.authomatic.login(adapter, provider_name) | |||
r0 | if result: | |||
if result.error: | ||||
return handle_auth_error(request, result) | ||||
elif result.user: | ||||
return handle_auth_success(request, result) | ||||
return request.response | ||||
def handle_auth_error(request, result): | ||||
# Login procedure finished with an error. | ||||
r153 | request.session.pop("zigg.social_auth", None) | |||
request.session.flash( | ||||
_( | ||||
"Something went wrong when we tried to " | ||||
"authorize you via external provider. " | ||||
"Please try again." | ||||
), | ||||
"warning", | ||||
) | ||||
r0 | ||||
r153 | return HTTPFound(location=request.route_url("/")) | |||
r0 | ||||
def handle_auth_success(request, result): | ||||
# Hooray, we have the user! | ||||
# OAuth 2.0 and OAuth 1.0a provide only limited user data on login, | ||||
# We need to update the user to get more info. | ||||
if result.user: | ||||
result.user.update() | ||||
social_data = { | ||||
r153 | "user": {"data": result.user.data}, | |||
"credentials": result.user.credentials, | ||||
r0 | } | |||
# normalize data | ||||
r153 | social_data["user"]["id"] = result.user.id | |||
user_name = result.user.username or "" | ||||
r0 | # use email name as username for google | |||
r153 | if social_data["credentials"].provider_name == "google" and result.user.email: | |||
r0 | user_name = result.user.email | |||
r153 | social_data["user"]["user_name"] = user_name | |||
social_data["user"]["email"] = result.user.email or "" | ||||
r0 | ||||
r153 | request.session["zigg.social_auth"] = social_data | |||
r0 | # user is logged so bind his external identity with account | |||
if request.user: | ||||
handle_social_data(request, request.user, social_data) | ||||
r153 | request.session.pop("zigg.social_auth", None) | |||
return HTTPFound(location=request.route_url("/")) | ||||
r0 | else: | |||
user = ExternalIdentityService.user_by_external_id_and_provider( | ||||
r153 | social_data["user"]["id"], social_data["credentials"].provider_name | |||
r0 | ) | |||
# fix legacy accounts with wrong google ID | ||||
r153 | if not user and social_data["credentials"].provider_name == "google": | |||
r0 | user = ExternalIdentityService.user_by_external_id_and_provider( | |||
r153 | social_data["user"]["email"], social_data["credentials"].provider_name | |||
) | ||||
r0 | ||||
# user tokens are already found in our db | ||||
if user: | ||||
handle_social_data(request, user, social_data) | ||||
headers = security.remember(request, user.id) | ||||
r153 | request.session.pop("zigg.social_auth", None) | |||
return HTTPFound(location=request.route_url("/"), headers=headers) | ||||
r0 | else: | |||
r153 | msg = ( | |||
"You need to finish registration " | ||||
"process to bind your external identity to your account " | ||||
"or sign in to existing account" | ||||
) | ||||
r0 | request.session.flash(msg) | |||
r153 | return HTTPFound(location=request.route_url("register")) | |||
r0 | ||||
r153 | @view_config( | |||
route_name="section_view", | ||||
permission="authenticated", | ||||
match_param=["section=users_section", "view=search_users"], | ||||
renderer="json", | ||||
) | ||||
r0 | def search_users(request): | |||
""" | ||||
Returns a list of users for autocomplete | ||||
""" | ||||
user = request.user | ||||
items_returned = [] | ||||
r153 | like_condition = request.params.get("user_name", "") + "%" | |||
r0 | # first append used if email is passed | |||
r153 | found_user = UserService.by_email(request.params.get("user_name", "")) | |||
r0 | if found_user: | |||
r153 | name = "{} {}".format(found_user.first_name, found_user.last_name) | |||
items_returned.append({"user": found_user.user_name, "name": name}) | ||||
r135 | for found_user in UserService.user_names_like(like_condition).limit(20): | |||
r153 | name = "{} {}".format(found_user.first_name, found_user.last_name) | |||
items_returned.append({"user": found_user.user_name, "name": name}) | ||||
r0 | return items_returned | |||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="GET", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
@view_config( | ||||
route_name="users_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="GET", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def auth_tokens_list(request): | |||
""" | ||||
Lists all available alert channels | ||||
""" | ||||
r153 | if request.matched_route.name == "users_self_property": | |||
r0 | user = request.user | |||
else: | ||||
r153 | user = UserService.by_id(request.matchdict.get("user_id")) | |||
r0 | if not user: | |||
return HTTPNotFound() | ||||
return [c.get_dict() for c in user.auth_tokens] | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="POST", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
@view_config( | ||||
route_name="users_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="POST", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def auth_tokens_POST(request): | |||
""" | ||||
Lists all available alert channels | ||||
""" | ||||
r153 | if request.matched_route.name == "users_self_property": | |||
r0 | user = request.user | |||
else: | ||||
r153 | user = UserService.by_id(request.matchdict.get("user_id")) | |||
r0 | if not user: | |||
return HTTPNotFound() | ||||
req_data = request.safe_json_body or {} | ||||
r153 | if not req_data.get("expires"): | |||
req_data.pop("expires", None) | ||||
r0 | form = forms.AuthTokenCreateForm(MultiDict(req_data), csrf_context=request) | |||
if not form.validate(): | ||||
return HTTPUnprocessableEntity(body=form.errors_json) | ||||
token = AuthToken() | ||||
form.populate_obj(token) | ||||
if token.expires: | ||||
r153 | interval = h.time_deltas.get(token.expires)["delta"] | |||
r0 | token.expires = datetime.datetime.utcnow() + interval | |||
user.auth_tokens.append(token) | ||||
DBSession.flush() | ||||
return token.get_dict() | ||||
r153 | @view_config( | |||
route_name="users_self_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="DELETE", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
@view_config( | ||||
route_name="users_property", | ||||
match_param="key=auth_tokens", | ||||
request_method="DELETE", | ||||
renderer="json", | ||||
permission="authenticated", | ||||
) | ||||
r0 | def auth_tokens_DELETE(request): | |||
""" | ||||
Lists all available alert channels | ||||
""" | ||||
r153 | if request.matched_route.name == "users_self_property": | |||
r0 | user = request.user | |||
else: | ||||
r153 | user = UserService.by_id(request.matchdict.get("user_id")) | |||
r0 | if not user: | |||
return HTTPNotFound() | ||||
for token in user.auth_tokens: | ||||
r153 | if token.token == request.params.get("token"): | |||
r0 | user.auth_tokens.remove(token) | |||
return True | ||||
return False | ||||