##// END OF EJS Templates
channelstream: use sha256 for history filename hashes
ergo -
r1163:ef1cd4f9 default
parent child Browse files
Show More
@@ -1,220 +1,220 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import hashlib
22 import itsdangerous
21 import logging
23 import logging
22 import os
24 import os
23
24 import itsdangerous
25 import requests
25 import requests
26
27 from dogpile.core import ReadWriteMutex
26 from dogpile.core import ReadWriteMutex
28
27
29 import rhodecode.lib.helpers as h
28 import rhodecode.lib.helpers as h
30
31 from rhodecode.lib.auth import HasRepoPermissionAny
29 from rhodecode.lib.auth import HasRepoPermissionAny
32 from rhodecode.lib.ext_json import json
30 from rhodecode.lib.ext_json import json
33 from rhodecode.model.db import User
31 from rhodecode.model.db import User
34
32
35 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
36
34
37 LOCK = ReadWriteMutex()
35 LOCK = ReadWriteMutex()
38
36
39 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
40 'icon_link', 'display_name', 'display_link']
38 'icon_link', 'display_name', 'display_link']
41
39
42
40
43 class ChannelstreamException(Exception):
41 class ChannelstreamException(Exception):
44 pass
42 pass
45
43
46
44
47 class ChannelstreamConnectionException(Exception):
45 class ChannelstreamConnectionException(Exception):
48 pass
46 pass
49
47
50
48
51 class ChannelstreamPermissionException(Exception):
49 class ChannelstreamPermissionException(Exception):
52 pass
50 pass
53
51
54
52
55 def channelstream_request(config, payload, endpoint, raise_exc=True):
53 def channelstream_request(config, payload, endpoint, raise_exc=True):
56 signer = itsdangerous.TimestampSigner(config['secret'])
54 signer = itsdangerous.TimestampSigner(config['secret'])
57 sig_for_server = signer.sign(endpoint)
55 sig_for_server = signer.sign(endpoint)
58 secret_headers = {'x-channelstream-secret': sig_for_server,
56 secret_headers = {'x-channelstream-secret': sig_for_server,
59 'x-channelstream-endpoint': endpoint,
57 'x-channelstream-endpoint': endpoint,
60 'Content-Type': 'application/json'}
58 'Content-Type': 'application/json'}
61 req_url = 'http://{}{}'.format(config['server'], endpoint)
59 req_url = 'http://{}{}'.format(config['server'], endpoint)
62 response = None
60 response = None
63 try:
61 try:
64 response = requests.post(req_url, data=json.dumps(payload),
62 response = requests.post(req_url, data=json.dumps(payload),
65 headers=secret_headers).json()
63 headers=secret_headers).json()
66 except requests.ConnectionError:
64 except requests.ConnectionError:
67 log.exception('ConnectionError happened')
65 log.exception('ConnectionError happened')
68 if raise_exc:
66 if raise_exc:
69 raise ChannelstreamConnectionException()
67 raise ChannelstreamConnectionException()
70 except Exception:
68 except Exception:
71 log.exception('Exception related to channelstream happened')
69 log.exception('Exception related to channelstream happened')
72 if raise_exc:
70 if raise_exc:
73 raise ChannelstreamConnectionException()
71 raise ChannelstreamConnectionException()
74 return response
72 return response
75
73
76
74
77 def get_user_data(user_id):
75 def get_user_data(user_id):
78 user = User.get(user_id)
76 user = User.get(user_id)
79 return {
77 return {
80 'id': user.user_id,
78 'id': user.user_id,
81 'username': user.username,
79 'username': user.username,
82 'first_name': user.name,
80 'first_name': user.name,
83 'last_name': user.lastname,
81 'last_name': user.lastname,
84 'icon_link': h.gravatar_url(user.email, 60),
82 'icon_link': h.gravatar_url(user.email, 60),
85 'display_name': h.person(user, 'username_or_name_or_email'),
83 'display_name': h.person(user, 'username_or_name_or_email'),
86 'display_link': h.link_to_user(user),
84 'display_link': h.link_to_user(user),
87 'notifications': user.user_data.get('notification_status', True)
85 'notifications': user.user_data.get('notification_status', True)
88 }
86 }
89
87
90
88
91 def broadcast_validator(channel_name):
89 def broadcast_validator(channel_name):
92 """ checks if user can access the broadcast channel """
90 """ checks if user can access the broadcast channel """
93 if channel_name == 'broadcast':
91 if channel_name == 'broadcast':
94 return True
92 return True
95
93
96
94
97 def repo_validator(channel_name):
95 def repo_validator(channel_name):
98 """ checks if user can access the broadcast channel """
96 """ checks if user can access the broadcast channel """
99 channel_prefix = '/repo$'
97 channel_prefix = '/repo$'
100 if channel_name.startswith(channel_prefix):
98 if channel_name.startswith(channel_prefix):
101 elements = channel_name[len(channel_prefix):].split('$')
99 elements = channel_name[len(channel_prefix):].split('$')
102 repo_name = elements[0]
100 repo_name = elements[0]
103 can_access = HasRepoPermissionAny(
101 can_access = HasRepoPermissionAny(
104 'repository.read',
102 'repository.read',
105 'repository.write',
103 'repository.write',
106 'repository.admin')(repo_name)
104 'repository.admin')(repo_name)
107 log.debug('permission check for {} channel '
105 log.debug('permission check for {} channel '
108 'resulted in {}'.format(repo_name, can_access))
106 'resulted in {}'.format(repo_name, can_access))
109 if can_access:
107 if can_access:
110 return True
108 return True
111 return False
109 return False
112
110
113
111
114 def check_channel_permissions(channels, plugin_validators, should_raise=True):
112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
115 valid_channels = []
113 valid_channels = []
116
114
117 validators = [broadcast_validator, repo_validator]
115 validators = [broadcast_validator, repo_validator]
118 if plugin_validators:
116 if plugin_validators:
119 validators.extend(plugin_validators)
117 validators.extend(plugin_validators)
120 for channel_name in channels:
118 for channel_name in channels:
121 is_valid = False
119 is_valid = False
122 for validator in validators:
120 for validator in validators:
123 if validator(channel_name):
121 if validator(channel_name):
124 is_valid = True
122 is_valid = True
125 break
123 break
126 if is_valid:
124 if is_valid:
127 valid_channels.append(channel_name)
125 valid_channels.append(channel_name)
128 else:
126 else:
129 if should_raise:
127 if should_raise:
130 raise ChannelstreamPermissionException()
128 raise ChannelstreamPermissionException()
131 return valid_channels
129 return valid_channels
132
130
133
131
134 def get_channels_info(self, channels):
132 def get_channels_info(self, channels):
135 payload = {'channels': channels}
133 payload = {'channels': channels}
136 # gather persistence info
134 # gather persistence info
137 return channelstream_request(self._config(), payload, '/info')
135 return channelstream_request(self._config(), payload, '/info')
138
136
139
137
140 def parse_channels_info(info_result, include_channel_info=None):
138 def parse_channels_info(info_result, include_channel_info=None):
141 """
139 """
142 Returns data that contains only secure information that can be
140 Returns data that contains only secure information that can be
143 presented to clients
141 presented to clients
144 """
142 """
145 include_channel_info = include_channel_info or []
143 include_channel_info = include_channel_info or []
146
144
147 user_state_dict = {}
145 user_state_dict = {}
148 for userinfo in info_result['users']:
146 for userinfo in info_result['users']:
149 user_state_dict[userinfo['user']] = {
147 user_state_dict[userinfo['user']] = {
150 k: v for k, v in userinfo['state'].items()
148 k: v for k, v in userinfo['state'].items()
151 if k in STATE_PUBLIC_KEYS
149 if k in STATE_PUBLIC_KEYS
152 }
150 }
153
151
154 channels_info = {}
152 channels_info = {}
155
153
156 for c_name, c_info in info_result['channels'].items():
154 for c_name, c_info in info_result['channels'].items():
157 if c_name not in include_channel_info:
155 if c_name not in include_channel_info:
158 continue
156 continue
159 connected_list = []
157 connected_list = []
160 for userinfo in c_info['users']:
158 for userinfo in c_info['users']:
161 connected_list.append({
159 connected_list.append({
162 'user': userinfo['user'],
160 'user': userinfo['user'],
163 'state': user_state_dict[userinfo['user']]
161 'state': user_state_dict[userinfo['user']]
164 })
162 })
165 channels_info[c_name] = {'users': connected_list,
163 channels_info[c_name] = {'users': connected_list,
166 'history': c_info['history']}
164 'history': c_info['history']}
167
165
168 return channels_info
166 return channels_info
169
167
170
168
171 def log_filepath(history_location, channel_name):
169 def log_filepath(history_location, channel_name):
172 filename = '{}.log'.format(channel_name.encode('hex'))
170 hasher = hashlib.sha256()
171 hasher.update(channel_name.encode('utf8'))
172 filename = '{}.log'.format(hasher.hexdigest())
173 filepath = os.path.join(history_location, filename)
173 filepath = os.path.join(history_location, filename)
174 return filepath
174 return filepath
175
175
176
176
177 def read_history(history_location, channel_name):
177 def read_history(history_location, channel_name):
178 filepath = log_filepath(history_location, channel_name)
178 filepath = log_filepath(history_location, channel_name)
179 if not os.path.exists(filepath):
179 if not os.path.exists(filepath):
180 return []
180 return []
181 history_lines_limit = -100
181 history_lines_limit = -100
182 history = []
182 history = []
183 with open(filepath, 'rb') as f:
183 with open(filepath, 'rb') as f:
184 for line in f.readlines()[history_lines_limit:]:
184 for line in f.readlines()[history_lines_limit:]:
185 try:
185 try:
186 history.append(json.loads(line))
186 history.append(json.loads(line))
187 except Exception:
187 except Exception:
188 log.exception('Failed to load history')
188 log.exception('Failed to load history')
189 return history
189 return history
190
190
191
191
192 def update_history_from_logs(config, channels, payload):
192 def update_history_from_logs(config, channels, payload):
193 history_location = config.get('history.location')
193 history_location = config.get('history.location')
194 for channel in channels:
194 for channel in channels:
195 history = read_history(history_location, channel)
195 history = read_history(history_location, channel)
196 payload['channels_info'][channel]['history'] = history
196 payload['channels_info'][channel]['history'] = history
197
197
198
198
199 def write_history(config, message):
199 def write_history(config, message):
200 """ writes a messge to a base64encoded filename """
200 """ writes a messge to a base64encoded filename """
201 history_location = config.get('history.location')
201 history_location = config.get('history.location')
202 if not os.path.exists(history_location):
202 if not os.path.exists(history_location):
203 return
203 return
204 try:
204 try:
205 LOCK.acquire_write_lock()
205 LOCK.acquire_write_lock()
206 filepath = log_filepath(history_location, message['channel'])
206 filepath = log_filepath(history_location, message['channel'])
207 with open(filepath, 'ab') as f:
207 with open(filepath, 'ab') as f:
208 json.dump(message, f)
208 json.dump(message, f)
209 f.write('\n')
209 f.write('\n')
210 finally:
210 finally:
211 LOCK.release_write_lock()
211 LOCK.release_write_lock()
212
212
213
213
214 def get_connection_validators(registry):
214 def get_connection_validators(registry):
215 validators = []
215 validators = []
216 for k, config in registry.rhodecode_plugins.iteritems():
216 for k, config in registry.rhodecode_plugins.iteritems():
217 validator = config.get('channelstream', {}).get('connect_validator')
217 validator = config.get('channelstream', {}).get('connect_validator')
218 if validator:
218 if validator:
219 validators.append(validator)
219 validators.append(validator)
220 return validators
220 return validators
General Comments 0
You need to be logged in to leave comments. Login now