##// END OF EJS Templates
channelstream: python3 fixes
super-admin -
r5003:c1b8a600 default
parent child Browse files
Show More
@@ -1,371 +1,372 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import hashlib
23 import itsdangerous
22 import itsdangerous
24 import logging
23 import logging
25 import requests
24 import requests
26 import datetime
25 import datetime
27
26
28 from dogpile.util.readwrite_lock import ReadWriteMutex
27 from dogpile.util.readwrite_lock import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
28 from pyramid.threadlocal import get_current_registry
30
29
31 import rhodecode.lib.helpers as h
30 import rhodecode.lib.helpers as h
32 from rhodecode.lib.auth import HasRepoPermissionAny
31 from rhodecode.lib.auth import HasRepoPermissionAny
33 from rhodecode.lib.ext_json import json
32 from rhodecode.lib.ext_json import json
34 from rhodecode.model.db import User
33 from rhodecode.model.db import User
34 from rhodecode.lib.str_utils import ascii_str
35 from rhodecode.lib.hash_utils import sha1_safe
35
36
36 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
37
38
38 LOCK = ReadWriteMutex()
39 LOCK = ReadWriteMutex()
39
40
40 USER_STATE_PUBLIC_KEYS = [
41 USER_STATE_PUBLIC_KEYS = [
41 'id', 'username', 'first_name', 'last_name',
42 'id', 'username', 'first_name', 'last_name',
42 'icon_link', 'display_name', 'display_link']
43 'icon_link', 'display_name', 'display_link']
43
44
44
45
45 class ChannelstreamException(Exception):
46 class ChannelstreamException(Exception):
46 pass
47 pass
47
48
48
49
49 class ChannelstreamConnectionException(ChannelstreamException):
50 class ChannelstreamConnectionException(ChannelstreamException):
50 pass
51 pass
51
52
52
53
53 class ChannelstreamPermissionException(ChannelstreamException):
54 class ChannelstreamPermissionException(ChannelstreamException):
54 pass
55 pass
55
56
56
57
57 def get_channelstream_server_url(config, endpoint):
58 def get_channelstream_server_url(config, endpoint):
58 return 'http://{}{}'.format(config['server'], endpoint)
59 return 'http://{}{}'.format(config['server'], endpoint)
59
60
60
61
61 def channelstream_request(config, payload, endpoint, raise_exc=True):
62 def channelstream_request(config, payload, endpoint, raise_exc=True):
62 signer = itsdangerous.TimestampSigner(config['secret'])
63 signer = itsdangerous.TimestampSigner(config['secret'])
63 sig_for_server = signer.sign(endpoint)
64 sig_for_server = signer.sign(endpoint)
64 secret_headers = {'x-channelstream-secret': sig_for_server,
65 secret_headers = {'x-channelstream-secret': sig_for_server,
65 'x-channelstream-endpoint': endpoint,
66 'x-channelstream-endpoint': endpoint,
66 'Content-Type': 'application/json'}
67 'Content-Type': 'application/json'}
67 req_url = get_channelstream_server_url(config, endpoint)
68 req_url = get_channelstream_server_url(config, endpoint)
68
69
69 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
70 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
70 response = None
71 response = None
71 try:
72 try:
72 response = requests.post(req_url, data=json.dumps(payload),
73 response = requests.post(req_url, data=json.dumps(payload),
73 headers=secret_headers).json()
74 headers=secret_headers).json()
74 except requests.ConnectionError:
75 except requests.ConnectionError:
75 log.exception('ConnectionError occurred for endpoint %s', req_url)
76 log.exception('ConnectionError occurred for endpoint %s', req_url)
76 if raise_exc:
77 if raise_exc:
77 raise ChannelstreamConnectionException(req_url)
78 raise ChannelstreamConnectionException(req_url)
78 except Exception:
79 except Exception:
79 log.exception('Exception related to Channelstream happened')
80 log.exception('Exception related to Channelstream happened')
80 if raise_exc:
81 if raise_exc:
81 raise ChannelstreamConnectionException()
82 raise ChannelstreamConnectionException()
82 log.debug('Got channelstream response: %s', response)
83 log.debug('Got channelstream response: %s', response)
83 return response
84 return response
84
85
85
86
86 def get_user_data(user_id):
87 def get_user_data(user_id):
87 user = User.get(user_id)
88 user = User.get(user_id)
88 return {
89 return {
89 'id': user.user_id,
90 'id': user.user_id,
90 'username': user.username,
91 'username': user.username,
91 'first_name': user.first_name,
92 'first_name': user.first_name,
92 'last_name': user.last_name,
93 'last_name': user.last_name,
93 'icon_link': h.gravatar_url(user.email, 60),
94 'icon_link': h.gravatar_url(user.email, 60),
94 'display_name': h.person(user, 'username_or_name_or_email'),
95 'display_name': h.person(user, 'username_or_name_or_email'),
95 'display_link': h.link_to_user(user),
96 'display_link': h.link_to_user(user),
96 'notifications': user.user_data.get('notification_status', True)
97 'notifications': user.user_data.get('notification_status', True)
97 }
98 }
98
99
99
100
100 def broadcast_validator(channel_name):
101 def broadcast_validator(channel_name):
101 """ checks if user can access the broadcast channel """
102 """ checks if user can access the broadcast channel """
102 if channel_name == 'broadcast':
103 if channel_name == 'broadcast':
103 return True
104 return True
104
105
105
106
106 def repo_validator(channel_name):
107 def repo_validator(channel_name):
107 """ checks if user can access the broadcast channel """
108 """ checks if user can access the broadcast channel """
108 channel_prefix = '/repo$'
109 channel_prefix = '/repo$'
109 if channel_name.startswith(channel_prefix):
110 if channel_name.startswith(channel_prefix):
110 elements = channel_name[len(channel_prefix):].split('$')
111 elements = channel_name[len(channel_prefix):].split('$')
111 repo_name = elements[0]
112 repo_name = elements[0]
112 can_access = HasRepoPermissionAny(
113 can_access = HasRepoPermissionAny(
113 'repository.read',
114 'repository.read',
114 'repository.write',
115 'repository.write',
115 'repository.admin')(repo_name)
116 'repository.admin')(repo_name)
116 log.debug(
117 log.debug(
117 'permission check for %s channel resulted in %s',
118 'permission check for %s channel resulted in %s',
118 repo_name, can_access)
119 repo_name, can_access)
119 if can_access:
120 if can_access:
120 return True
121 return True
121 return False
122 return False
122
123
123
124
124 def check_channel_permissions(channels, plugin_validators, should_raise=True):
125 def check_channel_permissions(channels, plugin_validators, should_raise=True):
125 valid_channels = []
126 valid_channels = []
126
127
127 validators = [broadcast_validator, repo_validator]
128 validators = [broadcast_validator, repo_validator]
128 if plugin_validators:
129 if plugin_validators:
129 validators.extend(plugin_validators)
130 validators.extend(plugin_validators)
130 for channel_name in channels:
131 for channel_name in channels:
131 is_valid = False
132 is_valid = False
132 for validator in validators:
133 for validator in validators:
133 if validator(channel_name):
134 if validator(channel_name):
134 is_valid = True
135 is_valid = True
135 break
136 break
136 if is_valid:
137 if is_valid:
137 valid_channels.append(channel_name)
138 valid_channels.append(channel_name)
138 else:
139 else:
139 if should_raise:
140 if should_raise:
140 raise ChannelstreamPermissionException()
141 raise ChannelstreamPermissionException()
141 return valid_channels
142 return valid_channels
142
143
143
144
144 def get_channels_info(self, channels):
145 def get_channels_info(self, channels):
145 payload = {'channels': channels}
146 payload = {'channels': channels}
146 # gather persistence info
147 # gather persistence info
147 return channelstream_request(self._config(), payload, '/info')
148 return channelstream_request(self._config(), payload, '/info')
148
149
149
150
150 def parse_channels_info(info_result, include_channel_info=None):
151 def parse_channels_info(info_result, include_channel_info=None):
151 """
152 """
152 Returns data that contains only secure information that can be
153 Returns data that contains only secure information that can be
153 presented to clients
154 presented to clients
154 """
155 """
155 include_channel_info = include_channel_info or []
156 include_channel_info = include_channel_info or []
156
157
157 user_state_dict = {}
158 user_state_dict = {}
158 for userinfo in info_result['users']:
159 for userinfo in info_result['users']:
159 user_state_dict[userinfo['user']] = {
160 user_state_dict[userinfo['user']] = {
160 k: v for k, v in userinfo['state'].items()
161 k: v for k, v in list(userinfo['state'].items())
161 if k in USER_STATE_PUBLIC_KEYS
162 if k in USER_STATE_PUBLIC_KEYS
162 }
163 }
163
164
164 channels_info = {}
165 channels_info = {}
165
166
166 for c_name, c_info in info_result['channels'].items():
167 for c_name, c_info in list(info_result['channels'].items()):
167 if c_name not in include_channel_info:
168 if c_name not in include_channel_info:
168 continue
169 continue
169 connected_list = []
170 connected_list = []
170 for username in c_info['users']:
171 for username in c_info['users']:
171 connected_list.append({
172 connected_list.append({
172 'user': username,
173 'user': username,
173 'state': user_state_dict[username]
174 'state': user_state_dict[username]
174 })
175 })
175 channels_info[c_name] = {'users': connected_list,
176 channels_info[c_name] = {'users': connected_list,
176 'history': c_info['history']}
177 'history': c_info['history']}
177
178
178 return channels_info
179 return channels_info
179
180
180
181
181 def log_filepath(history_location, channel_name):
182 def log_filepath(history_location, channel_name):
182 hasher = hashlib.sha256()
183
183 hasher.update(channel_name.encode('utf8'))
184 channel_hash = ascii_str(sha1_safe(channel_name))
184 filename = '{}.log'.format(hasher.hexdigest())
185 filename = f'{channel_hash}.log'
185 filepath = os.path.join(history_location, filename)
186 filepath = os.path.join(history_location, filename)
186 return filepath
187 return filepath
187
188
188
189
189 def read_history(history_location, channel_name):
190 def read_history(history_location, channel_name):
190 filepath = log_filepath(history_location, channel_name)
191 filepath = log_filepath(history_location, channel_name)
191 if not os.path.exists(filepath):
192 if not os.path.exists(filepath):
192 return []
193 return []
193 history_lines_limit = -100
194 history_lines_limit = -100
194 history = []
195 history = []
195 with open(filepath, 'rb') as f:
196 with open(filepath, 'rb') as f:
196 for line in f.readlines()[history_lines_limit:]:
197 for line in f.readlines()[history_lines_limit:]:
197 try:
198 try:
198 history.append(json.loads(line))
199 history.append(json.loads(line))
199 except Exception:
200 except Exception:
200 log.exception('Failed to load history')
201 log.exception('Failed to load history')
201 return history
202 return history
202
203
203
204
204 def update_history_from_logs(config, channels, payload):
205 def update_history_from_logs(config, channels, payload):
205 history_location = config.get('history.location')
206 history_location = config.get('history.location')
206 for channel in channels:
207 for channel in channels:
207 history = read_history(history_location, channel)
208 history = read_history(history_location, channel)
208 payload['channels_info'][channel]['history'] = history
209 payload['channels_info'][channel]['history'] = history
209
210
210
211
211 def write_history(config, message):
212 def write_history(config, message):
212 """ writes a message to a base64encoded filename """
213 """ writes a message to a base64encoded filename """
213 history_location = config.get('history.location')
214 history_location = config.get('history.location')
214 if not os.path.exists(history_location):
215 if not os.path.exists(history_location):
215 return
216 return
216 try:
217 try:
217 LOCK.acquire_write_lock()
218 LOCK.acquire_write_lock()
218 filepath = log_filepath(history_location, message['channel'])
219 filepath = log_filepath(history_location, message['channel'])
219 json_message = json.dumps(message)
220 json_message = json.dumps(message)
220 with open(filepath, 'ab') as f:
221 with open(filepath, 'ab') as f:
221 f.write(json_message)
222 f.write(json_message)
222 f.write('\n')
223 f.write('\n')
223 finally:
224 finally:
224 LOCK.release_write_lock()
225 LOCK.release_write_lock()
225
226
226
227
227 def get_connection_validators(registry):
228 def get_connection_validators(registry):
228 validators = []
229 validators = []
229 for k, config in registry.rhodecode_plugins.items():
230 for k, config in list(registry.rhodecode_plugins.items()):
230 validator = config.get('channelstream', {}).get('connect_validator')
231 validator = config.get('channelstream', {}).get('connect_validator')
231 if validator:
232 if validator:
232 validators.append(validator)
233 validators.append(validator)
233 return validators
234 return validators
234
235
235
236
236 def get_channelstream_config(registry=None):
237 def get_channelstream_config(registry=None):
237 if not registry:
238 if not registry:
238 registry = get_current_registry()
239 registry = get_current_registry()
239
240
240 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
241 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
241 channelstream_config = rhodecode_plugins.get('channelstream', {})
242 channelstream_config = rhodecode_plugins.get('channelstream', {})
242 return channelstream_config
243 return channelstream_config
243
244
244
245
245 def post_message(channel, message, username, registry=None):
246 def post_message(channel, message, username, registry=None):
246 channelstream_config = get_channelstream_config(registry)
247 channelstream_config = get_channelstream_config(registry)
247 if not channelstream_config.get('enabled'):
248 if not channelstream_config.get('enabled'):
248 return
249 return
249
250
250 message_obj = message
251 message_obj = message
251 if isinstance(message, str):
252 if isinstance(message, str):
252 message_obj = {
253 message_obj = {
253 'message': message,
254 'message': message,
254 'level': 'success',
255 'level': 'success',
255 'topic': '/notifications'
256 'topic': '/notifications'
256 }
257 }
257
258
258 log.debug('Channelstream: sending notification to channel %s', channel)
259 log.debug('Channelstream: sending notification to channel %s', channel)
259 payload = {
260 payload = {
260 'type': 'message',
261 'type': 'message',
261 'timestamp': datetime.datetime.utcnow(),
262 'timestamp': datetime.datetime.utcnow(),
262 'user': 'system',
263 'user': 'system',
263 'exclude_users': [username],
264 'exclude_users': [username],
264 'channel': channel,
265 'channel': channel,
265 'message': message_obj
266 'message': message_obj
266 }
267 }
267
268
268 try:
269 try:
269 return channelstream_request(
270 return channelstream_request(
270 channelstream_config, [payload], '/message',
271 channelstream_config, [payload], '/message',
271 raise_exc=False)
272 raise_exc=False)
272 except ChannelstreamException:
273 except ChannelstreamException:
273 log.exception('Failed to send channelstream data')
274 log.exception('Failed to send channelstream data')
274 raise
275 raise
275
276
276
277
277 def _reload_link(label):
278 def _reload_link(label):
278 return (
279 return (
279 '<a onclick="window.location.reload()">'
280 '<a onclick="window.location.reload()">'
280 '<strong>{}</strong>'
281 '<strong>{}</strong>'
281 '</a>'.format(label)
282 '</a>'.format(label)
282 )
283 )
283
284
284
285
285 def pr_channel(pull_request):
286 def pr_channel(pull_request):
286 repo_name = pull_request.target_repo.repo_name
287 repo_name = pull_request.target_repo.repo_name
287 pull_request_id = pull_request.pull_request_id
288 pull_request_id = pull_request.pull_request_id
288 channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id)
289 channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id)
289 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
290 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
290 return channel
291 return channel
291
292
292
293
293 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
294 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
294 channel = None
295 channel = None
295 if commit_obj:
296 if commit_obj:
296 channel = '/repo${}$/commit/{}'.format(
297 channel = '/repo${}$/commit/{}'.format(
297 repo_name, commit_obj.raw_id
298 repo_name, commit_obj.raw_id
298 )
299 )
299 elif pull_request_obj:
300 elif pull_request_obj:
300 channel = '/repo${}$/pr/{}'.format(
301 channel = '/repo${}$/pr/{}'.format(
301 repo_name, pull_request_obj.pull_request_id
302 repo_name, pull_request_obj.pull_request_id
302 )
303 )
303 log.debug('Getting comment channelstream broadcast channel: %s', channel)
304 log.debug('Getting comment channelstream broadcast channel: %s', channel)
304
305
305 return channel
306 return channel
306
307
307
308
308 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
309 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
309 """
310 """
310 Channel push on pull request update
311 Channel push on pull request update
311 """
312 """
312 if not pr_broadcast_channel:
313 if not pr_broadcast_channel:
313 return
314 return
314
315
315 _ = request.translate
316 _ = request.translate
316
317
317 message = '{} {}'.format(
318 message = '{} {}'.format(
318 msg,
319 msg,
319 _reload_link(_(' Reload page to load changes')))
320 _reload_link(_(' Reload page to load changes')))
320
321
321 message_obj = {
322 message_obj = {
322 'message': message,
323 'message': message,
323 'level': 'success',
324 'level': 'success',
324 'topic': '/notifications'
325 'topic': '/notifications'
325 }
326 }
326
327
327 post_message(
328 post_message(
328 pr_broadcast_channel, message_obj, user.username,
329 pr_broadcast_channel, message_obj, user.username,
329 registry=request.registry)
330 registry=request.registry)
330
331
331
332
332 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
333 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
333 """
334 """
334 Channelstream push on comment action, on commit, or pull-request
335 Channelstream push on comment action, on commit, or pull-request
335 """
336 """
336 if not comment_broadcast_channel:
337 if not comment_broadcast_channel:
337 return
338 return
338
339
339 _ = request.translate
340 _ = request.translate
340
341
341 comment_data = kwargs.pop('comment_data', {})
342 comment_data = kwargs.pop('comment_data', {})
342 user_data = kwargs.pop('user_data', {})
343 user_data = kwargs.pop('user_data', {})
343 comment_id = comment_data.keys()[0] if comment_data else ''
344 comment_id = list(comment_data.keys())[0] if comment_data else ''
344
345
345 message = '<strong>{}</strong> {} #{}'.format(
346 message = '<strong>{}</strong> {} #{}'.format(
346 user.username,
347 user.username,
347 msg,
348 msg,
348 comment_id,
349 comment_id,
349 )
350 )
350
351
351 message_obj = {
352 message_obj = {
352 'message': message,
353 'message': message,
353 'level': 'success',
354 'level': 'success',
354 'topic': '/notifications'
355 'topic': '/notifications'
355 }
356 }
356
357
357 post_message(
358 post_message(
358 comment_broadcast_channel, message_obj, user.username,
359 comment_broadcast_channel, message_obj, user.username,
359 registry=request.registry)
360 registry=request.registry)
360
361
361 message_obj = {
362 message_obj = {
362 'message': None,
363 'message': None,
363 'user': user.username,
364 'user': user.username,
364 'comment_id': comment_id,
365 'comment_id': comment_id,
365 'comment_data': comment_data,
366 'comment_data': comment_data,
366 'user_data': user_data,
367 'user_data': user_data,
367 'topic': '/comment'
368 'topic': '/comment'
368 }
369 }
369 post_message(
370 post_message(
370 comment_broadcast_channel, message_obj, user.username,
371 comment_broadcast_channel, message_obj, user.username,
371 registry=request.registry)
372 registry=request.registry)
General Comments 0
You need to be logged in to leave comments. Login now