##// END OF EJS Templates
channelstream: nicer handle of connection errors for debug.
marcink -
r3169:c15445d5 default
parent child Browse files
Show More
@@ -1,169 +1,177 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22 import uuid
23 23
24 24 from pyramid.view import view_config
25 25 from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPBadGateway
26 26
27 27 from rhodecode.apps._base import BaseAppView
28 28 from rhodecode.lib.channelstream import (
29 channelstream_request,
29 channelstream_request, get_channelstream_server_url,
30 30 ChannelstreamConnectionException,
31 31 ChannelstreamPermissionException,
32 32 check_channel_permissions,
33 33 get_connection_validators,
34 34 get_user_data,
35 35 parse_channels_info,
36 36 update_history_from_logs,
37 37 STATE_PUBLIC_KEYS)
38 38
39 39 from rhodecode.lib.auth import NotAnonymous
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class ChannelstreamView(BaseAppView):
45 45
46 46 def load_default_context(self):
47 47 c = self._get_local_tmpl_context()
48 48 self.channelstream_config = \
49 49 self.request.registry.rhodecode_plugins['channelstream']
50 50 if not self.channelstream_config.get('enabled'):
51 51 log.error('Channelstream plugin is disabled')
52 52 raise HTTPBadRequest()
53 53
54 54 return c
55 55
56 56 @NotAnonymous()
57 57 @view_config(route_name='channelstream_connect', renderer='json_ext')
58 58 def connect(self):
59 """ handle authorization of users trying to connect """
60
59 61 self.load_default_context()
60 """ handle authorization of users trying to connect """
61 62 try:
62 63 json_body = self.request.json_body
63 64 except Exception:
64 65 log.exception('Failed to decode json from request')
65 66 raise HTTPBadRequest()
66 67
67 68 try:
68 69 channels = check_channel_permissions(
69 70 json_body.get('channels'),
70 71 get_connection_validators(self.request.registry))
71 72 except ChannelstreamPermissionException:
72 73 log.error('Incorrect permissions for requested channels')
73 74 raise HTTPForbidden()
74 75
75 76 user = self._rhodecode_user
76 77 if user.user_id:
77 78 user_data = get_user_data(user.user_id)
78 79 else:
79 80 user_data = {
80 81 'id': None,
81 82 'username': None,
82 83 'first_name': None,
83 84 'last_name': None,
84 85 'icon_link': None,
85 86 'display_name': None,
86 87 'display_link': None,
87 88 }
88 89 user_data['permissions'] = self._rhodecode_user.permissions_safe
89 90 payload = {
90 91 'username': user.username,
91 92 'user_state': user_data,
92 93 'conn_id': str(uuid.uuid4()),
93 94 'channels': channels,
94 95 'channel_configs': {},
95 96 'state_public_keys': STATE_PUBLIC_KEYS,
96 97 'info': {
97 98 'exclude_channels': ['broadcast']
98 99 }
99 100 }
100 101 filtered_channels = [channel for channel in channels
101 102 if channel != 'broadcast']
102 103 for channel in filtered_channels:
103 104 payload['channel_configs'][channel] = {
104 105 'notify_presence': True,
105 106 'history_size': 100,
106 107 'store_history': True,
107 108 'broadcast_presence_with_user_lists': True
108 109 }
109 110 # connect user to server
111 channelstream_url = get_channelstream_server_url(
112 self.channelstream_config, '/connect')
110 113 try:
111 connect_result = channelstream_request(self.channelstream_config,
112 payload, '/connect')
114 connect_result = channelstream_request(
115 self.channelstream_config, payload, '/connect')
113 116 except ChannelstreamConnectionException:
114 log.exception('Channelstream service is down')
117 log.exception(
118 'Channelstream service at {} is down'.format(channelstream_url))
115 119 return HTTPBadGateway()
116 120
117 121 connect_result['channels'] = channels
118 122 connect_result['channels_info'] = parse_channels_info(
119 123 connect_result['channels_info'],
120 124 include_channel_info=filtered_channels)
121 125 update_history_from_logs(self.channelstream_config,
122 126 filtered_channels, connect_result)
123 127 return connect_result
124 128
125 129 @NotAnonymous()
126 130 @view_config(route_name='channelstream_subscribe', renderer='json_ext')
127 131 def subscribe(self):
128 132 """ can be used to subscribe specific connection to other channels """
129 133 self.load_default_context()
130 134 try:
131 135 json_body = self.request.json_body
132 136 except Exception:
133 137 log.exception('Failed to decode json from request')
134 138 raise HTTPBadRequest()
135 139 try:
136 140 channels = check_channel_permissions(
137 141 json_body.get('channels'),
138 142 get_connection_validators(self.request.registry))
139 143 except ChannelstreamPermissionException:
140 144 log.error('Incorrect permissions for requested channels')
141 145 raise HTTPForbidden()
142 146 payload = {'conn_id': json_body.get('conn_id', ''),
143 147 'channels': channels,
144 148 'channel_configs': {},
145 149 'info': {
146 150 'exclude_channels': ['broadcast']}
147 151 }
148 152 filtered_channels = [chan for chan in channels if chan != 'broadcast']
149 153 for channel in filtered_channels:
150 154 payload['channel_configs'][channel] = {
151 155 'notify_presence': True,
152 156 'history_size': 100,
153 157 'store_history': True,
154 158 'broadcast_presence_with_user_lists': True
155 159 }
160
161 channelstream_url = get_channelstream_server_url(
162 self.channelstream_config, '/subscribe')
156 163 try:
157 164 connect_result = channelstream_request(
158 165 self.channelstream_config, payload, '/subscribe')
159 166 except ChannelstreamConnectionException:
160 log.exception('Channelstream service is down')
167 log.exception(
168 'Channelstream service at {} is down'.format(channelstream_url))
161 169 return HTTPBadGateway()
162 170 # include_channel_info will limit history only to new channel
163 171 # to not overwrite histories on other channels in client
164 172 connect_result['channels_info'] = parse_channels_info(
165 173 connect_result['channels_info'],
166 174 include_channel_info=filtered_channels)
167 update_history_from_logs(self.channelstream_config,
168 filtered_channels, connect_result)
175 update_history_from_logs(
176 self.channelstream_config, filtered_channels, connect_result)
169 177 return connect_result
@@ -1,255 +1,259 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import hashlib
23 23 import itsdangerous
24 24 import logging
25 25 import requests
26 26 import datetime
27 27
28 28 from dogpile.core import ReadWriteMutex
29 29 from pyramid.threadlocal import get_current_registry
30 30
31 31 import rhodecode.lib.helpers as h
32 32 from rhodecode.lib.auth import HasRepoPermissionAny
33 33 from rhodecode.lib.ext_json import json
34 34 from rhodecode.model.db import User
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 LOCK = ReadWriteMutex()
39 39
40 40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
41 41 'icon_link', 'display_name', 'display_link']
42 42
43 43
44 44 class ChannelstreamException(Exception):
45 45 pass
46 46
47 47
48 48 class ChannelstreamConnectionException(ChannelstreamException):
49 49 pass
50 50
51 51
52 52 class ChannelstreamPermissionException(ChannelstreamException):
53 53 pass
54 54
55 55
56 def get_channelstream_server_url(config, endpoint):
57 return 'http://{}{}'.format(config['server'], endpoint)
58
59
56 60 def channelstream_request(config, payload, endpoint, raise_exc=True):
57 61 signer = itsdangerous.TimestampSigner(config['secret'])
58 62 sig_for_server = signer.sign(endpoint)
59 63 secret_headers = {'x-channelstream-secret': sig_for_server,
60 64 'x-channelstream-endpoint': endpoint,
61 65 'Content-Type': 'application/json'}
62 req_url = 'http://{}{}'.format(config['server'], endpoint)
66 req_url = get_channelstream_server_url(config, endpoint)
63 67 response = None
64 68 try:
65 69 response = requests.post(req_url, data=json.dumps(payload),
66 70 headers=secret_headers).json()
67 71 except requests.ConnectionError:
68 log.exception('ConnectionError happened')
72 log.exception('ConnectionError occurred for endpoint %s', req_url)
69 73 if raise_exc:
70 raise ChannelstreamConnectionException()
74 raise ChannelstreamConnectionException(req_url)
71 75 except Exception:
72 log.exception('Exception related to channelstream happened')
76 log.exception('Exception related to Channelstream happened')
73 77 if raise_exc:
74 78 raise ChannelstreamConnectionException()
75 79 return response
76 80
77 81
78 82 def get_user_data(user_id):
79 83 user = User.get(user_id)
80 84 return {
81 85 'id': user.user_id,
82 86 'username': user.username,
83 87 'first_name': user.first_name,
84 88 'last_name': user.last_name,
85 89 'icon_link': h.gravatar_url(user.email, 60),
86 90 'display_name': h.person(user, 'username_or_name_or_email'),
87 91 'display_link': h.link_to_user(user),
88 92 'notifications': user.user_data.get('notification_status', True)
89 93 }
90 94
91 95
92 96 def broadcast_validator(channel_name):
93 97 """ checks if user can access the broadcast channel """
94 98 if channel_name == 'broadcast':
95 99 return True
96 100
97 101
98 102 def repo_validator(channel_name):
99 103 """ checks if user can access the broadcast channel """
100 104 channel_prefix = '/repo$'
101 105 if channel_name.startswith(channel_prefix):
102 106 elements = channel_name[len(channel_prefix):].split('$')
103 107 repo_name = elements[0]
104 108 can_access = HasRepoPermissionAny(
105 109 'repository.read',
106 110 'repository.write',
107 111 'repository.admin')(repo_name)
108 112 log.debug(
109 113 'permission check for %s channel resulted in %s',
110 114 repo_name, can_access)
111 115 if can_access:
112 116 return True
113 117 return False
114 118
115 119
116 120 def check_channel_permissions(channels, plugin_validators, should_raise=True):
117 121 valid_channels = []
118 122
119 123 validators = [broadcast_validator, repo_validator]
120 124 if plugin_validators:
121 125 validators.extend(plugin_validators)
122 126 for channel_name in channels:
123 127 is_valid = False
124 128 for validator in validators:
125 129 if validator(channel_name):
126 130 is_valid = True
127 131 break
128 132 if is_valid:
129 133 valid_channels.append(channel_name)
130 134 else:
131 135 if should_raise:
132 136 raise ChannelstreamPermissionException()
133 137 return valid_channels
134 138
135 139
136 140 def get_channels_info(self, channels):
137 141 payload = {'channels': channels}
138 142 # gather persistence info
139 143 return channelstream_request(self._config(), payload, '/info')
140 144
141 145
142 146 def parse_channels_info(info_result, include_channel_info=None):
143 147 """
144 148 Returns data that contains only secure information that can be
145 149 presented to clients
146 150 """
147 151 include_channel_info = include_channel_info or []
148 152
149 153 user_state_dict = {}
150 154 for userinfo in info_result['users']:
151 155 user_state_dict[userinfo['user']] = {
152 156 k: v for k, v in userinfo['state'].items()
153 157 if k in STATE_PUBLIC_KEYS
154 158 }
155 159
156 160 channels_info = {}
157 161
158 162 for c_name, c_info in info_result['channels'].items():
159 163 if c_name not in include_channel_info:
160 164 continue
161 165 connected_list = []
162 166 for userinfo in c_info['users']:
163 167 connected_list.append({
164 168 'user': userinfo['user'],
165 169 'state': user_state_dict[userinfo['user']]
166 170 })
167 171 channels_info[c_name] = {'users': connected_list,
168 172 'history': c_info['history']}
169 173
170 174 return channels_info
171 175
172 176
173 177 def log_filepath(history_location, channel_name):
174 178 hasher = hashlib.sha256()
175 179 hasher.update(channel_name.encode('utf8'))
176 180 filename = '{}.log'.format(hasher.hexdigest())
177 181 filepath = os.path.join(history_location, filename)
178 182 return filepath
179 183
180 184
181 185 def read_history(history_location, channel_name):
182 186 filepath = log_filepath(history_location, channel_name)
183 187 if not os.path.exists(filepath):
184 188 return []
185 189 history_lines_limit = -100
186 190 history = []
187 191 with open(filepath, 'rb') as f:
188 192 for line in f.readlines()[history_lines_limit:]:
189 193 try:
190 194 history.append(json.loads(line))
191 195 except Exception:
192 196 log.exception('Failed to load history')
193 197 return history
194 198
195 199
196 200 def update_history_from_logs(config, channels, payload):
197 201 history_location = config.get('history.location')
198 202 for channel in channels:
199 203 history = read_history(history_location, channel)
200 204 payload['channels_info'][channel]['history'] = history
201 205
202 206
203 207 def write_history(config, message):
204 208 """ writes a messge to a base64encoded filename """
205 209 history_location = config.get('history.location')
206 210 if not os.path.exists(history_location):
207 211 return
208 212 try:
209 213 LOCK.acquire_write_lock()
210 214 filepath = log_filepath(history_location, message['channel'])
211 215 with open(filepath, 'ab') as f:
212 216 json.dump(message, f)
213 217 f.write('\n')
214 218 finally:
215 219 LOCK.release_write_lock()
216 220
217 221
218 222 def get_connection_validators(registry):
219 223 validators = []
220 224 for k, config in registry.rhodecode_plugins.iteritems():
221 225 validator = config.get('channelstream', {}).get('connect_validator')
222 226 if validator:
223 227 validators.append(validator)
224 228 return validators
225 229
226 230
227 231 def post_message(channel, message, username, registry=None):
228 232
229 233 if not registry:
230 234 registry = get_current_registry()
231 235
232 236 log.debug('Channelstream: sending notification to channel %s', channel)
233 237 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
234 238 channelstream_config = rhodecode_plugins.get('channelstream', {})
235 239 if channelstream_config.get('enabled'):
236 240 payload = {
237 241 'type': 'message',
238 242 'timestamp': datetime.datetime.utcnow(),
239 243 'user': 'system',
240 244 'exclude_users': [username],
241 245 'channel': channel,
242 246 'message': {
243 247 'message': message,
244 248 'level': 'success',
245 249 'topic': '/notifications'
246 250 }
247 251 }
248 252
249 253 try:
250 254 return channelstream_request(
251 255 channelstream_config, [payload], '/message',
252 256 raise_exc=False)
253 257 except ChannelstreamException:
254 258 log.exception('Failed to send channelstream data')
255 259 raise
General Comments 0
You need to be logged in to leave comments. Login now