##// END OF EJS Templates
channelstream: add simple method of posting message to notification channel.
marcink -
r1969:27a92529 default
parent child Browse files
Show More
@@ -1,220 +1,253 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import hashlib
22 import hashlib
22 import itsdangerous
23 import itsdangerous
23 import logging
24 import logging
24 import os
25 import requests
25 import requests
26 import datetime
27
26 from dogpile.core import ReadWriteMutex
28 from dogpile.core import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
27
30
28 import rhodecode.lib.helpers as h
31 import rhodecode.lib.helpers as h
29 from rhodecode.lib.auth import HasRepoPermissionAny
32 from rhodecode.lib.auth import HasRepoPermissionAny
30 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
31 from rhodecode.model.db import User
34 from rhodecode.model.db import User
32
35
33 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
34
37
35 LOCK = ReadWriteMutex()
38 LOCK = ReadWriteMutex()
36
39
37 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
38 'icon_link', 'display_name', 'display_link']
41 'icon_link', 'display_name', 'display_link']
39
42
40
43
41 class ChannelstreamException(Exception):
44 class ChannelstreamException(Exception):
42 pass
45 pass
43
46
44
47
45 class ChannelstreamConnectionException(ChannelstreamException):
48 class ChannelstreamConnectionException(ChannelstreamException):
46 pass
49 pass
47
50
48
51
49 class ChannelstreamPermissionException(ChannelstreamException):
52 class ChannelstreamPermissionException(ChannelstreamException):
50 pass
53 pass
51
54
52
55
53 def channelstream_request(config, payload, endpoint, raise_exc=True):
56 def channelstream_request(config, payload, endpoint, raise_exc=True):
54 signer = itsdangerous.TimestampSigner(config['secret'])
57 signer = itsdangerous.TimestampSigner(config['secret'])
55 sig_for_server = signer.sign(endpoint)
58 sig_for_server = signer.sign(endpoint)
56 secret_headers = {'x-channelstream-secret': sig_for_server,
59 secret_headers = {'x-channelstream-secret': sig_for_server,
57 'x-channelstream-endpoint': endpoint,
60 'x-channelstream-endpoint': endpoint,
58 'Content-Type': 'application/json'}
61 'Content-Type': 'application/json'}
59 req_url = 'http://{}{}'.format(config['server'], endpoint)
62 req_url = 'http://{}{}'.format(config['server'], endpoint)
60 response = None
63 response = None
61 try:
64 try:
62 response = requests.post(req_url, data=json.dumps(payload),
65 response = requests.post(req_url, data=json.dumps(payload),
63 headers=secret_headers).json()
66 headers=secret_headers).json()
64 except requests.ConnectionError:
67 except requests.ConnectionError:
65 log.exception('ConnectionError happened')
68 log.exception('ConnectionError happened')
66 if raise_exc:
69 if raise_exc:
67 raise ChannelstreamConnectionException()
70 raise ChannelstreamConnectionException()
68 except Exception:
71 except Exception:
69 log.exception('Exception related to channelstream happened')
72 log.exception('Exception related to channelstream happened')
70 if raise_exc:
73 if raise_exc:
71 raise ChannelstreamConnectionException()
74 raise ChannelstreamConnectionException()
72 return response
75 return response
73
76
74
77
75 def get_user_data(user_id):
78 def get_user_data(user_id):
76 user = User.get(user_id)
79 user = User.get(user_id)
77 return {
80 return {
78 'id': user.user_id,
81 'id': user.user_id,
79 'username': user.username,
82 'username': user.username,
80 'first_name': user.first_name,
83 'first_name': user.first_name,
81 'last_name': user.last_name,
84 'last_name': user.last_name,
82 'icon_link': h.gravatar_url(user.email, 60),
85 'icon_link': h.gravatar_url(user.email, 60),
83 'display_name': h.person(user, 'username_or_name_or_email'),
86 'display_name': h.person(user, 'username_or_name_or_email'),
84 'display_link': h.link_to_user(user),
87 'display_link': h.link_to_user(user),
85 'notifications': user.user_data.get('notification_status', True)
88 'notifications': user.user_data.get('notification_status', True)
86 }
89 }
87
90
88
91
89 def broadcast_validator(channel_name):
92 def broadcast_validator(channel_name):
90 """ checks if user can access the broadcast channel """
93 """ checks if user can access the broadcast channel """
91 if channel_name == 'broadcast':
94 if channel_name == 'broadcast':
92 return True
95 return True
93
96
94
97
95 def repo_validator(channel_name):
98 def repo_validator(channel_name):
96 """ checks if user can access the broadcast channel """
99 """ checks if user can access the broadcast channel """
97 channel_prefix = '/repo$'
100 channel_prefix = '/repo$'
98 if channel_name.startswith(channel_prefix):
101 if channel_name.startswith(channel_prefix):
99 elements = channel_name[len(channel_prefix):].split('$')
102 elements = channel_name[len(channel_prefix):].split('$')
100 repo_name = elements[0]
103 repo_name = elements[0]
101 can_access = HasRepoPermissionAny(
104 can_access = HasRepoPermissionAny(
102 'repository.read',
105 'repository.read',
103 'repository.write',
106 'repository.write',
104 'repository.admin')(repo_name)
107 'repository.admin')(repo_name)
105 log.debug('permission check for {} channel '
108 log.debug('permission check for {} channel '
106 'resulted in {}'.format(repo_name, can_access))
109 'resulted in {}'.format(repo_name, can_access))
107 if can_access:
110 if can_access:
108 return True
111 return True
109 return False
112 return False
110
113
111
114
112 def check_channel_permissions(channels, plugin_validators, should_raise=True):
115 def check_channel_permissions(channels, plugin_validators, should_raise=True):
113 valid_channels = []
116 valid_channels = []
114
117
115 validators = [broadcast_validator, repo_validator]
118 validators = [broadcast_validator, repo_validator]
116 if plugin_validators:
119 if plugin_validators:
117 validators.extend(plugin_validators)
120 validators.extend(plugin_validators)
118 for channel_name in channels:
121 for channel_name in channels:
119 is_valid = False
122 is_valid = False
120 for validator in validators:
123 for validator in validators:
121 if validator(channel_name):
124 if validator(channel_name):
122 is_valid = True
125 is_valid = True
123 break
126 break
124 if is_valid:
127 if is_valid:
125 valid_channels.append(channel_name)
128 valid_channels.append(channel_name)
126 else:
129 else:
127 if should_raise:
130 if should_raise:
128 raise ChannelstreamPermissionException()
131 raise ChannelstreamPermissionException()
129 return valid_channels
132 return valid_channels
130
133
131
134
132 def get_channels_info(self, channels):
135 def get_channels_info(self, channels):
133 payload = {'channels': channels}
136 payload = {'channels': channels}
134 # gather persistence info
137 # gather persistence info
135 return channelstream_request(self._config(), payload, '/info')
138 return channelstream_request(self._config(), payload, '/info')
136
139
137
140
138 def parse_channels_info(info_result, include_channel_info=None):
141 def parse_channels_info(info_result, include_channel_info=None):
139 """
142 """
140 Returns data that contains only secure information that can be
143 Returns data that contains only secure information that can be
141 presented to clients
144 presented to clients
142 """
145 """
143 include_channel_info = include_channel_info or []
146 include_channel_info = include_channel_info or []
144
147
145 user_state_dict = {}
148 user_state_dict = {}
146 for userinfo in info_result['users']:
149 for userinfo in info_result['users']:
147 user_state_dict[userinfo['user']] = {
150 user_state_dict[userinfo['user']] = {
148 k: v for k, v in userinfo['state'].items()
151 k: v for k, v in userinfo['state'].items()
149 if k in STATE_PUBLIC_KEYS
152 if k in STATE_PUBLIC_KEYS
150 }
153 }
151
154
152 channels_info = {}
155 channels_info = {}
153
156
154 for c_name, c_info in info_result['channels'].items():
157 for c_name, c_info in info_result['channels'].items():
155 if c_name not in include_channel_info:
158 if c_name not in include_channel_info:
156 continue
159 continue
157 connected_list = []
160 connected_list = []
158 for userinfo in c_info['users']:
161 for userinfo in c_info['users']:
159 connected_list.append({
162 connected_list.append({
160 'user': userinfo['user'],
163 'user': userinfo['user'],
161 'state': user_state_dict[userinfo['user']]
164 'state': user_state_dict[userinfo['user']]
162 })
165 })
163 channels_info[c_name] = {'users': connected_list,
166 channels_info[c_name] = {'users': connected_list,
164 'history': c_info['history']}
167 'history': c_info['history']}
165
168
166 return channels_info
169 return channels_info
167
170
168
171
169 def log_filepath(history_location, channel_name):
172 def log_filepath(history_location, channel_name):
170 hasher = hashlib.sha256()
173 hasher = hashlib.sha256()
171 hasher.update(channel_name.encode('utf8'))
174 hasher.update(channel_name.encode('utf8'))
172 filename = '{}.log'.format(hasher.hexdigest())
175 filename = '{}.log'.format(hasher.hexdigest())
173 filepath = os.path.join(history_location, filename)
176 filepath = os.path.join(history_location, filename)
174 return filepath
177 return filepath
175
178
176
179
177 def read_history(history_location, channel_name):
180 def read_history(history_location, channel_name):
178 filepath = log_filepath(history_location, channel_name)
181 filepath = log_filepath(history_location, channel_name)
179 if not os.path.exists(filepath):
182 if not os.path.exists(filepath):
180 return []
183 return []
181 history_lines_limit = -100
184 history_lines_limit = -100
182 history = []
185 history = []
183 with open(filepath, 'rb') as f:
186 with open(filepath, 'rb') as f:
184 for line in f.readlines()[history_lines_limit:]:
187 for line in f.readlines()[history_lines_limit:]:
185 try:
188 try:
186 history.append(json.loads(line))
189 history.append(json.loads(line))
187 except Exception:
190 except Exception:
188 log.exception('Failed to load history')
191 log.exception('Failed to load history')
189 return history
192 return history
190
193
191
194
192 def update_history_from_logs(config, channels, payload):
195 def update_history_from_logs(config, channels, payload):
193 history_location = config.get('history.location')
196 history_location = config.get('history.location')
194 for channel in channels:
197 for channel in channels:
195 history = read_history(history_location, channel)
198 history = read_history(history_location, channel)
196 payload['channels_info'][channel]['history'] = history
199 payload['channels_info'][channel]['history'] = history
197
200
198
201
199 def write_history(config, message):
202 def write_history(config, message):
200 """ writes a messge to a base64encoded filename """
203 """ writes a messge to a base64encoded filename """
201 history_location = config.get('history.location')
204 history_location = config.get('history.location')
202 if not os.path.exists(history_location):
205 if not os.path.exists(history_location):
203 return
206 return
204 try:
207 try:
205 LOCK.acquire_write_lock()
208 LOCK.acquire_write_lock()
206 filepath = log_filepath(history_location, message['channel'])
209 filepath = log_filepath(history_location, message['channel'])
207 with open(filepath, 'ab') as f:
210 with open(filepath, 'ab') as f:
208 json.dump(message, f)
211 json.dump(message, f)
209 f.write('\n')
212 f.write('\n')
210 finally:
213 finally:
211 LOCK.release_write_lock()
214 LOCK.release_write_lock()
212
215
213
216
214 def get_connection_validators(registry):
217 def get_connection_validators(registry):
215 validators = []
218 validators = []
216 for k, config in registry.rhodecode_plugins.iteritems():
219 for k, config in registry.rhodecode_plugins.iteritems():
217 validator = config.get('channelstream', {}).get('connect_validator')
220 validator = config.get('channelstream', {}).get('connect_validator')
218 if validator:
221 if validator:
219 validators.append(validator)
222 validators.append(validator)
220 return validators
223 return validators
224
225
226 def post_message(channel, message, username, registry=None):
227
228 if not registry:
229 registry = get_current_registry()
230
231 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
232 channelstream_config = rhodecode_plugins.get('channelstream', {})
233 if channelstream_config.get('enabled'):
234 payload = {
235 'type': 'message',
236 'timestamp': datetime.datetime.utcnow(),
237 'user': 'system',
238 'exclude_users': [username],
239 'channel': channel,
240 'message': {
241 'message': message,
242 'level': 'success',
243 'topic': '/notifications'
244 }
245 }
246
247 try:
248 return channelstream_request(
249 channelstream_config, [payload], '/message',
250 raise_exc=False)
251 except ChannelstreamException:
252 log.exception('Failed to send channelstream data')
253 raise
General Comments 0
You need to be logged in to leave comments. Login now