##// END OF EJS Templates
git: improve logging of git remote commands.
marcink -
r1574:aea16388 default
parent child Browse files
Show More
@@ -1,289 +1,290 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31
31
32 import pycurl
32 import pycurl
33 import msgpack
33 import msgpack
34 import requests
34 import requests
35
35
36 from . import exceptions, CurlSession
36 from . import exceptions, CurlSession
37
37
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 # TODO: mikhail: Keep it in sync with vcsserver's
42 # TODO: mikhail: Keep it in sync with vcsserver's
43 # HTTPApplication.ALLOWED_EXCEPTIONS
43 # HTTPApplication.ALLOWED_EXCEPTIONS
44 EXCEPTIONS_MAP = {
44 EXCEPTIONS_MAP = {
45 'KeyError': KeyError,
45 'KeyError': KeyError,
46 'URLError': urllib2.URLError,
46 'URLError': urllib2.URLError,
47 }
47 }
48
48
49
49
50 class RepoMaker(object):
50 class RepoMaker(object):
51
51
52 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
52 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 self.url = urlparse.urljoin(
53 self.url = urlparse.urljoin(
54 'http://%s' % server_and_port, backend_endpoint)
54 'http://%s' % server_and_port, backend_endpoint)
55 self._session_factory = session_factory
55 self._session_factory = session_factory
56 self.backend_type = backend_type
56 self.backend_type = backend_type
57
57
58 def __call__(self, path, config, with_wire=None):
58 def __call__(self, path, config, with_wire=None):
59 log.debug('RepoMaker call on %s', path)
59 log.debug('RepoMaker call on %s', path)
60 return RemoteRepo(
60 return RemoteRepo(
61 path, config, self.url, self._session_factory(),
61 path, config, self.url, self._session_factory(),
62 with_wire=with_wire)
62 with_wire=with_wire)
63
63
64 def __getattr__(self, name):
64 def __getattr__(self, name):
65 def f(*args, **kwargs):
65 def f(*args, **kwargs):
66 return self._call(name, *args, **kwargs)
66 return self._call(name, *args, **kwargs)
67 return f
67 return f
68
68
69 @exceptions.map_vcs_exceptions
69 @exceptions.map_vcs_exceptions
70 def _call(self, name, *args, **kwargs):
70 def _call(self, name, *args, **kwargs):
71 payload = {
71 payload = {
72 'id': str(uuid.uuid4()),
72 'id': str(uuid.uuid4()),
73 'method': name,
73 'method': name,
74 'backend': self.backend_type,
74 'backend': self.backend_type,
75 'params': {'args': args, 'kwargs': kwargs}
75 'params': {'args': args, 'kwargs': kwargs}
76 }
76 }
77 return _remote_call(
77 return _remote_call(
78 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
78 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79
79
80
80
81 class ServiceConnection(object):
81 class ServiceConnection(object):
82 def __init__(self, server_and_port, backend_endpoint, session_factory):
82 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 self.url = urlparse.urljoin(
83 self.url = urlparse.urljoin(
84 'http://%s' % server_and_port, backend_endpoint)
84 'http://%s' % server_and_port, backend_endpoint)
85 self._session_factory = session_factory
85 self._session_factory = session_factory
86
86
87 def __getattr__(self, name):
87 def __getattr__(self, name):
88 def f(*args, **kwargs):
88 def f(*args, **kwargs):
89 return self._call(name, *args, **kwargs)
89 return self._call(name, *args, **kwargs)
90
90
91 return f
91 return f
92
92
93 @exceptions.map_vcs_exceptions
93 @exceptions.map_vcs_exceptions
94 def _call(self, name, *args, **kwargs):
94 def _call(self, name, *args, **kwargs):
95 payload = {
95 payload = {
96 'id': str(uuid.uuid4()),
96 'id': str(uuid.uuid4()),
97 'method': name,
97 'method': name,
98 'params': {'args': args, 'kwargs': kwargs}
98 'params': {'args': args, 'kwargs': kwargs}
99 }
99 }
100 return _remote_call(
100 return _remote_call(
101 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
101 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102
102
103
103
104 class RemoteRepo(object):
104 class RemoteRepo(object):
105
105
106 def __init__(self, path, config, url, session, with_wire=None):
106 def __init__(self, path, config, url, session, with_wire=None):
107 self.url = url
107 self.url = url
108 self._session = session
108 self._session = session
109 self._wire = {
109 self._wire = {
110 "path": path,
110 "path": path,
111 "config": config,
111 "config": config,
112 "context": self._create_vcs_cache_context(),
112 "context": self._create_vcs_cache_context(),
113 }
113 }
114 if with_wire:
114 if with_wire:
115 self._wire.update(with_wire)
115 self._wire.update(with_wire)
116
116
117 # johbo: Trading complexity for performance. Avoiding the call to
117 # johbo: Trading complexity for performance. Avoiding the call to
118 # log.debug brings a few percent gain even if is is not active.
118 # log.debug brings a few percent gain even if is is not active.
119 if log.isEnabledFor(logging.DEBUG):
119 if log.isEnabledFor(logging.DEBUG):
120 self._call = self._call_with_logging
120 self._call = self._call_with_logging
121
121
122 def __getattr__(self, name):
122 def __getattr__(self, name):
123 def f(*args, **kwargs):
123 def f(*args, **kwargs):
124 return self._call(name, *args, **kwargs)
124 return self._call(name, *args, **kwargs)
125 return f
125 return f
126
126
127 @exceptions.map_vcs_exceptions
127 @exceptions.map_vcs_exceptions
128 def _call(self, name, *args, **kwargs):
128 def _call(self, name, *args, **kwargs):
129 # TODO: oliver: This is currently necessary pre-call since the
129 # TODO: oliver: This is currently necessary pre-call since the
130 # config object is being changed for hooking scenarios
130 # config object is being changed for hooking scenarios
131 wire = copy.deepcopy(self._wire)
131 wire = copy.deepcopy(self._wire)
132 wire["config"] = wire["config"].serialize()
132 wire["config"] = wire["config"].serialize()
133 payload = {
133 payload = {
134 'id': str(uuid.uuid4()),
134 'id': str(uuid.uuid4()),
135 'method': name,
135 'method': name,
136 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
136 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 }
137 }
138 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
138 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139
139
140 def _call_with_logging(self, name, *args, **kwargs):
140 def _call_with_logging(self, name, *args, **kwargs):
141 log.debug('Calling %s@%s', self.url, name)
141
142 log.debug('Calling %s@%s with args:%r', self.url, name, args)
142 return RemoteRepo._call(self, name, *args, **kwargs)
143 return RemoteRepo._call(self, name, *args, **kwargs)
143
144
144 def __getitem__(self, key):
145 def __getitem__(self, key):
145 return self.revision(key)
146 return self.revision(key)
146
147
147 def _create_vcs_cache_context(self):
148 def _create_vcs_cache_context(self):
148 """
149 """
149 Creates a unique string which is passed to the VCSServer on every
150 Creates a unique string which is passed to the VCSServer on every
150 remote call. It is used as cache key in the VCSServer.
151 remote call. It is used as cache key in the VCSServer.
151 """
152 """
152 return str(uuid.uuid4())
153 return str(uuid.uuid4())
153
154
154 def invalidate_vcs_cache(self):
155 def invalidate_vcs_cache(self):
155 """
156 """
156 This invalidates the context which is sent to the VCSServer on every
157 This invalidates the context which is sent to the VCSServer on every
157 call to a remote method. It forces the VCSServer to create a fresh
158 call to a remote method. It forces the VCSServer to create a fresh
158 repository instance on the next call to a remote method.
159 repository instance on the next call to a remote method.
159 """
160 """
160 self._wire['context'] = self._create_vcs_cache_context()
161 self._wire['context'] = self._create_vcs_cache_context()
161
162
162
163
163 class RemoteObject(object):
164 class RemoteObject(object):
164
165
165 def __init__(self, url, session):
166 def __init__(self, url, session):
166 self._url = url
167 self._url = url
167 self._session = session
168 self._session = session
168
169
169 # johbo: Trading complexity for performance. Avoiding the call to
170 # johbo: Trading complexity for performance. Avoiding the call to
170 # log.debug brings a few percent gain even if is is not active.
171 # log.debug brings a few percent gain even if is is not active.
171 if log.isEnabledFor(logging.DEBUG):
172 if log.isEnabledFor(logging.DEBUG):
172 self._call = self._call_with_logging
173 self._call = self._call_with_logging
173
174
174 def __getattr__(self, name):
175 def __getattr__(self, name):
175 def f(*args, **kwargs):
176 def f(*args, **kwargs):
176 return self._call(name, *args, **kwargs)
177 return self._call(name, *args, **kwargs)
177 return f
178 return f
178
179
179 @exceptions.map_vcs_exceptions
180 @exceptions.map_vcs_exceptions
180 def _call(self, name, *args, **kwargs):
181 def _call(self, name, *args, **kwargs):
181 payload = {
182 payload = {
182 'id': str(uuid.uuid4()),
183 'id': str(uuid.uuid4()),
183 'method': name,
184 'method': name,
184 'params': {'args': args, 'kwargs': kwargs}
185 'params': {'args': args, 'kwargs': kwargs}
185 }
186 }
186 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
187 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
187
188
188 def _call_with_logging(self, name, *args, **kwargs):
189 def _call_with_logging(self, name, *args, **kwargs):
189 log.debug('Calling %s@%s', self._url, name)
190 log.debug('Calling %s@%s', self._url, name)
190 return RemoteObject._call(self, name, *args, **kwargs)
191 return RemoteObject._call(self, name, *args, **kwargs)
191
192
192
193
193 def _remote_call(url, payload, exceptions_map, session):
194 def _remote_call(url, payload, exceptions_map, session):
194 try:
195 try:
195 response = session.post(url, data=msgpack.packb(payload))
196 response = session.post(url, data=msgpack.packb(payload))
196 except pycurl.error as e:
197 except pycurl.error as e:
197 raise exceptions.HttpVCSCommunicationError(e)
198 raise exceptions.HttpVCSCommunicationError(e)
198
199
199 if response.status_code >= 400:
200 if response.status_code >= 400:
200 log.error('Call to %s returned non 200 HTTP code: %s',
201 log.error('Call to %s returned non 200 HTTP code: %s',
201 url, response.status_code)
202 url, response.status_code)
202 raise exceptions.HttpVCSCommunicationError(repr(response.content))
203 raise exceptions.HttpVCSCommunicationError(repr(response.content))
203
204
204 try:
205 try:
205 response = msgpack.unpackb(response.content)
206 response = msgpack.unpackb(response.content)
206 except Exception:
207 except Exception:
207 log.exception('Failed to decode response %r', response.content)
208 log.exception('Failed to decode response %r', response.content)
208 raise
209 raise
209
210
210 error = response.get('error')
211 error = response.get('error')
211 if error:
212 if error:
212 type_ = error.get('type', 'Exception')
213 type_ = error.get('type', 'Exception')
213 exc = exceptions_map.get(type_, Exception)
214 exc = exceptions_map.get(type_, Exception)
214 exc = exc(error.get('message'))
215 exc = exc(error.get('message'))
215 try:
216 try:
216 exc._vcs_kind = error['_vcs_kind']
217 exc._vcs_kind = error['_vcs_kind']
217 except KeyError:
218 except KeyError:
218 pass
219 pass
219
220
220 try:
221 try:
221 exc._vcs_server_traceback = error['traceback']
222 exc._vcs_server_traceback = error['traceback']
222 except KeyError:
223 except KeyError:
223 pass
224 pass
224
225
225 raise exc
226 raise exc
226 return response.get('result')
227 return response.get('result')
227
228
228
229
229 class VcsHttpProxy(object):
230 class VcsHttpProxy(object):
230
231
231 CHUNK_SIZE = 16384
232 CHUNK_SIZE = 16384
232
233
233 def __init__(self, server_and_port, backend_endpoint):
234 def __init__(self, server_and_port, backend_endpoint):
234 adapter = requests.adapters.HTTPAdapter(max_retries=5)
235 adapter = requests.adapters.HTTPAdapter(max_retries=5)
235 self.base_url = urlparse.urljoin(
236 self.base_url = urlparse.urljoin(
236 'http://%s' % server_and_port, backend_endpoint)
237 'http://%s' % server_and_port, backend_endpoint)
237 self.session = requests.Session()
238 self.session = requests.Session()
238 self.session.mount('http://', adapter)
239 self.session.mount('http://', adapter)
239
240
240 def handle(self, environment, input_data, *args, **kwargs):
241 def handle(self, environment, input_data, *args, **kwargs):
241 data = {
242 data = {
242 'environment': environment,
243 'environment': environment,
243 'input_data': input_data,
244 'input_data': input_data,
244 'args': args,
245 'args': args,
245 'kwargs': kwargs
246 'kwargs': kwargs
246 }
247 }
247 result = self.session.post(
248 result = self.session.post(
248 self.base_url, msgpack.packb(data), stream=True)
249 self.base_url, msgpack.packb(data), stream=True)
249 return self._get_result(result)
250 return self._get_result(result)
250
251
251 def _deserialize_and_raise(self, error):
252 def _deserialize_and_raise(self, error):
252 exception = Exception(error['message'])
253 exception = Exception(error['message'])
253 try:
254 try:
254 exception._vcs_kind = error['_vcs_kind']
255 exception._vcs_kind = error['_vcs_kind']
255 except KeyError:
256 except KeyError:
256 pass
257 pass
257 raise exception
258 raise exception
258
259
259 def _iterate(self, result):
260 def _iterate(self, result):
260 unpacker = msgpack.Unpacker()
261 unpacker = msgpack.Unpacker()
261 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
262 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
262 unpacker.feed(line)
263 unpacker.feed(line)
263 for chunk in unpacker:
264 for chunk in unpacker:
264 yield chunk
265 yield chunk
265
266
266 def _get_result(self, result):
267 def _get_result(self, result):
267 iterator = self._iterate(result)
268 iterator = self._iterate(result)
268 error = iterator.next()
269 error = iterator.next()
269 if error:
270 if error:
270 self._deserialize_and_raise(error)
271 self._deserialize_and_raise(error)
271
272
272 status = iterator.next()
273 status = iterator.next()
273 headers = iterator.next()
274 headers = iterator.next()
274
275
275 return iterator, status, headers
276 return iterator, status, headers
276
277
277
278
278 class ThreadlocalSessionFactory(object):
279 class ThreadlocalSessionFactory(object):
279 """
280 """
280 Creates one CurlSession per thread on demand.
281 Creates one CurlSession per thread on demand.
281 """
282 """
282
283
283 def __init__(self):
284 def __init__(self):
284 self._thread_local = threading.local()
285 self._thread_local = threading.local()
285
286
286 def __call__(self):
287 def __call__(self):
287 if not hasattr(self._thread_local, 'curl_session'):
288 if not hasattr(self._thread_local, 'curl_session'):
288 self._thread_local.curl_session = CurlSession()
289 self._thread_local.curl_session = CurlSession()
289 return self._thread_local.curl_session
290 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now