##// END OF EJS Templates
channelstream: nicer handle of connection errors for debug.
marcink -
r3169:c15445d5 default
parent child Browse files
Show More
@@ -1,169 +1,177 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import logging
21 import logging
22 import uuid
22 import uuid
23
23
24 from pyramid.view import view_config
24 from pyramid.view import view_config
25 from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPBadGateway
25 from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPBadGateway
26
26
27 from rhodecode.apps._base import BaseAppView
27 from rhodecode.apps._base import BaseAppView
28 from rhodecode.lib.channelstream import (
28 from rhodecode.lib.channelstream import (
29 channelstream_request,
29 channelstream_request, get_channelstream_server_url,
30 ChannelstreamConnectionException,
30 ChannelstreamConnectionException,
31 ChannelstreamPermissionException,
31 ChannelstreamPermissionException,
32 check_channel_permissions,
32 check_channel_permissions,
33 get_connection_validators,
33 get_connection_validators,
34 get_user_data,
34 get_user_data,
35 parse_channels_info,
35 parse_channels_info,
36 update_history_from_logs,
36 update_history_from_logs,
37 STATE_PUBLIC_KEYS)
37 STATE_PUBLIC_KEYS)
38
38
39 from rhodecode.lib.auth import NotAnonymous
39 from rhodecode.lib.auth import NotAnonymous
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class ChannelstreamView(BaseAppView):
44 class ChannelstreamView(BaseAppView):
45
45
46 def load_default_context(self):
46 def load_default_context(self):
47 c = self._get_local_tmpl_context()
47 c = self._get_local_tmpl_context()
48 self.channelstream_config = \
48 self.channelstream_config = \
49 self.request.registry.rhodecode_plugins['channelstream']
49 self.request.registry.rhodecode_plugins['channelstream']
50 if not self.channelstream_config.get('enabled'):
50 if not self.channelstream_config.get('enabled'):
51 log.error('Channelstream plugin is disabled')
51 log.error('Channelstream plugin is disabled')
52 raise HTTPBadRequest()
52 raise HTTPBadRequest()
53
53
54 return c
54 return c
55
55
56 @NotAnonymous()
56 @NotAnonymous()
57 @view_config(route_name='channelstream_connect', renderer='json_ext')
57 @view_config(route_name='channelstream_connect', renderer='json_ext')
58 def connect(self):
58 def connect(self):
59 """ handle authorization of users trying to connect """
60
59 self.load_default_context()
61 self.load_default_context()
60 """ handle authorization of users trying to connect """
61 try:
62 try:
62 json_body = self.request.json_body
63 json_body = self.request.json_body
63 except Exception:
64 except Exception:
64 log.exception('Failed to decode json from request')
65 log.exception('Failed to decode json from request')
65 raise HTTPBadRequest()
66 raise HTTPBadRequest()
66
67
67 try:
68 try:
68 channels = check_channel_permissions(
69 channels = check_channel_permissions(
69 json_body.get('channels'),
70 json_body.get('channels'),
70 get_connection_validators(self.request.registry))
71 get_connection_validators(self.request.registry))
71 except ChannelstreamPermissionException:
72 except ChannelstreamPermissionException:
72 log.error('Incorrect permissions for requested channels')
73 log.error('Incorrect permissions for requested channels')
73 raise HTTPForbidden()
74 raise HTTPForbidden()
74
75
75 user = self._rhodecode_user
76 user = self._rhodecode_user
76 if user.user_id:
77 if user.user_id:
77 user_data = get_user_data(user.user_id)
78 user_data = get_user_data(user.user_id)
78 else:
79 else:
79 user_data = {
80 user_data = {
80 'id': None,
81 'id': None,
81 'username': None,
82 'username': None,
82 'first_name': None,
83 'first_name': None,
83 'last_name': None,
84 'last_name': None,
84 'icon_link': None,
85 'icon_link': None,
85 'display_name': None,
86 'display_name': None,
86 'display_link': None,
87 'display_link': None,
87 }
88 }
88 user_data['permissions'] = self._rhodecode_user.permissions_safe
89 user_data['permissions'] = self._rhodecode_user.permissions_safe
89 payload = {
90 payload = {
90 'username': user.username,
91 'username': user.username,
91 'user_state': user_data,
92 'user_state': user_data,
92 'conn_id': str(uuid.uuid4()),
93 'conn_id': str(uuid.uuid4()),
93 'channels': channels,
94 'channels': channels,
94 'channel_configs': {},
95 'channel_configs': {},
95 'state_public_keys': STATE_PUBLIC_KEYS,
96 'state_public_keys': STATE_PUBLIC_KEYS,
96 'info': {
97 'info': {
97 'exclude_channels': ['broadcast']
98 'exclude_channels': ['broadcast']
98 }
99 }
99 }
100 }
100 filtered_channels = [channel for channel in channels
101 filtered_channels = [channel for channel in channels
101 if channel != 'broadcast']
102 if channel != 'broadcast']
102 for channel in filtered_channels:
103 for channel in filtered_channels:
103 payload['channel_configs'][channel] = {
104 payload['channel_configs'][channel] = {
104 'notify_presence': True,
105 'notify_presence': True,
105 'history_size': 100,
106 'history_size': 100,
106 'store_history': True,
107 'store_history': True,
107 'broadcast_presence_with_user_lists': True
108 'broadcast_presence_with_user_lists': True
108 }
109 }
109 # connect user to server
110 # connect user to server
111 channelstream_url = get_channelstream_server_url(
112 self.channelstream_config, '/connect')
110 try:
113 try:
111 connect_result = channelstream_request(self.channelstream_config,
114 connect_result = channelstream_request(
112 payload, '/connect')
115 self.channelstream_config, payload, '/connect')
113 except ChannelstreamConnectionException:
116 except ChannelstreamConnectionException:
114 log.exception('Channelstream service is down')
117 log.exception(
118 'Channelstream service at {} is down'.format(channelstream_url))
115 return HTTPBadGateway()
119 return HTTPBadGateway()
116
120
117 connect_result['channels'] = channels
121 connect_result['channels'] = channels
118 connect_result['channels_info'] = parse_channels_info(
122 connect_result['channels_info'] = parse_channels_info(
119 connect_result['channels_info'],
123 connect_result['channels_info'],
120 include_channel_info=filtered_channels)
124 include_channel_info=filtered_channels)
121 update_history_from_logs(self.channelstream_config,
125 update_history_from_logs(self.channelstream_config,
122 filtered_channels, connect_result)
126 filtered_channels, connect_result)
123 return connect_result
127 return connect_result
124
128
125 @NotAnonymous()
129 @NotAnonymous()
126 @view_config(route_name='channelstream_subscribe', renderer='json_ext')
130 @view_config(route_name='channelstream_subscribe', renderer='json_ext')
127 def subscribe(self):
131 def subscribe(self):
128 """ can be used to subscribe specific connection to other channels """
132 """ can be used to subscribe specific connection to other channels """
129 self.load_default_context()
133 self.load_default_context()
130 try:
134 try:
131 json_body = self.request.json_body
135 json_body = self.request.json_body
132 except Exception:
136 except Exception:
133 log.exception('Failed to decode json from request')
137 log.exception('Failed to decode json from request')
134 raise HTTPBadRequest()
138 raise HTTPBadRequest()
135 try:
139 try:
136 channels = check_channel_permissions(
140 channels = check_channel_permissions(
137 json_body.get('channels'),
141 json_body.get('channels'),
138 get_connection_validators(self.request.registry))
142 get_connection_validators(self.request.registry))
139 except ChannelstreamPermissionException:
143 except ChannelstreamPermissionException:
140 log.error('Incorrect permissions for requested channels')
144 log.error('Incorrect permissions for requested channels')
141 raise HTTPForbidden()
145 raise HTTPForbidden()
142 payload = {'conn_id': json_body.get('conn_id', ''),
146 payload = {'conn_id': json_body.get('conn_id', ''),
143 'channels': channels,
147 'channels': channels,
144 'channel_configs': {},
148 'channel_configs': {},
145 'info': {
149 'info': {
146 'exclude_channels': ['broadcast']}
150 'exclude_channels': ['broadcast']}
147 }
151 }
148 filtered_channels = [chan for chan in channels if chan != 'broadcast']
152 filtered_channels = [chan for chan in channels if chan != 'broadcast']
149 for channel in filtered_channels:
153 for channel in filtered_channels:
150 payload['channel_configs'][channel] = {
154 payload['channel_configs'][channel] = {
151 'notify_presence': True,
155 'notify_presence': True,
152 'history_size': 100,
156 'history_size': 100,
153 'store_history': True,
157 'store_history': True,
154 'broadcast_presence_with_user_lists': True
158 'broadcast_presence_with_user_lists': True
155 }
159 }
160
161 channelstream_url = get_channelstream_server_url(
162 self.channelstream_config, '/subscribe')
156 try:
163 try:
157 connect_result = channelstream_request(
164 connect_result = channelstream_request(
158 self.channelstream_config, payload, '/subscribe')
165 self.channelstream_config, payload, '/subscribe')
159 except ChannelstreamConnectionException:
166 except ChannelstreamConnectionException:
160 log.exception('Channelstream service is down')
167 log.exception(
168 'Channelstream service at {} is down'.format(channelstream_url))
161 return HTTPBadGateway()
169 return HTTPBadGateway()
162 # include_channel_info will limit history only to new channel
170 # include_channel_info will limit history only to new channel
163 # to not overwrite histories on other channels in client
171 # to not overwrite histories on other channels in client
164 connect_result['channels_info'] = parse_channels_info(
172 connect_result['channels_info'] = parse_channels_info(
165 connect_result['channels_info'],
173 connect_result['channels_info'],
166 include_channel_info=filtered_channels)
174 include_channel_info=filtered_channels)
167 update_history_from_logs(self.channelstream_config,
175 update_history_from_logs(
168 filtered_channels, connect_result)
176 self.channelstream_config, filtered_channels, connect_result)
169 return connect_result
177 return connect_result
@@ -1,255 +1,259 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import hashlib
22 import hashlib
23 import itsdangerous
23 import itsdangerous
24 import logging
24 import logging
25 import requests
25 import requests
26 import datetime
26 import datetime
27
27
28 from dogpile.core import ReadWriteMutex
28 from dogpile.core import ReadWriteMutex
29 from pyramid.threadlocal import get_current_registry
29 from pyramid.threadlocal import get_current_registry
30
30
31 import rhodecode.lib.helpers as h
31 import rhodecode.lib.helpers as h
32 from rhodecode.lib.auth import HasRepoPermissionAny
32 from rhodecode.lib.auth import HasRepoPermissionAny
33 from rhodecode.lib.ext_json import json
33 from rhodecode.lib.ext_json import json
34 from rhodecode.model.db import User
34 from rhodecode.model.db import User
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38 LOCK = ReadWriteMutex()
38 LOCK = ReadWriteMutex()
39
39
40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
40 STATE_PUBLIC_KEYS = ['id', 'username', 'first_name', 'last_name',
41 'icon_link', 'display_name', 'display_link']
41 'icon_link', 'display_name', 'display_link']
42
42
43
43
44 class ChannelstreamException(Exception):
44 class ChannelstreamException(Exception):
45 pass
45 pass
46
46
47
47
48 class ChannelstreamConnectionException(ChannelstreamException):
48 class ChannelstreamConnectionException(ChannelstreamException):
49 pass
49 pass
50
50
51
51
52 class ChannelstreamPermissionException(ChannelstreamException):
52 class ChannelstreamPermissionException(ChannelstreamException):
53 pass
53 pass
54
54
55
55
56 def get_channelstream_server_url(config, endpoint):
57 return 'http://{}{}'.format(config['server'], endpoint)
58
59
56 def channelstream_request(config, payload, endpoint, raise_exc=True):
60 def channelstream_request(config, payload, endpoint, raise_exc=True):
57 signer = itsdangerous.TimestampSigner(config['secret'])
61 signer = itsdangerous.TimestampSigner(config['secret'])
58 sig_for_server = signer.sign(endpoint)
62 sig_for_server = signer.sign(endpoint)
59 secret_headers = {'x-channelstream-secret': sig_for_server,
63 secret_headers = {'x-channelstream-secret': sig_for_server,
60 'x-channelstream-endpoint': endpoint,
64 'x-channelstream-endpoint': endpoint,
61 'Content-Type': 'application/json'}
65 'Content-Type': 'application/json'}
62 req_url = 'http://{}{}'.format(config['server'], endpoint)
66 req_url = get_channelstream_server_url(config, endpoint)
63 response = None
67 response = None
64 try:
68 try:
65 response = requests.post(req_url, data=json.dumps(payload),
69 response = requests.post(req_url, data=json.dumps(payload),
66 headers=secret_headers).json()
70 headers=secret_headers).json()
67 except requests.ConnectionError:
71 except requests.ConnectionError:
68 log.exception('ConnectionError happened')
72 log.exception('ConnectionError occurred for endpoint %s', req_url)
69 if raise_exc:
73 if raise_exc:
70 raise ChannelstreamConnectionException()
74 raise ChannelstreamConnectionException(req_url)
71 except Exception:
75 except Exception:
72 log.exception('Exception related to channelstream happened')
76 log.exception('Exception related to Channelstream happened')
73 if raise_exc:
77 if raise_exc:
74 raise ChannelstreamConnectionException()
78 raise ChannelstreamConnectionException()
75 return response
79 return response
76
80
77
81
78 def get_user_data(user_id):
82 def get_user_data(user_id):
79 user = User.get(user_id)
83 user = User.get(user_id)
80 return {
84 return {
81 'id': user.user_id,
85 'id': user.user_id,
82 'username': user.username,
86 'username': user.username,
83 'first_name': user.first_name,
87 'first_name': user.first_name,
84 'last_name': user.last_name,
88 'last_name': user.last_name,
85 'icon_link': h.gravatar_url(user.email, 60),
89 'icon_link': h.gravatar_url(user.email, 60),
86 'display_name': h.person(user, 'username_or_name_or_email'),
90 'display_name': h.person(user, 'username_or_name_or_email'),
87 'display_link': h.link_to_user(user),
91 'display_link': h.link_to_user(user),
88 'notifications': user.user_data.get('notification_status', True)
92 'notifications': user.user_data.get('notification_status', True)
89 }
93 }
90
94
91
95
92 def broadcast_validator(channel_name):
96 def broadcast_validator(channel_name):
93 """ checks if user can access the broadcast channel """
97 """ checks if user can access the broadcast channel """
94 if channel_name == 'broadcast':
98 if channel_name == 'broadcast':
95 return True
99 return True
96
100
97
101
98 def repo_validator(channel_name):
102 def repo_validator(channel_name):
99 """ checks if user can access the broadcast channel """
103 """ checks if user can access the broadcast channel """
100 channel_prefix = '/repo$'
104 channel_prefix = '/repo$'
101 if channel_name.startswith(channel_prefix):
105 if channel_name.startswith(channel_prefix):
102 elements = channel_name[len(channel_prefix):].split('$')
106 elements = channel_name[len(channel_prefix):].split('$')
103 repo_name = elements[0]
107 repo_name = elements[0]
104 can_access = HasRepoPermissionAny(
108 can_access = HasRepoPermissionAny(
105 'repository.read',
109 'repository.read',
106 'repository.write',
110 'repository.write',
107 'repository.admin')(repo_name)
111 'repository.admin')(repo_name)
108 log.debug(
112 log.debug(
109 'permission check for %s channel resulted in %s',
113 'permission check for %s channel resulted in %s',
110 repo_name, can_access)
114 repo_name, can_access)
111 if can_access:
115 if can_access:
112 return True
116 return True
113 return False
117 return False
114
118
115
119
116 def check_channel_permissions(channels, plugin_validators, should_raise=True):
120 def check_channel_permissions(channels, plugin_validators, should_raise=True):
117 valid_channels = []
121 valid_channels = []
118
122
119 validators = [broadcast_validator, repo_validator]
123 validators = [broadcast_validator, repo_validator]
120 if plugin_validators:
124 if plugin_validators:
121 validators.extend(plugin_validators)
125 validators.extend(plugin_validators)
122 for channel_name in channels:
126 for channel_name in channels:
123 is_valid = False
127 is_valid = False
124 for validator in validators:
128 for validator in validators:
125 if validator(channel_name):
129 if validator(channel_name):
126 is_valid = True
130 is_valid = True
127 break
131 break
128 if is_valid:
132 if is_valid:
129 valid_channels.append(channel_name)
133 valid_channels.append(channel_name)
130 else:
134 else:
131 if should_raise:
135 if should_raise:
132 raise ChannelstreamPermissionException()
136 raise ChannelstreamPermissionException()
133 return valid_channels
137 return valid_channels
134
138
135
139
136 def get_channels_info(self, channels):
140 def get_channels_info(self, channels):
137 payload = {'channels': channels}
141 payload = {'channels': channels}
138 # gather persistence info
142 # gather persistence info
139 return channelstream_request(self._config(), payload, '/info')
143 return channelstream_request(self._config(), payload, '/info')
140
144
141
145
142 def parse_channels_info(info_result, include_channel_info=None):
146 def parse_channels_info(info_result, include_channel_info=None):
143 """
147 """
144 Returns data that contains only secure information that can be
148 Returns data that contains only secure information that can be
145 presented to clients
149 presented to clients
146 """
150 """
147 include_channel_info = include_channel_info or []
151 include_channel_info = include_channel_info or []
148
152
149 user_state_dict = {}
153 user_state_dict = {}
150 for userinfo in info_result['users']:
154 for userinfo in info_result['users']:
151 user_state_dict[userinfo['user']] = {
155 user_state_dict[userinfo['user']] = {
152 k: v for k, v in userinfo['state'].items()
156 k: v for k, v in userinfo['state'].items()
153 if k in STATE_PUBLIC_KEYS
157 if k in STATE_PUBLIC_KEYS
154 }
158 }
155
159
156 channels_info = {}
160 channels_info = {}
157
161
158 for c_name, c_info in info_result['channels'].items():
162 for c_name, c_info in info_result['channels'].items():
159 if c_name not in include_channel_info:
163 if c_name not in include_channel_info:
160 continue
164 continue
161 connected_list = []
165 connected_list = []
162 for userinfo in c_info['users']:
166 for userinfo in c_info['users']:
163 connected_list.append({
167 connected_list.append({
164 'user': userinfo['user'],
168 'user': userinfo['user'],
165 'state': user_state_dict[userinfo['user']]
169 'state': user_state_dict[userinfo['user']]
166 })
170 })
167 channels_info[c_name] = {'users': connected_list,
171 channels_info[c_name] = {'users': connected_list,
168 'history': c_info['history']}
172 'history': c_info['history']}
169
173
170 return channels_info
174 return channels_info
171
175
172
176
173 def log_filepath(history_location, channel_name):
177 def log_filepath(history_location, channel_name):
174 hasher = hashlib.sha256()
178 hasher = hashlib.sha256()
175 hasher.update(channel_name.encode('utf8'))
179 hasher.update(channel_name.encode('utf8'))
176 filename = '{}.log'.format(hasher.hexdigest())
180 filename = '{}.log'.format(hasher.hexdigest())
177 filepath = os.path.join(history_location, filename)
181 filepath = os.path.join(history_location, filename)
178 return filepath
182 return filepath
179
183
180
184
181 def read_history(history_location, channel_name):
185 def read_history(history_location, channel_name):
182 filepath = log_filepath(history_location, channel_name)
186 filepath = log_filepath(history_location, channel_name)
183 if not os.path.exists(filepath):
187 if not os.path.exists(filepath):
184 return []
188 return []
185 history_lines_limit = -100
189 history_lines_limit = -100
186 history = []
190 history = []
187 with open(filepath, 'rb') as f:
191 with open(filepath, 'rb') as f:
188 for line in f.readlines()[history_lines_limit:]:
192 for line in f.readlines()[history_lines_limit:]:
189 try:
193 try:
190 history.append(json.loads(line))
194 history.append(json.loads(line))
191 except Exception:
195 except Exception:
192 log.exception('Failed to load history')
196 log.exception('Failed to load history')
193 return history
197 return history
194
198
195
199
196 def update_history_from_logs(config, channels, payload):
200 def update_history_from_logs(config, channels, payload):
197 history_location = config.get('history.location')
201 history_location = config.get('history.location')
198 for channel in channels:
202 for channel in channels:
199 history = read_history(history_location, channel)
203 history = read_history(history_location, channel)
200 payload['channels_info'][channel]['history'] = history
204 payload['channels_info'][channel]['history'] = history
201
205
202
206
203 def write_history(config, message):
207 def write_history(config, message):
204 """ writes a messge to a base64encoded filename """
208 """ writes a messge to a base64encoded filename """
205 history_location = config.get('history.location')
209 history_location = config.get('history.location')
206 if not os.path.exists(history_location):
210 if not os.path.exists(history_location):
207 return
211 return
208 try:
212 try:
209 LOCK.acquire_write_lock()
213 LOCK.acquire_write_lock()
210 filepath = log_filepath(history_location, message['channel'])
214 filepath = log_filepath(history_location, message['channel'])
211 with open(filepath, 'ab') as f:
215 with open(filepath, 'ab') as f:
212 json.dump(message, f)
216 json.dump(message, f)
213 f.write('\n')
217 f.write('\n')
214 finally:
218 finally:
215 LOCK.release_write_lock()
219 LOCK.release_write_lock()
216
220
217
221
218 def get_connection_validators(registry):
222 def get_connection_validators(registry):
219 validators = []
223 validators = []
220 for k, config in registry.rhodecode_plugins.iteritems():
224 for k, config in registry.rhodecode_plugins.iteritems():
221 validator = config.get('channelstream', {}).get('connect_validator')
225 validator = config.get('channelstream', {}).get('connect_validator')
222 if validator:
226 if validator:
223 validators.append(validator)
227 validators.append(validator)
224 return validators
228 return validators
225
229
226
230
227 def post_message(channel, message, username, registry=None):
231 def post_message(channel, message, username, registry=None):
228
232
229 if not registry:
233 if not registry:
230 registry = get_current_registry()
234 registry = get_current_registry()
231
235
232 log.debug('Channelstream: sending notification to channel %s', channel)
236 log.debug('Channelstream: sending notification to channel %s', channel)
233 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
237 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
234 channelstream_config = rhodecode_plugins.get('channelstream', {})
238 channelstream_config = rhodecode_plugins.get('channelstream', {})
235 if channelstream_config.get('enabled'):
239 if channelstream_config.get('enabled'):
236 payload = {
240 payload = {
237 'type': 'message',
241 'type': 'message',
238 'timestamp': datetime.datetime.utcnow(),
242 'timestamp': datetime.datetime.utcnow(),
239 'user': 'system',
243 'user': 'system',
240 'exclude_users': [username],
244 'exclude_users': [username],
241 'channel': channel,
245 'channel': channel,
242 'message': {
246 'message': {
243 'message': message,
247 'message': message,
244 'level': 'success',
248 'level': 'success',
245 'topic': '/notifications'
249 'topic': '/notifications'
246 }
250 }
247 }
251 }
248
252
249 try:
253 try:
250 return channelstream_request(
254 return channelstream_request(
251 channelstream_config, [payload], '/message',
255 channelstream_config, [payload], '/message',
252 raise_exc=False)
256 raise_exc=False)
253 except ChannelstreamException:
257 except ChannelstreamException:
254 log.exception('Failed to send channelstream data')
258 log.exception('Failed to send channelstream data')
255 raise
259 raise
General Comments 0
You need to be logged in to leave comments. Login now