##// END OF EJS Templates
fix(channelstream): fixed history writing
super-admin -
r5208:d460d319 default
parent child Browse files
Show More
@@ -1,370 +1,370 b''
1 # Copyright (C) 2016-2023 RhodeCode GmbH
1 # Copyright (C) 2016-2023 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
18
19 import os
19 import os
20 import itsdangerous
20 import itsdangerous
21 import logging
21 import logging
22 import requests
22 import requests
23 import datetime
23 import datetime
24
24
25 from dogpile.util.readwrite_lock import ReadWriteMutex
25 from dogpile.util.readwrite_lock import ReadWriteMutex
26
26
27 import rhodecode.lib.helpers as h
27 import rhodecode.lib.helpers as h
28 from rhodecode.lib.auth import HasRepoPermissionAny
28 from rhodecode.lib.auth import HasRepoPermissionAny
29 from rhodecode.lib.ext_json import json
29 from rhodecode.lib.ext_json import json
30 from rhodecode.model.db import User
30 from rhodecode.model.db import User
31 from rhodecode.lib.str_utils import ascii_str
31 from rhodecode.lib.str_utils import ascii_str
32 from rhodecode.lib.hash_utils import sha1_safe
32 from rhodecode.lib.hash_utils import sha1_safe
33
33
34 log = logging.getLogger(__name__)
34 log = logging.getLogger(__name__)
35
35
36 LOCK = ReadWriteMutex()
36 LOCK = ReadWriteMutex()
37
37
38 USER_STATE_PUBLIC_KEYS = [
38 USER_STATE_PUBLIC_KEYS = [
39 'id', 'username', 'first_name', 'last_name',
39 'id', 'username', 'first_name', 'last_name',
40 'icon_link', 'display_name', 'display_link']
40 'icon_link', 'display_name', 'display_link']
41
41
42
42
43 class ChannelstreamException(Exception):
43 class ChannelstreamException(Exception):
44 pass
44 pass
45
45
46
46
47 class ChannelstreamConnectionException(ChannelstreamException):
47 class ChannelstreamConnectionException(ChannelstreamException):
48 pass
48 pass
49
49
50
50
51 class ChannelstreamPermissionException(ChannelstreamException):
51 class ChannelstreamPermissionException(ChannelstreamException):
52 pass
52 pass
53
53
54
54
55 def get_channelstream_server_url(config, endpoint):
55 def get_channelstream_server_url(config, endpoint):
56 return 'http://{}{}'.format(config['server'], endpoint)
56 return 'http://{}{}'.format(config['server'], endpoint)
57
57
58
58
59 def channelstream_request(config, payload, endpoint, raise_exc=True):
59 def channelstream_request(config, payload, endpoint, raise_exc=True):
60 signer = itsdangerous.TimestampSigner(config['secret'])
60 signer = itsdangerous.TimestampSigner(config['secret'])
61 sig_for_server = signer.sign(endpoint)
61 sig_for_server = signer.sign(endpoint)
62 secret_headers = {'x-channelstream-secret': sig_for_server,
62 secret_headers = {'x-channelstream-secret': sig_for_server,
63 'x-channelstream-endpoint': endpoint,
63 'x-channelstream-endpoint': endpoint,
64 'Content-Type': 'application/json'}
64 'Content-Type': 'application/json'}
65 req_url = get_channelstream_server_url(config, endpoint)
65 req_url = get_channelstream_server_url(config, endpoint)
66
66
67 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
67 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
68 response = None
68 response = None
69 try:
69 try:
70 response = requests.post(req_url, data=json.dumps(payload),
70 response = requests.post(req_url, data=json.dumps(payload),
71 headers=secret_headers).json()
71 headers=secret_headers).json()
72 except requests.ConnectionError:
72 except requests.ConnectionError:
73 log.exception('ConnectionError occurred for endpoint %s', req_url)
73 log.exception('ConnectionError occurred for endpoint %s', req_url)
74 if raise_exc:
74 if raise_exc:
75 raise ChannelstreamConnectionException(req_url)
75 raise ChannelstreamConnectionException(req_url)
76 except Exception:
76 except Exception:
77 log.exception('Exception related to Channelstream happened')
77 log.exception('Exception related to Channelstream happened')
78 if raise_exc:
78 if raise_exc:
79 raise ChannelstreamConnectionException()
79 raise ChannelstreamConnectionException()
80 log.debug('Got channelstream response: %s', response)
80 log.debug('Got channelstream response: %s', response)
81 return response
81 return response
82
82
83
83
84 def get_user_data(user_id):
84 def get_user_data(user_id):
85 user = User.get(user_id)
85 user = User.get(user_id)
86 return {
86 return {
87 'id': user.user_id,
87 'id': user.user_id,
88 'username': user.username,
88 'username': user.username,
89 'first_name': user.first_name,
89 'first_name': user.first_name,
90 'last_name': user.last_name,
90 'last_name': user.last_name,
91 'icon_link': h.gravatar_url(user.email, 60),
91 'icon_link': h.gravatar_url(user.email, 60),
92 'display_name': h.person(user, 'username_or_name_or_email'),
92 'display_name': h.person(user, 'username_or_name_or_email'),
93 'display_link': h.link_to_user(user),
93 'display_link': h.link_to_user(user),
94 'notifications': user.user_data.get('notification_status', True)
94 'notifications': user.user_data.get('notification_status', True)
95 }
95 }
96
96
97
97
98 def broadcast_validator(channel_name):
98 def broadcast_validator(channel_name):
99 """ checks if user can access the broadcast channel """
99 """ checks if user can access the broadcast channel """
100 if channel_name == 'broadcast':
100 if channel_name == 'broadcast':
101 return True
101 return True
102
102
103
103
104 def repo_validator(channel_name):
104 def repo_validator(channel_name):
105 """ checks if user can access the broadcast channel """
105 """ checks if user can access the broadcast channel """
106 channel_prefix = '/repo$'
106 channel_prefix = '/repo$'
107 if channel_name.startswith(channel_prefix):
107 if channel_name.startswith(channel_prefix):
108 elements = channel_name[len(channel_prefix):].split('$')
108 elements = channel_name[len(channel_prefix):].split('$')
109 repo_name = elements[0]
109 repo_name = elements[0]
110 can_access = HasRepoPermissionAny(
110 can_access = HasRepoPermissionAny(
111 'repository.read',
111 'repository.read',
112 'repository.write',
112 'repository.write',
113 'repository.admin')(repo_name)
113 'repository.admin')(repo_name)
114 log.debug(
114 log.debug(
115 'permission check for %s channel resulted in %s',
115 'permission check for %s channel resulted in %s',
116 repo_name, can_access)
116 repo_name, can_access)
117 if can_access:
117 if can_access:
118 return True
118 return True
119 return False
119 return False
120
120
121
121
122 def check_channel_permissions(channels, plugin_validators, should_raise=True):
122 def check_channel_permissions(channels, plugin_validators, should_raise=True):
123 valid_channels = []
123 valid_channels = []
124
124
125 validators = [broadcast_validator, repo_validator]
125 validators = [broadcast_validator, repo_validator]
126 if plugin_validators:
126 if plugin_validators:
127 validators.extend(plugin_validators)
127 validators.extend(plugin_validators)
128 for channel_name in channels:
128 for channel_name in channels:
129 is_valid = False
129 is_valid = False
130 for validator in validators:
130 for validator in validators:
131 if validator(channel_name):
131 if validator(channel_name):
132 is_valid = True
132 is_valid = True
133 break
133 break
134 if is_valid:
134 if is_valid:
135 valid_channels.append(channel_name)
135 valid_channels.append(channel_name)
136 else:
136 else:
137 if should_raise:
137 if should_raise:
138 raise ChannelstreamPermissionException()
138 raise ChannelstreamPermissionException()
139 return valid_channels
139 return valid_channels
140
140
141
141
142 def get_channels_info(self, channels):
142 def get_channels_info(self, channels):
143 payload = {'channels': channels}
143 payload = {'channels': channels}
144 # gather persistence info
144 # gather persistence info
145 return channelstream_request(self._config(), payload, '/info')
145 return channelstream_request(self._config(), payload, '/info')
146
146
147
147
148 def parse_channels_info(info_result, include_channel_info=None):
148 def parse_channels_info(info_result, include_channel_info=None):
149 """
149 """
150 Returns data that contains only secure information that can be
150 Returns data that contains only secure information that can be
151 presented to clients
151 presented to clients
152 """
152 """
153 include_channel_info = include_channel_info or []
153 include_channel_info = include_channel_info or []
154
154
155 user_state_dict = {}
155 user_state_dict = {}
156 for userinfo in info_result['users']:
156 for userinfo in info_result['users']:
157 user_state_dict[userinfo['user']] = {
157 user_state_dict[userinfo['user']] = {
158 k: v for k, v in list(userinfo['state'].items())
158 k: v for k, v in list(userinfo['state'].items())
159 if k in USER_STATE_PUBLIC_KEYS
159 if k in USER_STATE_PUBLIC_KEYS
160 }
160 }
161
161
162 channels_info = {}
162 channels_info = {}
163
163
164 for c_name, c_info in list(info_result['channels'].items()):
164 for c_name, c_info in list(info_result['channels'].items()):
165 if c_name not in include_channel_info:
165 if c_name not in include_channel_info:
166 continue
166 continue
167 connected_list = []
167 connected_list = []
168 for username in c_info['users']:
168 for username in c_info['users']:
169 connected_list.append({
169 connected_list.append({
170 'user': username,
170 'user': username,
171 'state': user_state_dict[username]
171 'state': user_state_dict[username]
172 })
172 })
173 channels_info[c_name] = {'users': connected_list,
173 channels_info[c_name] = {'users': connected_list,
174 'history': c_info['history']}
174 'history': c_info['history']}
175
175
176 return channels_info
176 return channels_info
177
177
178
178
179 def log_filepath(history_location, channel_name):
179 def log_filepath(history_location, channel_name):
180
180
181 channel_hash = sha1_safe(channel_name, return_type='str')
181 channel_hash = sha1_safe(channel_name, return_type='str')
182 filename = f'{channel_hash}.log'
182 filename = f'{channel_hash}.log'
183 filepath = os.path.join(history_location, filename)
183 filepath = os.path.join(history_location, filename)
184 return filepath
184 return filepath
185
185
186
186
187 def read_history(history_location, channel_name):
187 def read_history(history_location, channel_name):
188 filepath = log_filepath(history_location, channel_name)
188 filepath = log_filepath(history_location, channel_name)
189 if not os.path.exists(filepath):
189 if not os.path.exists(filepath):
190 return []
190 return []
191 history_lines_limit = -100
191 history_lines_limit = -100
192 history = []
192 history = []
193 with open(filepath, 'rb') as f:
193 with open(filepath, 'rb') as f:
194 for line in f.readlines()[history_lines_limit:]:
194 for line in f.readlines()[history_lines_limit:]:
195 try:
195 try:
196 history.append(json.loads(line))
196 history.append(json.loads(line))
197 except Exception:
197 except Exception:
198 log.exception('Failed to load history')
198 log.exception('Failed to load history')
199 return history
199 return history
200
200
201
201
202 def update_history_from_logs(config, channels, payload):
202 def update_history_from_logs(config, channels, payload):
203 history_location = config.get('history.location')
203 history_location = config.get('history.location')
204 for channel in channels:
204 for channel in channels:
205 history = read_history(history_location, channel)
205 history = read_history(history_location, channel)
206 payload['channels_info'][channel]['history'] = history
206 payload['channels_info'][channel]['history'] = history
207
207
208
208
209 def write_history(config, message):
209 def write_history(config, message):
210 """ writes a message to a base64encoded filename """
210 """ writes a message to a base64encoded filename """
211 history_location = config.get('history.location')
211 history_location = config.get('history.location')
212 if not os.path.exists(history_location):
212 if not os.path.exists(history_location):
213 return
213 return
214 try:
214 try:
215 LOCK.acquire_write_lock()
215 LOCK.acquire_write_lock()
216 filepath = log_filepath(history_location, message['channel'])
216 filepath = log_filepath(history_location, message['channel'])
217 json_message = json.dumps(message)
217 json_message = json.dumps(message)
218 with open(filepath, 'ab') as f:
218 with open(filepath, 'ab') as f:
219 f.write(json_message)
219 f.write(json_message)
220 f.write('\n')
220 f.write(b'\n')
221 finally:
221 finally:
222 LOCK.release_write_lock()
222 LOCK.release_write_lock()
223
223
224
224
225 def get_connection_validators(registry):
225 def get_connection_validators(registry):
226 validators = []
226 validators = []
227 for k, config in list(registry.rhodecode_plugins.items()):
227 for k, config in list(registry.rhodecode_plugins.items()):
228 validator = config.get('channelstream', {}).get('connect_validator')
228 validator = config.get('channelstream', {}).get('connect_validator')
229 if validator:
229 if validator:
230 validators.append(validator)
230 validators.append(validator)
231 return validators
231 return validators
232
232
233
233
234 def get_channelstream_config(registry=None):
234 def get_channelstream_config(registry=None):
235 if not registry:
235 if not registry:
236 from pyramid.threadlocal import get_current_registry
236 from pyramid.threadlocal import get_current_registry
237 registry = get_current_registry()
237 registry = get_current_registry()
238
238
239 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
239 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
240 channelstream_config = rhodecode_plugins.get('channelstream', {})
240 channelstream_config = rhodecode_plugins.get('channelstream', {})
241 return channelstream_config
241 return channelstream_config
242
242
243
243
244 def post_message(channel, message, username, registry=None):
244 def post_message(channel, message, username, registry=None):
245 channelstream_config = get_channelstream_config(registry)
245 channelstream_config = get_channelstream_config(registry)
246 if not channelstream_config.get('enabled'):
246 if not channelstream_config.get('enabled'):
247 return
247 return
248
248
249 message_obj = message
249 message_obj = message
250 if isinstance(message, str):
250 if isinstance(message, str):
251 message_obj = {
251 message_obj = {
252 'message': message,
252 'message': message,
253 'level': 'success',
253 'level': 'success',
254 'topic': '/notifications'
254 'topic': '/notifications'
255 }
255 }
256
256
257 log.debug('Channelstream: sending notification to channel %s', channel)
257 log.debug('Channelstream: sending notification to channel %s', channel)
258 payload = {
258 payload = {
259 'type': 'message',
259 'type': 'message',
260 'timestamp': datetime.datetime.utcnow(),
260 'timestamp': datetime.datetime.utcnow(),
261 'user': 'system',
261 'user': 'system',
262 'exclude_users': [username],
262 'exclude_users': [username],
263 'channel': channel,
263 'channel': channel,
264 'message': message_obj
264 'message': message_obj
265 }
265 }
266
266
267 try:
267 try:
268 return channelstream_request(
268 return channelstream_request(
269 channelstream_config, [payload], '/message',
269 channelstream_config, [payload], '/message',
270 raise_exc=False)
270 raise_exc=False)
271 except ChannelstreamException:
271 except ChannelstreamException:
272 log.exception('Failed to send channelstream data')
272 log.exception('Failed to send channelstream data')
273 raise
273 raise
274
274
275
275
276 def _reload_link(label):
276 def _reload_link(label):
277 return (
277 return (
278 '<a onclick="window.location.reload()">'
278 '<a onclick="window.location.reload()">'
279 '<strong>{}</strong>'
279 '<strong>{}</strong>'
280 '</a>'.format(label)
280 '</a>'.format(label)
281 )
281 )
282
282
283
283
284 def pr_channel(pull_request):
284 def pr_channel(pull_request):
285 repo_name = pull_request.target_repo.repo_name
285 repo_name = pull_request.target_repo.repo_name
286 pull_request_id = pull_request.pull_request_id
286 pull_request_id = pull_request.pull_request_id
287 channel = f'/repo${repo_name}$/pr/{pull_request_id}'
287 channel = f'/repo${repo_name}$/pr/{pull_request_id}'
288 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
288 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
289 return channel
289 return channel
290
290
291
291
292 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
292 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
293 channel = None
293 channel = None
294 if commit_obj:
294 if commit_obj:
295 channel = '/repo${}$/commit/{}'.format(
295 channel = '/repo${}$/commit/{}'.format(
296 repo_name, commit_obj.raw_id
296 repo_name, commit_obj.raw_id
297 )
297 )
298 elif pull_request_obj:
298 elif pull_request_obj:
299 channel = '/repo${}$/pr/{}'.format(
299 channel = '/repo${}$/pr/{}'.format(
300 repo_name, pull_request_obj.pull_request_id
300 repo_name, pull_request_obj.pull_request_id
301 )
301 )
302 log.debug('Getting comment channelstream broadcast channel: %s', channel)
302 log.debug('Getting comment channelstream broadcast channel: %s', channel)
303
303
304 return channel
304 return channel
305
305
306
306
307 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
307 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
308 """
308 """
309 Channel push on pull request update
309 Channel push on pull request update
310 """
310 """
311 if not pr_broadcast_channel:
311 if not pr_broadcast_channel:
312 return
312 return
313
313
314 _ = request.translate
314 _ = request.translate
315
315
316 message = '{} {}'.format(
316 message = '{} {}'.format(
317 msg,
317 msg,
318 _reload_link(_(' Reload page to load changes')))
318 _reload_link(_(' Reload page to load changes')))
319
319
320 message_obj = {
320 message_obj = {
321 'message': message,
321 'message': message,
322 'level': 'success',
322 'level': 'success',
323 'topic': '/notifications'
323 'topic': '/notifications'
324 }
324 }
325
325
326 post_message(
326 post_message(
327 pr_broadcast_channel, message_obj, user.username,
327 pr_broadcast_channel, message_obj, user.username,
328 registry=request.registry)
328 registry=request.registry)
329
329
330
330
331 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
331 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
332 """
332 """
333 Channelstream push on comment action, on commit, or pull-request
333 Channelstream push on comment action, on commit, or pull-request
334 """
334 """
335 if not comment_broadcast_channel:
335 if not comment_broadcast_channel:
336 return
336 return
337
337
338 _ = request.translate
338 _ = request.translate
339
339
340 comment_data = kwargs.pop('comment_data', {})
340 comment_data = kwargs.pop('comment_data', {})
341 user_data = kwargs.pop('user_data', {})
341 user_data = kwargs.pop('user_data', {})
342 comment_id = list(comment_data.keys())[0] if comment_data else ''
342 comment_id = list(comment_data.keys())[0] if comment_data else ''
343
343
344 message = '<strong>{}</strong> {} #{}'.format(
344 message = '<strong>{}</strong> {} #{}'.format(
345 user.username,
345 user.username,
346 msg,
346 msg,
347 comment_id,
347 comment_id,
348 )
348 )
349
349
350 message_obj = {
350 message_obj = {
351 'message': message,
351 'message': message,
352 'level': 'success',
352 'level': 'success',
353 'topic': '/notifications'
353 'topic': '/notifications'
354 }
354 }
355
355
356 post_message(
356 post_message(
357 comment_broadcast_channel, message_obj, user.username,
357 comment_broadcast_channel, message_obj, user.username,
358 registry=request.registry)
358 registry=request.registry)
359
359
360 message_obj = {
360 message_obj = {
361 'message': None,
361 'message': None,
362 'user': user.username,
362 'user': user.username,
363 'comment_id': comment_id,
363 'comment_id': comment_id,
364 'comment_data': comment_data,
364 'comment_data': comment_data,
365 'user_data': user_data,
365 'user_data': user_data,
366 'topic': '/comment'
366 'topic': '/comment'
367 }
367 }
368 post_message(
368 post_message(
369 comment_broadcast_channel, message_obj, user.username,
369 comment_broadcast_channel, message_obj, user.username,
370 registry=request.registry)
370 registry=request.registry)
General Comments 0
You need to be logged in to leave comments. Login now