##// END OF EJS Templates
channelstream: fixed registered users info.
marcink -
r4480:5c39b207 default
parent child Browse files
Show More
@@ -1,267 +1,267 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2020 RhodeCode GmbH
3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import hashlib
22 import hashlib
23 import itsdangerous
23 import itsdangerous
24 import logging
24 import logging
25 import requests
25 import requests
26 import datetime
26 import datetime
27
27
28 from dogpile.core import ReadWriteMutex
28 from dogpile.core import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
29 from pyramid.threadlocal import get_current_registry
30
30
31 import rhodecode.lib.helpers as h
31 import rhodecode.lib.helpers as h
32 from rhodecode.lib.auth import HasRepoPermissionAny
32 from rhodecode.lib.auth import HasRepoPermissionAny
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
34 from rhodecode.model.db import User
34 from rhodecode.model.db import User
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38 LOCK = ReadWriteMutex()
38 LOCK = ReadWriteMutex()
39
39
40 USER_STATE_PUBLIC_KEYS = [
40 USER_STATE_PUBLIC_KEYS = [
41 'id', 'username', 'first_name', 'last_name',
41 'id', 'username', 'first_name', 'last_name',
42 'icon_link', 'display_name', 'display_link']
42 'icon_link', 'display_name', 'display_link']
43
43
44
44
45 class ChannelstreamException(Exception):
45 class ChannelstreamException(Exception):
46 pass
46 pass
47
47
48
48
49 class ChannelstreamConnectionException(ChannelstreamException):
49 class ChannelstreamConnectionException(ChannelstreamException):
50 pass
50 pass
51
51
52
52
53 class ChannelstreamPermissionException(ChannelstreamException):
53 class ChannelstreamPermissionException(ChannelstreamException):
54 pass
54 pass
55
55
56
56
57 def get_channelstream_server_url(config, endpoint):
57 def get_channelstream_server_url(config, endpoint):
58 return 'http://{}{}'.format(config['server'], endpoint)
58 return 'http://{}{}'.format(config['server'], endpoint)
59
59
60
60
61 def channelstream_request(config, payload, endpoint, raise_exc=True):
61 def channelstream_request(config, payload, endpoint, raise_exc=True):
62 signer = itsdangerous.TimestampSigner(config['secret'])
62 signer = itsdangerous.TimestampSigner(config['secret'])
63 sig_for_server = signer.sign(endpoint)
63 sig_for_server = signer.sign(endpoint)
64 secret_headers = {'x-channelstream-secret': sig_for_server,
64 secret_headers = {'x-channelstream-secret': sig_for_server,
65 'x-channelstream-endpoint': endpoint,
65 'x-channelstream-endpoint': endpoint,
66 'Content-Type': 'application/json'}
66 'Content-Type': 'application/json'}
67 req_url = get_channelstream_server_url(config, endpoint)
67 req_url = get_channelstream_server_url(config, endpoint)
68
68
69 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
69 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
70 response = None
70 response = None
71 try:
71 try:
72 response = requests.post(req_url, data=json.dumps(payload),
72 response = requests.post(req_url, data=json.dumps(payload),
73 headers=secret_headers).json()
73 headers=secret_headers).json()
74 except requests.ConnectionError:
74 except requests.ConnectionError:
75 log.exception('ConnectionError occurred for endpoint %s', req_url)
75 log.exception('ConnectionError occurred for endpoint %s', req_url)
76 if raise_exc:
76 if raise_exc:
77 raise ChannelstreamConnectionException(req_url)
77 raise ChannelstreamConnectionException(req_url)
78 except Exception:
78 except Exception:
79 log.exception('Exception related to Channelstream happened')
79 log.exception('Exception related to Channelstream happened')
80 if raise_exc:
80 if raise_exc:
81 raise ChannelstreamConnectionException()
81 raise ChannelstreamConnectionException()
82 log.debug('Got channelstream response: %s', response)
82 log.debug('Got channelstream response: %s', response)
83 return response
83 return response
84
84
85
85
86 def get_user_data(user_id):
86 def get_user_data(user_id):
87 user = User.get(user_id)
87 user = User.get(user_id)
88 return {
88 return {
89 'id': user.user_id,
89 'id': user.user_id,
90 'username': user.username,
90 'username': user.username,
91 'first_name': user.first_name,
91 'first_name': user.first_name,
92 'last_name': user.last_name,
92 'last_name': user.last_name,
93 'icon_link': h.gravatar_url(user.email, 60),
93 'icon_link': h.gravatar_url(user.email, 60),
94 'display_name': h.person(user, 'username_or_name_or_email'),
94 'display_name': h.person(user, 'username_or_name_or_email'),
95 'display_link': h.link_to_user(user),
95 'display_link': h.link_to_user(user),
96 'notifications': user.user_data.get('notification_status', True)
96 'notifications': user.user_data.get('notification_status', True)
97 }
97 }
98
98
99
99
100 def broadcast_validator(channel_name):
100 def broadcast_validator(channel_name):
101 """ checks if user can access the broadcast channel """
101 """ checks if user can access the broadcast channel """
102 if channel_name == 'broadcast':
102 if channel_name == 'broadcast':
103 return True
103 return True
104
104
105
105
106 def repo_validator(channel_name):
106 def repo_validator(channel_name):
107 """ checks if user can access the broadcast channel """
107 """ checks if user can access the broadcast channel """
108 channel_prefix = '/repo$'
108 channel_prefix = '/repo$'
109 if channel_name.startswith(channel_prefix):
109 if channel_name.startswith(channel_prefix):
110 elements = channel_name[len(channel_prefix):].split('$')
110 elements = channel_name[len(channel_prefix):].split('$')
111 repo_name = elements[0]
111 repo_name = elements[0]
112 can_access = HasRepoPermissionAny(
112 can_access = HasRepoPermissionAny(
113 'repository.read',
113 'repository.read',
114 'repository.write',
114 'repository.write',
115 'repository.admin')(repo_name)
115 'repository.admin')(repo_name)
116 log.debug(
116 log.debug(
117 'permission check for %s channel resulted in %s',
117 'permission check for %s channel resulted in %s',
118 repo_name, can_access)
118 repo_name, can_access)
119 if can_access:
119 if can_access:
120 return True
120 return True
121 return False
121 return False
122
122
123
123
124 def check_channel_permissions(channels, plugin_validators, should_raise=True):
124 def check_channel_permissions(channels, plugin_validators, should_raise=True):
125 valid_channels = []
125 valid_channels = []
126
126
127 validators = [broadcast_validator, repo_validator]
127 validators = [broadcast_validator, repo_validator]
128 if plugin_validators:
128 if plugin_validators:
129 validators.extend(plugin_validators)
129 validators.extend(plugin_validators)
130 for channel_name in channels:
130 for channel_name in channels:
131 is_valid = False
131 is_valid = False
132 for validator in validators:
132 for validator in validators:
133 if validator(channel_name):
133 if validator(channel_name):
134 is_valid = True
134 is_valid = True
135 break
135 break
136 if is_valid:
136 if is_valid:
137 valid_channels.append(channel_name)
137 valid_channels.append(channel_name)
138 else:
138 else:
139 if should_raise:
139 if should_raise:
140 raise ChannelstreamPermissionException()
140 raise ChannelstreamPermissionException()
141 return valid_channels
141 return valid_channels
142
142
143
143
144 def get_channels_info(self, channels):
144 def get_channels_info(self, channels):
145 payload = {'channels': channels}
145 payload = {'channels': channels}
146 # gather persistence info
146 # gather persistence info
147 return channelstream_request(self._config(), payload, '/info')
147 return channelstream_request(self._config(), payload, '/info')
148
148
149
149
150 def parse_channels_info(info_result, include_channel_info=None):
150 def parse_channels_info(info_result, include_channel_info=None):
151 """
151 """
152 Returns data that contains only secure information that can be
152 Returns data that contains only secure information that can be
153 presented to clients
153 presented to clients
154 """
154 """
155 include_channel_info = include_channel_info or []
155 include_channel_info = include_channel_info or []
156
156
157 user_state_dict = {}
157 user_state_dict = {}
158 for userinfo in info_result['users']:
158 for userinfo in info_result['users']:
159 user_state_dict[userinfo['user']] = {
159 user_state_dict[userinfo['user']] = {
160 k: v for k, v in userinfo['state'].items()
160 k: v for k, v in userinfo['state'].items()
161 if k in USER_STATE_PUBLIC_KEYS
161 if k in USER_STATE_PUBLIC_KEYS
162 }
162 }
163
163
164 channels_info = {}
164 channels_info = {}
165
165
166 for c_name, c_info in info_result['channels'].items():
166 for c_name, c_info in info_result['channels'].items():
167 if c_name not in include_channel_info:
167 if c_name not in include_channel_info:
168 continue
168 continue
169 connected_list = []
169 connected_list = []
170 for username in c_info['users']:
170 for username in c_info['users']:
171 connected_list.append({
171 connected_list.append({
172 'user': username,
172 'user': username,
173 'state': user_state_dict[userinfo['user']]
173 'state': user_state_dict[username]
174 })
174 })
175 channels_info[c_name] = {'users': connected_list,
175 channels_info[c_name] = {'users': connected_list,
176 'history': c_info['history']}
176 'history': c_info['history']}
177
177
178 return channels_info
178 return channels_info
179
179
180
180
181 def log_filepath(history_location, channel_name):
181 def log_filepath(history_location, channel_name):
182 hasher = hashlib.sha256()
182 hasher = hashlib.sha256()
183 hasher.update(channel_name.encode('utf8'))
183 hasher.update(channel_name.encode('utf8'))
184 filename = '{}.log'.format(hasher.hexdigest())
184 filename = '{}.log'.format(hasher.hexdigest())
185 filepath = os.path.join(history_location, filename)
185 filepath = os.path.join(history_location, filename)
186 return filepath
186 return filepath
187
187
188
188
189 def read_history(history_location, channel_name):
189 def read_history(history_location, channel_name):
190 filepath = log_filepath(history_location, channel_name)
190 filepath = log_filepath(history_location, channel_name)
191 if not os.path.exists(filepath):
191 if not os.path.exists(filepath):
192 return []
192 return []
193 history_lines_limit = -100
193 history_lines_limit = -100
194 history = []
194 history = []
195 with open(filepath, 'rb') as f:
195 with open(filepath, 'rb') as f:
196 for line in f.readlines()[history_lines_limit:]:
196 for line in f.readlines()[history_lines_limit:]:
197 try:
197 try:
198 history.append(json.loads(line))
198 history.append(json.loads(line))
199 except Exception:
199 except Exception:
200 log.exception('Failed to load history')
200 log.exception('Failed to load history')
201 return history
201 return history
202
202
203
203
204 def update_history_from_logs(config, channels, payload):
204 def update_history_from_logs(config, channels, payload):
205 history_location = config.get('history.location')
205 history_location = config.get('history.location')
206 for channel in channels:
206 for channel in channels:
207 history = read_history(history_location, channel)
207 history = read_history(history_location, channel)
208 payload['channels_info'][channel]['history'] = history
208 payload['channels_info'][channel]['history'] = history
209
209
210
210
211 def write_history(config, message):
211 def write_history(config, message):
212 """ writes a messge to a base64encoded filename """
212 """ writes a messge to a base64encoded filename """
213 history_location = config.get('history.location')
213 history_location = config.get('history.location')
214 if not os.path.exists(history_location):
214 if not os.path.exists(history_location):
215 return
215 return
216 try:
216 try:
217 LOCK.acquire_write_lock()
217 LOCK.acquire_write_lock()
218 filepath = log_filepath(history_location, message['channel'])
218 filepath = log_filepath(history_location, message['channel'])
219 with open(filepath, 'ab') as f:
219 with open(filepath, 'ab') as f:
220 json.dump(message, f)
220 json.dump(message, f)
221 f.write('\n')
221 f.write('\n')
222 finally:
222 finally:
223 LOCK.release_write_lock()
223 LOCK.release_write_lock()
224
224
225
225
226 def get_connection_validators(registry):
226 def get_connection_validators(registry):
227 validators = []
227 validators = []
228 for k, config in registry.rhodecode_plugins.iteritems():
228 for k, config in registry.rhodecode_plugins.iteritems():
229 validator = config.get('channelstream', {}).get('connect_validator')
229 validator = config.get('channelstream', {}).get('connect_validator')
230 if validator:
230 if validator:
231 validators.append(validator)
231 validators.append(validator)
232 return validators
232 return validators
233
233
234
234
235 def post_message(channel, message, username, registry=None):
235 def post_message(channel, message, username, registry=None):
236
236
237 message_obj = message
237 message_obj = message
238 if isinstance(message, basestring):
238 if isinstance(message, basestring):
239 message_obj = {
239 message_obj = {
240 'message': message,
240 'message': message,
241 'level': 'success',
241 'level': 'success',
242 'topic': '/notifications'
242 'topic': '/notifications'
243 }
243 }
244
244
245 if not registry:
245 if not registry:
246 registry = get_current_registry()
246 registry = get_current_registry()
247
247
248 log.debug('Channelstream: sending notification to channel %s', channel)
248 log.debug('Channelstream: sending notification to channel %s', channel)
249 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
249 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
250 channelstream_config = rhodecode_plugins.get('channelstream', {})
250 channelstream_config = rhodecode_plugins.get('channelstream', {})
251 if channelstream_config.get('enabled'):
251 if channelstream_config.get('enabled'):
252 payload = {
252 payload = {
253 'type': 'message',
253 'type': 'message',
254 'timestamp': datetime.datetime.utcnow(),
254 'timestamp': datetime.datetime.utcnow(),
255 'user': 'system',
255 'user': 'system',
256 'exclude_users': [username],
256 'exclude_users': [username],
257 'channel': channel,
257 'channel': channel,
258 'message': message_obj
258 'message': message_obj
259 }
259 }
260
260
261 try:
261 try:
262 return channelstream_request(
262 return channelstream_request(
263 channelstream_config, [payload], '/message',
263 channelstream_config, [payload], '/message',
264 raise_exc=False)
264 raise_exc=False)
265 except ChannelstreamException:
265 except ChannelstreamException:
266 log.exception('Failed to send channelstream data')
266 log.exception('Failed to send channelstream data')
267 raise
267 raise
General Comments 0
You need to be logged in to leave comments. Login now