##// END OF EJS Templates
channelstream: log debug should be lazily evaulated.
dan -
r1973:5a20d6ac default
parent child Browse files
Show More
@@ -1,254 +1,255 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import hashlib
22 import hashlib
23 import itsdangerous
23 import itsdangerous
24 import logging
24 import logging
25 import requests
25 import requests
26 import datetime
26 import datetime
27
27
28 from dogpile.core import ReadWriteMutex
28 from dogpile.core import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
29 from pyramid.threadlocal import get_current_registry
30
30
31 import rhodecode.lib.helpers as h
31 import rhodecode.lib.helpers as h
32 from rhodecode.lib.auth import HasRepoPermissionAny
32 from rhodecode.lib.auth import HasRepoPermissionAny
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
34 from rhodecode.model.db import User
34 from rhodecode.model.db import User
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38 LOCK = ReadWriteMutex()
38 LOCK = ReadWriteMutex()
39
39
40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
41 'icon_link', 'display_name', 'display_link']
41 'icon_link', 'display_name', 'display_link']
42
42
43
43
44 class ChannelstreamException(Exception):
44 class ChannelstreamException(Exception):
45 pass
45 pass
46
46
47
47
48 class ChannelstreamConnectionException(ChannelstreamException):
48 class ChannelstreamConnectionException(ChannelstreamException):
49 pass
49 pass
50
50
51
51
52 class ChannelstreamPermissionException(ChannelstreamException):
52 class ChannelstreamPermissionException(ChannelstreamException):
53 pass
53 pass
54
54
55
55
56 def channelstream_request(config, payload, endpoint, raise_exc=True):
56 def channelstream_request(config, payload, endpoint, raise_exc=True):
57 signer = itsdangerous.TimestampSigner(config['secret'])
57 signer = itsdangerous.TimestampSigner(config['secret'])
58 sig_for_server = signer.sign(endpoint)
58 sig_for_server = signer.sign(endpoint)
59 secret_headers = {'x-channelstream-secret': sig_for_server,
59 secret_headers = {'x-channelstream-secret': sig_for_server,
60 'x-channelstream-endpoint': endpoint,
60 'x-channelstream-endpoint': endpoint,
61 'Content-Type': 'application/json'}
61 'Content-Type': 'application/json'}
62 req_url = 'http://{}{}'.format(config['server'], endpoint)
62 req_url = 'http://{}{}'.format(config['server'], endpoint)
63 response = None
63 response = None
64 try:
64 try:
65 response = requests.post(req_url, data=json.dumps(payload),
65 response = requests.post(req_url, data=json.dumps(payload),
66 headers=secret_headers).json()
66 headers=secret_headers).json()
67 except requests.ConnectionError:
67 except requests.ConnectionError:
68 log.exception('ConnectionError happened')
68 log.exception('ConnectionError happened')
69 if raise_exc:
69 if raise_exc:
70 raise ChannelstreamConnectionException()
70 raise ChannelstreamConnectionException()
71 except Exception:
71 except Exception:
72 log.exception('Exception related to channelstream happened')
72 log.exception('Exception related to channelstream happened')
73 if raise_exc:
73 if raise_exc:
74 raise ChannelstreamConnectionException()
74 raise ChannelstreamConnectionException()
75 return response
75 return response
76
76
77
77
78 def get_user_data(user_id):
78 def get_user_data(user_id):
79 user = User.get(user_id)
79 user = User.get(user_id)
80 return {
80 return {
81 'id': user.user_id,
81 'id': user.user_id,
82 'username': user.username,
82 'username': user.username,
83 'first_name': user.first_name,
83 'first_name': user.first_name,
84 'last_name': user.last_name,
84 'last_name': user.last_name,
85 'icon_link': h.gravatar_url(user.email, 60),
85 'icon_link': h.gravatar_url(user.email, 60),
86 'display_name': h.person(user, 'username_or_name_or_email'),
86 'display_name': h.person(user, 'username_or_name_or_email'),
87 'display_link': h.link_to_user(user),
87 'display_link': h.link_to_user(user),
88 'notifications': user.user_data.get('notification_status', True)
88 'notifications': user.user_data.get('notification_status', True)
89 }
89 }
90
90
91
91
92 def broadcast_validator(channel_name):
92 def broadcast_validator(channel_name):
93 """ checks if user can access the broadcast channel """
93 """ checks if user can access the broadcast channel """
94 if channel_name == 'broadcast':
94 if channel_name == 'broadcast':
95 return True
95 return True
96
96
97
97
98 def repo_validator(channel_name):
98 def repo_validator(channel_name):
99 """ checks if user can access the broadcast channel """
99 """ checks if user can access the broadcast channel """
100 channel_prefix = '/repo$'
100 channel_prefix = '/repo$'
101 if channel_name.startswith(channel_prefix):
101 if channel_name.startswith(channel_prefix):
102 elements = channel_name[len(channel_prefix):].split('$')
102 elements = channel_name[len(channel_prefix):].split('$')
103 repo_name = elements[0]
103 repo_name = elements[0]
104 can_access = HasRepoPermissionAny(
104 can_access = HasRepoPermissionAny(
105 'repository.read',
105 'repository.read',
106 'repository.write',
106 'repository.write',
107 'repository.admin')(repo_name)
107 'repository.admin')(repo_name)
108 log.debug('permission check for {} channel '
108 log.debug(
109 'resulted in {}'.format(repo_name, can_access))
109 'permission check for %s channel resulted in %s',
110 repo_name, can_access)
110 if can_access:
111 if can_access:
111 return True
112 return True
112 return False
113 return False
113
114
114
115
115 def check_channel_permissions(channels, plugin_validators, should_raise=True):
116 def check_channel_permissions(channels, plugin_validators, should_raise=True):
116 valid_channels = []
117 valid_channels = []
117
118
118 validators = [broadcast_validator, repo_validator]
119 validators = [broadcast_validator, repo_validator]
119 if plugin_validators:
120 if plugin_validators:
120 validators.extend(plugin_validators)
121 validators.extend(plugin_validators)
121 for channel_name in channels:
122 for channel_name in channels:
122 is_valid = False
123 is_valid = False
123 for validator in validators:
124 for validator in validators:
124 if validator(channel_name):
125 if validator(channel_name):
125 is_valid = True
126 is_valid = True
126 break
127 break
127 if is_valid:
128 if is_valid:
128 valid_channels.append(channel_name)
129 valid_channels.append(channel_name)
129 else:
130 else:
130 if should_raise:
131 if should_raise:
131 raise ChannelstreamPermissionException()
132 raise ChannelstreamPermissionException()
132 return valid_channels
133 return valid_channels
133
134
134
135
135 def get_channels_info(self, channels):
136 def get_channels_info(self, channels):
136 payload = {'channels': channels}
137 payload = {'channels': channels}
137 # gather persistence info
138 # gather persistence info
138 return channelstream_request(self._config(), payload, '/info')
139 return channelstream_request(self._config(), payload, '/info')
139
140
140
141
141 def parse_channels_info(info_result, include_channel_info=None):
142 def parse_channels_info(info_result, include_channel_info=None):
142 """
143 """
143 Returns data that contains only secure information that can be
144 Returns data that contains only secure information that can be
144 presented to clients
145 presented to clients
145 """
146 """
146 include_channel_info = include_channel_info or []
147 include_channel_info = include_channel_info or []
147
148
148 user_state_dict = {}
149 user_state_dict = {}
149 for userinfo in info_result['users']:
150 for userinfo in info_result['users']:
150 user_state_dict[userinfo['user']] = {
151 user_state_dict[userinfo['user']] = {
151 k: v for k, v in userinfo['state'].items()
152 k: v for k, v in userinfo['state'].items()
152 if k in STATE_PUBLIC_KEYS
153 if k in STATE_PUBLIC_KEYS
153 }
154 }
154
155
155 channels_info = {}
156 channels_info = {}
156
157
157 for c_name, c_info in info_result['channels'].items():
158 for c_name, c_info in info_result['channels'].items():
158 if c_name not in include_channel_info:
159 if c_name not in include_channel_info:
159 continue
160 continue
160 connected_list = []
161 connected_list = []
161 for userinfo in c_info['users']:
162 for userinfo in c_info['users']:
162 connected_list.append({
163 connected_list.append({
163 'user': userinfo['user'],
164 'user': userinfo['user'],
164 'state': user_state_dict[userinfo['user']]
165 'state': user_state_dict[userinfo['user']]
165 })
166 })
166 channels_info[c_name] = {'users': connected_list,
167 channels_info[c_name] = {'users': connected_list,
167 'history': c_info['history']}
168 'history': c_info['history']}
168
169
169 return channels_info
170 return channels_info
170
171
171
172
172 def log_filepath(history_location, channel_name):
173 def log_filepath(history_location, channel_name):
173 hasher = hashlib.sha256()
174 hasher = hashlib.sha256()
174 hasher.update(channel_name.encode('utf8'))
175 hasher.update(channel_name.encode('utf8'))
175 filename = '{}.log'.format(hasher.hexdigest())
176 filename = '{}.log'.format(hasher.hexdigest())
176 filepath = os.path.join(history_location, filename)
177 filepath = os.path.join(history_location, filename)
177 return filepath
178 return filepath
178
179
179
180
180 def read_history(history_location, channel_name):
181 def read_history(history_location, channel_name):
181 filepath = log_filepath(history_location, channel_name)
182 filepath = log_filepath(history_location, channel_name)
182 if not os.path.exists(filepath):
183 if not os.path.exists(filepath):
183 return []
184 return []
184 history_lines_limit = -100
185 history_lines_limit = -100
185 history = []
186 history = []
186 with open(filepath, 'rb') as f:
187 with open(filepath, 'rb') as f:
187 for line in f.readlines()[history_lines_limit:]:
188 for line in f.readlines()[history_lines_limit:]:
188 try:
189 try:
189 history.append(json.loads(line))
190 history.append(json.loads(line))
190 except Exception:
191 except Exception:
191 log.exception('Failed to load history')
192 log.exception('Failed to load history')
192 return history
193 return history
193
194
194
195
195 def update_history_from_logs(config, channels, payload):
196 def update_history_from_logs(config, channels, payload):
196 history_location = config.get('history.location')
197 history_location = config.get('history.location')
197 for channel in channels:
198 for channel in channels:
198 history = read_history(history_location, channel)
199 history = read_history(history_location, channel)
199 payload['channels_info'][channel]['history'] = history
200 payload['channels_info'][channel]['history'] = history
200
201
201
202
202 def write_history(config, message):
203 def write_history(config, message):
203 """ writes a messge to a base64encoded filename """
204 """ writes a messge to a base64encoded filename """
204 history_location = config.get('history.location')
205 history_location = config.get('history.location')
205 if not os.path.exists(history_location):
206 if not os.path.exists(history_location):
206 return
207 return
207 try:
208 try:
208 LOCK.acquire_write_lock()
209 LOCK.acquire_write_lock()
209 filepath = log_filepath(history_location, message['channel'])
210 filepath = log_filepath(history_location, message['channel'])
210 with open(filepath, 'ab') as f:
211 with open(filepath, 'ab') as f:
211 json.dump(message, f)
212 json.dump(message, f)
212 f.write('\n')
213 f.write('\n')
213 finally:
214 finally:
214 LOCK.release_write_lock()
215 LOCK.release_write_lock()
215
216
216
217
217 def get_connection_validators(registry):
218 def get_connection_validators(registry):
218 validators = []
219 validators = []
219 for k, config in registry.rhodecode_plugins.iteritems():
220 for k, config in registry.rhodecode_plugins.iteritems():
220 validator = config.get('channelstream', {}).get('connect_validator')
221 validator = config.get('channelstream', {}).get('connect_validator')
221 if validator:
222 if validator:
222 validators.append(validator)
223 validators.append(validator)
223 return validators
224 return validators
224
225
225
226
226 def post_message(channel, message, username, registry=None):
227 def post_message(channel, message, username, registry=None):
227
228
228 if not registry:
229 if not registry:
229 registry = get_current_registry()
230 registry = get_current_registry()
230
231
231 log.debug('Channelstream: sending notification to channel %s', channel)
232 log.debug('Channelstream: sending notification to channel %s', channel)
232 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
233 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
233 channelstream_config = rhodecode_plugins.get('channelstream', {})
234 channelstream_config = rhodecode_plugins.get('channelstream', {})
234 if channelstream_config.get('enabled'):
235 if channelstream_config.get('enabled'):
235 payload = {
236 payload = {
236 'type': 'message',
237 'type': 'message',
237 'timestamp': datetime.datetime.utcnow(),
238 'timestamp': datetime.datetime.utcnow(),
238 'user': 'system',
239 'user': 'system',
239 'exclude_users': [username],
240 'exclude_users': [username],
240 'channel': channel,
241 'channel': channel,
241 'message': {
242 'message': {
242 'message': message,
243 'message': message,
243 'level': 'success',
244 'level': 'success',
244 'topic': '/notifications'
245 'topic': '/notifications'
245 }
246 }
246 }
247 }
247
248
248 try:
249 try:
249 return channelstream_request(
250 return channelstream_request(
250 channelstream_config, [payload], '/message',
251 channelstream_config, [payload], '/message',
251 raise_exc=False)
252 raise_exc=False)
252 except ChannelstreamException:
253 except ChannelstreamException:
253 log.exception('Failed to send channelstream data')
254 log.exception('Failed to send channelstream data')
254 raise
255 raise
General Comments 0
You need to be logged in to leave comments. Login now