##// END OF EJS Templates
channelstream: use sha256 for history filename hashes
ergo -
r1163:ef1cd4f9 default
parent child Browse files
Show More
@@ -1,220 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import hashlib
22 import itsdangerous
21 23 import logging
22 24 import os
23
24 import itsdangerous
25 25 import requests
26
27 26 from dogpile.core import ReadWriteMutex
28 27
29 28 import rhodecode.lib.helpers as h
30
31 29 from rhodecode.lib.auth import HasRepoPermissionAny
32 30 from rhodecode.lib.ext_json import json
33 31 from rhodecode.model.db import User
34 32
35 33 log = logging.getLogger(__name__)
36 34
37 35 LOCK = ReadWriteMutex()
38 36
39 37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
40 38 'icon_link', 'display_name', 'display_link']
41 39
42 40
43 41 class ChannelstreamException(Exception):
44 42 pass
45 43
46 44
47 45 class ChannelstreamConnectionException(Exception):
48 46 pass
49 47
50 48
51 49 class ChannelstreamPermissionException(Exception):
52 50 pass
53 51
54 52
55 53 def channelstream_request(config, payload, endpoint, raise_exc=True):
56 54 signer = itsdangerous.TimestampSigner(config['secret'])
57 55 sig_for_server = signer.sign(endpoint)
58 56 secret_headers = {'x-channelstream-secret': sig_for_server,
59 57 'x-channelstream-endpoint': endpoint,
60 58 'Content-Type': 'application/json'}
61 59 req_url = 'http://{}{}'.format(config['server'], endpoint)
62 60 response = None
63 61 try:
64 62 response = requests.post(req_url, data=json.dumps(payload),
65 63 headers=secret_headers).json()
66 64 except requests.ConnectionError:
67 65 log.exception('ConnectionError happened')
68 66 if raise_exc:
69 67 raise ChannelstreamConnectionException()
70 68 except Exception:
71 69 log.exception('Exception related to channelstream happened')
72 70 if raise_exc:
73 71 raise ChannelstreamConnectionException()
74 72 return response
75 73
76 74
77 75 def get_user_data(user_id):
78 76 user = User.get(user_id)
79 77 return {
80 78 'id': user.user_id,
81 79 'username': user.username,
82 80 'first_name': user.name,
83 81 'last_name': user.lastname,
84 82 'icon_link': h.gravatar_url(user.email, 60),
85 83 'display_name': h.person(user, 'username_or_name_or_email'),
86 84 'display_link': h.link_to_user(user),
87 85 'notifications': user.user_data.get('notification_status', True)
88 86 }
89 87
90 88
91 89 def broadcast_validator(channel_name):
92 90 """ checks if user can access the broadcast channel """
93 91 if channel_name == 'broadcast':
94 92 return True
95 93
96 94
97 95 def repo_validator(channel_name):
98 96 """ checks if user can access the broadcast channel """
99 97 channel_prefix = '/repo$'
100 98 if channel_name.startswith(channel_prefix):
101 99 elements = channel_name[len(channel_prefix):].split('$')
102 100 repo_name = elements[0]
103 101 can_access = HasRepoPermissionAny(
104 102 'repository.read',
105 103 'repository.write',
106 104 'repository.admin')(repo_name)
107 105 log.debug('permission check for {} channel '
108 106 'resulted in {}'.format(repo_name, can_access))
109 107 if can_access:
110 108 return True
111 109 return False
112 110
113 111
114 112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
115 113 valid_channels = []
116 114
117 115 validators = [broadcast_validator, repo_validator]
118 116 if plugin_validators:
119 117 validators.extend(plugin_validators)
120 118 for channel_name in channels:
121 119 is_valid = False
122 120 for validator in validators:
123 121 if validator(channel_name):
124 122 is_valid = True
125 123 break
126 124 if is_valid:
127 125 valid_channels.append(channel_name)
128 126 else:
129 127 if should_raise:
130 128 raise ChannelstreamPermissionException()
131 129 return valid_channels
132 130
133 131
134 132 def get_channels_info(self, channels):
135 133 payload = {'channels': channels}
136 134 # gather persistence info
137 135 return channelstream_request(self._config(), payload, '/info')
138 136
139 137
140 138 def parse_channels_info(info_result, include_channel_info=None):
141 139 """
142 140 Returns data that contains only secure information that can be
143 141 presented to clients
144 142 """
145 143 include_channel_info = include_channel_info or []
146 144
147 145 user_state_dict = {}
148 146 for userinfo in info_result['users']:
149 147 user_state_dict[userinfo['user']] = {
150 148 k: v for k, v in userinfo['state'].items()
151 149 if k in STATE_PUBLIC_KEYS
152 150 }
153 151
154 152 channels_info = {}
155 153
156 154 for c_name, c_info in info_result['channels'].items():
157 155 if c_name not in include_channel_info:
158 156 continue
159 157 connected_list = []
160 158 for userinfo in c_info['users']:
161 159 connected_list.append({
162 160 'user': userinfo['user'],
163 161 'state': user_state_dict[userinfo['user']]
164 162 })
165 163 channels_info[c_name] = {'users': connected_list,
166 164 'history': c_info['history']}
167 165
168 166 return channels_info
169 167
170 168
171 169 def log_filepath(history_location, channel_name):
172 filename = '{}.log'.format(channel_name.encode('hex'))
170 hasher = hashlib.sha256()
171 hasher.update(channel_name.encode('utf8'))
172 filename = '{}.log'.format(hasher.hexdigest())
173 173 filepath = os.path.join(history_location, filename)
174 174 return filepath
175 175
176 176
177 177 def read_history(history_location, channel_name):
178 178 filepath = log_filepath(history_location, channel_name)
179 179 if not os.path.exists(filepath):
180 180 return []
181 181 history_lines_limit = -100
182 182 history = []
183 183 with open(filepath, 'rb') as f:
184 184 for line in f.readlines()[history_lines_limit:]:
185 185 try:
186 186 history.append(json.loads(line))
187 187 except Exception:
188 188 log.exception('Failed to load history')
189 189 return history
190 190
191 191
192 192 def update_history_from_logs(config, channels, payload):
193 193 history_location = config.get('history.location')
194 194 for channel in channels:
195 195 history = read_history(history_location, channel)
196 196 payload['channels_info'][channel]['history'] = history
197 197
198 198
199 199 def write_history(config, message):
200 200 """ writes a messge to a base64encoded filename """
201 201 history_location = config.get('history.location')
202 202 if not os.path.exists(history_location):
203 203 return
204 204 try:
205 205 LOCK.acquire_write_lock()
206 206 filepath = log_filepath(history_location, message['channel'])
207 207 with open(filepath, 'ab') as f:
208 208 json.dump(message, f)
209 209 f.write('\n')
210 210 finally:
211 211 LOCK.release_write_lock()
212 212
213 213
214 214 def get_connection_validators(registry):
215 215 validators = []
216 216 for k, config in registry.rhodecode_plugins.iteritems():
217 217 validator = config.get('channelstream', {}).get('connect_validator')
218 218 if validator:
219 219 validators.append(validator)
220 220 return validators
General Comments 0
You need to be logged in to leave comments. Login now