##// END OF EJS Templates
http-backend: catch errors from HTTP calls that are not raising exceptions, but...
marcink -
r1410:a4133cfc default
parent child Browse files
Show More
@@ -1,241 +1,248 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2017 RhodeCode GmbH
3 # Copyright (C) 2014-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Various version Control System version lib (vcs) management abstraction layer
22 Various version Control System version lib (vcs) management abstraction layer
23 for Python. Build with server client architecture.
23 for Python. Build with server client architecture.
24 """
24 """
25
25
26
26
27 VERSION = (0, 5, 0, 'dev')
27 VERSION = (0, 5, 0, 'dev')
28
28
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30
30
31 __all__ = [
31 __all__ = [
32 'get_version', 'get_vcs_instance', 'get_backend',
32 'get_version', 'get_vcs_instance', 'get_backend',
33 'VCSError', 'RepositoryError', 'CommitError'
33 'VCSError', 'RepositoryError', 'CommitError'
34 ]
34 ]
35
35
36 import atexit
36 import atexit
37 import logging
37 import logging
38 import subprocess32
38 import subprocess32
39 import time
39 import time
40 import urlparse
40 import urlparse
41 from cStringIO import StringIO
41 from cStringIO import StringIO
42
42
43
43
44 from rhodecode.lib.vcs.conf import settings
44 from rhodecode.lib.vcs.conf import settings
45 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
45 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
46 from rhodecode.lib.vcs.exceptions import (
46 from rhodecode.lib.vcs.exceptions import (
47 VCSError, RepositoryError, CommitError, VCSCommunicationError)
47 VCSError, RepositoryError, CommitError, VCSCommunicationError)
48
48
49 log = logging.getLogger(__name__)
49 log = logging.getLogger(__name__)
50
50
51 # The pycurl library directly accesses C API functions and is not patched by
51 # The pycurl library directly accesses C API functions and is not patched by
52 # gevent. This will potentially lead to deadlocks due to incompatibility to
52 # gevent. This will potentially lead to deadlocks due to incompatibility to
53 # gevent. Therefore we check if gevent is active and import a gevent compatible
53 # gevent. Therefore we check if gevent is active and import a gevent compatible
54 # wrapper in that case.
54 # wrapper in that case.
55 try:
55 try:
56 from gevent import monkey
56 from gevent import monkey
57 if monkey.is_module_patched('__builtin__'):
57 if monkey.is_module_patched('__builtin__'):
58 import geventcurl as pycurl
58 import geventcurl as pycurl
59 log.debug('Using gevent comapatible pycurl: %s', pycurl)
59 log.debug('Using gevent comapatible pycurl: %s', pycurl)
60 else:
60 else:
61 import pycurl
61 import pycurl
62 except ImportError:
62 except ImportError:
63 import pycurl
63 import pycurl
64
64
65
65
66 def get_version():
66 def get_version():
67 """
67 """
68 Returns shorter version (digit parts only) as string.
68 Returns shorter version (digit parts only) as string.
69 """
69 """
70 return '.'.join((str(each) for each in VERSION[:3]))
70 return '.'.join((str(each) for each in VERSION[:3]))
71
71
72
72
73 def connect_http(server_and_port):
73 def connect_http(server_and_port):
74 from rhodecode.lib.vcs import connection, client_http
74 from rhodecode.lib.vcs import connection, client_http
75 from rhodecode.lib.middleware.utils import scm_app
75 from rhodecode.lib.middleware.utils import scm_app
76
76
77 session_factory = client_http.ThreadlocalSessionFactory()
77 session_factory = client_http.ThreadlocalSessionFactory()
78
78
79 connection.Git = client_http.RepoMaker(
79 connection.Git = client_http.RepoMaker(
80 server_and_port, '/git', 'git', session_factory)
80 server_and_port, '/git', 'git', session_factory)
81 connection.Hg = client_http.RepoMaker(
81 connection.Hg = client_http.RepoMaker(
82 server_and_port, '/hg', 'hg', session_factory)
82 server_and_port, '/hg', 'hg', session_factory)
83 connection.Svn = client_http.RepoMaker(
83 connection.Svn = client_http.RepoMaker(
84 server_and_port, '/svn', 'svn', session_factory)
84 server_and_port, '/svn', 'svn', session_factory)
85 connection.Service = client_http.ServiceConnection(
85 connection.Service = client_http.ServiceConnection(
86 server_and_port, '/_service', session_factory)
86 server_and_port, '/_service', session_factory)
87
87
88 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
88 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
89 server_and_port, '/proxy/hg')
89 server_and_port, '/proxy/hg')
90 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
90 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
91 server_and_port, '/proxy/git')
91 server_and_port, '/proxy/git')
92
92
93 @atexit.register
93 @atexit.register
94 def free_connection_resources():
94 def free_connection_resources():
95 connection.Git = None
95 connection.Git = None
96 connection.Hg = None
96 connection.Hg = None
97 connection.Svn = None
97 connection.Svn = None
98 connection.Service = None
98 connection.Service = None
99
99
100
100
101 def connect_vcs(server_and_port, protocol):
101 def connect_vcs(server_and_port, protocol):
102 """
102 """
103 Initializes the connection to the vcs server.
103 Initializes the connection to the vcs server.
104
104
105 :param server_and_port: str, e.g. "localhost:9900"
105 :param server_and_port: str, e.g. "localhost:9900"
106 :param protocol: str or "http"
106 :param protocol: str or "http"
107 """
107 """
108 if protocol == 'http':
108 if protocol == 'http':
109 connect_http(server_and_port)
109 connect_http(server_and_port)
110 else:
110 else:
111 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
111 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
112
112
113
113
114 # TODO: johbo: This function should be moved into our test suite, there is
114 # TODO: johbo: This function should be moved into our test suite, there is
115 # no reason to support starting the vcsserver in Enterprise itself.
115 # no reason to support starting the vcsserver in Enterprise itself.
116 def start_vcs_server(server_and_port, protocol, log_level=None):
116 def start_vcs_server(server_and_port, protocol, log_level=None):
117 """
117 """
118 Starts the vcs server in a subprocess.
118 Starts the vcs server in a subprocess.
119 """
119 """
120 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
120 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
121 if protocol == 'http':
121 if protocol == 'http':
122 return _start_http_vcs_server(server_and_port, log_level)
122 return _start_http_vcs_server(server_and_port, log_level)
123 else:
123 else:
124 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
124 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
125
125
126
126
127 def _start_http_vcs_server(server_and_port, log_level=None):
127 def _start_http_vcs_server(server_and_port, log_level=None):
128 # TODO: mikhail: shutdown if an http server already runs
128 # TODO: mikhail: shutdown if an http server already runs
129
129
130 host, port = server_and_port.rsplit(":", 1)
130 host, port = server_and_port.rsplit(":", 1)
131 args = [
131 args = [
132 'pserve', 'rhodecode/tests/vcsserver_http.ini',
132 'pserve', 'rhodecode/tests/vcsserver_http.ini',
133 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
133 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
134 proc = subprocess32.Popen(args)
134 proc = subprocess32.Popen(args)
135
135
136 def cleanup_server_process():
136 def cleanup_server_process():
137 proc.kill()
137 proc.kill()
138 atexit.register(cleanup_server_process)
138 atexit.register(cleanup_server_process)
139
139
140 server = create_vcsserver_proxy(server_and_port, protocol='http')
140 server = create_vcsserver_proxy(server_and_port, protocol='http')
141 _wait_until_vcs_server_is_reachable(server)
141 _wait_until_vcs_server_is_reachable(server)
142
142
143
143
144 def _wait_until_vcs_server_is_reachable(server, timeout=40):
144 def _wait_until_vcs_server_is_reachable(server, timeout=40):
145 begin = time.time()
145 begin = time.time()
146 while (time.time() - begin) < timeout:
146 while (time.time() - begin) < timeout:
147 try:
147 try:
148 server.ping()
148 server.ping()
149 return
149 return
150 except (VCSCommunicationError, pycurl.error):
150 except (VCSCommunicationError, pycurl.error):
151 log.debug('VCSServer not started yet, retry to connect.')
151 log.debug('VCSServer not started yet, retry to connect.')
152 time.sleep(0.5)
152 time.sleep(0.5)
153 raise Exception(
153 raise Exception(
154 'Starting the VCSServer failed or took more than {} '
154 'Starting the VCSServer failed or took more than {} '
155 'seconds.'.format(timeout))
155 'seconds.'.format(timeout))
156
156
157
157
158 def _try_to_shutdown_running_server(server_and_port, protocol):
158 def _try_to_shutdown_running_server(server_and_port, protocol):
159 server = create_vcsserver_proxy(server_and_port, protocol)
159 server = create_vcsserver_proxy(server_and_port, protocol)
160 try:
160 try:
161 server.shutdown()
161 server.shutdown()
162 except pycurl.error:
162 except pycurl.error:
163 return
163 return
164
164
165 # TODO: Not sure why this is important, but without it the following start
165 # TODO: Not sure why this is important, but without it the following start
166 # of the server fails.
166 # of the server fails.
167 server = create_vcsserver_proxy(server_and_port, protocol)
167 server = create_vcsserver_proxy(server_and_port, protocol)
168 server.ping()
168 server.ping()
169
169
170
170
171 def create_vcsserver_proxy(server_and_port, protocol):
171 def create_vcsserver_proxy(server_and_port, protocol):
172 if protocol == 'http':
172 if protocol == 'http':
173 return _create_vcsserver_proxy_http(server_and_port)
173 return _create_vcsserver_proxy_http(server_and_port)
174 else:
174 else:
175 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
175 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
176
176
177
177
178 def _create_vcsserver_proxy_http(server_and_port):
178 def _create_vcsserver_proxy_http(server_and_port):
179 from rhodecode.lib.vcs import client_http
179 from rhodecode.lib.vcs import client_http
180
180
181 session = _create_http_rpc_session()
181 session = _create_http_rpc_session()
182 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
182 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
183 return client_http.RemoteObject(url, session)
183 return client_http.RemoteObject(url, session)
184
184
185
185
186 class CurlSession(object):
186 class CurlSession(object):
187 """
187 """
188 Modeled so that it provides a subset of the requests interface.
188 Modeled so that it provides a subset of the requests interface.
189
189
190 This has been created so that it does only provide a minimal API for our
190 This has been created so that it does only provide a minimal API for our
191 needs. The parts which it provides are based on the API of the library
191 needs. The parts which it provides are based on the API of the library
192 `requests` which allows us to easily benchmark against it.
192 `requests` which allows us to easily benchmark against it.
193
193
194 Please have a look at the class :class:`requests.Session` when you extend
194 Please have a look at the class :class:`requests.Session` when you extend
195 it.
195 it.
196 """
196 """
197
197
198 def __init__(self):
198 def __init__(self):
199 curl = pycurl.Curl()
199 curl = pycurl.Curl()
200 # TODO: johbo: I did test with 7.19 of libcurl. This version has
200 # TODO: johbo: I did test with 7.19 of libcurl. This version has
201 # trouble with 100 - continue being set in the expect header. This
201 # trouble with 100 - continue being set in the expect header. This
202 # can lead to massive performance drops, switching it off here.
202 # can lead to massive performance drops, switching it off here.
203 curl.setopt(curl.HTTPHEADER, ["Expect:"])
203 curl.setopt(curl.HTTPHEADER, ["Expect:"])
204 curl.setopt(curl.TCP_NODELAY, True)
204 curl.setopt(curl.TCP_NODELAY, True)
205 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
205 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
206 self._curl = curl
206 self._curl = curl
207
207
208 def post(self, url, data, allow_redirects=False):
208 def post(self, url, data, allow_redirects=False):
209 response_buffer = StringIO()
209 response_buffer = StringIO()
210
210
211 curl = self._curl
211 curl = self._curl
212 curl.setopt(curl.URL, url)
212 curl.setopt(curl.URL, url)
213 curl.setopt(curl.POST, True)
213 curl.setopt(curl.POST, True)
214 curl.setopt(curl.POSTFIELDS, data)
214 curl.setopt(curl.POSTFIELDS, data)
215 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
215 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
216 curl.setopt(curl.WRITEDATA, response_buffer)
216 curl.setopt(curl.WRITEDATA, response_buffer)
217 curl.perform()
217 curl.perform()
218
218
219 return CurlResponse(response_buffer)
219 status_code = curl.getinfo(pycurl.HTTP_CODE)
220
221 return CurlResponse(response_buffer, status_code)
220
222
221
223
222 class CurlResponse(object):
224 class CurlResponse(object):
223 """
225 """
224 The response of a request, modeled after the requests API.
226 The response of a request, modeled after the requests API.
225
227
226 This class provides a subset of the response interface known from the
228 This class provides a subset of the response interface known from the
227 library `requests`. It is intentionally kept similar, so that we can use
229 library `requests`. It is intentionally kept similar, so that we can use
228 `requests` as a drop in replacement for benchmarking purposes.
230 `requests` as a drop in replacement for benchmarking purposes.
229 """
231 """
230
232
231 def __init__(self, response_buffer):
233 def __init__(self, response_buffer, status_code):
232 self._response_buffer = response_buffer
234 self._response_buffer = response_buffer
235 self._status_code = status_code
233
236
234 @property
237 @property
235 def content(self):
238 def content(self):
236 return self._response_buffer.getvalue()
239 return self._response_buffer.getvalue()
237
240
241 @property
242 def status_code(self):
243 return self._status_code
244
238
245
239 def _create_http_rpc_session():
246 def _create_http_rpc_session():
240 session = CurlSession()
247 session = CurlSession()
241 return session
248 return session
@@ -1,284 +1,289 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23 """
23 """
24
24
25 import copy
25 import copy
26 import logging
26 import logging
27 import threading
27 import threading
28 import urllib2
28 import urllib2
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31
31
32 import pycurl
32 import pycurl
33 import msgpack
33 import msgpack
34 import requests
34 import requests
35
35
36 from . import exceptions, CurlSession
36 from . import exceptions, CurlSession
37
37
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 # TODO: mikhail: Keep it in sync with vcsserver's
42 # TODO: mikhail: Keep it in sync with vcsserver's
43 # HTTPApplication.ALLOWED_EXCEPTIONS
43 # HTTPApplication.ALLOWED_EXCEPTIONS
44 EXCEPTIONS_MAP = {
44 EXCEPTIONS_MAP = {
45 'KeyError': KeyError,
45 'KeyError': KeyError,
46 'URLError': urllib2.URLError,
46 'URLError': urllib2.URLError,
47 }
47 }
48
48
49
49
50 class RepoMaker(object):
50 class RepoMaker(object):
51
51
52 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
52 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
53 self.url = urlparse.urljoin(
53 self.url = urlparse.urljoin(
54 'http://%s' % server_and_port, backend_endpoint)
54 'http://%s' % server_and_port, backend_endpoint)
55 self._session_factory = session_factory
55 self._session_factory = session_factory
56 self.backend_type = backend_type
56 self.backend_type = backend_type
57
57
58 def __call__(self, path, config, with_wire=None):
58 def __call__(self, path, config, with_wire=None):
59 log.debug('RepoMaker call on %s', path)
59 log.debug('RepoMaker call on %s', path)
60 return RemoteRepo(
60 return RemoteRepo(
61 path, config, self.url, self._session_factory(),
61 path, config, self.url, self._session_factory(),
62 with_wire=with_wire)
62 with_wire=with_wire)
63
63
64 def __getattr__(self, name):
64 def __getattr__(self, name):
65 def f(*args, **kwargs):
65 def f(*args, **kwargs):
66 return self._call(name, *args, **kwargs)
66 return self._call(name, *args, **kwargs)
67 return f
67 return f
68
68
69 @exceptions.map_vcs_exceptions
69 @exceptions.map_vcs_exceptions
70 def _call(self, name, *args, **kwargs):
70 def _call(self, name, *args, **kwargs):
71 payload = {
71 payload = {
72 'id': str(uuid.uuid4()),
72 'id': str(uuid.uuid4()),
73 'method': name,
73 'method': name,
74 'backend': self.backend_type,
74 'backend': self.backend_type,
75 'params': {'args': args, 'kwargs': kwargs}
75 'params': {'args': args, 'kwargs': kwargs}
76 }
76 }
77 return _remote_call(
77 return _remote_call(
78 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
78 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
79
79
80
80
81 class ServiceConnection(object):
81 class ServiceConnection(object):
82 def __init__(self, server_and_port, backend_endpoint, session_factory):
82 def __init__(self, server_and_port, backend_endpoint, session_factory):
83 self.url = urlparse.urljoin(
83 self.url = urlparse.urljoin(
84 'http://%s' % server_and_port, backend_endpoint)
84 'http://%s' % server_and_port, backend_endpoint)
85 self._session_factory = session_factory
85 self._session_factory = session_factory
86
86
87 def __getattr__(self, name):
87 def __getattr__(self, name):
88 def f(*args, **kwargs):
88 def f(*args, **kwargs):
89 return self._call(name, *args, **kwargs)
89 return self._call(name, *args, **kwargs)
90
90
91 return f
91 return f
92
92
93 @exceptions.map_vcs_exceptions
93 @exceptions.map_vcs_exceptions
94 def _call(self, name, *args, **kwargs):
94 def _call(self, name, *args, **kwargs):
95 payload = {
95 payload = {
96 'id': str(uuid.uuid4()),
96 'id': str(uuid.uuid4()),
97 'method': name,
97 'method': name,
98 'params': {'args': args, 'kwargs': kwargs}
98 'params': {'args': args, 'kwargs': kwargs}
99 }
99 }
100 return _remote_call(
100 return _remote_call(
101 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
101 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
102
102
103
103
104 class RemoteRepo(object):
104 class RemoteRepo(object):
105
105
106 def __init__(self, path, config, url, session, with_wire=None):
106 def __init__(self, path, config, url, session, with_wire=None):
107 self.url = url
107 self.url = url
108 self._session = session
108 self._session = session
109 self._wire = {
109 self._wire = {
110 "path": path,
110 "path": path,
111 "config": config,
111 "config": config,
112 "context": self._create_vcs_cache_context(),
112 "context": self._create_vcs_cache_context(),
113 }
113 }
114 if with_wire:
114 if with_wire:
115 self._wire.update(with_wire)
115 self._wire.update(with_wire)
116
116
117 # johbo: Trading complexity for performance. Avoiding the call to
117 # johbo: Trading complexity for performance. Avoiding the call to
118 # log.debug brings a few percent gain even if is is not active.
118 # log.debug brings a few percent gain even if is is not active.
119 if log.isEnabledFor(logging.DEBUG):
119 if log.isEnabledFor(logging.DEBUG):
120 self._call = self._call_with_logging
120 self._call = self._call_with_logging
121
121
122 def __getattr__(self, name):
122 def __getattr__(self, name):
123 def f(*args, **kwargs):
123 def f(*args, **kwargs):
124 return self._call(name, *args, **kwargs)
124 return self._call(name, *args, **kwargs)
125 return f
125 return f
126
126
127 @exceptions.map_vcs_exceptions
127 @exceptions.map_vcs_exceptions
128 def _call(self, name, *args, **kwargs):
128 def _call(self, name, *args, **kwargs):
129 # TODO: oliver: This is currently necessary pre-call since the
129 # TODO: oliver: This is currently necessary pre-call since the
130 # config object is being changed for hooking scenarios
130 # config object is being changed for hooking scenarios
131 wire = copy.deepcopy(self._wire)
131 wire = copy.deepcopy(self._wire)
132 wire["config"] = wire["config"].serialize()
132 wire["config"] = wire["config"].serialize()
133 payload = {
133 payload = {
134 'id': str(uuid.uuid4()),
134 'id': str(uuid.uuid4()),
135 'method': name,
135 'method': name,
136 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
136 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
137 }
137 }
138 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
138 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
139
139
140 def _call_with_logging(self, name, *args, **kwargs):
140 def _call_with_logging(self, name, *args, **kwargs):
141 log.debug('Calling %s@%s', self.url, name)
141 log.debug('Calling %s@%s', self.url, name)
142 return RemoteRepo._call(self, name, *args, **kwargs)
142 return RemoteRepo._call(self, name, *args, **kwargs)
143
143
144 def __getitem__(self, key):
144 def __getitem__(self, key):
145 return self.revision(key)
145 return self.revision(key)
146
146
147 def _create_vcs_cache_context(self):
147 def _create_vcs_cache_context(self):
148 """
148 """
149 Creates a unique string which is passed to the VCSServer on every
149 Creates a unique string which is passed to the VCSServer on every
150 remote call. It is used as cache key in the VCSServer.
150 remote call. It is used as cache key in the VCSServer.
151 """
151 """
152 return str(uuid.uuid4())
152 return str(uuid.uuid4())
153
153
154 def invalidate_vcs_cache(self):
154 def invalidate_vcs_cache(self):
155 """
155 """
156 This invalidates the context which is sent to the VCSServer on every
156 This invalidates the context which is sent to the VCSServer on every
157 call to a remote method. It forces the VCSServer to create a fresh
157 call to a remote method. It forces the VCSServer to create a fresh
158 repository instance on the next call to a remote method.
158 repository instance on the next call to a remote method.
159 """
159 """
160 self._wire['context'] = self._create_vcs_cache_context()
160 self._wire['context'] = self._create_vcs_cache_context()
161
161
162
162
163 class RemoteObject(object):
163 class RemoteObject(object):
164
164
165 def __init__(self, url, session):
165 def __init__(self, url, session):
166 self._url = url
166 self._url = url
167 self._session = session
167 self._session = session
168
168
169 # johbo: Trading complexity for performance. Avoiding the call to
169 # johbo: Trading complexity for performance. Avoiding the call to
170 # log.debug brings a few percent gain even if is is not active.
170 # log.debug brings a few percent gain even if is is not active.
171 if log.isEnabledFor(logging.DEBUG):
171 if log.isEnabledFor(logging.DEBUG):
172 self._call = self._call_with_logging
172 self._call = self._call_with_logging
173
173
174 def __getattr__(self, name):
174 def __getattr__(self, name):
175 def f(*args, **kwargs):
175 def f(*args, **kwargs):
176 return self._call(name, *args, **kwargs)
176 return self._call(name, *args, **kwargs)
177 return f
177 return f
178
178
179 @exceptions.map_vcs_exceptions
179 @exceptions.map_vcs_exceptions
180 def _call(self, name, *args, **kwargs):
180 def _call(self, name, *args, **kwargs):
181 payload = {
181 payload = {
182 'id': str(uuid.uuid4()),
182 'id': str(uuid.uuid4()),
183 'method': name,
183 'method': name,
184 'params': {'args': args, 'kwargs': kwargs}
184 'params': {'args': args, 'kwargs': kwargs}
185 }
185 }
186 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
186 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
187
187
188 def _call_with_logging(self, name, *args, **kwargs):
188 def _call_with_logging(self, name, *args, **kwargs):
189 log.debug('Calling %s@%s', self._url, name)
189 log.debug('Calling %s@%s', self._url, name)
190 return RemoteObject._call(self, name, *args, **kwargs)
190 return RemoteObject._call(self, name, *args, **kwargs)
191
191
192
192
193 def _remote_call(url, payload, exceptions_map, session):
193 def _remote_call(url, payload, exceptions_map, session):
194 try:
194 try:
195 response = session.post(url, data=msgpack.packb(payload))
195 response = session.post(url, data=msgpack.packb(payload))
196 except pycurl.error as e:
196 except pycurl.error as e:
197 raise exceptions.HttpVCSCommunicationError(e)
197 raise exceptions.HttpVCSCommunicationError(e)
198
198
199 if response.status_code >= 400:
200 log.error('Call to %s returned non 200 HTTP code: %s',
201 url, response.status_code)
202 raise exceptions.HttpVCSCommunicationError(repr(response.content))
203
199 try:
204 try:
200 response = msgpack.unpackb(response.content)
205 response = msgpack.unpackb(response.content)
201 except Exception:
206 except Exception:
202 log.exception('Failed to decode repsponse %r', response.content)
207 log.exception('Failed to decode response %r', response.content)
203 raise
208 raise
204
209
205 error = response.get('error')
210 error = response.get('error')
206 if error:
211 if error:
207 type_ = error.get('type', 'Exception')
212 type_ = error.get('type', 'Exception')
208 exc = exceptions_map.get(type_, Exception)
213 exc = exceptions_map.get(type_, Exception)
209 exc = exc(error.get('message'))
214 exc = exc(error.get('message'))
210 try:
215 try:
211 exc._vcs_kind = error['_vcs_kind']
216 exc._vcs_kind = error['_vcs_kind']
212 except KeyError:
217 except KeyError:
213 pass
218 pass
214
219
215 try:
220 try:
216 exc._vcs_server_traceback = error['traceback']
221 exc._vcs_server_traceback = error['traceback']
217 except KeyError:
222 except KeyError:
218 pass
223 pass
219
224
220 raise exc
225 raise exc
221 return response.get('result')
226 return response.get('result')
222
227
223
228
224 class VcsHttpProxy(object):
229 class VcsHttpProxy(object):
225
230
226 CHUNK_SIZE = 16384
231 CHUNK_SIZE = 16384
227
232
228 def __init__(self, server_and_port, backend_endpoint):
233 def __init__(self, server_and_port, backend_endpoint):
229 adapter = requests.adapters.HTTPAdapter(max_retries=5)
234 adapter = requests.adapters.HTTPAdapter(max_retries=5)
230 self.base_url = urlparse.urljoin(
235 self.base_url = urlparse.urljoin(
231 'http://%s' % server_and_port, backend_endpoint)
236 'http://%s' % server_and_port, backend_endpoint)
232 self.session = requests.Session()
237 self.session = requests.Session()
233 self.session.mount('http://', adapter)
238 self.session.mount('http://', adapter)
234
239
235 def handle(self, environment, input_data, *args, **kwargs):
240 def handle(self, environment, input_data, *args, **kwargs):
236 data = {
241 data = {
237 'environment': environment,
242 'environment': environment,
238 'input_data': input_data,
243 'input_data': input_data,
239 'args': args,
244 'args': args,
240 'kwargs': kwargs
245 'kwargs': kwargs
241 }
246 }
242 result = self.session.post(
247 result = self.session.post(
243 self.base_url, msgpack.packb(data), stream=True)
248 self.base_url, msgpack.packb(data), stream=True)
244 return self._get_result(result)
249 return self._get_result(result)
245
250
246 def _deserialize_and_raise(self, error):
251 def _deserialize_and_raise(self, error):
247 exception = Exception(error['message'])
252 exception = Exception(error['message'])
248 try:
253 try:
249 exception._vcs_kind = error['_vcs_kind']
254 exception._vcs_kind = error['_vcs_kind']
250 except KeyError:
255 except KeyError:
251 pass
256 pass
252 raise exception
257 raise exception
253
258
254 def _iterate(self, result):
259 def _iterate(self, result):
255 unpacker = msgpack.Unpacker()
260 unpacker = msgpack.Unpacker()
256 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
261 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
257 unpacker.feed(line)
262 unpacker.feed(line)
258 for chunk in unpacker:
263 for chunk in unpacker:
259 yield chunk
264 yield chunk
260
265
261 def _get_result(self, result):
266 def _get_result(self, result):
262 iterator = self._iterate(result)
267 iterator = self._iterate(result)
263 error = iterator.next()
268 error = iterator.next()
264 if error:
269 if error:
265 self._deserialize_and_raise(error)
270 self._deserialize_and_raise(error)
266
271
267 status = iterator.next()
272 status = iterator.next()
268 headers = iterator.next()
273 headers = iterator.next()
269
274
270 return iterator, status, headers
275 return iterator, status, headers
271
276
272
277
273 class ThreadlocalSessionFactory(object):
278 class ThreadlocalSessionFactory(object):
274 """
279 """
275 Creates one CurlSession per thread on demand.
280 Creates one CurlSession per thread on demand.
276 """
281 """
277
282
278 def __init__(self):
283 def __init__(self):
279 self._thread_local = threading.local()
284 self._thread_local = threading.local()
280
285
281 def __call__(self):
286 def __call__(self):
282 if not hasattr(self._thread_local, 'curl_session'):
287 if not hasattr(self._thread_local, 'curl_session'):
283 self._thread_local.curl_session = CurlSession()
288 self._thread_local.curl_session = CurlSession()
284 return self._thread_local.curl_session
289 return self._thread_local.curl_session
@@ -1,96 +1,133 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import logging
21 import logging
22
22
23 import mock
23 import mock
24 import msgpack
24 import msgpack
25 import pytest
25 import pytest
26
26
27 from rhodecode.lib import vcs
27 from rhodecode.lib import vcs
28 from rhodecode.lib.vcs import client_http
28 from rhodecode.lib.vcs import client_http, exceptions
29
30
31 def test_uses_persistent_http_connections(caplog, vcsbackend_hg):
32 repo = vcsbackend_hg.repo
33 remote_call = repo._remote.branches
34
35 with caplog.at_level(logging.INFO):
36 for x in range(5):
37 remote_call(normal=True, closed=False)
38
39 new_connections = [
40 r for r in caplog.record_tuples if is_new_connection(*r)]
41 assert len(new_connections) <= 1
42
29
43
30
44 def is_new_connection(logger, level, message):
31 def is_new_connection(logger, level, message):
45 return (
32 return (
46 logger == 'requests.packages.urllib3.connectionpool' and
33 logger == 'requests.packages.urllib3.connectionpool' and
47 message.startswith('Starting new HTTP'))
34 message.startswith('Starting new HTTP'))
48
35
49
36
50 @pytest.fixture
37 @pytest.fixture
51 def stub_session():
38 def stub_session():
52 """
39 """
53 Stub of `requests.Session()`.
40 Stub of `requests.Session()`.
54 """
41 """
55 session = mock.Mock()
42 session = mock.Mock()
56 session.post().content = msgpack.packb({})
43 post = session.post()
44 post.content = msgpack.packb({})
45 post.status_code = 200
46
47 session.reset_mock()
48 return session
49
50
51 @pytest.fixture
52 def stub_fail_session():
53 """
54 Stub of `requests.Session()`.
55 """
56 session = mock.Mock()
57 post = session.post()
58 post.content = msgpack.packb({'error': '500'})
59 post.status_code = 500
60
57 session.reset_mock()
61 session.reset_mock()
58 return session
62 return session
59
63
60
64
61 @pytest.fixture
65 @pytest.fixture
62 def stub_session_factory(stub_session):
66 def stub_session_factory(stub_session):
63 """
67 """
64 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
68 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
65 """
69 """
66 session_factory = mock.Mock()
70 session_factory = mock.Mock()
67 session_factory.return_value = stub_session
71 session_factory.return_value = stub_session
68 return session_factory
72 return session_factory
69
73
70
74
75 @pytest.fixture
76 def stub_session_failing_factory(stub_fail_session):
77 """
78 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
79 """
80 session_factory = mock.Mock()
81 session_factory.return_value = stub_fail_session
82 return session_factory
83
84
85 def test_uses_persistent_http_connections(caplog, vcsbackend_hg):
86 repo = vcsbackend_hg.repo
87 remote_call = repo._remote.branches
88
89 with caplog.at_level(logging.INFO):
90 for x in range(5):
91 remote_call(normal=True, closed=False)
92
93 new_connections = [
94 r for r in caplog.record_tuples if is_new_connection(*r)]
95 assert len(new_connections) <= 1
96
97
71 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
98 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
72 repo_maker = client_http.RepoMaker(
99 repo_maker = client_http.RepoMaker(
73 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
100 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
74 repo_maker.example_call()
101 repo_maker.example_call()
75 stub_session_factory().post.assert_called_with(
102 stub_session_factory().post.assert_called_with(
76 'http://server_and_port/endpoint', data=mock.ANY)
103 'http://server_and_port/endpoint', data=mock.ANY)
77
104
78
105
79 def test_repo_maker_uses_session_for_instance_methods(
106 def test_repo_maker_uses_session_for_instance_methods(
80 stub_session_factory, config):
107 stub_session_factory, config):
81 repo_maker = client_http.RepoMaker(
108 repo_maker = client_http.RepoMaker(
82 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
109 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
83 repo = repo_maker('stub_path', config)
110 repo = repo_maker('stub_path', config)
84 repo.example_call()
111 repo.example_call()
85 stub_session_factory().post.assert_called_with(
112 stub_session_factory().post.assert_called_with(
86 'http://server_and_port/endpoint', data=mock.ANY)
113 'http://server_and_port/endpoint', data=mock.ANY)
87
114
88
115
89 @mock.patch('rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory')
116 @mock.patch('rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory')
90 @mock.patch('rhodecode.lib.vcs.connection')
117 @mock.patch('rhodecode.lib.vcs.connection')
91 def test_connect_passes_in_the_same_session(
118 def test_connect_passes_in_the_same_session(
92 connection, session_factory_class, stub_session):
119 connection, session_factory_class, stub_session):
93 session_factory = session_factory_class.return_value
120 session_factory = session_factory_class.return_value
94 session_factory.return_value = stub_session
121 session_factory.return_value = stub_session
95
122
96 vcs.connect_http('server_and_port')
123 vcs.connect_http('server_and_port')
124
125
126 def test_repo_maker_uses_session_that_throws_error(
127 stub_session_failing_factory, config):
128 repo_maker = client_http.RepoMaker(
129 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_failing_factory)
130 repo = repo_maker('stub_path', config)
131
132 with pytest.raises(exceptions.HttpVCSCommunicationError):
133 repo.example_call()
General Comments 0
You need to be logged in to leave comments. Login now