##// END OF EJS Templates
channelstream: python3 fixes
super-admin -
r5003:c1b8a600 default
parent child Browse files
Show More
@@ -1,371 +1,372 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 import hashlib
23 22 import itsdangerous
24 23 import logging
25 24 import requests
26 25 import datetime
27 26
28 27 from dogpile.util.readwrite_lock import ReadWriteMutex
29 28 from pyramid.threadlocal import get_current_registry
30 29
31 30 import rhodecode.lib.helpers as h
32 31 from rhodecode.lib.auth import HasRepoPermissionAny
33 32 from rhodecode.lib.ext_json import json
34 33 from rhodecode.model.db import User
34 from rhodecode.lib.str_utils import ascii_str
35 from rhodecode.lib.hash_utils import sha1_safe
35 36
36 37 log = logging.getLogger(__name__)
37 38
38 39 LOCK = ReadWriteMutex()
39 40
40 41 USER_STATE_PUBLIC_KEYS = [
41 42 'id', 'username', 'first_name', 'last_name',
42 43 'icon_link', 'display_name', 'display_link']
43 44
44 45
45 46 class ChannelstreamException(Exception):
46 47 pass
47 48
48 49
49 50 class ChannelstreamConnectionException(ChannelstreamException):
50 51 pass
51 52
52 53
53 54 class ChannelstreamPermissionException(ChannelstreamException):
54 55 pass
55 56
56 57
57 58 def get_channelstream_server_url(config, endpoint):
58 59 return 'http://{}{}'.format(config['server'], endpoint)
59 60
60 61
61 62 def channelstream_request(config, payload, endpoint, raise_exc=True):
62 63 signer = itsdangerous.TimestampSigner(config['secret'])
63 64 sig_for_server = signer.sign(endpoint)
64 65 secret_headers = {'x-channelstream-secret': sig_for_server,
65 66 'x-channelstream-endpoint': endpoint,
66 67 'Content-Type': 'application/json'}
67 68 req_url = get_channelstream_server_url(config, endpoint)
68 69
69 70 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
70 71 response = None
71 72 try:
72 73 response = requests.post(req_url, data=json.dumps(payload),
73 74 headers=secret_headers).json()
74 75 except requests.ConnectionError:
75 76 log.exception('ConnectionError occurred for endpoint %s', req_url)
76 77 if raise_exc:
77 78 raise ChannelstreamConnectionException(req_url)
78 79 except Exception:
79 80 log.exception('Exception related to Channelstream happened')
80 81 if raise_exc:
81 82 raise ChannelstreamConnectionException()
82 83 log.debug('Got channelstream response: %s', response)
83 84 return response
84 85
85 86
86 87 def get_user_data(user_id):
87 88 user = User.get(user_id)
88 89 return {
89 90 'id': user.user_id,
90 91 'username': user.username,
91 92 'first_name': user.first_name,
92 93 'last_name': user.last_name,
93 94 'icon_link': h.gravatar_url(user.email, 60),
94 95 'display_name': h.person(user, 'username_or_name_or_email'),
95 96 'display_link': h.link_to_user(user),
96 97 'notifications': user.user_data.get('notification_status', True)
97 98 }
98 99
99 100
100 101 def broadcast_validator(channel_name):
101 102 """ checks if user can access the broadcast channel """
102 103 if channel_name == 'broadcast':
103 104 return True
104 105
105 106
106 107 def repo_validator(channel_name):
107 108 """ checks if user can access the broadcast channel """
108 109 channel_prefix = '/repo$'
109 110 if channel_name.startswith(channel_prefix):
110 111 elements = channel_name[len(channel_prefix):].split('$')
111 112 repo_name = elements[0]
112 113 can_access = HasRepoPermissionAny(
113 114 'repository.read',
114 115 'repository.write',
115 116 'repository.admin')(repo_name)
116 117 log.debug(
117 118 'permission check for %s channel resulted in %s',
118 119 repo_name, can_access)
119 120 if can_access:
120 121 return True
121 122 return False
122 123
123 124
124 125 def check_channel_permissions(channels, plugin_validators, should_raise=True):
125 126 valid_channels = []
126 127
127 128 validators = [broadcast_validator, repo_validator]
128 129 if plugin_validators:
129 130 validators.extend(plugin_validators)
130 131 for channel_name in channels:
131 132 is_valid = False
132 133 for validator in validators:
133 134 if validator(channel_name):
134 135 is_valid = True
135 136 break
136 137 if is_valid:
137 138 valid_channels.append(channel_name)
138 139 else:
139 140 if should_raise:
140 141 raise ChannelstreamPermissionException()
141 142 return valid_channels
142 143
143 144
144 145 def get_channels_info(self, channels):
145 146 payload = {'channels': channels}
146 147 # gather persistence info
147 148 return channelstream_request(self._config(), payload, '/info')
148 149
149 150
150 151 def parse_channels_info(info_result, include_channel_info=None):
151 152 """
152 153 Returns data that contains only secure information that can be
153 154 presented to clients
154 155 """
155 156 include_channel_info = include_channel_info or []
156 157
157 158 user_state_dict = {}
158 159 for userinfo in info_result['users']:
159 160 user_state_dict[userinfo['user']] = {
160 k: v for k, v in userinfo['state'].items()
161 k: v for k, v in list(userinfo['state'].items())
161 162 if k in USER_STATE_PUBLIC_KEYS
162 163 }
163 164
164 165 channels_info = {}
165 166
166 for c_name, c_info in info_result['channels'].items():
167 for c_name, c_info in list(info_result['channels'].items()):
167 168 if c_name not in include_channel_info:
168 169 continue
169 170 connected_list = []
170 171 for username in c_info['users']:
171 172 connected_list.append({
172 173 'user': username,
173 174 'state': user_state_dict[username]
174 175 })
175 176 channels_info[c_name] = {'users': connected_list,
176 177 'history': c_info['history']}
177 178
178 179 return channels_info
179 180
180 181
181 182 def log_filepath(history_location, channel_name):
182 hasher = hashlib.sha256()
183 hasher.update(channel_name.encode('utf8'))
184 filename = '{}.log'.format(hasher.hexdigest())
183
184 channel_hash = ascii_str(sha1_safe(channel_name))
185 filename = f'{channel_hash}.log'
185 186 filepath = os.path.join(history_location, filename)
186 187 return filepath
187 188
188 189
189 190 def read_history(history_location, channel_name):
190 191 filepath = log_filepath(history_location, channel_name)
191 192 if not os.path.exists(filepath):
192 193 return []
193 194 history_lines_limit = -100
194 195 history = []
195 196 with open(filepath, 'rb') as f:
196 197 for line in f.readlines()[history_lines_limit:]:
197 198 try:
198 199 history.append(json.loads(line))
199 200 except Exception:
200 201 log.exception('Failed to load history')
201 202 return history
202 203
203 204
204 205 def update_history_from_logs(config, channels, payload):
205 206 history_location = config.get('history.location')
206 207 for channel in channels:
207 208 history = read_history(history_location, channel)
208 209 payload['channels_info'][channel]['history'] = history
209 210
210 211
211 212 def write_history(config, message):
212 213 """ writes a message to a base64encoded filename """
213 214 history_location = config.get('history.location')
214 215 if not os.path.exists(history_location):
215 216 return
216 217 try:
217 218 LOCK.acquire_write_lock()
218 219 filepath = log_filepath(history_location, message['channel'])
219 220 json_message = json.dumps(message)
220 221 with open(filepath, 'ab') as f:
221 222 f.write(json_message)
222 223 f.write('\n')
223 224 finally:
224 225 LOCK.release_write_lock()
225 226
226 227
227 228 def get_connection_validators(registry):
228 229 validators = []
229 for k, config in registry.rhodecode_plugins.items():
230 for k, config in list(registry.rhodecode_plugins.items()):
230 231 validator = config.get('channelstream', {}).get('connect_validator')
231 232 if validator:
232 233 validators.append(validator)
233 234 return validators
234 235
235 236
236 237 def get_channelstream_config(registry=None):
237 238 if not registry:
238 239 registry = get_current_registry()
239 240
240 241 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
241 242 channelstream_config = rhodecode_plugins.get('channelstream', {})
242 243 return channelstream_config
243 244
244 245
245 246 def post_message(channel, message, username, registry=None):
246 247 channelstream_config = get_channelstream_config(registry)
247 248 if not channelstream_config.get('enabled'):
248 249 return
249 250
250 251 message_obj = message
251 252 if isinstance(message, str):
252 253 message_obj = {
253 254 'message': message,
254 255 'level': 'success',
255 256 'topic': '/notifications'
256 257 }
257 258
258 259 log.debug('Channelstream: sending notification to channel %s', channel)
259 260 payload = {
260 261 'type': 'message',
261 262 'timestamp': datetime.datetime.utcnow(),
262 263 'user': 'system',
263 264 'exclude_users': [username],
264 265 'channel': channel,
265 266 'message': message_obj
266 267 }
267 268
268 269 try:
269 270 return channelstream_request(
270 271 channelstream_config, [payload], '/message',
271 272 raise_exc=False)
272 273 except ChannelstreamException:
273 274 log.exception('Failed to send channelstream data')
274 275 raise
275 276
276 277
277 278 def _reload_link(label):
278 279 return (
279 280 '<a onclick="window.location.reload()">'
280 281 '<strong>{}</strong>'
281 282 '</a>'.format(label)
282 283 )
283 284
284 285
285 286 def pr_channel(pull_request):
286 287 repo_name = pull_request.target_repo.repo_name
287 288 pull_request_id = pull_request.pull_request_id
288 289 channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id)
289 290 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
290 291 return channel
291 292
292 293
293 294 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
294 295 channel = None
295 296 if commit_obj:
296 297 channel = '/repo${}$/commit/{}'.format(
297 298 repo_name, commit_obj.raw_id
298 299 )
299 300 elif pull_request_obj:
300 301 channel = '/repo${}$/pr/{}'.format(
301 302 repo_name, pull_request_obj.pull_request_id
302 303 )
303 304 log.debug('Getting comment channelstream broadcast channel: %s', channel)
304 305
305 306 return channel
306 307
307 308
308 309 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
309 310 """
310 311 Channel push on pull request update
311 312 """
312 313 if not pr_broadcast_channel:
313 314 return
314 315
315 316 _ = request.translate
316 317
317 318 message = '{} {}'.format(
318 319 msg,
319 320 _reload_link(_(' Reload page to load changes')))
320 321
321 322 message_obj = {
322 323 'message': message,
323 324 'level': 'success',
324 325 'topic': '/notifications'
325 326 }
326 327
327 328 post_message(
328 329 pr_broadcast_channel, message_obj, user.username,
329 330 registry=request.registry)
330 331
331 332
332 333 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
333 334 """
334 335 Channelstream push on comment action, on commit, or pull-request
335 336 """
336 337 if not comment_broadcast_channel:
337 338 return
338 339
339 340 _ = request.translate
340 341
341 342 comment_data = kwargs.pop('comment_data', {})
342 343 user_data = kwargs.pop('user_data', {})
343 comment_id = comment_data.keys()[0] if comment_data else ''
344 comment_id = list(comment_data.keys())[0] if comment_data else ''
344 345
345 346 message = '<strong>{}</strong> {} #{}'.format(
346 347 user.username,
347 348 msg,
348 349 comment_id,
349 350 )
350 351
351 352 message_obj = {
352 353 'message': message,
353 354 'level': 'success',
354 355 'topic': '/notifications'
355 356 }
356 357
357 358 post_message(
358 359 comment_broadcast_channel, message_obj, user.username,
359 360 registry=request.registry)
360 361
361 362 message_obj = {
362 363 'message': None,
363 364 'user': user.username,
364 365 'comment_id': comment_id,
365 366 'comment_data': comment_data,
366 367 'user_data': user_data,
367 368 'topic': '/comment'
368 369 }
369 370 post_message(
370 371 comment_broadcast_channel, message_obj, user.username,
371 372 registry=request.registry)
General Comments 0
You need to be logged in to leave comments. Login now