##// END OF EJS Templates
logging: http logging should limit the data to some sane ammount....
marcink -
r3813:8aa49fab stable
parent child Browse files
Show More
@@ -1,311 +1,311 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 31 import traceback
32 32
33 33 import pycurl
34 34 import msgpack
35 35 import requests
36 36 from requests.packages.urllib3.util.retry import Retry
37 37
38 38 import rhodecode
39 39 from rhodecode.lib.system_info import get_cert_path
40 40 from rhodecode.lib.vcs import exceptions, CurlSession
41 41
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 # TODO: mikhail: Keep it in sync with vcsserver's
47 47 # HTTPApplication.ALLOWED_EXCEPTIONS
48 48 EXCEPTIONS_MAP = {
49 49 'KeyError': KeyError,
50 50 'URLError': urllib2.URLError,
51 51 }
52 52
53 53
54 54 class RepoMaker(object):
55 55
56 56 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
57 57 self.url = urlparse.urljoin(
58 58 'http://%s' % server_and_port, backend_endpoint)
59 59 self._session_factory = session_factory
60 60 self.backend_type = backend_type
61 61
62 62 def __call__(self, path, config, with_wire=None):
63 63 log.debug('RepoMaker call on %s', path)
64 64 return RemoteRepo(
65 65 path, config, self.url, self._session_factory(),
66 66 with_wire=with_wire)
67 67
68 68 def __getattr__(self, name):
69 69 def f(*args, **kwargs):
70 70 return self._call(name, *args, **kwargs)
71 71 return f
72 72
73 73 @exceptions.map_vcs_exceptions
74 74 def _call(self, name, *args, **kwargs):
75 75 payload = {
76 76 'id': str(uuid.uuid4()),
77 77 'method': name,
78 78 'backend': self.backend_type,
79 79 'params': {'args': args, 'kwargs': kwargs}
80 80 }
81 81 return _remote_call(
82 82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
83 83
84 84
85 85 class ServiceConnection(object):
86 86 def __init__(self, server_and_port, backend_endpoint, session_factory):
87 87 self.url = urlparse.urljoin(
88 88 'http://%s' % server_and_port, backend_endpoint)
89 89 self._session_factory = session_factory
90 90
91 91 def __getattr__(self, name):
92 92 def f(*args, **kwargs):
93 93 return self._call(name, *args, **kwargs)
94 94
95 95 return f
96 96
97 97 @exceptions.map_vcs_exceptions
98 98 def _call(self, name, *args, **kwargs):
99 99 payload = {
100 100 'id': str(uuid.uuid4()),
101 101 'method': name,
102 102 'params': {'args': args, 'kwargs': kwargs}
103 103 }
104 104 return _remote_call(
105 105 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
106 106
107 107
108 108 class RemoteRepo(object):
109 109
110 110 def __init__(self, path, config, url, session, with_wire=None):
111 111 self.url = url
112 112 self._session = session
113 113 self._wire = {
114 114 "path": path,
115 115 "config": config,
116 116 "context": self._create_vcs_cache_context(),
117 117 }
118 118 if with_wire:
119 119 self._wire.update(with_wire)
120 120
121 121 # johbo: Trading complexity for performance. Avoiding the call to
122 122 # log.debug brings a few percent gain even if is is not active.
123 123 if log.isEnabledFor(logging.DEBUG):
124 124 self._call = self._call_with_logging
125 125
126 126 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
127 127
128 128 def __getattr__(self, name):
129 129 def f(*args, **kwargs):
130 130 return self._call(name, *args, **kwargs)
131 131 return f
132 132
133 133 @exceptions.map_vcs_exceptions
134 134 def _call(self, name, *args, **kwargs):
135 135 # TODO: oliver: This is currently necessary pre-call since the
136 136 # config object is being changed for hooking scenarios
137 137 wire = copy.deepcopy(self._wire)
138 138 wire["config"] = wire["config"].serialize()
139 139
140 140 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
141 141 payload = {
142 142 'id': str(uuid.uuid4()),
143 143 'method': name,
144 144 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
145 145 }
146 146 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
147 147
148 148 def _call_with_logging(self, name, *args, **kwargs):
149 149 context_uid = self._wire.get('context')
150 log.debug('Calling %s@%s with args:%r. wire_context: %s',
150 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
151 151 self.url, name, args, context_uid)
152 152 return RemoteRepo._call(self, name, *args, **kwargs)
153 153
154 154 def __getitem__(self, key):
155 155 return self.revision(key)
156 156
157 157 def _create_vcs_cache_context(self):
158 158 """
159 159 Creates a unique string which is passed to the VCSServer on every
160 160 remote call. It is used as cache key in the VCSServer.
161 161 """
162 162 return str(uuid.uuid4())
163 163
164 164 def invalidate_vcs_cache(self):
165 165 """
166 166 This invalidates the context which is sent to the VCSServer on every
167 167 call to a remote method. It forces the VCSServer to create a fresh
168 168 repository instance on the next call to a remote method.
169 169 """
170 170 self._wire['context'] = self._create_vcs_cache_context()
171 171
172 172
173 173 class RemoteObject(object):
174 174
175 175 def __init__(self, url, session):
176 176 self._url = url
177 177 self._session = session
178 178
179 179 # johbo: Trading complexity for performance. Avoiding the call to
180 180 # log.debug brings a few percent gain even if is is not active.
181 181 if log.isEnabledFor(logging.DEBUG):
182 182 self._call = self._call_with_logging
183 183
184 184 def __getattr__(self, name):
185 185 def f(*args, **kwargs):
186 186 return self._call(name, *args, **kwargs)
187 187 return f
188 188
189 189 @exceptions.map_vcs_exceptions
190 190 def _call(self, name, *args, **kwargs):
191 191 payload = {
192 192 'id': str(uuid.uuid4()),
193 193 'method': name,
194 194 'params': {'args': args, 'kwargs': kwargs}
195 195 }
196 196 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
197 197
198 198 def _call_with_logging(self, name, *args, **kwargs):
199 199 log.debug('Calling %s@%s', self._url, name)
200 200 return RemoteObject._call(self, name, *args, **kwargs)
201 201
202 202
203 203 def _remote_call(url, payload, exceptions_map, session):
204 204 try:
205 205 response = session.post(url, data=msgpack.packb(payload))
206 206 except pycurl.error as e:
207 207 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
208 208 raise exceptions.HttpVCSCommunicationError(msg)
209 209 except Exception as e:
210 210 message = getattr(e, 'message', '')
211 211 if 'Failed to connect' in message:
212 212 # gevent doesn't return proper pycurl errors
213 213 raise exceptions.HttpVCSCommunicationError(e)
214 214 else:
215 215 raise
216 216
217 217 if response.status_code >= 400:
218 218 log.error('Call to %s returned non 200 HTTP code: %s',
219 219 url, response.status_code)
220 220 raise exceptions.HttpVCSCommunicationError(repr(response.content))
221 221
222 222 try:
223 223 response = msgpack.unpackb(response.content)
224 224 except Exception:
225 225 log.exception('Failed to decode response %r', response.content)
226 226 raise
227 227
228 228 error = response.get('error')
229 229 if error:
230 230 type_ = error.get('type', 'Exception')
231 231 exc = exceptions_map.get(type_, Exception)
232 232 exc = exc(error.get('message'))
233 233 try:
234 234 exc._vcs_kind = error['_vcs_kind']
235 235 except KeyError:
236 236 pass
237 237
238 238 try:
239 239 exc._vcs_server_traceback = error['traceback']
240 240 exc._vcs_server_org_exc_name = error['org_exc']
241 241 exc._vcs_server_org_exc_tb = error['org_exc_tb']
242 242 except KeyError:
243 243 pass
244 244
245 245 raise exc
246 246 return response.get('result')
247 247
248 248
249 249 class VcsHttpProxy(object):
250 250
251 251 CHUNK_SIZE = 16384
252 252
253 253 def __init__(self, server_and_port, backend_endpoint):
254 254 retries = Retry(total=5, connect=None, read=None, redirect=None)
255 255
256 256 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
257 257 self.base_url = urlparse.urljoin(
258 258 'http://%s' % server_and_port, backend_endpoint)
259 259 self.session = requests.Session()
260 260 self.session.mount('http://', adapter)
261 261
262 262 def handle(self, environment, input_data, *args, **kwargs):
263 263 data = {
264 264 'environment': environment,
265 265 'input_data': input_data,
266 266 'args': args,
267 267 'kwargs': kwargs
268 268 }
269 269 result = self.session.post(
270 270 self.base_url, msgpack.packb(data), stream=True)
271 271 return self._get_result(result)
272 272
273 273 def _deserialize_and_raise(self, error):
274 274 exception = Exception(error['message'])
275 275 try:
276 276 exception._vcs_kind = error['_vcs_kind']
277 277 except KeyError:
278 278 pass
279 279 raise exception
280 280
281 281 def _iterate(self, result):
282 282 unpacker = msgpack.Unpacker()
283 283 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
284 284 unpacker.feed(line)
285 285 for chunk in unpacker:
286 286 yield chunk
287 287
288 288 def _get_result(self, result):
289 289 iterator = self._iterate(result)
290 290 error = iterator.next()
291 291 if error:
292 292 self._deserialize_and_raise(error)
293 293
294 294 status = iterator.next()
295 295 headers = iterator.next()
296 296
297 297 return iterator, status, headers
298 298
299 299
300 300 class ThreadlocalSessionFactory(object):
301 301 """
302 302 Creates one CurlSession per thread on demand.
303 303 """
304 304
305 305 def __init__(self):
306 306 self._thread_local = threading.local()
307 307
308 308 def __call__(self):
309 309 if not hasattr(self._thread_local, 'curl_session'):
310 310 self._thread_local.curl_session = CurlSession()
311 311 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now