##// END OF EJS Templates
channelstream: log debug should be lazily evaulated.
dan -
r1973:5a20d6ac default
parent child Browse files
Show More
@@ -1,254 +1,255 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import hashlib
23 23 import itsdangerous
24 24 import logging
25 25 import requests
26 26 import datetime
27 27
28 28 from dogpile.core import ReadWriteMutex
29 29 from pyramid.threadlocal import get_current_registry
30 30
31 31 import rhodecode.lib.helpers as h
32 32 from rhodecode.lib.auth import HasRepoPermissionAny
33 33 from rhodecode.lib.ext_json import json
34 34 from rhodecode.model.db import User
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 LOCK = ReadWriteMutex()
39 39
40 40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
41 41 'icon_link', 'display_name', 'display_link']
42 42
43 43
44 44 class ChannelstreamException(Exception):
45 45 pass
46 46
47 47
48 48 class ChannelstreamConnectionException(ChannelstreamException):
49 49 pass
50 50
51 51
52 52 class ChannelstreamPermissionException(ChannelstreamException):
53 53 pass
54 54
55 55
56 56 def channelstream_request(config, payload, endpoint, raise_exc=True):
57 57 signer = itsdangerous.TimestampSigner(config['secret'])
58 58 sig_for_server = signer.sign(endpoint)
59 59 secret_headers = {'x-channelstream-secret': sig_for_server,
60 60 'x-channelstream-endpoint': endpoint,
61 61 'Content-Type': 'application/json'}
62 62 req_url = 'http://{}{}'.format(config['server'], endpoint)
63 63 response = None
64 64 try:
65 65 response = requests.post(req_url, data=json.dumps(payload),
66 66 headers=secret_headers).json()
67 67 except requests.ConnectionError:
68 68 log.exception('ConnectionError happened')
69 69 if raise_exc:
70 70 raise ChannelstreamConnectionException()
71 71 except Exception:
72 72 log.exception('Exception related to channelstream happened')
73 73 if raise_exc:
74 74 raise ChannelstreamConnectionException()
75 75 return response
76 76
77 77
78 78 def get_user_data(user_id):
79 79 user = User.get(user_id)
80 80 return {
81 81 'id': user.user_id,
82 82 'username': user.username,
83 83 'first_name': user.first_name,
84 84 'last_name': user.last_name,
85 85 'icon_link': h.gravatar_url(user.email, 60),
86 86 'display_name': h.person(user, 'username_or_name_or_email'),
87 87 'display_link': h.link_to_user(user),
88 88 'notifications': user.user_data.get('notification_status', True)
89 89 }
90 90
91 91
92 92 def broadcast_validator(channel_name):
93 93 """ checks if user can access the broadcast channel """
94 94 if channel_name == 'broadcast':
95 95 return True
96 96
97 97
98 98 def repo_validator(channel_name):
99 99 """ checks if user can access the broadcast channel """
100 100 channel_prefix = '/repo$'
101 101 if channel_name.startswith(channel_prefix):
102 102 elements = channel_name[len(channel_prefix):].split('$')
103 103 repo_name = elements[0]
104 104 can_access = HasRepoPermissionAny(
105 105 'repository.read',
106 106 'repository.write',
107 107 'repository.admin')(repo_name)
108 log.debug('permission check for {} channel '
109 'resulted in {}'.format(repo_name, can_access))
108 log.debug(
109 'permission check for %s channel resulted in %s',
110 repo_name, can_access)
110 111 if can_access:
111 112 return True
112 113 return False
113 114
114 115
115 116 def check_channel_permissions(channels, plugin_validators, should_raise=True):
116 117 valid_channels = []
117 118
118 119 validators = [broadcast_validator, repo_validator]
119 120 if plugin_validators:
120 121 validators.extend(plugin_validators)
121 122 for channel_name in channels:
122 123 is_valid = False
123 124 for validator in validators:
124 125 if validator(channel_name):
125 126 is_valid = True
126 127 break
127 128 if is_valid:
128 129 valid_channels.append(channel_name)
129 130 else:
130 131 if should_raise:
131 132 raise ChannelstreamPermissionException()
132 133 return valid_channels
133 134
134 135
135 136 def get_channels_info(self, channels):
136 137 payload = {'channels': channels}
137 138 # gather persistence info
138 139 return channelstream_request(self._config(), payload, '/info')
139 140
140 141
141 142 def parse_channels_info(info_result, include_channel_info=None):
142 143 """
143 144 Returns data that contains only secure information that can be
144 145 presented to clients
145 146 """
146 147 include_channel_info = include_channel_info or []
147 148
148 149 user_state_dict = {}
149 150 for userinfo in info_result['users']:
150 151 user_state_dict[userinfo['user']] = {
151 152 k: v for k, v in userinfo['state'].items()
152 153 if k in STATE_PUBLIC_KEYS
153 154 }
154 155
155 156 channels_info = {}
156 157
157 158 for c_name, c_info in info_result['channels'].items():
158 159 if c_name not in include_channel_info:
159 160 continue
160 161 connected_list = []
161 162 for userinfo in c_info['users']:
162 163 connected_list.append({
163 164 'user': userinfo['user'],
164 165 'state': user_state_dict[userinfo['user']]
165 166 })
166 167 channels_info[c_name] = {'users': connected_list,
167 168 'history': c_info['history']}
168 169
169 170 return channels_info
170 171
171 172
172 173 def log_filepath(history_location, channel_name):
173 174 hasher = hashlib.sha256()
174 175 hasher.update(channel_name.encode('utf8'))
175 176 filename = '{}.log'.format(hasher.hexdigest())
176 177 filepath = os.path.join(history_location, filename)
177 178 return filepath
178 179
179 180
180 181 def read_history(history_location, channel_name):
181 182 filepath = log_filepath(history_location, channel_name)
182 183 if not os.path.exists(filepath):
183 184 return []
184 185 history_lines_limit = -100
185 186 history = []
186 187 with open(filepath, 'rb') as f:
187 188 for line in f.readlines()[history_lines_limit:]:
188 189 try:
189 190 history.append(json.loads(line))
190 191 except Exception:
191 192 log.exception('Failed to load history')
192 193 return history
193 194
194 195
195 196 def update_history_from_logs(config, channels, payload):
196 197 history_location = config.get('history.location')
197 198 for channel in channels:
198 199 history = read_history(history_location, channel)
199 200 payload['channels_info'][channel]['history'] = history
200 201
201 202
202 203 def write_history(config, message):
203 204 """ writes a messge to a base64encoded filename """
204 205 history_location = config.get('history.location')
205 206 if not os.path.exists(history_location):
206 207 return
207 208 try:
208 209 LOCK.acquire_write_lock()
209 210 filepath = log_filepath(history_location, message['channel'])
210 211 with open(filepath, 'ab') as f:
211 212 json.dump(message, f)
212 213 f.write('\n')
213 214 finally:
214 215 LOCK.release_write_lock()
215 216
216 217
217 218 def get_connection_validators(registry):
218 219 validators = []
219 220 for k, config in registry.rhodecode_plugins.iteritems():
220 221 validator = config.get('channelstream', {}).get('connect_validator')
221 222 if validator:
222 223 validators.append(validator)
223 224 return validators
224 225
225 226
226 227 def post_message(channel, message, username, registry=None):
227 228
228 229 if not registry:
229 230 registry = get_current_registry()
230 231
231 232 log.debug('Channelstream: sending notification to channel %s', channel)
232 233 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
233 234 channelstream_config = rhodecode_plugins.get('channelstream', {})
234 235 if channelstream_config.get('enabled'):
235 236 payload = {
236 237 'type': 'message',
237 238 'timestamp': datetime.datetime.utcnow(),
238 239 'user': 'system',
239 240 'exclude_users': [username],
240 241 'channel': channel,
241 242 'message': {
242 243 'message': message,
243 244 'level': 'success',
244 245 'topic': '/notifications'
245 246 }
246 247 }
247 248
248 249 try:
249 250 return channelstream_request(
250 251 channelstream_config, [payload], '/message',
251 252 raise_exc=False)
252 253 except ChannelstreamException:
253 254 log.exception('Failed to send channelstream data')
254 255 raise
General Comments 0
You need to be logged in to leave comments. Login now