##// END OF EJS Templates
http: use better logging with vcsserver context UUID.
marcink -
r2886:7def1637 default
parent child Browse files
Show More
@@ -1,302 +1,303 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 31
32 32 import pycurl
33 33 import msgpack
34 34 import requests
35 35 from requests.packages.urllib3.util.retry import Retry
36 36
37 37 from . import exceptions, CurlSession
38 38
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 # TODO: mikhail: Keep it in sync with vcsserver's
44 44 # HTTPApplication.ALLOWED_EXCEPTIONS
45 45 EXCEPTIONS_MAP = {
46 46 'KeyError': KeyError,
47 47 'URLError': urllib2.URLError,
48 48 }
49 49
50 50
51 51 class RepoMaker(object):
52 52
53 53 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
54 54 self.url = urlparse.urljoin(
55 55 'http://%s' % server_and_port, backend_endpoint)
56 56 self._session_factory = session_factory
57 57 self.backend_type = backend_type
58 58
59 59 def __call__(self, path, config, with_wire=None):
60 60 log.debug('RepoMaker call on %s', path)
61 61 return RemoteRepo(
62 62 path, config, self.url, self._session_factory(),
63 63 with_wire=with_wire)
64 64
65 65 def __getattr__(self, name):
66 66 def f(*args, **kwargs):
67 67 return self._call(name, *args, **kwargs)
68 68 return f
69 69
70 70 @exceptions.map_vcs_exceptions
71 71 def _call(self, name, *args, **kwargs):
72 72 payload = {
73 73 'id': str(uuid.uuid4()),
74 74 'method': name,
75 75 'backend': self.backend_type,
76 76 'params': {'args': args, 'kwargs': kwargs}
77 77 }
78 78 return _remote_call(
79 79 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
80 80
81 81
82 82 class ServiceConnection(object):
83 83 def __init__(self, server_and_port, backend_endpoint, session_factory):
84 84 self.url = urlparse.urljoin(
85 85 'http://%s' % server_and_port, backend_endpoint)
86 86 self._session_factory = session_factory
87 87
88 88 def __getattr__(self, name):
89 89 def f(*args, **kwargs):
90 90 return self._call(name, *args, **kwargs)
91 91
92 92 return f
93 93
94 94 @exceptions.map_vcs_exceptions
95 95 def _call(self, name, *args, **kwargs):
96 96 payload = {
97 97 'id': str(uuid.uuid4()),
98 98 'method': name,
99 99 'params': {'args': args, 'kwargs': kwargs}
100 100 }
101 101 return _remote_call(
102 102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103 103
104 104
105 105 class RemoteRepo(object):
106 106
107 107 def __init__(self, path, config, url, session, with_wire=None):
108 108 self.url = url
109 109 self._session = session
110 110 self._wire = {
111 111 "path": path,
112 112 "config": config,
113 113 "context": self._create_vcs_cache_context(),
114 114 }
115 115 if with_wire:
116 116 self._wire.update(with_wire)
117 117
118 118 # johbo: Trading complexity for performance. Avoiding the call to
119 119 # log.debug brings a few percent gain even if is is not active.
120 120 if log.isEnabledFor(logging.DEBUG):
121 121 self._call = self._call_with_logging
122 122
123 123 def __getattr__(self, name):
124 124 def f(*args, **kwargs):
125 125 return self._call(name, *args, **kwargs)
126 126 return f
127 127
128 128 @exceptions.map_vcs_exceptions
129 129 def _call(self, name, *args, **kwargs):
130 130 # TODO: oliver: This is currently necessary pre-call since the
131 131 # config object is being changed for hooking scenarios
132 132 wire = copy.deepcopy(self._wire)
133 133 wire["config"] = wire["config"].serialize()
134 134 payload = {
135 135 'id': str(uuid.uuid4()),
136 136 'method': name,
137 137 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
138 138 }
139 139 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
140 140
141 141 def _call_with_logging(self, name, *args, **kwargs):
142
143 log.debug('Calling %s@%s with args:%r', self.url, name, args)
142 context_uid = self._wire.get('context')
143 log.debug('Calling %s@%s with args:%r. wire_context: %s',
144 self.url, name, args, context_uid)
144 145 return RemoteRepo._call(self, name, *args, **kwargs)
145 146
146 147 def __getitem__(self, key):
147 148 return self.revision(key)
148 149
149 150 def _create_vcs_cache_context(self):
150 151 """
151 152 Creates a unique string which is passed to the VCSServer on every
152 153 remote call. It is used as cache key in the VCSServer.
153 154 """
154 155 return str(uuid.uuid4())
155 156
156 157 def invalidate_vcs_cache(self):
157 158 """
158 159 This invalidates the context which is sent to the VCSServer on every
159 160 call to a remote method. It forces the VCSServer to create a fresh
160 161 repository instance on the next call to a remote method.
161 162 """
162 163 self._wire['context'] = self._create_vcs_cache_context()
163 164
164 165
165 166 class RemoteObject(object):
166 167
167 168 def __init__(self, url, session):
168 169 self._url = url
169 170 self._session = session
170 171
171 172 # johbo: Trading complexity for performance. Avoiding the call to
172 173 # log.debug brings a few percent gain even if is is not active.
173 174 if log.isEnabledFor(logging.DEBUG):
174 175 self._call = self._call_with_logging
175 176
176 177 def __getattr__(self, name):
177 178 def f(*args, **kwargs):
178 179 return self._call(name, *args, **kwargs)
179 180 return f
180 181
181 182 @exceptions.map_vcs_exceptions
182 183 def _call(self, name, *args, **kwargs):
183 184 payload = {
184 185 'id': str(uuid.uuid4()),
185 186 'method': name,
186 187 'params': {'args': args, 'kwargs': kwargs}
187 188 }
188 189 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
189 190
190 191 def _call_with_logging(self, name, *args, **kwargs):
191 192 log.debug('Calling %s@%s', self._url, name)
192 193 return RemoteObject._call(self, name, *args, **kwargs)
193 194
194 195
195 196 def _remote_call(url, payload, exceptions_map, session):
196 197 try:
197 198 response = session.post(url, data=msgpack.packb(payload))
198 199 except pycurl.error as e:
199 200 raise exceptions.HttpVCSCommunicationError(e)
200 201 except Exception as e:
201 202 message = getattr(e, 'message', '')
202 203 if 'Failed to connect' in message:
203 204 # gevent doesn't return proper pycurl errors
204 205 raise exceptions.HttpVCSCommunicationError(e)
205 206 else:
206 207 raise
207 208
208 209 if response.status_code >= 400:
209 210 log.error('Call to %s returned non 200 HTTP code: %s',
210 211 url, response.status_code)
211 212 raise exceptions.HttpVCSCommunicationError(repr(response.content))
212 213
213 214 try:
214 215 response = msgpack.unpackb(response.content)
215 216 except Exception:
216 217 log.exception('Failed to decode response %r', response.content)
217 218 raise
218 219
219 220 error = response.get('error')
220 221 if error:
221 222 type_ = error.get('type', 'Exception')
222 223 exc = exceptions_map.get(type_, Exception)
223 224 exc = exc(error.get('message'))
224 225 try:
225 226 exc._vcs_kind = error['_vcs_kind']
226 227 except KeyError:
227 228 pass
228 229
229 230 try:
230 231 exc._vcs_server_traceback = error['traceback']
231 232 except KeyError:
232 233 pass
233 234
234 235 raise exc
235 236 return response.get('result')
236 237
237 238
238 239 class VcsHttpProxy(object):
239 240
240 241 CHUNK_SIZE = 16384
241 242
242 243 def __init__(self, server_and_port, backend_endpoint):
243 244
244 245
245 246 retries = Retry(total=5, connect=None, read=None, redirect=None)
246 247
247 248 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
248 249 self.base_url = urlparse.urljoin(
249 250 'http://%s' % server_and_port, backend_endpoint)
250 251 self.session = requests.Session()
251 252 self.session.mount('http://', adapter)
252 253
253 254 def handle(self, environment, input_data, *args, **kwargs):
254 255 data = {
255 256 'environment': environment,
256 257 'input_data': input_data,
257 258 'args': args,
258 259 'kwargs': kwargs
259 260 }
260 261 result = self.session.post(
261 262 self.base_url, msgpack.packb(data), stream=True)
262 263 return self._get_result(result)
263 264
264 265 def _deserialize_and_raise(self, error):
265 266 exception = Exception(error['message'])
266 267 try:
267 268 exception._vcs_kind = error['_vcs_kind']
268 269 except KeyError:
269 270 pass
270 271 raise exception
271 272
272 273 def _iterate(self, result):
273 274 unpacker = msgpack.Unpacker()
274 275 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
275 276 unpacker.feed(line)
276 277 for chunk in unpacker:
277 278 yield chunk
278 279
279 280 def _get_result(self, result):
280 281 iterator = self._iterate(result)
281 282 error = iterator.next()
282 283 if error:
283 284 self._deserialize_and_raise(error)
284 285
285 286 status = iterator.next()
286 287 headers = iterator.next()
287 288
288 289 return iterator, status, headers
289 290
290 291
291 292 class ThreadlocalSessionFactory(object):
292 293 """
293 294 Creates one CurlSession per thread on demand.
294 295 """
295 296
296 297 def __init__(self):
297 298 self._thread_local = threading.local()
298 299
299 300 def __call__(self):
300 301 if not hasattr(self._thread_local, 'curl_session'):
301 302 self._thread_local.curl_session = CurlSession()
302 303 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now