##// END OF EJS Templates
channelstream: add simple method of posting message to notification channel.
marcink -
r1969:27a92529 default
parent child Browse files
Show More
@@ -1,220 +1,253 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
21 22 import hashlib
22 23 import itsdangerous
23 24 import logging
24 import os
25 25 import requests
26 import datetime
27
26 28 from dogpile.core import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
27 30
28 31 import rhodecode.lib.helpers as h
29 32 from rhodecode.lib.auth import HasRepoPermissionAny
30 33 from rhodecode.lib.ext_json import json
31 34 from rhodecode.model.db import User
32 35
33 36 log = logging.getLogger(__name__)
34 37
35 38 LOCK = ReadWriteMutex()
36 39
37 40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
38 41 'icon_link', 'display_name', 'display_link']
39 42
40 43
41 44 class ChannelstreamException(Exception):
42 45 pass
43 46
44 47
45 48 class ChannelstreamConnectionException(ChannelstreamException):
46 49 pass
47 50
48 51
49 52 class ChannelstreamPermissionException(ChannelstreamException):
50 53 pass
51 54
52 55
53 56 def channelstream_request(config, payload, endpoint, raise_exc=True):
54 57 signer = itsdangerous.TimestampSigner(config['secret'])
55 58 sig_for_server = signer.sign(endpoint)
56 59 secret_headers = {'x-channelstream-secret': sig_for_server,
57 60 'x-channelstream-endpoint': endpoint,
58 61 'Content-Type': 'application/json'}
59 62 req_url = 'http://{}{}'.format(config['server'], endpoint)
60 63 response = None
61 64 try:
62 65 response = requests.post(req_url, data=json.dumps(payload),
63 66 headers=secret_headers).json()
64 67 except requests.ConnectionError:
65 68 log.exception('ConnectionError happened')
66 69 if raise_exc:
67 70 raise ChannelstreamConnectionException()
68 71 except Exception:
69 72 log.exception('Exception related to channelstream happened')
70 73 if raise_exc:
71 74 raise ChannelstreamConnectionException()
72 75 return response
73 76
74 77
75 78 def get_user_data(user_id):
76 79 user = User.get(user_id)
77 80 return {
78 81 'id': user.user_id,
79 82 'username': user.username,
80 83 'first_name': user.first_name,
81 84 'last_name': user.last_name,
82 85 'icon_link': h.gravatar_url(user.email, 60),
83 86 'display_name': h.person(user, 'username_or_name_or_email'),
84 87 'display_link': h.link_to_user(user),
85 88 'notifications': user.user_data.get('notification_status', True)
86 89 }
87 90
88 91
89 92 def broadcast_validator(channel_name):
90 93 """ checks if user can access the broadcast channel """
91 94 if channel_name == 'broadcast':
92 95 return True
93 96
94 97
95 98 def repo_validator(channel_name):
96 99 """ checks if user can access the broadcast channel """
97 100 channel_prefix = '/repo$'
98 101 if channel_name.startswith(channel_prefix):
99 102 elements = channel_name[len(channel_prefix):].split('$')
100 103 repo_name = elements[0]
101 104 can_access = HasRepoPermissionAny(
102 105 'repository.read',
103 106 'repository.write',
104 107 'repository.admin')(repo_name)
105 108 log.debug('permission check for {} channel '
106 109 'resulted in {}'.format(repo_name, can_access))
107 110 if can_access:
108 111 return True
109 112 return False
110 113
111 114
112 115 def check_channel_permissions(channels, plugin_validators, should_raise=True):
113 116 valid_channels = []
114 117
115 118 validators = [broadcast_validator, repo_validator]
116 119 if plugin_validators:
117 120 validators.extend(plugin_validators)
118 121 for channel_name in channels:
119 122 is_valid = False
120 123 for validator in validators:
121 124 if validator(channel_name):
122 125 is_valid = True
123 126 break
124 127 if is_valid:
125 128 valid_channels.append(channel_name)
126 129 else:
127 130 if should_raise:
128 131 raise ChannelstreamPermissionException()
129 132 return valid_channels
130 133
131 134
132 135 def get_channels_info(self, channels):
133 136 payload = {'channels': channels}
134 137 # gather persistence info
135 138 return channelstream_request(self._config(), payload, '/info')
136 139
137 140
138 141 def parse_channels_info(info_result, include_channel_info=None):
139 142 """
140 143 Returns data that contains only secure information that can be
141 144 presented to clients
142 145 """
143 146 include_channel_info = include_channel_info or []
144 147
145 148 user_state_dict = {}
146 149 for userinfo in info_result['users']:
147 150 user_state_dict[userinfo['user']] = {
148 151 k: v for k, v in userinfo['state'].items()
149 152 if k in STATE_PUBLIC_KEYS
150 153 }
151 154
152 155 channels_info = {}
153 156
154 157 for c_name, c_info in info_result['channels'].items():
155 158 if c_name not in include_channel_info:
156 159 continue
157 160 connected_list = []
158 161 for userinfo in c_info['users']:
159 162 connected_list.append({
160 163 'user': userinfo['user'],
161 164 'state': user_state_dict[userinfo['user']]
162 165 })
163 166 channels_info[c_name] = {'users': connected_list,
164 167 'history': c_info['history']}
165 168
166 169 return channels_info
167 170
168 171
169 172 def log_filepath(history_location, channel_name):
170 173 hasher = hashlib.sha256()
171 174 hasher.update(channel_name.encode('utf8'))
172 175 filename = '{}.log'.format(hasher.hexdigest())
173 176 filepath = os.path.join(history_location, filename)
174 177 return filepath
175 178
176 179
177 180 def read_history(history_location, channel_name):
178 181 filepath = log_filepath(history_location, channel_name)
179 182 if not os.path.exists(filepath):
180 183 return []
181 184 history_lines_limit = -100
182 185 history = []
183 186 with open(filepath, 'rb') as f:
184 187 for line in f.readlines()[history_lines_limit:]:
185 188 try:
186 189 history.append(json.loads(line))
187 190 except Exception:
188 191 log.exception('Failed to load history')
189 192 return history
190 193
191 194
192 195 def update_history_from_logs(config, channels, payload):
193 196 history_location = config.get('history.location')
194 197 for channel in channels:
195 198 history = read_history(history_location, channel)
196 199 payload['channels_info'][channel]['history'] = history
197 200
198 201
199 202 def write_history(config, message):
200 203 """ writes a messge to a base64encoded filename """
201 204 history_location = config.get('history.location')
202 205 if not os.path.exists(history_location):
203 206 return
204 207 try:
205 208 LOCK.acquire_write_lock()
206 209 filepath = log_filepath(history_location, message['channel'])
207 210 with open(filepath, 'ab') as f:
208 211 json.dump(message, f)
209 212 f.write('\n')
210 213 finally:
211 214 LOCK.release_write_lock()
212 215
213 216
214 217 def get_connection_validators(registry):
215 218 validators = []
216 219 for k, config in registry.rhodecode_plugins.iteritems():
217 220 validator = config.get('channelstream', {}).get('connect_validator')
218 221 if validator:
219 222 validators.append(validator)
220 223 return validators
224
225
226 def post_message(channel, message, username, registry=None):
227
228 if not registry:
229 registry = get_current_registry()
230
231 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
232 channelstream_config = rhodecode_plugins.get('channelstream', {})
233 if channelstream_config.get('enabled'):
234 payload = {
235 'type': 'message',
236 'timestamp': datetime.datetime.utcnow(),
237 'user': 'system',
238 'exclude_users': [username],
239 'channel': channel,
240 'message': {
241 'message': message,
242 'level': 'success',
243 'topic': '/notifications'
244 }
245 }
246
247 try:
248 return channelstream_request(
249 channelstream_config, [payload], '/message',
250 raise_exc=False)
251 except ChannelstreamException:
252 log.exception('Failed to send channelstream data')
253 raise
General Comments 0
You need to be logged in to leave comments. Login now