##// END OF EJS Templates
http: use better logging with vcsserver context UUID.
marcink -
r2886:7def1637 default
parent child Browse files
Show More
@@ -1,302 +1,303 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2018 RhodeCode GmbH
3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31
31
32 import pycurl
32 import pycurl
33 import msgpack
33 import msgpack
34 import requests
34 import requests
35 from requests.packages.urllib3.util.retry import Retry
35 from requests.packages.urllib3.util.retry import Retry
36
36
37 from . import exceptions, CurlSession
37 from . import exceptions, CurlSession
38
38
39
39
40 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
41
41
42
42
43 # TODO: mikhail: Keep it in sync with vcsserver's
43 # TODO: mikhail: Keep it in sync with vcsserver's
44 # HTTPApplication.ALLOWED_EXCEPTIONS
44 # HTTPApplication.ALLOWED_EXCEPTIONS
45 EXCEPTIONS_MAP = {
45 EXCEPTIONS_MAP = {
46 'KeyError': KeyError,
46 'KeyError': KeyError,
47 'URLError': urllib2.URLError,
47 'URLError': urllib2.URLError,
48 }
48 }
49
49
50
50
51 class RepoMaker(object):
51 class RepoMaker(object):
52
52
53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 self.url = urlparse.urljoin(
54 self.url = urlparse.urljoin(
55 'http://%s' % server_and_port, backend_endpoint)
55 'http://%s' % server_and_port, backend_endpoint)
56 self._session_factory = session_factory
56 self._session_factory = session_factory
57 self.backend_type = backend_type
57 self.backend_type = backend_type
58
58
59 def __call__(self, path, config, with_wire=None):
59 def __call__(self, path, config, with_wire=None):
60 log.debug('RepoMaker call on %s', path)
60 log.debug('RepoMaker call on %s', path)
61 return RemoteRepo(
61 return RemoteRepo(
62 path, config, self.url, self._session_factory(),
62 path, config, self.url, self._session_factory(),
63 with_wire=with_wire)
63 with_wire=with_wire)
64
64
65 def __getattr__(self, name):
65 def __getattr__(self, name):
66 def f(*args, **kwargs):
66 def f(*args, **kwargs):
67 return self._call(name, *args, **kwargs)
67 return self._call(name, *args, **kwargs)
68 return f
68 return f
69
69
70 @exceptions.map_vcs_exceptions
70 @exceptions.map_vcs_exceptions
71 def _call(self, name, *args, **kwargs):
71 def _call(self, name, *args, **kwargs):
72 payload = {
72 payload = {
73 'id': str(uuid.uuid4()),
73 'id': str(uuid.uuid4()),
74 'method': name,
74 'method': name,
75 'backend': self.backend_type,
75 'backend': self.backend_type,
76 'params': {'args': args, 'kwargs': kwargs}
76 'params': {'args': args, 'kwargs': kwargs}
77 }
77 }
78 return _remote_call(
78 return _remote_call(
79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80
80
81
81
82 class ServiceConnection(object):
82 class ServiceConnection(object):
83 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 self.url = urlparse.urljoin(
84 self.url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
85 'http://%s' % server_and_port, backend_endpoint)
86 self._session_factory = session_factory
86 self._session_factory = session_factory
87
87
88 def __getattr__(self, name):
88 def __getattr__(self, name):
89 def f(*args, **kwargs):
89 def f(*args, **kwargs):
90 return self._call(name, *args, **kwargs)
90 return self._call(name, *args, **kwargs)
91
91
92 return f
92 return f
93
93
94 @exceptions.map_vcs_exceptions
94 @exceptions.map_vcs_exceptions
95 def _call(self, name, *args, **kwargs):
95 def _call(self, name, *args, **kwargs):
96 payload = {
96 payload = {
97 'id': str(uuid.uuid4()),
97 'id': str(uuid.uuid4()),
98 'method': name,
98 'method': name,
99 'params': {'args': args, 'kwargs': kwargs}
99 'params': {'args': args, 'kwargs': kwargs}
100 }
100 }
101 return _remote_call(
101 return _remote_call(
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103
103
104
104
105 class RemoteRepo(object):
105 class RemoteRepo(object):
106
106
107 def __init__(self, path, config, url, session, with_wire=None):
107 def __init__(self, path, config, url, session, with_wire=None):
108 self.url = url
108 self.url = url
109 self._session = session
109 self._session = session
110 self._wire = {
110 self._wire = {
111 "path": path,
111 "path": path,
112 "config": config,
112 "config": config,
113 "context": self._create_vcs_cache_context(),
113 "context": self._create_vcs_cache_context(),
114 }
114 }
115 if with_wire:
115 if with_wire:
116 self._wire.update(with_wire)
116 self._wire.update(with_wire)
117
117
118 # johbo: Trading complexity for performance. Avoiding the call to
118 # johbo: Trading complexity for performance. Avoiding the call to
119 # log.debug brings a few percent gain even if is is not active.
119 # log.debug brings a few percent gain even if is is not active.
120 if log.isEnabledFor(logging.DEBUG):
120 if log.isEnabledFor(logging.DEBUG):
121 self._call = self._call_with_logging
121 self._call = self._call_with_logging
122
122
123 def __getattr__(self, name):
123 def __getattr__(self, name):
124 def f(*args, **kwargs):
124 def f(*args, **kwargs):
125 return self._call(name, *args, **kwargs)
125 return self._call(name, *args, **kwargs)
126 return f
126 return f
127
127
128 @exceptions.map_vcs_exceptions
128 @exceptions.map_vcs_exceptions
129 def _call(self, name, *args, **kwargs):
129 def _call(self, name, *args, **kwargs):
130 # TODO: oliver: This is currently necessary pre-call since the
130 # TODO: oliver: This is currently necessary pre-call since the
131 # config object is being changed for hooking scenarios
131 # config object is being changed for hooking scenarios
132 wire = copy.deepcopy(self._wire)
132 wire = copy.deepcopy(self._wire)
133 wire["config"] = wire["config"].serialize()
133 wire["config"] = wire["config"].serialize()
134 payload = {
134 payload = {
135 'id': str(uuid.uuid4()),
135 'id': str(uuid.uuid4()),
136 'method': name,
136 'method': name,
137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 }
138 }
139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140
140
141 def _call_with_logging(self, name, *args, **kwargs):
141 def _call_with_logging(self, name, *args, **kwargs):
142
142 context_uid = self._wire.get('context')
143 log.debug('Calling %s@%s with args:%r', self.url, name, args)
143 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 self.url, name, args, context_uid)
144 return RemoteRepo._call(self, name, *args, **kwargs)
145 return RemoteRepo._call(self, name, *args, **kwargs)
145
146
146 def __getitem__(self, key):
147 def __getitem__(self, key):
147 return self.revision(key)
148 return self.revision(key)
148
149
149 def _create_vcs_cache_context(self):
150 def _create_vcs_cache_context(self):
150 """
151 """
151 Creates a unique string which is passed to the VCSServer on every
152 Creates a unique string which is passed to the VCSServer on every
152 remote call. It is used as cache key in the VCSServer.
153 remote call. It is used as cache key in the VCSServer.
153 """
154 """
154 return str(uuid.uuid4())
155 return str(uuid.uuid4())
155
156
156 def invalidate_vcs_cache(self):
157 def invalidate_vcs_cache(self):
157 """
158 """
158 This invalidates the context which is sent to the VCSServer on every
159 This invalidates the context which is sent to the VCSServer on every
159 call to a remote method. It forces the VCSServer to create a fresh
160 call to a remote method. It forces the VCSServer to create a fresh
160 repository instance on the next call to a remote method.
161 repository instance on the next call to a remote method.
161 """
162 """
162 self._wire['context'] = self._create_vcs_cache_context()
163 self._wire['context'] = self._create_vcs_cache_context()
163
164
164
165
165 class RemoteObject(object):
166 class RemoteObject(object):
166
167
167 def __init__(self, url, session):
168 def __init__(self, url, session):
168 self._url = url
169 self._url = url
169 self._session = session
170 self._session = session
170
171
171 # johbo: Trading complexity for performance. Avoiding the call to
172 # johbo: Trading complexity for performance. Avoiding the call to
172 # log.debug brings a few percent gain even if is is not active.
173 # log.debug brings a few percent gain even if is is not active.
173 if log.isEnabledFor(logging.DEBUG):
174 if log.isEnabledFor(logging.DEBUG):
174 self._call = self._call_with_logging
175 self._call = self._call_with_logging
175
176
176 def __getattr__(self, name):
177 def __getattr__(self, name):
177 def f(*args, **kwargs):
178 def f(*args, **kwargs):
178 return self._call(name, *args, **kwargs)
179 return self._call(name, *args, **kwargs)
179 return f
180 return f
180
181
181 @exceptions.map_vcs_exceptions
182 @exceptions.map_vcs_exceptions
182 def _call(self, name, *args, **kwargs):
183 def _call(self, name, *args, **kwargs):
183 payload = {
184 payload = {
184 'id': str(uuid.uuid4()),
185 'id': str(uuid.uuid4()),
185 'method': name,
186 'method': name,
186 'params': {'args': args, 'kwargs': kwargs}
187 'params': {'args': args, 'kwargs': kwargs}
187 }
188 }
188 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
189 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
189
190
190 def _call_with_logging(self, name, *args, **kwargs):
191 def _call_with_logging(self, name, *args, **kwargs):
191 log.debug('Calling %s@%s', self._url, name)
192 log.debug('Calling %s@%s', self._url, name)
192 return RemoteObject._call(self, name, *args, **kwargs)
193 return RemoteObject._call(self, name, *args, **kwargs)
193
194
194
195
195 def _remote_call(url, payload, exceptions_map, session):
196 def _remote_call(url, payload, exceptions_map, session):
196 try:
197 try:
197 response = session.post(url, data=msgpack.packb(payload))
198 response = session.post(url, data=msgpack.packb(payload))
198 except pycurl.error as e:
199 except pycurl.error as e:
199 raise exceptions.HttpVCSCommunicationError(e)
200 raise exceptions.HttpVCSCommunicationError(e)
200 except Exception as e:
201 except Exception as e:
201 message = getattr(e, 'message', '')
202 message = getattr(e, 'message', '')
202 if 'Failed to connect' in message:
203 if 'Failed to connect' in message:
203 # gevent doesn't return proper pycurl errors
204 # gevent doesn't return proper pycurl errors
204 raise exceptions.HttpVCSCommunicationError(e)
205 raise exceptions.HttpVCSCommunicationError(e)
205 else:
206 else:
206 raise
207 raise
207
208
208 if response.status_code >= 400:
209 if response.status_code >= 400:
209 log.error('Call to %s returned non 200 HTTP code: %s',
210 log.error('Call to %s returned non 200 HTTP code: %s',
210 url, response.status_code)
211 url, response.status_code)
211 raise exceptions.HttpVCSCommunicationError(repr(response.content))
212 raise exceptions.HttpVCSCommunicationError(repr(response.content))
212
213
213 try:
214 try:
214 response = msgpack.unpackb(response.content)
215 response = msgpack.unpackb(response.content)
215 except Exception:
216 except Exception:
216 log.exception('Failed to decode response %r', response.content)
217 log.exception('Failed to decode response %r', response.content)
217 raise
218 raise
218
219
219 error = response.get('error')
220 error = response.get('error')
220 if error:
221 if error:
221 type_ = error.get('type', 'Exception')
222 type_ = error.get('type', 'Exception')
222 exc = exceptions_map.get(type_, Exception)
223 exc = exceptions_map.get(type_, Exception)
223 exc = exc(error.get('message'))
224 exc = exc(error.get('message'))
224 try:
225 try:
225 exc._vcs_kind = error['_vcs_kind']
226 exc._vcs_kind = error['_vcs_kind']
226 except KeyError:
227 except KeyError:
227 pass
228 pass
228
229
229 try:
230 try:
230 exc._vcs_server_traceback = error['traceback']
231 exc._vcs_server_traceback = error['traceback']
231 except KeyError:
232 except KeyError:
232 pass
233 pass
233
234
234 raise exc
235 raise exc
235 return response.get('result')
236 return response.get('result')
236
237
237
238
238 class VcsHttpProxy(object):
239 class VcsHttpProxy(object):
239
240
240 CHUNK_SIZE = 16384
241 CHUNK_SIZE = 16384
241
242
242 def __init__(self, server_and_port, backend_endpoint):
243 def __init__(self, server_and_port, backend_endpoint):
243
244
244
245
245 retries = Retry(total=5, connect=None, read=None, redirect=None)
246 retries = Retry(total=5, connect=None, read=None, redirect=None)
246
247
247 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
248 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
248 self.base_url = urlparse.urljoin(
249 self.base_url = urlparse.urljoin(
249 'http://%s' % server_and_port, backend_endpoint)
250 'http://%s' % server_and_port, backend_endpoint)
250 self.session = requests.Session()
251 self.session = requests.Session()
251 self.session.mount('http://', adapter)
252 self.session.mount('http://', adapter)
252
253
253 def handle(self, environment, input_data, *args, **kwargs):
254 def handle(self, environment, input_data, *args, **kwargs):
254 data = {
255 data = {
255 'environment': environment,
256 'environment': environment,
256 'input_data': input_data,
257 'input_data': input_data,
257 'args': args,
258 'args': args,
258 'kwargs': kwargs
259 'kwargs': kwargs
259 }
260 }
260 result = self.session.post(
261 result = self.session.post(
261 self.base_url, msgpack.packb(data), stream=True)
262 self.base_url, msgpack.packb(data), stream=True)
262 return self._get_result(result)
263 return self._get_result(result)
263
264
264 def _deserialize_and_raise(self, error):
265 def _deserialize_and_raise(self, error):
265 exception = Exception(error['message'])
266 exception = Exception(error['message'])
266 try:
267 try:
267 exception._vcs_kind = error['_vcs_kind']
268 exception._vcs_kind = error['_vcs_kind']
268 except KeyError:
269 except KeyError:
269 pass
270 pass
270 raise exception
271 raise exception
271
272
272 def _iterate(self, result):
273 def _iterate(self, result):
273 unpacker = msgpack.Unpacker()
274 unpacker = msgpack.Unpacker()
274 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
275 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
275 unpacker.feed(line)
276 unpacker.feed(line)
276 for chunk in unpacker:
277 for chunk in unpacker:
277 yield chunk
278 yield chunk
278
279
279 def _get_result(self, result):
280 def _get_result(self, result):
280 iterator = self._iterate(result)
281 iterator = self._iterate(result)
281 error = iterator.next()
282 error = iterator.next()
282 if error:
283 if error:
283 self._deserialize_and_raise(error)
284 self._deserialize_and_raise(error)
284
285
285 status = iterator.next()
286 status = iterator.next()
286 headers = iterator.next()
287 headers = iterator.next()
287
288
288 return iterator, status, headers
289 return iterator, status, headers
289
290
290
291
291 class ThreadlocalSessionFactory(object):
292 class ThreadlocalSessionFactory(object):
292 """
293 """
293 Creates one CurlSession per thread on demand.
294 Creates one CurlSession per thread on demand.
294 """
295 """
295
296
296 def __init__(self):
297 def __init__(self):
297 self._thread_local = threading.local()
298 self._thread_local = threading.local()
298
299
299 def __call__(self):
300 def __call__(self):
300 if not hasattr(self._thread_local, 'curl_session'):
301 if not hasattr(self._thread_local, 'curl_session'):
301 self._thread_local.curl_session = CurlSession()
302 self._thread_local.curl_session = CurlSession()
302 return self._thread_local.curl_session
303 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now