##// END OF EJS Templates
channelstream: make main exception be base for all others.
marcink -
r1273:27a65416 default
parent child Browse files
Show More
@@ -1,220 +1,220 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import hashlib
21 import hashlib
22 import itsdangerous
22 import itsdangerous
23 import logging
23 import logging
24 import os
24 import os
25 import requests
25 import requests
26 from dogpile.core import ReadWriteMutex
26 from dogpile.core import ReadWriteMutex
27
27
28 import rhodecode.lib.helpers as h
28 import rhodecode.lib.helpers as h
29 from rhodecode.lib.auth import HasRepoPermissionAny
29 from rhodecode.lib.auth import HasRepoPermissionAny
30 from rhodecode.lib.ext_json import json
30 from rhodecode.lib.ext_json import json
31 from rhodecode.model.db import User
31 from rhodecode.model.db import User
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35 LOCK = ReadWriteMutex()
35 LOCK = ReadWriteMutex()
36
36
37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
38 'icon_link', 'display_name', 'display_link']
38 'icon_link', 'display_name', 'display_link']
39
39
40
40
41 class ChannelstreamException(Exception):
41 class ChannelstreamException(Exception):
42 pass
42 pass
43
43
44
44
45 class ChannelstreamConnectionException(Exception):
45 class ChannelstreamConnectionException(ChannelstreamException):
46 pass
46 pass
47
47
48
48
49 class ChannelstreamPermissionException(Exception):
49 class ChannelstreamPermissionException(ChannelstreamException):
50 pass
50 pass
51
51
52
52
53 def channelstream_request(config, payload, endpoint, raise_exc=True):
53 def channelstream_request(config, payload, endpoint, raise_exc=True):
54 signer = itsdangerous.TimestampSigner(config['secret'])
54 signer = itsdangerous.TimestampSigner(config['secret'])
55 sig_for_server = signer.sign(endpoint)
55 sig_for_server = signer.sign(endpoint)
56 secret_headers = {'x-channelstream-secret': sig_for_server,
56 secret_headers = {'x-channelstream-secret': sig_for_server,
57 'x-channelstream-endpoint': endpoint,
57 'x-channelstream-endpoint': endpoint,
58 'Content-Type': 'application/json'}
58 'Content-Type': 'application/json'}
59 req_url = 'http://{}{}'.format(config['server'], endpoint)
59 req_url = 'http://{}{}'.format(config['server'], endpoint)
60 response = None
60 response = None
61 try:
61 try:
62 response = requests.post(req_url, data=json.dumps(payload),
62 response = requests.post(req_url, data=json.dumps(payload),
63 headers=secret_headers).json()
63 headers=secret_headers).json()
64 except requests.ConnectionError:
64 except requests.ConnectionError:
65 log.exception('ConnectionError happened')
65 log.exception('ConnectionError happened')
66 if raise_exc:
66 if raise_exc:
67 raise ChannelstreamConnectionException()
67 raise ChannelstreamConnectionException()
68 except Exception:
68 except Exception:
69 log.exception('Exception related to channelstream happened')
69 log.exception('Exception related to channelstream happened')
70 if raise_exc:
70 if raise_exc:
71 raise ChannelstreamConnectionException()
71 raise ChannelstreamConnectionException()
72 return response
72 return response
73
73
74
74
75 def get_user_data(user_id):
75 def get_user_data(user_id):
76 user = User.get(user_id)
76 user = User.get(user_id)
77 return {
77 return {
78 'id': user.user_id,
78 'id': user.user_id,
79 'username': user.username,
79 'username': user.username,
80 'first_name': user.name,
80 'first_name': user.name,
81 'last_name': user.lastname,
81 'last_name': user.lastname,
82 'icon_link': h.gravatar_url(user.email, 60),
82 'icon_link': h.gravatar_url(user.email, 60),
83 'display_name': h.person(user, 'username_or_name_or_email'),
83 'display_name': h.person(user, 'username_or_name_or_email'),
84 'display_link': h.link_to_user(user),
84 'display_link': h.link_to_user(user),
85 'notifications': user.user_data.get('notification_status', True)
85 'notifications': user.user_data.get('notification_status', True)
86 }
86 }
87
87
88
88
89 def broadcast_validator(channel_name):
89 def broadcast_validator(channel_name):
90 """ checks if user can access the broadcast channel """
90 """ checks if user can access the broadcast channel """
91 if channel_name == 'broadcast':
91 if channel_name == 'broadcast':
92 return True
92 return True
93
93
94
94
95 def repo_validator(channel_name):
95 def repo_validator(channel_name):
96 """ checks if user can access the broadcast channel """
96 """ checks if user can access the broadcast channel """
97 channel_prefix = '/repo$'
97 channel_prefix = '/repo$'
98 if channel_name.startswith(channel_prefix):
98 if channel_name.startswith(channel_prefix):
99 elements = channel_name[len(channel_prefix):].split('$')
99 elements = channel_name[len(channel_prefix):].split('$')
100 repo_name = elements[0]
100 repo_name = elements[0]
101 can_access = HasRepoPermissionAny(
101 can_access = HasRepoPermissionAny(
102 'repository.read',
102 'repository.read',
103 'repository.write',
103 'repository.write',
104 'repository.admin')(repo_name)
104 'repository.admin')(repo_name)
105 log.debug('permission check for {} channel '
105 log.debug('permission check for {} channel '
106 'resulted in {}'.format(repo_name, can_access))
106 'resulted in {}'.format(repo_name, can_access))
107 if can_access:
107 if can_access:
108 return True
108 return True
109 return False
109 return False
110
110
111
111
112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
113 valid_channels = []
113 valid_channels = []
114
114
115 validators = [broadcast_validator, repo_validator]
115 validators = [broadcast_validator, repo_validator]
116 if plugin_validators:
116 if plugin_validators:
117 validators.extend(plugin_validators)
117 validators.extend(plugin_validators)
118 for channel_name in channels:
118 for channel_name in channels:
119 is_valid = False
119 is_valid = False
120 for validator in validators:
120 for validator in validators:
121 if validator(channel_name):
121 if validator(channel_name):
122 is_valid = True
122 is_valid = True
123 break
123 break
124 if is_valid:
124 if is_valid:
125 valid_channels.append(channel_name)
125 valid_channels.append(channel_name)
126 else:
126 else:
127 if should_raise:
127 if should_raise:
128 raise ChannelstreamPermissionException()
128 raise ChannelstreamPermissionException()
129 return valid_channels
129 return valid_channels
130
130
131
131
132 def get_channels_info(self, channels):
132 def get_channels_info(self, channels):
133 payload = {'channels': channels}
133 payload = {'channels': channels}
134 # gather persistence info
134 # gather persistence info
135 return channelstream_request(self._config(), payload, '/info')
135 return channelstream_request(self._config(), payload, '/info')
136
136
137
137
138 def parse_channels_info(info_result, include_channel_info=None):
138 def parse_channels_info(info_result, include_channel_info=None):
139 """
139 """
140 Returns data that contains only secure information that can be
140 Returns data that contains only secure information that can be
141 presented to clients
141 presented to clients
142 """
142 """
143 include_channel_info = include_channel_info or []
143 include_channel_info = include_channel_info or []
144
144
145 user_state_dict = {}
145 user_state_dict = {}
146 for userinfo in info_result['users']:
146 for userinfo in info_result['users']:
147 user_state_dict[userinfo['user']] = {
147 user_state_dict[userinfo['user']] = {
148 k: v for k, v in userinfo['state'].items()
148 k: v for k, v in userinfo['state'].items()
149 if k in STATE_PUBLIC_KEYS
149 if k in STATE_PUBLIC_KEYS
150 }
150 }
151
151
152 channels_info = {}
152 channels_info = {}
153
153
154 for c_name, c_info in info_result['channels'].items():
154 for c_name, c_info in info_result['channels'].items():
155 if c_name not in include_channel_info:
155 if c_name not in include_channel_info:
156 continue
156 continue
157 connected_list = []
157 connected_list = []
158 for userinfo in c_info['users']:
158 for userinfo in c_info['users']:
159 connected_list.append({
159 connected_list.append({
160 'user': userinfo['user'],
160 'user': userinfo['user'],
161 'state': user_state_dict[userinfo['user']]
161 'state': user_state_dict[userinfo['user']]
162 })
162 })
163 channels_info[c_name] = {'users': connected_list,
163 channels_info[c_name] = {'users': connected_list,
164 'history': c_info['history']}
164 'history': c_info['history']}
165
165
166 return channels_info
166 return channels_info
167
167
168
168
169 def log_filepath(history_location, channel_name):
169 def log_filepath(history_location, channel_name):
170 hasher = hashlib.sha256()
170 hasher = hashlib.sha256()
171 hasher.update(channel_name.encode('utf8'))
171 hasher.update(channel_name.encode('utf8'))
172 filename = '{}.log'.format(hasher.hexdigest())
172 filename = '{}.log'.format(hasher.hexdigest())
173 filepath = os.path.join(history_location, filename)
173 filepath = os.path.join(history_location, filename)
174 return filepath
174 return filepath
175
175
176
176
177 def read_history(history_location, channel_name):
177 def read_history(history_location, channel_name):
178 filepath = log_filepath(history_location, channel_name)
178 filepath = log_filepath(history_location, channel_name)
179 if not os.path.exists(filepath):
179 if not os.path.exists(filepath):
180 return []
180 return []
181 history_lines_limit = -100
181 history_lines_limit = -100
182 history = []
182 history = []
183 with open(filepath, 'rb') as f:
183 with open(filepath, 'rb') as f:
184 for line in f.readlines()[history_lines_limit:]:
184 for line in f.readlines()[history_lines_limit:]:
185 try:
185 try:
186 history.append(json.loads(line))
186 history.append(json.loads(line))
187 except Exception:
187 except Exception:
188 log.exception('Failed to load history')
188 log.exception('Failed to load history')
189 return history
189 return history
190
190
191
191
192 def update_history_from_logs(config, channels, payload):
192 def update_history_from_logs(config, channels, payload):
193 history_location = config.get('history.location')
193 history_location = config.get('history.location')
194 for channel in channels:
194 for channel in channels:
195 history = read_history(history_location, channel)
195 history = read_history(history_location, channel)
196 payload['channels_info'][channel]['history'] = history
196 payload['channels_info'][channel]['history'] = history
197
197
198
198
199 def write_history(config, message):
199 def write_history(config, message):
200 """ writes a messge to a base64encoded filename """
200 """ writes a messge to a base64encoded filename """
201 history_location = config.get('history.location')
201 history_location = config.get('history.location')
202 if not os.path.exists(history_location):
202 if not os.path.exists(history_location):
203 return
203 return
204 try:
204 try:
205 LOCK.acquire_write_lock()
205 LOCK.acquire_write_lock()
206 filepath = log_filepath(history_location, message['channel'])
206 filepath = log_filepath(history_location, message['channel'])
207 with open(filepath, 'ab') as f:
207 with open(filepath, 'ab') as f:
208 json.dump(message, f)
208 json.dump(message, f)
209 f.write('\n')
209 f.write('\n')
210 finally:
210 finally:
211 LOCK.release_write_lock()
211 LOCK.release_write_lock()
212
212
213
213
214 def get_connection_validators(registry):
214 def get_connection_validators(registry):
215 validators = []
215 validators = []
216 for k, config in registry.rhodecode_plugins.iteritems():
216 for k, config in registry.rhodecode_plugins.iteritems():
217 validator = config.get('channelstream', {}).get('connect_validator')
217 validator = config.get('channelstream', {}).get('connect_validator')
218 if validator:
218 if validator:
219 validators.append(validator)
219 validators.append(validator)
220 return validators
220 return validators
General Comments 0
You need to be logged in to leave comments. Login now