##// END OF EJS Templates
git: improve logging of git remote commands.
marcink -
r1574:aea16388 default
parent child Browse files
Show More
@@ -1,289 +1,290 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import urllib2
29 29 import urlparse
30 30 import uuid
31 31
32 32 import pycurl
33 33 import msgpack
34 34 import requests
35 35
36 36 from . import exceptions, CurlSession
37 37
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 # TODO: mikhail: Keep it in sync with vcsserver's
43 43 # HTTPApplication.ALLOWED_EXCEPTIONS
44 44 EXCEPTIONS_MAP = {
45 45 'KeyError': KeyError,
46 46 'URLError': urllib2.URLError,
47 47 }
48 48
49 49
50 50 class RepoMaker(object):
51 51
52 52 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 53 self.url = urlparse.urljoin(
54 54 'http://%s' % server_and_port, backend_endpoint)
55 55 self._session_factory = session_factory
56 56 self.backend_type = backend_type
57 57
58 58 def __call__(self, path, config, with_wire=None):
59 59 log.debug('RepoMaker call on %s', path)
60 60 return RemoteRepo(
61 61 path, config, self.url, self._session_factory(),
62 62 with_wire=with_wire)
63 63
64 64 def __getattr__(self, name):
65 65 def f(*args, **kwargs):
66 66 return self._call(name, *args, **kwargs)
67 67 return f
68 68
69 69 @exceptions.map_vcs_exceptions
70 70 def _call(self, name, *args, **kwargs):
71 71 payload = {
72 72 'id': str(uuid.uuid4()),
73 73 'method': name,
74 74 'backend': self.backend_type,
75 75 'params': {'args': args, 'kwargs': kwargs}
76 76 }
77 77 return _remote_call(
78 78 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79 79
80 80
81 81 class ServiceConnection(object):
82 82 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 83 self.url = urlparse.urljoin(
84 84 'http://%s' % server_and_port, backend_endpoint)
85 85 self._session_factory = session_factory
86 86
87 87 def __getattr__(self, name):
88 88 def f(*args, **kwargs):
89 89 return self._call(name, *args, **kwargs)
90 90
91 91 return f
92 92
93 93 @exceptions.map_vcs_exceptions
94 94 def _call(self, name, *args, **kwargs):
95 95 payload = {
96 96 'id': str(uuid.uuid4()),
97 97 'method': name,
98 98 'params': {'args': args, 'kwargs': kwargs}
99 99 }
100 100 return _remote_call(
101 101 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102 102
103 103
104 104 class RemoteRepo(object):
105 105
106 106 def __init__(self, path, config, url, session, with_wire=None):
107 107 self.url = url
108 108 self._session = session
109 109 self._wire = {
110 110 "path": path,
111 111 "config": config,
112 112 "context": self._create_vcs_cache_context(),
113 113 }
114 114 if with_wire:
115 115 self._wire.update(with_wire)
116 116
117 117 # johbo: Trading complexity for performance. Avoiding the call to
118 118 # log.debug brings a few percent gain even if is is not active.
119 119 if log.isEnabledFor(logging.DEBUG):
120 120 self._call = self._call_with_logging
121 121
122 122 def __getattr__(self, name):
123 123 def f(*args, **kwargs):
124 124 return self._call(name, *args, **kwargs)
125 125 return f
126 126
127 127 @exceptions.map_vcs_exceptions
128 128 def _call(self, name, *args, **kwargs):
129 129 # TODO: oliver: This is currently necessary pre-call since the
130 130 # config object is being changed for hooking scenarios
131 131 wire = copy.deepcopy(self._wire)
132 132 wire["config"] = wire["config"].serialize()
133 133 payload = {
134 134 'id': str(uuid.uuid4()),
135 135 'method': name,
136 136 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 137 }
138 138 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139 139
140 140 def _call_with_logging(self, name, *args, **kwargs):
141 log.debug('Calling %s@%s', self.url, name)
141
142 log.debug('Calling %s@%s with args:%r', self.url, name, args)
142 143 return RemoteRepo._call(self, name, *args, **kwargs)
143 144
144 145 def __getitem__(self, key):
145 146 return self.revision(key)
146 147
147 148 def _create_vcs_cache_context(self):
148 149 """
149 150 Creates a unique string which is passed to the VCSServer on every
150 151 remote call. It is used as cache key in the VCSServer.
151 152 """
152 153 return str(uuid.uuid4())
153 154
154 155 def invalidate_vcs_cache(self):
155 156 """
156 157 This invalidates the context which is sent to the VCSServer on every
157 158 call to a remote method. It forces the VCSServer to create a fresh
158 159 repository instance on the next call to a remote method.
159 160 """
160 161 self._wire['context'] = self._create_vcs_cache_context()
161 162
162 163
163 164 class RemoteObject(object):
164 165
165 166 def __init__(self, url, session):
166 167 self._url = url
167 168 self._session = session
168 169
169 170 # johbo: Trading complexity for performance. Avoiding the call to
170 171 # log.debug brings a few percent gain even if is is not active.
171 172 if log.isEnabledFor(logging.DEBUG):
172 173 self._call = self._call_with_logging
173 174
174 175 def __getattr__(self, name):
175 176 def f(*args, **kwargs):
176 177 return self._call(name, *args, **kwargs)
177 178 return f
178 179
179 180 @exceptions.map_vcs_exceptions
180 181 def _call(self, name, *args, **kwargs):
181 182 payload = {
182 183 'id': str(uuid.uuid4()),
183 184 'method': name,
184 185 'params': {'args': args, 'kwargs': kwargs}
185 186 }
186 187 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
187 188
188 189 def _call_with_logging(self, name, *args, **kwargs):
189 190 log.debug('Calling %s@%s', self._url, name)
190 191 return RemoteObject._call(self, name, *args, **kwargs)
191 192
192 193
193 194 def _remote_call(url, payload, exceptions_map, session):
194 195 try:
195 196 response = session.post(url, data=msgpack.packb(payload))
196 197 except pycurl.error as e:
197 198 raise exceptions.HttpVCSCommunicationError(e)
198 199
199 200 if response.status_code >= 400:
200 201 log.error('Call to %s returned non 200 HTTP code: %s',
201 202 url, response.status_code)
202 203 raise exceptions.HttpVCSCommunicationError(repr(response.content))
203 204
204 205 try:
205 206 response = msgpack.unpackb(response.content)
206 207 except Exception:
207 208 log.exception('Failed to decode response %r', response.content)
208 209 raise
209 210
210 211 error = response.get('error')
211 212 if error:
212 213 type_ = error.get('type', 'Exception')
213 214 exc = exceptions_map.get(type_, Exception)
214 215 exc = exc(error.get('message'))
215 216 try:
216 217 exc._vcs_kind = error['_vcs_kind']
217 218 except KeyError:
218 219 pass
219 220
220 221 try:
221 222 exc._vcs_server_traceback = error['traceback']
222 223 except KeyError:
223 224 pass
224 225
225 226 raise exc
226 227 return response.get('result')
227 228
228 229
229 230 class VcsHttpProxy(object):
230 231
231 232 CHUNK_SIZE = 16384
232 233
233 234 def __init__(self, server_and_port, backend_endpoint):
234 235 adapter = requests.adapters.HTTPAdapter(max_retries=5)
235 236 self.base_url = urlparse.urljoin(
236 237 'http://%s' % server_and_port, backend_endpoint)
237 238 self.session = requests.Session()
238 239 self.session.mount('http://', adapter)
239 240
240 241 def handle(self, environment, input_data, *args, **kwargs):
241 242 data = {
242 243 'environment': environment,
243 244 'input_data': input_data,
244 245 'args': args,
245 246 'kwargs': kwargs
246 247 }
247 248 result = self.session.post(
248 249 self.base_url, msgpack.packb(data), stream=True)
249 250 return self._get_result(result)
250 251
251 252 def _deserialize_and_raise(self, error):
252 253 exception = Exception(error['message'])
253 254 try:
254 255 exception._vcs_kind = error['_vcs_kind']
255 256 except KeyError:
256 257 pass
257 258 raise exception
258 259
259 260 def _iterate(self, result):
260 261 unpacker = msgpack.Unpacker()
261 262 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
262 263 unpacker.feed(line)
263 264 for chunk in unpacker:
264 265 yield chunk
265 266
266 267 def _get_result(self, result):
267 268 iterator = self._iterate(result)
268 269 error = iterator.next()
269 270 if error:
270 271 self._deserialize_and_raise(error)
271 272
272 273 status = iterator.next()
273 274 headers = iterator.next()
274 275
275 276 return iterator, status, headers
276 277
277 278
278 279 class ThreadlocalSessionFactory(object):
279 280 """
280 281 Creates one CurlSession per thread on demand.
281 282 """
282 283
283 284 def __init__(self):
284 285 self._thread_local = threading.local()
285 286
286 287 def __call__(self):
287 288 if not hasattr(self._thread_local, 'curl_session'):
288 289 self._thread_local.curl_session = CurlSession()
289 290 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now