##// END OF EJS Templates
fix(channelstream): fixed history writing
super-admin -
r5208:d460d319 default
parent child Browse files
Show More
@@ -1,370 +1,370 b''
1 1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import os
20 20 import itsdangerous
21 21 import logging
22 22 import requests
23 23 import datetime
24 24
25 25 from dogpile.util.readwrite_lock import ReadWriteMutex
26 26
27 27 import rhodecode.lib.helpers as h
28 28 from rhodecode.lib.auth import HasRepoPermissionAny
29 29 from rhodecode.lib.ext_json import json
30 30 from rhodecode.model.db import User
31 31 from rhodecode.lib.str_utils import ascii_str
32 32 from rhodecode.lib.hash_utils import sha1_safe
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36 LOCK = ReadWriteMutex()
37 37
38 38 USER_STATE_PUBLIC_KEYS = [
39 39 'id', 'username', 'first_name', 'last_name',
40 40 'icon_link', 'display_name', 'display_link']
41 41
42 42
43 43 class ChannelstreamException(Exception):
44 44 pass
45 45
46 46
47 47 class ChannelstreamConnectionException(ChannelstreamException):
48 48 pass
49 49
50 50
51 51 class ChannelstreamPermissionException(ChannelstreamException):
52 52 pass
53 53
54 54
55 55 def get_channelstream_server_url(config, endpoint):
56 56 return 'http://{}{}'.format(config['server'], endpoint)
57 57
58 58
59 59 def channelstream_request(config, payload, endpoint, raise_exc=True):
60 60 signer = itsdangerous.TimestampSigner(config['secret'])
61 61 sig_for_server = signer.sign(endpoint)
62 62 secret_headers = {'x-channelstream-secret': sig_for_server,
63 63 'x-channelstream-endpoint': endpoint,
64 64 'Content-Type': 'application/json'}
65 65 req_url = get_channelstream_server_url(config, endpoint)
66 66
67 67 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
68 68 response = None
69 69 try:
70 70 response = requests.post(req_url, data=json.dumps(payload),
71 71 headers=secret_headers).json()
72 72 except requests.ConnectionError:
73 73 log.exception('ConnectionError occurred for endpoint %s', req_url)
74 74 if raise_exc:
75 75 raise ChannelstreamConnectionException(req_url)
76 76 except Exception:
77 77 log.exception('Exception related to Channelstream happened')
78 78 if raise_exc:
79 79 raise ChannelstreamConnectionException()
80 80 log.debug('Got channelstream response: %s', response)
81 81 return response
82 82
83 83
84 84 def get_user_data(user_id):
85 85 user = User.get(user_id)
86 86 return {
87 87 'id': user.user_id,
88 88 'username': user.username,
89 89 'first_name': user.first_name,
90 90 'last_name': user.last_name,
91 91 'icon_link': h.gravatar_url(user.email, 60),
92 92 'display_name': h.person(user, 'username_or_name_or_email'),
93 93 'display_link': h.link_to_user(user),
94 94 'notifications': user.user_data.get('notification_status', True)
95 95 }
96 96
97 97
98 98 def broadcast_validator(channel_name):
99 99 """ checks if user can access the broadcast channel """
100 100 if channel_name == 'broadcast':
101 101 return True
102 102
103 103
104 104 def repo_validator(channel_name):
105 105 """ checks if user can access the broadcast channel """
106 106 channel_prefix = '/repo$'
107 107 if channel_name.startswith(channel_prefix):
108 108 elements = channel_name[len(channel_prefix):].split('$')
109 109 repo_name = elements[0]
110 110 can_access = HasRepoPermissionAny(
111 111 'repository.read',
112 112 'repository.write',
113 113 'repository.admin')(repo_name)
114 114 log.debug(
115 115 'permission check for %s channel resulted in %s',
116 116 repo_name, can_access)
117 117 if can_access:
118 118 return True
119 119 return False
120 120
121 121
122 122 def check_channel_permissions(channels, plugin_validators, should_raise=True):
123 123 valid_channels = []
124 124
125 125 validators = [broadcast_validator, repo_validator]
126 126 if plugin_validators:
127 127 validators.extend(plugin_validators)
128 128 for channel_name in channels:
129 129 is_valid = False
130 130 for validator in validators:
131 131 if validator(channel_name):
132 132 is_valid = True
133 133 break
134 134 if is_valid:
135 135 valid_channels.append(channel_name)
136 136 else:
137 137 if should_raise:
138 138 raise ChannelstreamPermissionException()
139 139 return valid_channels
140 140
141 141
142 142 def get_channels_info(self, channels):
143 143 payload = {'channels': channels}
144 144 # gather persistence info
145 145 return channelstream_request(self._config(), payload, '/info')
146 146
147 147
148 148 def parse_channels_info(info_result, include_channel_info=None):
149 149 """
150 150 Returns data that contains only secure information that can be
151 151 presented to clients
152 152 """
153 153 include_channel_info = include_channel_info or []
154 154
155 155 user_state_dict = {}
156 156 for userinfo in info_result['users']:
157 157 user_state_dict[userinfo['user']] = {
158 158 k: v for k, v in list(userinfo['state'].items())
159 159 if k in USER_STATE_PUBLIC_KEYS
160 160 }
161 161
162 162 channels_info = {}
163 163
164 164 for c_name, c_info in list(info_result['channels'].items()):
165 165 if c_name not in include_channel_info:
166 166 continue
167 167 connected_list = []
168 168 for username in c_info['users']:
169 169 connected_list.append({
170 170 'user': username,
171 171 'state': user_state_dict[username]
172 172 })
173 173 channels_info[c_name] = {'users': connected_list,
174 174 'history': c_info['history']}
175 175
176 176 return channels_info
177 177
178 178
179 179 def log_filepath(history_location, channel_name):
180 180
181 181 channel_hash = sha1_safe(channel_name, return_type='str')
182 182 filename = f'{channel_hash}.log'
183 183 filepath = os.path.join(history_location, filename)
184 184 return filepath
185 185
186 186
187 187 def read_history(history_location, channel_name):
188 188 filepath = log_filepath(history_location, channel_name)
189 189 if not os.path.exists(filepath):
190 190 return []
191 191 history_lines_limit = -100
192 192 history = []
193 193 with open(filepath, 'rb') as f:
194 194 for line in f.readlines()[history_lines_limit:]:
195 195 try:
196 196 history.append(json.loads(line))
197 197 except Exception:
198 198 log.exception('Failed to load history')
199 199 return history
200 200
201 201
202 202 def update_history_from_logs(config, channels, payload):
203 203 history_location = config.get('history.location')
204 204 for channel in channels:
205 205 history = read_history(history_location, channel)
206 206 payload['channels_info'][channel]['history'] = history
207 207
208 208
209 209 def write_history(config, message):
210 210 """ writes a message to a base64encoded filename """
211 211 history_location = config.get('history.location')
212 212 if not os.path.exists(history_location):
213 213 return
214 214 try:
215 215 LOCK.acquire_write_lock()
216 216 filepath = log_filepath(history_location, message['channel'])
217 217 json_message = json.dumps(message)
218 218 with open(filepath, 'ab') as f:
219 219 f.write(json_message)
220 f.write('\n')
220 f.write(b'\n')
221 221 finally:
222 222 LOCK.release_write_lock()
223 223
224 224
225 225 def get_connection_validators(registry):
226 226 validators = []
227 227 for k, config in list(registry.rhodecode_plugins.items()):
228 228 validator = config.get('channelstream', {}).get('connect_validator')
229 229 if validator:
230 230 validators.append(validator)
231 231 return validators
232 232
233 233
234 234 def get_channelstream_config(registry=None):
235 235 if not registry:
236 236 from pyramid.threadlocal import get_current_registry
237 237 registry = get_current_registry()
238 238
239 239 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
240 240 channelstream_config = rhodecode_plugins.get('channelstream', {})
241 241 return channelstream_config
242 242
243 243
244 244 def post_message(channel, message, username, registry=None):
245 245 channelstream_config = get_channelstream_config(registry)
246 246 if not channelstream_config.get('enabled'):
247 247 return
248 248
249 249 message_obj = message
250 250 if isinstance(message, str):
251 251 message_obj = {
252 252 'message': message,
253 253 'level': 'success',
254 254 'topic': '/notifications'
255 255 }
256 256
257 257 log.debug('Channelstream: sending notification to channel %s', channel)
258 258 payload = {
259 259 'type': 'message',
260 260 'timestamp': datetime.datetime.utcnow(),
261 261 'user': 'system',
262 262 'exclude_users': [username],
263 263 'channel': channel,
264 264 'message': message_obj
265 265 }
266 266
267 267 try:
268 268 return channelstream_request(
269 269 channelstream_config, [payload], '/message',
270 270 raise_exc=False)
271 271 except ChannelstreamException:
272 272 log.exception('Failed to send channelstream data')
273 273 raise
274 274
275 275
276 276 def _reload_link(label):
277 277 return (
278 278 '<a onclick="window.location.reload()">'
279 279 '<strong>{}</strong>'
280 280 '</a>'.format(label)
281 281 )
282 282
283 283
284 284 def pr_channel(pull_request):
285 285 repo_name = pull_request.target_repo.repo_name
286 286 pull_request_id = pull_request.pull_request_id
287 287 channel = f'/repo${repo_name}$/pr/{pull_request_id}'
288 288 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
289 289 return channel
290 290
291 291
292 292 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
293 293 channel = None
294 294 if commit_obj:
295 295 channel = '/repo${}$/commit/{}'.format(
296 296 repo_name, commit_obj.raw_id
297 297 )
298 298 elif pull_request_obj:
299 299 channel = '/repo${}$/pr/{}'.format(
300 300 repo_name, pull_request_obj.pull_request_id
301 301 )
302 302 log.debug('Getting comment channelstream broadcast channel: %s', channel)
303 303
304 304 return channel
305 305
306 306
307 307 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
308 308 """
309 309 Channel push on pull request update
310 310 """
311 311 if not pr_broadcast_channel:
312 312 return
313 313
314 314 _ = request.translate
315 315
316 316 message = '{} {}'.format(
317 317 msg,
318 318 _reload_link(_(' Reload page to load changes')))
319 319
320 320 message_obj = {
321 321 'message': message,
322 322 'level': 'success',
323 323 'topic': '/notifications'
324 324 }
325 325
326 326 post_message(
327 327 pr_broadcast_channel, message_obj, user.username,
328 328 registry=request.registry)
329 329
330 330
331 331 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
332 332 """
333 333 Channelstream push on comment action, on commit, or pull-request
334 334 """
335 335 if not comment_broadcast_channel:
336 336 return
337 337
338 338 _ = request.translate
339 339
340 340 comment_data = kwargs.pop('comment_data', {})
341 341 user_data = kwargs.pop('user_data', {})
342 342 comment_id = list(comment_data.keys())[0] if comment_data else ''
343 343
344 344 message = '<strong>{}</strong> {} #{}'.format(
345 345 user.username,
346 346 msg,
347 347 comment_id,
348 348 )
349 349
350 350 message_obj = {
351 351 'message': message,
352 352 'level': 'success',
353 353 'topic': '/notifications'
354 354 }
355 355
356 356 post_message(
357 357 comment_broadcast_channel, message_obj, user.username,
358 358 registry=request.registry)
359 359
360 360 message_obj = {
361 361 'message': None,
362 362 'user': user.username,
363 363 'comment_id': comment_id,
364 364 'comment_data': comment_data,
365 365 'user_data': user_data,
366 366 'topic': '/comment'
367 367 }
368 368 post_message(
369 369 comment_broadcast_channel, message_obj, user.username,
370 370 registry=request.registry)
General Comments 0
You need to be logged in to leave comments. Login now