##// END OF EJS Templates
channelstream: make main exception be base for all others.
marcink -
r1273:27a65416 default
parent child Browse files
Show More
@@ -1,220 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import hashlib
22 22 import itsdangerous
23 23 import logging
24 24 import os
25 25 import requests
26 26 from dogpile.core import ReadWriteMutex
27 27
28 28 import rhodecode.lib.helpers as h
29 29 from rhodecode.lib.auth import HasRepoPermissionAny
30 30 from rhodecode.lib.ext_json import json
31 31 from rhodecode.model.db import User
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35 LOCK = ReadWriteMutex()
36 36
37 37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
38 38 'icon_link', 'display_name', 'display_link']
39 39
40 40
41 41 class ChannelstreamException(Exception):
42 42 pass
43 43
44 44
45 class ChannelstreamConnectionException(Exception):
45 class ChannelstreamConnectionException(ChannelstreamException):
46 46 pass
47 47
48 48
49 class ChannelstreamPermissionException(Exception):
49 class ChannelstreamPermissionException(ChannelstreamException):
50 50 pass
51 51
52 52
53 53 def channelstream_request(config, payload, endpoint, raise_exc=True):
54 54 signer = itsdangerous.TimestampSigner(config['secret'])
55 55 sig_for_server = signer.sign(endpoint)
56 56 secret_headers = {'x-channelstream-secret': sig_for_server,
57 57 'x-channelstream-endpoint': endpoint,
58 58 'Content-Type': 'application/json'}
59 59 req_url = 'http://{}{}'.format(config['server'], endpoint)
60 60 response = None
61 61 try:
62 62 response = requests.post(req_url, data=json.dumps(payload),
63 63 headers=secret_headers).json()
64 64 except requests.ConnectionError:
65 65 log.exception('ConnectionError happened')
66 66 if raise_exc:
67 67 raise ChannelstreamConnectionException()
68 68 except Exception:
69 69 log.exception('Exception related to channelstream happened')
70 70 if raise_exc:
71 71 raise ChannelstreamConnectionException()
72 72 return response
73 73
74 74
75 75 def get_user_data(user_id):
76 76 user = User.get(user_id)
77 77 return {
78 78 'id': user.user_id,
79 79 'username': user.username,
80 80 'first_name': user.name,
81 81 'last_name': user.lastname,
82 82 'icon_link': h.gravatar_url(user.email, 60),
83 83 'display_name': h.person(user, 'username_or_name_or_email'),
84 84 'display_link': h.link_to_user(user),
85 85 'notifications': user.user_data.get('notification_status', True)
86 86 }
87 87
88 88
89 89 def broadcast_validator(channel_name):
90 90 """ checks if user can access the broadcast channel """
91 91 if channel_name == 'broadcast':
92 92 return True
93 93
94 94
95 95 def repo_validator(channel_name):
96 96 """ checks if user can access the broadcast channel """
97 97 channel_prefix = '/repo$'
98 98 if channel_name.startswith(channel_prefix):
99 99 elements = channel_name[len(channel_prefix):].split('$')
100 100 repo_name = elements[0]
101 101 can_access = HasRepoPermissionAny(
102 102 'repository.read',
103 103 'repository.write',
104 104 'repository.admin')(repo_name)
105 105 log.debug('permission check for {} channel '
106 106 'resulted in {}'.format(repo_name, can_access))
107 107 if can_access:
108 108 return True
109 109 return False
110 110
111 111
112 112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
113 113 valid_channels = []
114 114
115 115 validators = [broadcast_validator, repo_validator]
116 116 if plugin_validators:
117 117 validators.extend(plugin_validators)
118 118 for channel_name in channels:
119 119 is_valid = False
120 120 for validator in validators:
121 121 if validator(channel_name):
122 122 is_valid = True
123 123 break
124 124 if is_valid:
125 125 valid_channels.append(channel_name)
126 126 else:
127 127 if should_raise:
128 128 raise ChannelstreamPermissionException()
129 129 return valid_channels
130 130
131 131
132 132 def get_channels_info(self, channels):
133 133 payload = {'channels': channels}
134 134 # gather persistence info
135 135 return channelstream_request(self._config(), payload, '/info')
136 136
137 137
138 138 def parse_channels_info(info_result, include_channel_info=None):
139 139 """
140 140 Returns data that contains only secure information that can be
141 141 presented to clients
142 142 """
143 143 include_channel_info = include_channel_info or []
144 144
145 145 user_state_dict = {}
146 146 for userinfo in info_result['users']:
147 147 user_state_dict[userinfo['user']] = {
148 148 k: v for k, v in userinfo['state'].items()
149 149 if k in STATE_PUBLIC_KEYS
150 150 }
151 151
152 152 channels_info = {}
153 153
154 154 for c_name, c_info in info_result['channels'].items():
155 155 if c_name not in include_channel_info:
156 156 continue
157 157 connected_list = []
158 158 for userinfo in c_info['users']:
159 159 connected_list.append({
160 160 'user': userinfo['user'],
161 161 'state': user_state_dict[userinfo['user']]
162 162 })
163 163 channels_info[c_name] = {'users': connected_list,
164 164 'history': c_info['history']}
165 165
166 166 return channels_info
167 167
168 168
169 169 def log_filepath(history_location, channel_name):
170 170 hasher = hashlib.sha256()
171 171 hasher.update(channel_name.encode('utf8'))
172 172 filename = '{}.log'.format(hasher.hexdigest())
173 173 filepath = os.path.join(history_location, filename)
174 174 return filepath
175 175
176 176
177 177 def read_history(history_location, channel_name):
178 178 filepath = log_filepath(history_location, channel_name)
179 179 if not os.path.exists(filepath):
180 180 return []
181 181 history_lines_limit = -100
182 182 history = []
183 183 with open(filepath, 'rb') as f:
184 184 for line in f.readlines()[history_lines_limit:]:
185 185 try:
186 186 history.append(json.loads(line))
187 187 except Exception:
188 188 log.exception('Failed to load history')
189 189 return history
190 190
191 191
192 192 def update_history_from_logs(config, channels, payload):
193 193 history_location = config.get('history.location')
194 194 for channel in channels:
195 195 history = read_history(history_location, channel)
196 196 payload['channels_info'][channel]['history'] = history
197 197
198 198
199 199 def write_history(config, message):
200 200 """ writes a messge to a base64encoded filename """
201 201 history_location = config.get('history.location')
202 202 if not os.path.exists(history_location):
203 203 return
204 204 try:
205 205 LOCK.acquire_write_lock()
206 206 filepath = log_filepath(history_location, message['channel'])
207 207 with open(filepath, 'ab') as f:
208 208 json.dump(message, f)
209 209 f.write('\n')
210 210 finally:
211 211 LOCK.release_write_lock()
212 212
213 213
214 214 def get_connection_validators(registry):
215 215 validators = []
216 216 for k, config in registry.rhodecode_plugins.iteritems():
217 217 validator = config.get('channelstream', {}).get('connect_validator')
218 218 if validator:
219 219 validators.append(validator)
220 220 return validators
General Comments 0
You need to be logged in to leave comments. Login now