##// END OF EJS Templates
vcs-client: report more data about the call for better request tracking
super-admin -
r4887:ae398e61 default
parent child Browse files
Show More
@@ -1,186 +1,190 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Various version Control System version lib (vcs) management abstraction layer
23 23 for Python. Build with server client architecture.
24 24 """
25 25 import atexit
26 26 import logging
27 27 import urlparse
28 28 from cStringIO import StringIO
29 29
30 30 import rhodecode
31 31 from rhodecode.lib.vcs.conf import settings
32 32 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
33 33 from rhodecode.lib.vcs.exceptions import (
34 34 VCSError, RepositoryError, CommitError, VCSCommunicationError)
35 35
36 36 VERSION = (0, 5, 0, 'dev')
37 37
38 38 __version__ = '.'.join((str(each) for each in VERSION[:4]))
39 39
40 40 __all__ = [
41 41 'get_version', 'get_vcs_instance', 'get_backend',
42 42 'VCSError', 'RepositoryError', 'CommitError', 'VCSCommunicationError'
43 43 ]
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47 # The pycurl library directly accesses C API functions and is not patched by
48 48 # gevent. This will potentially lead to deadlocks due to incompatibility to
49 49 # gevent. Therefore we check if gevent is active and import a gevent compatible
50 50 # wrapper in that case.
51 51 try:
52 52 from gevent import monkey
53 53 if monkey.is_module_patched('__builtin__'):
54 54 import geventcurl as pycurl
55 55 log.debug('Using gevent comapatible pycurl: %s', pycurl)
56 56 else:
57 57 import pycurl
58 58 except ImportError:
59 59 import pycurl
60 60
61 61
62 62 def get_version():
63 63 """
64 64 Returns shorter version (digit parts only) as string.
65 65 """
66 66 return '.'.join((str(each) for each in VERSION[:3]))
67 67
68 68
69 69 def connect_http(server_and_port):
70 70 from rhodecode.lib.vcs import connection, client_http
71 71 from rhodecode.lib.middleware.utils import scm_app
72 72
73 73 session_factory = client_http.ThreadlocalSessionFactory()
74 74
75 75 connection.Git = client_http.RemoteVCSMaker(
76 76 server_and_port, '/git', 'git', session_factory)
77 77 connection.Hg = client_http.RemoteVCSMaker(
78 78 server_and_port, '/hg', 'hg', session_factory)
79 79 connection.Svn = client_http.RemoteVCSMaker(
80 80 server_and_port, '/svn', 'svn', session_factory)
81 81 connection.Service = client_http.ServiceConnection(
82 82 server_and_port, '/_service', session_factory)
83 83
84 84 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
85 85 server_and_port, '/proxy/hg')
86 86 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
87 87 server_and_port, '/proxy/git')
88 88
89 89 @atexit.register
90 90 def free_connection_resources():
91 91 connection.Git = None
92 92 connection.Hg = None
93 93 connection.Svn = None
94 94 connection.Service = None
95 95
96 96
97 97 def connect_vcs(server_and_port, protocol):
98 98 """
99 99 Initializes the connection to the vcs server.
100 100
101 101 :param server_and_port: str, e.g. "localhost:9900"
102 102 :param protocol: str or "http"
103 103 """
104 104 if protocol == 'http':
105 105 connect_http(server_and_port)
106 106 else:
107 107 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
108 108
109 109
110 110 class CurlSession(object):
111 111 """
112 112 Modeled so that it provides a subset of the requests interface.
113 113
114 114 This has been created so that it does only provide a minimal API for our
115 115 needs. The parts which it provides are based on the API of the library
116 116 `requests` which allows us to easily benchmark against it.
117 117
118 118 Please have a look at the class :class:`requests.Session` when you extend
119 119 it.
120 120 """
121 121
122 122 def __init__(self):
123 123 curl = pycurl.Curl()
124 124 # TODO: johbo: I did test with 7.19 of libcurl. This version has
125 125 # trouble with 100 - continue being set in the expect header. This
126 126 # can lead to massive performance drops, switching it off here.
127 curl.setopt(curl.HTTPHEADER, ["Expect:"])
127
128 128 curl.setopt(curl.TCP_NODELAY, True)
129 129 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
130 130 curl.setopt(curl.USERAGENT, 'RhodeCode HTTP {}'.format(rhodecode.__version__))
131 131 curl.setopt(curl.SSL_VERIFYPEER, 0)
132 132 curl.setopt(curl.SSL_VERIFYHOST, 0)
133 133 self._curl = curl
134 134
135 def post(self, url, data, allow_redirects=False):
135 def post(self, url, data, allow_redirects=False, headers=None):
136 headers = headers or {}
137 # format is ['header_name1: header_value1', 'header_name2: header_value2'])
138 headers_list = ["Expect:"] + ['{}: {}'.format(k, v) for k, v in headers.items()]
136 139 response_buffer = StringIO()
137 140
138 141 curl = self._curl
139 142 curl.setopt(curl.URL, url)
140 143 curl.setopt(curl.POST, True)
141 144 curl.setopt(curl.POSTFIELDS, data)
142 145 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
143 146 curl.setopt(curl.WRITEDATA, response_buffer)
147 curl.setopt(curl.HTTPHEADER, headers_list)
144 148 curl.perform()
145 149
146 150 status_code = curl.getinfo(pycurl.HTTP_CODE)
147 151
148 152 return CurlResponse(response_buffer, status_code)
149 153
150 154
151 155 class CurlResponse(object):
152 156 """
153 157 The response of a request, modeled after the requests API.
154 158
155 159 This class provides a subset of the response interface known from the
156 160 library `requests`. It is intentionally kept similar, so that we can use
157 161 `requests` as a drop in replacement for benchmarking purposes.
158 162 """
159 163
160 164 def __init__(self, response_buffer, status_code):
161 165 self._response_buffer = response_buffer
162 166 self._status_code = status_code
163 167
164 168 @property
165 169 def content(self):
166 170 try:
167 171 return self._response_buffer.getvalue()
168 172 finally:
169 173 self._response_buffer.close()
170 174
171 175 @property
172 176 def status_code(self):
173 177 return self._status_code
174 178
175 179 def iter_content(self, chunk_size):
176 180 self._response_buffer.seek(0)
177 181 while 1:
178 182 chunk = self._response_buffer.read(chunk_size)
179 183 if not chunk:
180 184 break
181 185 yield chunk
182 186
183 187
184 188 def _create_http_rpc_session():
185 189 session = CurlSession()
186 190 return session
@@ -1,396 +1,408 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23 """
24 24
25 25 import copy
26 26 import logging
27 27 import threading
28 28 import time
29 29 import urllib2
30 30 import urlparse
31 31 import uuid
32 32 import traceback
33 33
34 34 import pycurl
35 35 import msgpack
36 36 import requests
37 37 from requests.packages.urllib3.util.retry import Retry
38 38
39 39 import rhodecode
40 40 from rhodecode.lib import rc_cache
41 41 from rhodecode.lib.rc_cache.utils import compute_key_from_params
42 42 from rhodecode.lib.system_info import get_cert_path
43 43 from rhodecode.lib.vcs import exceptions, CurlSession
44 44 from rhodecode.lib.utils2 import str2bool
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 # TODO: mikhail: Keep it in sync with vcsserver's
50 50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 51 EXCEPTIONS_MAP = {
52 52 'KeyError': KeyError,
53 53 'URLError': urllib2.URLError,
54 54 }
55 55
56 56
57 57 def _remote_call(url, payload, exceptions_map, session):
58 58 try:
59 response = session.post(url, data=msgpack.packb(payload))
59 headers = {
60 'X-RC-Method': payload.get('method'),
61 'X-RC-Repo-Name': payload.get('_repo_name')
62 }
63 response = session.post(url, data=msgpack.packb(payload), headers=headers)
60 64 except pycurl.error as e:
61 65 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
62 66 raise exceptions.HttpVCSCommunicationError(msg)
63 67 except Exception as e:
64 68 message = getattr(e, 'message', '')
65 69 if 'Failed to connect' in message:
66 70 # gevent doesn't return proper pycurl errors
67 71 raise exceptions.HttpVCSCommunicationError(e)
68 72 else:
69 73 raise
70 74
71 75 if response.status_code >= 400:
72 76 log.error('Call to %s returned non 200 HTTP code: %s',
73 77 url, response.status_code)
74 78 raise exceptions.HttpVCSCommunicationError(repr(response.content))
75 79
76 80 try:
77 81 response = msgpack.unpackb(response.content)
78 82 except Exception:
79 83 log.exception('Failed to decode response %r', response.content)
80 84 raise
81 85
82 86 error = response.get('error')
83 87 if error:
84 88 type_ = error.get('type', 'Exception')
85 89 exc = exceptions_map.get(type_, Exception)
86 90 exc = exc(error.get('message'))
87 91 try:
88 92 exc._vcs_kind = error['_vcs_kind']
89 93 except KeyError:
90 94 pass
91 95
92 96 try:
93 97 exc._vcs_server_traceback = error['traceback']
94 98 exc._vcs_server_org_exc_name = error['org_exc']
95 99 exc._vcs_server_org_exc_tb = error['org_exc_tb']
96 100 except KeyError:
97 101 pass
98 102
99 103 raise exc
100 104 return response.get('result')
101 105
102 106
103 107 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
104 108 try:
105 109 response = session.post(url, data=msgpack.packb(payload))
106 110 except pycurl.error as e:
107 111 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
108 112 raise exceptions.HttpVCSCommunicationError(msg)
109 113 except Exception as e:
110 114 message = getattr(e, 'message', '')
111 115 if 'Failed to connect' in message:
112 116 # gevent doesn't return proper pycurl errors
113 117 raise exceptions.HttpVCSCommunicationError(e)
114 118 else:
115 119 raise
116 120
117 121 if response.status_code >= 400:
118 122 log.error('Call to %s returned non 200 HTTP code: %s',
119 123 url, response.status_code)
120 124 raise exceptions.HttpVCSCommunicationError(repr(response.content))
121 125
122 126 return response.iter_content(chunk_size=chunk_size)
123 127
124 128
125 129 class ServiceConnection(object):
126 130 def __init__(self, server_and_port, backend_endpoint, session_factory):
127 131 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
128 132 self._session_factory = session_factory
129 133
130 134 def __getattr__(self, name):
131 135 def f(*args, **kwargs):
132 136 return self._call(name, *args, **kwargs)
133 137 return f
134 138
135 139 @exceptions.map_vcs_exceptions
136 140 def _call(self, name, *args, **kwargs):
137 141 payload = {
138 142 'id': str(uuid.uuid4()),
139 143 'method': name,
140 144 'params': {'args': args, 'kwargs': kwargs}
141 145 }
142 146 return _remote_call(
143 147 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
144 148
145 149
146 150 class RemoteVCSMaker(object):
147 151
148 152 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
149 153 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
150 154 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
151 155
152 156 self._session_factory = session_factory
153 157 self.backend_type = backend_type
154 158
155 159 @classmethod
156 160 def init_cache_region(cls, repo_id):
157 161 cache_namespace_uid = 'cache_repo.{}'.format(repo_id)
158 162 region = rc_cache.get_or_create_region('cache_repo', cache_namespace_uid)
159 163 return region, cache_namespace_uid
160 164
161 165 def __call__(self, path, repo_id, config, with_wire=None):
162 166 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
163 167 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
164 168
165 169 def __getattr__(self, name):
166 170 def remote_attr(*args, **kwargs):
167 171 return self._call(name, *args, **kwargs)
168 172 return remote_attr
169 173
170 174 @exceptions.map_vcs_exceptions
171 175 def _call(self, func_name, *args, **kwargs):
172 176 payload = {
173 177 'id': str(uuid.uuid4()),
174 178 'method': func_name,
175 179 'backend': self.backend_type,
176 180 'params': {'args': args, 'kwargs': kwargs}
177 181 }
178 182 url = self.url
179 183 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
180 184
181 185
182 186 class RemoteRepo(object):
183 187 CHUNK_SIZE = 16384
184 188
185 189 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
186 190 self.url = remote_maker.url
187 191 self.stream_url = remote_maker.stream_url
188 192 self._session = remote_maker._session_factory()
189 193
190 194 cache_repo_id = self._repo_id_sanitizer(repo_id)
195 _repo_name = self._get_repo_name(config, path)
191 196 self._cache_region, self._cache_namespace = \
192 197 remote_maker.init_cache_region(cache_repo_id)
193 198
194 199 with_wire = with_wire or {}
195 200
196 201 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
202
197 203 self._wire = {
204 "_repo_name": _repo_name,
198 205 "path": path, # repo path
199 206 "repo_id": repo_id,
200 207 "cache_repo_id": cache_repo_id,
201 208 "config": config,
202 209 "repo_state_uid": repo_state_uid,
203 210 "context": self._create_vcs_cache_context(path, repo_state_uid)
204 211 }
205 212
206 213 if with_wire:
207 214 self._wire.update(with_wire)
208 215
209 216 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
210 217 # log.debug brings a few percent gain even if is is not active.
211 218 if log.isEnabledFor(logging.DEBUG):
212 219 self._call_with_logging = True
213 220
214 221 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
215 222
223 def _get_repo_name(self, config, path):
224 repo_store = config.get('paths', '/')
225 return path.split(repo_store)[-1].lstrip('/')
226
216 227 def _repo_id_sanitizer(self, repo_id):
217 228 pathless = repo_id.replace('/', '__').replace('-', '_')
218 229 return ''.join(char if ord(char) < 128 else '_{}_'.format(ord(char)) for char in pathless)
219 230
220 231 def __getattr__(self, name):
221 232
222 233 if name.startswith('stream:'):
223 234 def repo_remote_attr(*args, **kwargs):
224 235 return self._call_stream(name, *args, **kwargs)
225 236 else:
226 237 def repo_remote_attr(*args, **kwargs):
227 238 return self._call(name, *args, **kwargs)
228 239
229 240 return repo_remote_attr
230 241
231 242 def _base_call(self, name, *args, **kwargs):
232 243 # TODO: oliver: This is currently necessary pre-call since the
233 244 # config object is being changed for hooking scenarios
234 245 wire = copy.deepcopy(self._wire)
235 246 wire["config"] = wire["config"].serialize()
236 247 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
237 248
238 249 payload = {
239 250 'id': str(uuid.uuid4()),
240 251 'method': name,
252 "_repo_name": wire['_repo_name'],
241 253 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
242 254 }
243 255
244 256 context_uid = wire.get('context')
245 257 return context_uid, payload
246 258
247 259 def get_local_cache(self, name, args):
248 260 cache_on = False
249 261 cache_key = ''
250 262 local_cache_on = str2bool(rhodecode.CONFIG.get('vcs.methods.cache'))
251 263
252 264 cache_methods = [
253 265 'branches', 'tags', 'bookmarks',
254 266 'is_large_file', 'is_binary',
255 267 'fctx_size', 'stream:fctx_node_data', 'blob_raw_length',
256 268 'node_history',
257 269 'revision', 'tree_items',
258 270 'ctx_list', 'ctx_branch', 'ctx_description',
259 271 'bulk_request',
260 272 'assert_correct_path'
261 273 ]
262 274
263 275 if local_cache_on and name in cache_methods:
264 276 cache_on = True
265 277 repo_state_uid = self._wire['repo_state_uid']
266 278 call_args = [a for a in args]
267 279 cache_key = compute_key_from_params(repo_state_uid, name, *call_args)
268 280
269 281 return cache_on, cache_key
270 282
271 283 @exceptions.map_vcs_exceptions
272 284 def _call(self, name, *args, **kwargs):
273 285 context_uid, payload = self._base_call(name, *args, **kwargs)
274 286 url = self.url
275 287
276 288 start = time.time()
277 289 cache_on, cache_key = self.get_local_cache(name, args)
278 290
279 291 @self._cache_region.conditional_cache_on_arguments(
280 292 namespace=self._cache_namespace, condition=cache_on and cache_key)
281 293 def remote_call(_cache_key):
282 294 if self._call_with_logging:
283 295 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
284 296 url, name, args, context_uid, cache_on)
285 297 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
286 298
287 299 result = remote_call(cache_key)
288 300 if self._call_with_logging:
289 301 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
290 302 url, name, time.time()-start, context_uid)
291 303 return result
292 304
293 305 @exceptions.map_vcs_exceptions
294 306 def _call_stream(self, name, *args, **kwargs):
295 307 context_uid, payload = self._base_call(name, *args, **kwargs)
296 308 payload['chunk_size'] = self.CHUNK_SIZE
297 309 url = self.stream_url
298 310
299 311 start = time.time()
300 312 cache_on, cache_key = self.get_local_cache(name, args)
301 313
302 314 # Cache is a problem because this is a stream
303 315 def streaming_remote_call(_cache_key):
304 316 if self._call_with_logging:
305 317 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s cache_on: %s',
306 318 url, name, args, context_uid, cache_on)
307 319 return _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session, self.CHUNK_SIZE)
308 320
309 321 result = streaming_remote_call(cache_key)
310 322 if self._call_with_logging:
311 323 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
312 324 url, name, time.time()-start, context_uid)
313 325 return result
314 326
315 327 def __getitem__(self, key):
316 328 return self.revision(key)
317 329
318 330 def _create_vcs_cache_context(self, *args):
319 331 """
320 332 Creates a unique string which is passed to the VCSServer on every
321 333 remote call. It is used as cache key in the VCSServer.
322 334 """
323 335 hash_key = '-'.join(map(str, args))
324 336 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
325 337
326 338 def invalidate_vcs_cache(self):
327 339 """
328 340 This invalidates the context which is sent to the VCSServer on every
329 341 call to a remote method. It forces the VCSServer to create a fresh
330 342 repository instance on the next call to a remote method.
331 343 """
332 344 self._wire['context'] = str(uuid.uuid4())
333 345
334 346
335 347 class VcsHttpProxy(object):
336 348
337 349 CHUNK_SIZE = 16384
338 350
339 351 def __init__(self, server_and_port, backend_endpoint):
340 352 retries = Retry(total=5, connect=None, read=None, redirect=None)
341 353
342 354 adapter = requests.adapters.HTTPAdapter(max_retries=retries)
343 355 self.base_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
344 356 self.session = requests.Session()
345 357 self.session.mount('http://', adapter)
346 358
347 359 def handle(self, environment, input_data, *args, **kwargs):
348 360 data = {
349 361 'environment': environment,
350 362 'input_data': input_data,
351 363 'args': args,
352 364 'kwargs': kwargs
353 365 }
354 366 result = self.session.post(
355 367 self.base_url, msgpack.packb(data), stream=True)
356 368 return self._get_result(result)
357 369
358 370 def _deserialize_and_raise(self, error):
359 371 exception = Exception(error['message'])
360 372 try:
361 373 exception._vcs_kind = error['_vcs_kind']
362 374 except KeyError:
363 375 pass
364 376 raise exception
365 377
366 378 def _iterate(self, result):
367 379 unpacker = msgpack.Unpacker()
368 380 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
369 381 unpacker.feed(line)
370 382 for chunk in unpacker:
371 383 yield chunk
372 384
373 385 def _get_result(self, result):
374 386 iterator = self._iterate(result)
375 387 error = iterator.next()
376 388 if error:
377 389 self._deserialize_and_raise(error)
378 390
379 391 status = iterator.next()
380 392 headers = iterator.next()
381 393
382 394 return iterator, status, headers
383 395
384 396
385 397 class ThreadlocalSessionFactory(object):
386 398 """
387 399 Creates one CurlSession per thread on demand.
388 400 """
389 401
390 402 def __init__(self):
391 403 self._thread_local = threading.local()
392 404
393 405 def __call__(self):
394 406 if not hasattr(self._thread_local, 'curl_session'):
395 407 self._thread_local.curl_session = CurlSession()
396 408 return self._thread_local.curl_session
@@ -1,133 +1,135 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22
23 23 import mock
24 24 import msgpack
25 25 import pytest
26 26
27 27 from rhodecode.lib import vcs
28 28 from rhodecode.lib.vcs import client_http, exceptions
29 29
30 30
31 31 def is_new_connection(logger, level, message):
32 32 return (
33 33 logger == 'requests.packages.urllib3.connectionpool' and
34 34 message.startswith('Starting new HTTP'))
35 35
36 36
37 37 @pytest.fixture()
38 38 def stub_session():
39 39 """
40 40 Stub of `requests.Session()`.
41 41 """
42 42 session = mock.Mock()
43 43 post = session.post()
44 44 post.content = msgpack.packb({})
45 45 post.status_code = 200
46 46
47 47 session.reset_mock()
48 48 return session
49 49
50 50
51 51 @pytest.fixture()
52 52 def stub_fail_session():
53 53 """
54 54 Stub of `requests.Session()`.
55 55 """
56 56 session = mock.Mock()
57 57 post = session.post()
58 58 post.content = msgpack.packb({'error': '500'})
59 59 post.status_code = 500
60 60
61 61 session.reset_mock()
62 62 return session
63 63
64 64
65 65 @pytest.fixture()
66 66 def stub_session_factory(stub_session):
67 67 """
68 68 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
69 69 """
70 70 session_factory = mock.Mock()
71 71 session_factory.return_value = stub_session
72 72 return session_factory
73 73
74 74
75 75 @pytest.fixture()
76 76 def stub_session_failing_factory(stub_fail_session):
77 77 """
78 78 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
79 79 """
80 80 session_factory = mock.Mock()
81 81 session_factory.return_value = stub_fail_session
82 82 return session_factory
83 83
84 84
85 85 def test_uses_persistent_http_connections(caplog, vcsbackend_hg):
86 86 repo = vcsbackend_hg.repo
87 87 remote_call = repo._remote.branches
88 88
89 89 with caplog.at_level(logging.INFO):
90 90 for x in range(5):
91 91 remote_call(normal=True, closed=False)
92 92
93 93 new_connections = [
94 94 r for r in caplog.record_tuples if is_new_connection(*r)]
95 95 assert len(new_connections) <= 1
96 96
97 97
98 98 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
99 99 repo_maker = client_http.RemoteVCSMaker(
100 100 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
101 101 repo_maker.example_call()
102 102 stub_session_factory().post.assert_called_with(
103 'http://server_and_port/endpoint', data=mock.ANY)
103 'http://server_and_port/endpoint', data=mock.ANY,
104 headers={'X-RC-Method': 'example_call', 'X-RC-Repo-Name': None})
104 105
105 106
106 107 def test_repo_maker_uses_session_for_instance_methods(
107 108 stub_session_factory, config):
108 109 repo_maker = client_http.RemoteVCSMaker(
109 110 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
110 111 repo = repo_maker('stub_path', 'stub_repo_id', config)
111 112 repo.example_call()
112 113 stub_session_factory().post.assert_called_with(
113 'http://server_and_port/endpoint', data=mock.ANY)
114 'http://server_and_port/endpoint', data=mock.ANY,
115 headers={'X-RC-Method': 'example_call', 'X-RC-Repo-Name': 'stub_path'})
114 116
115 117
116 118 @mock.patch('rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory')
117 119 @mock.patch('rhodecode.lib.vcs.connection')
118 120 def test_connect_passes_in_the_same_session(
119 121 connection, session_factory_class, stub_session):
120 122 session_factory = session_factory_class.return_value
121 123 session_factory.return_value = stub_session
122 124
123 125 vcs.connect_http('server_and_port')
124 126
125 127
126 128 def test_repo_maker_uses_session_that_throws_error(
127 129 stub_session_failing_factory, config):
128 130 repo_maker = client_http.RemoteVCSMaker(
129 131 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_failing_factory)
130 132 repo = repo_maker('stub_path', 'stub_repo_id', config)
131 133
132 134 with pytest.raises(exceptions.HttpVCSCommunicationError):
133 135 repo.example_call()
General Comments 0
You need to be logged in to leave comments. Login now