##// END OF EJS Templates
channelstream: fixed registered users info.
marcink -
r4480:5c39b207 default
parent child Browse files
Show More
@@ -1,267 +1,267 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import hashlib
23 23 import itsdangerous
24 24 import logging
25 25 import requests
26 26 import datetime
27 27
28 28 from dogpile.core import ReadWriteMutex
29 29 from pyramid.threadlocal import get_current_registry
30 30
31 31 import rhodecode.lib.helpers as h
32 32 from rhodecode.lib.auth import HasRepoPermissionAny
33 33 from rhodecode.lib.ext_json import json
34 34 from rhodecode.model.db import User
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 LOCK = ReadWriteMutex()
39 39
40 40 USER_STATE_PUBLIC_KEYS = [
41 41 'id', 'username', 'first_name', 'last_name',
42 42 'icon_link', 'display_name', 'display_link']
43 43
44 44
45 45 class ChannelstreamException(Exception):
46 46 pass
47 47
48 48
49 49 class ChannelstreamConnectionException(ChannelstreamException):
50 50 pass
51 51
52 52
53 53 class ChannelstreamPermissionException(ChannelstreamException):
54 54 pass
55 55
56 56
57 57 def get_channelstream_server_url(config, endpoint):
58 58 return 'http://{}{}'.format(config['server'], endpoint)
59 59
60 60
61 61 def channelstream_request(config, payload, endpoint, raise_exc=True):
62 62 signer = itsdangerous.TimestampSigner(config['secret'])
63 63 sig_for_server = signer.sign(endpoint)
64 64 secret_headers = {'x-channelstream-secret': sig_for_server,
65 65 'x-channelstream-endpoint': endpoint,
66 66 'Content-Type': 'application/json'}
67 67 req_url = get_channelstream_server_url(config, endpoint)
68 68
69 69 log.debug('Sending a channelstream request to endpoint: `%s`', req_url)
70 70 response = None
71 71 try:
72 72 response = requests.post(req_url, data=json.dumps(payload),
73 73 headers=secret_headers).json()
74 74 except requests.ConnectionError:
75 75 log.exception('ConnectionError occurred for endpoint %s', req_url)
76 76 if raise_exc:
77 77 raise ChannelstreamConnectionException(req_url)
78 78 except Exception:
79 79 log.exception('Exception related to Channelstream happened')
80 80 if raise_exc:
81 81 raise ChannelstreamConnectionException()
82 82 log.debug('Got channelstream response: %s', response)
83 83 return response
84 84
85 85
86 86 def get_user_data(user_id):
87 87 user = User.get(user_id)
88 88 return {
89 89 'id': user.user_id,
90 90 'username': user.username,
91 91 'first_name': user.first_name,
92 92 'last_name': user.last_name,
93 93 'icon_link': h.gravatar_url(user.email, 60),
94 94 'display_name': h.person(user, 'username_or_name_or_email'),
95 95 'display_link': h.link_to_user(user),
96 96 'notifications': user.user_data.get('notification_status', True)
97 97 }
98 98
99 99
100 100 def broadcast_validator(channel_name):
101 101 """ checks if user can access the broadcast channel """
102 102 if channel_name == 'broadcast':
103 103 return True
104 104
105 105
106 106 def repo_validator(channel_name):
107 107 """ checks if user can access the broadcast channel """
108 108 channel_prefix = '/repo$'
109 109 if channel_name.startswith(channel_prefix):
110 110 elements = channel_name[len(channel_prefix):].split('$')
111 111 repo_name = elements[0]
112 112 can_access = HasRepoPermissionAny(
113 113 'repository.read',
114 114 'repository.write',
115 115 'repository.admin')(repo_name)
116 116 log.debug(
117 117 'permission check for %s channel resulted in %s',
118 118 repo_name, can_access)
119 119 if can_access:
120 120 return True
121 121 return False
122 122
123 123
124 124 def check_channel_permissions(channels, plugin_validators, should_raise=True):
125 125 valid_channels = []
126 126
127 127 validators = [broadcast_validator, repo_validator]
128 128 if plugin_validators:
129 129 validators.extend(plugin_validators)
130 130 for channel_name in channels:
131 131 is_valid = False
132 132 for validator in validators:
133 133 if validator(channel_name):
134 134 is_valid = True
135 135 break
136 136 if is_valid:
137 137 valid_channels.append(channel_name)
138 138 else:
139 139 if should_raise:
140 140 raise ChannelstreamPermissionException()
141 141 return valid_channels
142 142
143 143
144 144 def get_channels_info(self, channels):
145 145 payload = {'channels': channels}
146 146 # gather persistence info
147 147 return channelstream_request(self._config(), payload, '/info')
148 148
149 149
150 150 def parse_channels_info(info_result, include_channel_info=None):
151 151 """
152 152 Returns data that contains only secure information that can be
153 153 presented to clients
154 154 """
155 155 include_channel_info = include_channel_info or []
156 156
157 157 user_state_dict = {}
158 158 for userinfo in info_result['users']:
159 159 user_state_dict[userinfo['user']] = {
160 160 k: v for k, v in userinfo['state'].items()
161 161 if k in USER_STATE_PUBLIC_KEYS
162 162 }
163 163
164 164 channels_info = {}
165 165
166 166 for c_name, c_info in info_result['channels'].items():
167 167 if c_name not in include_channel_info:
168 168 continue
169 169 connected_list = []
170 170 for username in c_info['users']:
171 171 connected_list.append({
172 172 'user': username,
173 'state': user_state_dict[userinfo['user']]
173 'state': user_state_dict[username]
174 174 })
175 175 channels_info[c_name] = {'users': connected_list,
176 176 'history': c_info['history']}
177 177
178 178 return channels_info
179 179
180 180
181 181 def log_filepath(history_location, channel_name):
182 182 hasher = hashlib.sha256()
183 183 hasher.update(channel_name.encode('utf8'))
184 184 filename = '{}.log'.format(hasher.hexdigest())
185 185 filepath = os.path.join(history_location, filename)
186 186 return filepath
187 187
188 188
189 189 def read_history(history_location, channel_name):
190 190 filepath = log_filepath(history_location, channel_name)
191 191 if not os.path.exists(filepath):
192 192 return []
193 193 history_lines_limit = -100
194 194 history = []
195 195 with open(filepath, 'rb') as f:
196 196 for line in f.readlines()[history_lines_limit:]:
197 197 try:
198 198 history.append(json.loads(line))
199 199 except Exception:
200 200 log.exception('Failed to load history')
201 201 return history
202 202
203 203
204 204 def update_history_from_logs(config, channels, payload):
205 205 history_location = config.get('history.location')
206 206 for channel in channels:
207 207 history = read_history(history_location, channel)
208 208 payload['channels_info'][channel]['history'] = history
209 209
210 210
211 211 def write_history(config, message):
212 212 """ writes a messge to a base64encoded filename """
213 213 history_location = config.get('history.location')
214 214 if not os.path.exists(history_location):
215 215 return
216 216 try:
217 217 LOCK.acquire_write_lock()
218 218 filepath = log_filepath(history_location, message['channel'])
219 219 with open(filepath, 'ab') as f:
220 220 json.dump(message, f)
221 221 f.write('\n')
222 222 finally:
223 223 LOCK.release_write_lock()
224 224
225 225
226 226 def get_connection_validators(registry):
227 227 validators = []
228 228 for k, config in registry.rhodecode_plugins.iteritems():
229 229 validator = config.get('channelstream', {}).get('connect_validator')
230 230 if validator:
231 231 validators.append(validator)
232 232 return validators
233 233
234 234
235 235 def post_message(channel, message, username, registry=None):
236 236
237 237 message_obj = message
238 238 if isinstance(message, basestring):
239 239 message_obj = {
240 240 'message': message,
241 241 'level': 'success',
242 242 'topic': '/notifications'
243 243 }
244 244
245 245 if not registry:
246 246 registry = get_current_registry()
247 247
248 248 log.debug('Channelstream: sending notification to channel %s', channel)
249 249 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
250 250 channelstream_config = rhodecode_plugins.get('channelstream', {})
251 251 if channelstream_config.get('enabled'):
252 252 payload = {
253 253 'type': 'message',
254 254 'timestamp': datetime.datetime.utcnow(),
255 255 'user': 'system',
256 256 'exclude_users': [username],
257 257 'channel': channel,
258 258 'message': message_obj
259 259 }
260 260
261 261 try:
262 262 return channelstream_request(
263 263 channelstream_config, [payload], '/message',
264 264 raise_exc=False)
265 265 except ChannelstreamException:
266 266 log.exception('Failed to send channelstream data')
267 267 raise
General Comments 0
You need to be logged in to leave comments. Login now