##// END OF EJS Templates
Aligned expanded sumarry box content better and moved comments as last column
Aligned expanded sumarry box content better and moved comments as last column

File last commit:

r3363:f08e98b1 default
r3639:7ee2e326 new-ui
Show More
channelstream.py
259 lines | 8.3 KiB | text/x-python | PythonLexer
notifications: support real-time notifications with websockets via channelstream
r526 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2016-2019 RhodeCode GmbH
notifications: support real-time notifications with websockets via channelstream
r526 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
channelstream: add simple method of posting message to notification channel.
r1969 import os
channelstream: use sha256 for history filename hashes
r1163 import hashlib
import itsdangerous
notifications: support real-time notifications with websockets via channelstream
r526 import logging
import requests
channelstream: add simple method of posting message to notification channel.
r1969 import datetime
notifications: support real-time notifications with websockets via channelstream
r526 from dogpile.core import ReadWriteMutex
channelstream: add simple method of posting message to notification channel.
r1969 from pyramid.threadlocal import get_current_registry
notifications: support real-time notifications with websockets via channelstream
r526
import rhodecode.lib.helpers as h
from rhodecode.lib.auth import HasRepoPermissionAny
from rhodecode.lib.ext_json import json
from rhodecode.model.db import User
log = logging.getLogger(__name__)
LOCK = ReadWriteMutex()
STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
'icon_link', 'display_name', 'display_link']
class ChannelstreamException(Exception):
pass
channelstream: make main exception be base for all others.
r1273 class ChannelstreamConnectionException(ChannelstreamException):
notifications: support real-time notifications with websockets via channelstream
r526 pass
channelstream: make main exception be base for all others.
r1273 class ChannelstreamPermissionException(ChannelstreamException):
notifications: support real-time notifications with websockets via channelstream
r526 pass
channelstream: nicer handle of connection errors for debug.
r3169 def get_channelstream_server_url(config, endpoint):
return 'http://{}{}'.format(config['server'], endpoint)
notifications: support real-time notifications with websockets via channelstream
r526 def channelstream_request(config, payload, endpoint, raise_exc=True):
signer = itsdangerous.TimestampSigner(config['secret'])
sig_for_server = signer.sign(endpoint)
secret_headers = {'x-channelstream-secret': sig_for_server,
'x-channelstream-endpoint': endpoint,
'Content-Type': 'application/json'}
channelstream: nicer handle of connection errors for debug.
r3169 req_url = get_channelstream_server_url(config, endpoint)
notifications: support real-time notifications with websockets via channelstream
r526 response = None
try:
response = requests.post(req_url, data=json.dumps(payload),
headers=secret_headers).json()
except requests.ConnectionError:
channelstream: nicer handle of connection errors for debug.
r3169 log.exception('ConnectionError occurred for endpoint %s', req_url)
notifications: support real-time notifications with websockets via channelstream
r526 if raise_exc:
channelstream: nicer handle of connection errors for debug.
r3169 raise ChannelstreamConnectionException(req_url)
notifications: support real-time notifications with websockets via channelstream
r526 except Exception:
channelstream: nicer handle of connection errors for debug.
r3169 log.exception('Exception related to Channelstream happened')
notifications: support real-time notifications with websockets via channelstream
r526 if raise_exc:
raise ChannelstreamConnectionException()
return response
def get_user_data(user_id):
user = User.get(user_id)
return {
'id': user.user_id,
'username': user.username,
security: use new safe escaped user attributes across the application....
r1815 'first_name': user.first_name,
'last_name': user.last_name,
ux: better looking chat avatars
r836 'icon_link': h.gravatar_url(user.email, 60),
notifications: support real-time notifications with websockets via channelstream
r526 'display_name': h.person(user, 'username_or_name_or_email'),
'display_link': h.link_to_user(user),
notifications: store notification status in channelstream
r734 'notifications': user.user_data.get('notification_status', True)
notifications: support real-time notifications with websockets via channelstream
r526 }
def broadcast_validator(channel_name):
""" checks if user can access the broadcast channel """
if channel_name == 'broadcast':
return True
def repo_validator(channel_name):
""" checks if user can access the broadcast channel """
channel_prefix = '/repo$'
if channel_name.startswith(channel_prefix):
elements = channel_name[len(channel_prefix):].split('$')
repo_name = elements[0]
can_access = HasRepoPermissionAny(
'repository.read',
'repository.write',
'repository.admin')(repo_name)
dan
channelstream: log debug should be lazily evaulated.
r1973 log.debug(
'permission check for %s channel resulted in %s',
repo_name, can_access)
notifications: support real-time notifications with websockets via channelstream
r526 if can_access:
return True
return False
def check_channel_permissions(channels, plugin_validators, should_raise=True):
valid_channels = []
validators = [broadcast_validator, repo_validator]
if plugin_validators:
validators.extend(plugin_validators)
for channel_name in channels:
is_valid = False
for validator in validators:
if validator(channel_name):
is_valid = True
break
if is_valid:
valid_channels.append(channel_name)
else:
if should_raise:
raise ChannelstreamPermissionException()
return valid_channels
def get_channels_info(self, channels):
payload = {'channels': channels}
# gather persistence info
return channelstream_request(self._config(), payload, '/info')
def parse_channels_info(info_result, include_channel_info=None):
"""
Returns data that contains only secure information that can be
presented to clients
"""
include_channel_info = include_channel_info or []
user_state_dict = {}
for userinfo in info_result['users']:
user_state_dict[userinfo['user']] = {
k: v for k, v in userinfo['state'].items()
if k in STATE_PUBLIC_KEYS
}
channels_info = {}
for c_name, c_info in info_result['channels'].items():
if c_name not in include_channel_info:
continue
connected_list = []
for userinfo in c_info['users']:
connected_list.append({
'user': userinfo['user'],
'state': user_state_dict[userinfo['user']]
})
channels_info[c_name] = {'users': connected_list,
'history': c_info['history']}
return channels_info
def log_filepath(history_location, channel_name):
channelstream: use sha256 for history filename hashes
r1163 hasher = hashlib.sha256()
hasher.update(channel_name.encode('utf8'))
filename = '{}.log'.format(hasher.hexdigest())
notifications: support real-time notifications with websockets via channelstream
r526 filepath = os.path.join(history_location, filename)
return filepath
def read_history(history_location, channel_name):
filepath = log_filepath(history_location, channel_name)
if not os.path.exists(filepath):
return []
history_lines_limit = -100
history = []
with open(filepath, 'rb') as f:
for line in f.readlines()[history_lines_limit:]:
try:
history.append(json.loads(line))
except Exception:
log.exception('Failed to load history')
return history
def update_history_from_logs(config, channels, payload):
history_location = config.get('history.location')
for channel in channels:
history = read_history(history_location, channel)
payload['channels_info'][channel]['history'] = history
def write_history(config, message):
""" writes a messge to a base64encoded filename """
history_location = config.get('history.location')
if not os.path.exists(history_location):
return
try:
LOCK.acquire_write_lock()
filepath = log_filepath(history_location, message['channel'])
with open(filepath, 'ab') as f:
json.dump(message, f)
f.write('\n')
finally:
LOCK.release_write_lock()
def get_connection_validators(registry):
validators = []
for k, config in registry.rhodecode_plugins.iteritems():
validator = config.get('channelstream', {}).get('connect_validator')
if validator:
validators.append(validator)
return validators
channelstream: add simple method of posting message to notification channel.
r1969
def post_message(channel, message, username, registry=None):
if not registry:
registry = get_current_registry()
channelstream: push events with comments on single commits....
r1970 log.debug('Channelstream: sending notification to channel %s', channel)
channelstream: add simple method of posting message to notification channel.
r1969 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
channelstream_config = rhodecode_plugins.get('channelstream', {})
if channelstream_config.get('enabled'):
payload = {
'type': 'message',
'timestamp': datetime.datetime.utcnow(),
'user': 'system',
'exclude_users': [username],
'channel': channel,
'message': {
'message': message,
'level': 'success',
'topic': '/notifications'
}
}
try:
return channelstream_request(
channelstream_config, [payload], '/message',
raise_exc=False)
except ChannelstreamException:
log.exception('Failed to send channelstream data')
raise