channelstream.py
371 lines
| 11.1 KiB
| text/x-python
|
PythonLexer
r526 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2016-2020 RhodeCode GmbH | |||
r526 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r1969 | import os | |||
r1163 | import hashlib | |||
import itsdangerous | ||||
r526 | import logging | |||
import requests | ||||
r1969 | import datetime | |||
r4799 | from dogpile.util.readwrite_lock import ReadWriteMutex | |||
r1969 | from pyramid.threadlocal import get_current_registry | |||
r526 | ||||
import rhodecode.lib.helpers as h | ||||
from rhodecode.lib.auth import HasRepoPermissionAny | ||||
from rhodecode.lib.ext_json import json | ||||
from rhodecode.model.db import User | ||||
log = logging.getLogger(__name__) | ||||
LOCK = ReadWriteMutex() | ||||
r4479 | USER_STATE_PUBLIC_KEYS = [ | |||
'id', 'username', 'first_name', 'last_name', | ||||
'icon_link', 'display_name', 'display_link'] | ||||
r526 | ||||
class ChannelstreamException(Exception): | ||||
pass | ||||
r1273 | class ChannelstreamConnectionException(ChannelstreamException): | |||
r526 | pass | |||
r1273 | class ChannelstreamPermissionException(ChannelstreamException): | |||
r526 | pass | |||
r3169 | def get_channelstream_server_url(config, endpoint): | |||
return 'http://{}{}'.format(config['server'], endpoint) | ||||
r526 | def channelstream_request(config, payload, endpoint, raise_exc=True): | |||
signer = itsdangerous.TimestampSigner(config['secret']) | ||||
sig_for_server = signer.sign(endpoint) | ||||
secret_headers = {'x-channelstream-secret': sig_for_server, | ||||
'x-channelstream-endpoint': endpoint, | ||||
'Content-Type': 'application/json'} | ||||
r3169 | req_url = get_channelstream_server_url(config, endpoint) | |||
r4479 | ||||
log.debug('Sending a channelstream request to endpoint: `%s`', req_url) | ||||
r526 | response = None | |||
try: | ||||
response = requests.post(req_url, data=json.dumps(payload), | ||||
headers=secret_headers).json() | ||||
except requests.ConnectionError: | ||||
r3169 | log.exception('ConnectionError occurred for endpoint %s', req_url) | |||
r526 | if raise_exc: | |||
r3169 | raise ChannelstreamConnectionException(req_url) | |||
r526 | except Exception: | |||
r3169 | log.exception('Exception related to Channelstream happened') | |||
r526 | if raise_exc: | |||
raise ChannelstreamConnectionException() | ||||
r4479 | log.debug('Got channelstream response: %s', response) | |||
r526 | return response | |||
def get_user_data(user_id): | ||||
user = User.get(user_id) | ||||
return { | ||||
'id': user.user_id, | ||||
'username': user.username, | ||||
r1815 | 'first_name': user.first_name, | |||
'last_name': user.last_name, | ||||
r836 | 'icon_link': h.gravatar_url(user.email, 60), | |||
r526 | 'display_name': h.person(user, 'username_or_name_or_email'), | |||
'display_link': h.link_to_user(user), | ||||
r734 | 'notifications': user.user_data.get('notification_status', True) | |||
r526 | } | |||
def broadcast_validator(channel_name): | ||||
""" checks if user can access the broadcast channel """ | ||||
if channel_name == 'broadcast': | ||||
return True | ||||
def repo_validator(channel_name): | ||||
""" checks if user can access the broadcast channel """ | ||||
channel_prefix = '/repo$' | ||||
if channel_name.startswith(channel_prefix): | ||||
elements = channel_name[len(channel_prefix):].split('$') | ||||
repo_name = elements[0] | ||||
can_access = HasRepoPermissionAny( | ||||
'repository.read', | ||||
'repository.write', | ||||
'repository.admin')(repo_name) | ||||
r1973 | log.debug( | |||
'permission check for %s channel resulted in %s', | ||||
repo_name, can_access) | ||||
r526 | if can_access: | |||
return True | ||||
return False | ||||
def check_channel_permissions(channels, plugin_validators, should_raise=True): | ||||
valid_channels = [] | ||||
validators = [broadcast_validator, repo_validator] | ||||
if plugin_validators: | ||||
validators.extend(plugin_validators) | ||||
for channel_name in channels: | ||||
is_valid = False | ||||
for validator in validators: | ||||
if validator(channel_name): | ||||
is_valid = True | ||||
break | ||||
if is_valid: | ||||
valid_channels.append(channel_name) | ||||
else: | ||||
if should_raise: | ||||
raise ChannelstreamPermissionException() | ||||
return valid_channels | ||||
def get_channels_info(self, channels): | ||||
payload = {'channels': channels} | ||||
# gather persistence info | ||||
return channelstream_request(self._config(), payload, '/info') | ||||
def parse_channels_info(info_result, include_channel_info=None): | ||||
""" | ||||
Returns data that contains only secure information that can be | ||||
presented to clients | ||||
""" | ||||
include_channel_info = include_channel_info or [] | ||||
user_state_dict = {} | ||||
for userinfo in info_result['users']: | ||||
user_state_dict[userinfo['user']] = { | ||||
k: v for k, v in userinfo['state'].items() | ||||
r4479 | if k in USER_STATE_PUBLIC_KEYS | |||
r526 | } | |||
channels_info = {} | ||||
for c_name, c_info in info_result['channels'].items(): | ||||
if c_name not in include_channel_info: | ||||
continue | ||||
connected_list = [] | ||||
r4479 | for username in c_info['users']: | |||
r526 | connected_list.append({ | |||
r4479 | 'user': username, | |||
r4480 | 'state': user_state_dict[username] | |||
r526 | }) | |||
channels_info[c_name] = {'users': connected_list, | ||||
'history': c_info['history']} | ||||
return channels_info | ||||
def log_filepath(history_location, channel_name): | ||||
r1163 | hasher = hashlib.sha256() | |||
hasher.update(channel_name.encode('utf8')) | ||||
filename = '{}.log'.format(hasher.hexdigest()) | ||||
r526 | filepath = os.path.join(history_location, filename) | |||
return filepath | ||||
def read_history(history_location, channel_name): | ||||
filepath = log_filepath(history_location, channel_name) | ||||
if not os.path.exists(filepath): | ||||
return [] | ||||
history_lines_limit = -100 | ||||
history = [] | ||||
with open(filepath, 'rb') as f: | ||||
for line in f.readlines()[history_lines_limit:]: | ||||
try: | ||||
history.append(json.loads(line)) | ||||
except Exception: | ||||
log.exception('Failed to load history') | ||||
return history | ||||
def update_history_from_logs(config, channels, payload): | ||||
history_location = config.get('history.location') | ||||
for channel in channels: | ||||
history = read_history(history_location, channel) | ||||
payload['channels_info'][channel]['history'] = history | ||||
def write_history(config, message): | ||||
r4969 | """ writes a message to a base64encoded filename """ | |||
r526 | history_location = config.get('history.location') | |||
if not os.path.exists(history_location): | ||||
return | ||||
try: | ||||
LOCK.acquire_write_lock() | ||||
filepath = log_filepath(history_location, message['channel']) | ||||
r4969 | json_message = json.dumps(message) | |||
r526 | with open(filepath, 'ab') as f: | |||
r4969 | f.write(json_message) | |||
r526 | f.write('\n') | |||
finally: | ||||
LOCK.release_write_lock() | ||||
def get_connection_validators(registry): | ||||
validators = [] | ||||
r4505 | for k, config in registry.rhodecode_plugins.items(): | |||
r526 | validator = config.get('channelstream', {}).get('connect_validator') | |||
if validator: | ||||
validators.append(validator) | ||||
return validators | ||||
r1969 | ||||
r4505 | def get_channelstream_config(registry=None): | |||
if not registry: | ||||
registry = get_current_registry() | ||||
rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {}) | ||||
channelstream_config = rhodecode_plugins.get('channelstream', {}) | ||||
return channelstream_config | ||||
r1969 | def post_message(channel, message, username, registry=None): | |||
r4505 | channelstream_config = get_channelstream_config(registry) | |||
if not channelstream_config.get('enabled'): | ||||
return | ||||
r1969 | ||||
r4479 | message_obj = message | |||
r4917 | if isinstance(message, str): | |||
r4479 | message_obj = { | |||
'message': message, | ||||
'level': 'success', | ||||
'topic': '/notifications' | ||||
} | ||||
r4505 | log.debug('Channelstream: sending notification to channel %s', channel) | |||
payload = { | ||||
'type': 'message', | ||||
'timestamp': datetime.datetime.utcnow(), | ||||
'user': 'system', | ||||
'exclude_users': [username], | ||||
'channel': channel, | ||||
'message': message_obj | ||||
} | ||||
try: | ||||
return channelstream_request( | ||||
channelstream_config, [payload], '/message', | ||||
raise_exc=False) | ||||
except ChannelstreamException: | ||||
log.exception('Failed to send channelstream data') | ||||
raise | ||||
def _reload_link(label): | ||||
return ( | ||||
'<a onclick="window.location.reload()">' | ||||
'<strong>{}</strong>' | ||||
'</a>'.format(label) | ||||
) | ||||
def pr_channel(pull_request): | ||||
repo_name = pull_request.target_repo.repo_name | ||||
pull_request_id = pull_request.pull_request_id | ||||
channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id) | ||||
log.debug('Getting pull-request channelstream broadcast channel: %s', channel) | ||||
return channel | ||||
def comment_channel(repo_name, commit_obj=None, pull_request_obj=None): | ||||
channel = None | ||||
if commit_obj: | ||||
channel = u'/repo${}$/commit/{}'.format( | ||||
repo_name, commit_obj.raw_id | ||||
) | ||||
elif pull_request_obj: | ||||
channel = u'/repo${}$/pr/{}'.format( | ||||
repo_name, pull_request_obj.pull_request_id | ||||
) | ||||
log.debug('Getting comment channelstream broadcast channel: %s', channel) | ||||
return channel | ||||
def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs): | ||||
""" | ||||
Channel push on pull request update | ||||
""" | ||||
if not pr_broadcast_channel: | ||||
return | ||||
r1969 | ||||
r4505 | _ = request.translate | |||
message = '{} {}'.format( | ||||
msg, | ||||
_reload_link(_(' Reload page to load changes'))) | ||||
message_obj = { | ||||
'message': message, | ||||
'level': 'success', | ||||
'topic': '/notifications' | ||||
} | ||||
post_message( | ||||
pr_broadcast_channel, message_obj, user.username, | ||||
registry=request.registry) | ||||
def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs): | ||||
""" | ||||
Channelstream push on comment action, on commit, or pull-request | ||||
""" | ||||
if not comment_broadcast_channel: | ||||
return | ||||
_ = request.translate | ||||
r1969 | ||||
r4505 | comment_data = kwargs.pop('comment_data', {}) | |||
user_data = kwargs.pop('user_data', {}) | ||||
r4543 | comment_id = comment_data.keys()[0] if comment_data else '' | |||
r4505 | ||||
r4543 | message = '<strong>{}</strong> {} #{}'.format( | |||
r4505 | user.username, | |||
msg, | ||||
comment_id, | ||||
) | ||||
message_obj = { | ||||
'message': message, | ||||
'level': 'success', | ||||
'topic': '/notifications' | ||||
} | ||||
post_message( | ||||
comment_broadcast_channel, message_obj, user.username, | ||||
registry=request.registry) | ||||
message_obj = { | ||||
'message': None, | ||||
'user': user.username, | ||||
'comment_id': comment_id, | ||||
'comment_data': comment_data, | ||||
'user_data': user_data, | ||||
'topic': '/comment' | ||||
} | ||||
post_message( | ||||
comment_broadcast_channel, message_obj, user.username, | ||||
registry=request.registry) | ||||