##// END OF EJS Templates
vcs: Only allow 'pull' actions on shadow repositories....
vcs: Only allow 'pull' actions on shadow repositories. We are exposing the shadow repositories of pull requests to allow easy CI integration or users to access the pull request shadow repo for investigating on it. But we don't want someone/something to push changes to a shadow repository.

File last commit:

r836:ae0d550a default
r891:910a0be0 default
Show More
channelstream.py
220 lines | 6.9 KiB | text/x-python | PythonLexer
notifications: support real-time notifications with websockets via channelstream
r526 # -*- coding: utf-8 -*-
# Copyright (C) 2016-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import logging
import os
import itsdangerous
import requests
from dogpile.core import ReadWriteMutex
import rhodecode.lib.helpers as h
from rhodecode.lib.auth import HasRepoPermissionAny
from rhodecode.lib.ext_json import json
from rhodecode.model.db import User
log = logging.getLogger(__name__)
LOCK = ReadWriteMutex()
STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
'icon_link', 'display_name', 'display_link']
class ChannelstreamException(Exception):
pass
class ChannelstreamConnectionException(Exception):
pass
class ChannelstreamPermissionException(Exception):
pass
def channelstream_request(config, payload, endpoint, raise_exc=True):
signer = itsdangerous.TimestampSigner(config['secret'])
sig_for_server = signer.sign(endpoint)
secret_headers = {'x-channelstream-secret': sig_for_server,
'x-channelstream-endpoint': endpoint,
'Content-Type': 'application/json'}
req_url = 'http://{}{}'.format(config['server'], endpoint)
response = None
try:
response = requests.post(req_url, data=json.dumps(payload),
headers=secret_headers).json()
except requests.ConnectionError:
log.exception('ConnectionError happened')
if raise_exc:
raise ChannelstreamConnectionException()
except Exception:
log.exception('Exception related to channelstream happened')
if raise_exc:
raise ChannelstreamConnectionException()
return response
def get_user_data(user_id):
user = User.get(user_id)
return {
'id': user.user_id,
'username': user.username,
'first_name': user.name,
'last_name': user.lastname,
ux: better looking chat avatars
r836 'icon_link': h.gravatar_url(user.email, 60),
notifications: support real-time notifications with websockets via channelstream
r526 'display_name': h.person(user, 'username_or_name_or_email'),
'display_link': h.link_to_user(user),
notifications: store notification status in channelstream
r734 'notifications': user.user_data.get('notification_status', True)
notifications: support real-time notifications with websockets via channelstream
r526 }
def broadcast_validator(channel_name):
""" checks if user can access the broadcast channel """
if channel_name == 'broadcast':
return True
def repo_validator(channel_name):
""" checks if user can access the broadcast channel """
channel_prefix = '/repo$'
if channel_name.startswith(channel_prefix):
elements = channel_name[len(channel_prefix):].split('$')
repo_name = elements[0]
can_access = HasRepoPermissionAny(
'repository.read',
'repository.write',
'repository.admin')(repo_name)
log.debug('permission check for {} channel '
'resulted in {}'.format(repo_name, can_access))
if can_access:
return True
return False
def check_channel_permissions(channels, plugin_validators, should_raise=True):
valid_channels = []
validators = [broadcast_validator, repo_validator]
if plugin_validators:
validators.extend(plugin_validators)
for channel_name in channels:
is_valid = False
for validator in validators:
if validator(channel_name):
is_valid = True
break
if is_valid:
valid_channels.append(channel_name)
else:
if should_raise:
raise ChannelstreamPermissionException()
return valid_channels
def get_channels_info(self, channels):
payload = {'channels': channels}
# gather persistence info
return channelstream_request(self._config(), payload, '/info')
def parse_channels_info(info_result, include_channel_info=None):
"""
Returns data that contains only secure information that can be
presented to clients
"""
include_channel_info = include_channel_info or []
user_state_dict = {}
for userinfo in info_result['users']:
user_state_dict[userinfo['user']] = {
k: v for k, v in userinfo['state'].items()
if k in STATE_PUBLIC_KEYS
}
channels_info = {}
for c_name, c_info in info_result['channels'].items():
if c_name not in include_channel_info:
continue
connected_list = []
for userinfo in c_info['users']:
connected_list.append({
'user': userinfo['user'],
'state': user_state_dict[userinfo['user']]
})
channels_info[c_name] = {'users': connected_list,
'history': c_info['history']}
return channels_info
def log_filepath(history_location, channel_name):
filename = '{}.log'.format(channel_name.encode('hex'))
filepath = os.path.join(history_location, filename)
return filepath
def read_history(history_location, channel_name):
filepath = log_filepath(history_location, channel_name)
if not os.path.exists(filepath):
return []
history_lines_limit = -100
history = []
with open(filepath, 'rb') as f:
for line in f.readlines()[history_lines_limit:]:
try:
history.append(json.loads(line))
except Exception:
log.exception('Failed to load history')
return history
def update_history_from_logs(config, channels, payload):
history_location = config.get('history.location')
for channel in channels:
history = read_history(history_location, channel)
payload['channels_info'][channel]['history'] = history
def write_history(config, message):
""" writes a messge to a base64encoded filename """
history_location = config.get('history.location')
if not os.path.exists(history_location):
return
try:
LOCK.acquire_write_lock()
filepath = log_filepath(history_location, message['channel'])
with open(filepath, 'ab') as f:
json.dump(message, f)
f.write('\n')
finally:
LOCK.release_write_lock()
def get_connection_validators(registry):
validators = []
for k, config in registry.rhodecode_plugins.iteritems():
validator = config.get('channelstream', {}).get('connect_validator')
if validator:
validators.append(validator)
return validators