##// END OF EJS Templates
vcs-http: explicitly pass in vcs-type into vcsserver.
marcink -
r1126:e9e0b5e3 default
parent child Browse files
Show More
@@ -1,304 +1,304 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Various version Control System version lib (vcs) management abstraction layer
22 Various version Control System version lib (vcs) management abstraction layer
23 for Python. Build with server client architecture.
23 for Python. Build with server client architecture.
24 """
24 """
25
25
26
26
27 VERSION = (0, 5, 0, 'dev')
27 VERSION = (0, 5, 0, 'dev')
28
28
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30
30
31 __all__ = [
31 __all__ = [
32 'get_version', 'get_vcs_instance', 'get_backend',
32 'get_version', 'get_vcs_instance', 'get_backend',
33 'VCSError', 'RepositoryError', 'CommitError'
33 'VCSError', 'RepositoryError', 'CommitError'
34 ]
34 ]
35
35
36 import atexit
36 import atexit
37 import logging
37 import logging
38 import subprocess32
38 import subprocess32
39 import time
39 import time
40 import urlparse
40 import urlparse
41 from cStringIO import StringIO
41 from cStringIO import StringIO
42
42
43 import Pyro4
43 import Pyro4
44 from Pyro4.errors import CommunicationError
44 from Pyro4.errors import CommunicationError
45
45
46 from rhodecode.lib.vcs.conf import settings
46 from rhodecode.lib.vcs.conf import settings
47 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
47 from rhodecode.lib.vcs.backends import get_vcs_instance, get_backend
48 from rhodecode.lib.vcs.exceptions import (
48 from rhodecode.lib.vcs.exceptions import (
49 VCSError, RepositoryError, CommitError, VCSCommunicationError)
49 VCSError, RepositoryError, CommitError, VCSCommunicationError)
50
50
51 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
52
52
53 # The pycurl library directly accesses C API functions and is not patched by
53 # The pycurl library directly accesses C API functions and is not patched by
54 # gevent. This will potentially lead to deadlocks due to incompatibility to
54 # gevent. This will potentially lead to deadlocks due to incompatibility to
55 # gevent. Therefore we check if gevent is active and import a gevent compatible
55 # gevent. Therefore we check if gevent is active and import a gevent compatible
56 # wrapper in that case.
56 # wrapper in that case.
57 try:
57 try:
58 from gevent import monkey
58 from gevent import monkey
59 if monkey.is_module_patched('__builtin__'):
59 if monkey.is_module_patched('__builtin__'):
60 import geventcurl as pycurl
60 import geventcurl as pycurl
61 log.debug('Using gevent comapatible pycurl: %s', pycurl)
61 log.debug('Using gevent comapatible pycurl: %s', pycurl)
62 else:
62 else:
63 import pycurl
63 import pycurl
64 except ImportError:
64 except ImportError:
65 import pycurl
65 import pycurl
66
66
67
67
68 def get_version():
68 def get_version():
69 """
69 """
70 Returns shorter version (digit parts only) as string.
70 Returns shorter version (digit parts only) as string.
71 """
71 """
72 return '.'.join((str(each) for each in VERSION[:3]))
72 return '.'.join((str(each) for each in VERSION[:3]))
73
73
74
74
75 def connect_pyro4(server_and_port):
75 def connect_pyro4(server_and_port):
76 from rhodecode.lib.vcs import connection, client
76 from rhodecode.lib.vcs import connection, client
77 from rhodecode.lib.middleware.utils import scm_app
77 from rhodecode.lib.middleware.utils import scm_app
78
78
79 git_remote = client.RequestScopeProxyFactory(
79 git_remote = client.RequestScopeProxyFactory(
80 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
80 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
81 hg_remote = client.RequestScopeProxyFactory(
81 hg_remote = client.RequestScopeProxyFactory(
82 settings.pyro_remote(settings.PYRO_HG, server_and_port))
82 settings.pyro_remote(settings.PYRO_HG, server_and_port))
83 svn_remote = client.RequestScopeProxyFactory(
83 svn_remote = client.RequestScopeProxyFactory(
84 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
84 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
85
85
86 connection.Git = client.RepoMaker(proxy_factory=git_remote)
86 connection.Git = client.RepoMaker(proxy_factory=git_remote)
87 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
87 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
88 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
88 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
89
89
90 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
90 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
91 settings.pyro_remote(
91 settings.pyro_remote(
92 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
92 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
93 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
93 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
94 settings.pyro_remote(
94 settings.pyro_remote(
95 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
95 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
96
96
97 @atexit.register
97 @atexit.register
98 def free_connection_resources():
98 def free_connection_resources():
99 connection.Git = None
99 connection.Git = None
100 connection.Hg = None
100 connection.Hg = None
101 connection.Svn = None
101 connection.Svn = None
102 connection.Service = None
102 connection.Service = None
103
103
104
104
105 def connect_http(server_and_port):
105 def connect_http(server_and_port):
106 from rhodecode.lib.vcs import connection, client_http
106 from rhodecode.lib.vcs import connection, client_http
107 from rhodecode.lib.middleware.utils import scm_app
107 from rhodecode.lib.middleware.utils import scm_app
108
108
109 session_factory = client_http.ThreadlocalSessionFactory()
109 session_factory = client_http.ThreadlocalSessionFactory()
110
110
111 connection.Git = client_http.RepoMaker(
111 connection.Git = client_http.RepoMaker(
112 server_and_port, '/git', session_factory)
112 server_and_port, '/git', 'git', session_factory)
113 connection.Hg = client_http.RepoMaker(
113 connection.Hg = client_http.RepoMaker(
114 server_and_port, '/hg', session_factory)
114 server_and_port, '/hg', 'hg', session_factory)
115 connection.Svn = client_http.RepoMaker(
115 connection.Svn = client_http.RepoMaker(
116 server_and_port, '/svn', session_factory)
116 server_and_port, '/svn', 'svn', session_factory)
117 connection.Service = client_http.ServiceConnection(
117 connection.Service = client_http.ServiceConnection(
118 server_and_port, '/_service', session_factory)
118 server_and_port, '/_service', session_factory)
119
119
120 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
120 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
121 server_and_port, '/proxy/hg')
121 server_and_port, '/proxy/hg')
122 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
122 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
123 server_and_port, '/proxy/git')
123 server_and_port, '/proxy/git')
124
124
125 @atexit.register
125 @atexit.register
126 def free_connection_resources():
126 def free_connection_resources():
127 connection.Git = None
127 connection.Git = None
128 connection.Hg = None
128 connection.Hg = None
129 connection.Svn = None
129 connection.Svn = None
130 connection.Service = None
130 connection.Service = None
131
131
132
132
133 def connect_vcs(server_and_port, protocol):
133 def connect_vcs(server_and_port, protocol):
134 """
134 """
135 Initializes the connection to the vcs server.
135 Initializes the connection to the vcs server.
136
136
137 :param server_and_port: str, e.g. "localhost:9900"
137 :param server_and_port: str, e.g. "localhost:9900"
138 :param protocol: str, "pyro4" or "http"
138 :param protocol: str, "pyro4" or "http"
139 """
139 """
140 if protocol == 'pyro4':
140 if protocol == 'pyro4':
141 connect_pyro4(server_and_port)
141 connect_pyro4(server_and_port)
142 elif protocol == 'http':
142 elif protocol == 'http':
143 connect_http(server_and_port)
143 connect_http(server_and_port)
144 else:
144 else:
145 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
145 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
146
146
147
147
148 # TODO: johbo: This function should be moved into our test suite, there is
148 # TODO: johbo: This function should be moved into our test suite, there is
149 # no reason to support starting the vcsserver in Enterprise itself.
149 # no reason to support starting the vcsserver in Enterprise itself.
150 def start_vcs_server(server_and_port, protocol, log_level=None):
150 def start_vcs_server(server_and_port, protocol, log_level=None):
151 """
151 """
152 Starts the vcs server in a subprocess.
152 Starts the vcs server in a subprocess.
153 """
153 """
154 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
154 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
155 if protocol == 'http':
155 if protocol == 'http':
156 return _start_http_vcs_server(server_and_port, log_level)
156 return _start_http_vcs_server(server_and_port, log_level)
157 elif protocol == 'pyro4':
157 elif protocol == 'pyro4':
158 return _start_pyro4_vcs_server(server_and_port, log_level)
158 return _start_pyro4_vcs_server(server_and_port, log_level)
159 else:
159 else:
160 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
160 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
161
161
162
162
163 def _start_pyro4_vcs_server(server_and_port, log_level=None):
163 def _start_pyro4_vcs_server(server_and_port, log_level=None):
164 _try_to_shutdown_running_server(server_and_port, protocol='pyro4')
164 _try_to_shutdown_running_server(server_and_port, protocol='pyro4')
165 host, port = server_and_port.rsplit(":", 1)
165 host, port = server_and_port.rsplit(":", 1)
166 host = host.strip('[]')
166 host = host.strip('[]')
167 args = [
167 args = [
168 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
168 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
169 '--threadpool', '32']
169 '--threadpool', '32']
170 if log_level:
170 if log_level:
171 args += ['--log-level', log_level]
171 args += ['--log-level', log_level]
172 proc = subprocess32.Popen(args)
172 proc = subprocess32.Popen(args)
173
173
174 def cleanup_server_process():
174 def cleanup_server_process():
175 proc.kill()
175 proc.kill()
176 atexit.register(cleanup_server_process)
176 atexit.register(cleanup_server_process)
177
177
178 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
178 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
179 _wait_until_vcs_server_is_reachable(server)
179 _wait_until_vcs_server_is_reachable(server)
180
180
181
181
182 def _start_http_vcs_server(server_and_port, log_level=None):
182 def _start_http_vcs_server(server_and_port, log_level=None):
183 # TODO: mikhail: shutdown if an http server already runs
183 # TODO: mikhail: shutdown if an http server already runs
184
184
185 host, port = server_and_port.rsplit(":", 1)
185 host, port = server_and_port.rsplit(":", 1)
186 args = [
186 args = [
187 'pserve', 'rhodecode/tests/vcsserver_http.ini',
187 'pserve', 'rhodecode/tests/vcsserver_http.ini',
188 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
188 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
189 proc = subprocess32.Popen(args)
189 proc = subprocess32.Popen(args)
190
190
191 def cleanup_server_process():
191 def cleanup_server_process():
192 proc.kill()
192 proc.kill()
193 atexit.register(cleanup_server_process)
193 atexit.register(cleanup_server_process)
194
194
195 server = create_vcsserver_proxy(server_and_port, protocol='http')
195 server = create_vcsserver_proxy(server_and_port, protocol='http')
196 _wait_until_vcs_server_is_reachable(server)
196 _wait_until_vcs_server_is_reachable(server)
197
197
198
198
199 def _wait_until_vcs_server_is_reachable(server, timeout=40):
199 def _wait_until_vcs_server_is_reachable(server, timeout=40):
200 begin = time.time()
200 begin = time.time()
201 while (time.time() - begin) < timeout:
201 while (time.time() - begin) < timeout:
202 try:
202 try:
203 server.ping()
203 server.ping()
204 return
204 return
205 except (VCSCommunicationError, CommunicationError, pycurl.error):
205 except (VCSCommunicationError, CommunicationError, pycurl.error):
206 log.debug('VCSServer not started yet, retry to connect.')
206 log.debug('VCSServer not started yet, retry to connect.')
207 time.sleep(0.5)
207 time.sleep(0.5)
208 raise Exception(
208 raise Exception(
209 'Starting the VCSServer failed or took more than {} '
209 'Starting the VCSServer failed or took more than {} '
210 'seconds.'.format(timeout))
210 'seconds.'.format(timeout))
211
211
212
212
213 def _try_to_shutdown_running_server(server_and_port, protocol):
213 def _try_to_shutdown_running_server(server_and_port, protocol):
214 server = create_vcsserver_proxy(server_and_port, protocol)
214 server = create_vcsserver_proxy(server_and_port, protocol)
215 try:
215 try:
216 server.shutdown()
216 server.shutdown()
217 except (CommunicationError, pycurl.error):
217 except (CommunicationError, pycurl.error):
218 return
218 return
219
219
220 # TODO: Not sure why this is important, but without it the following start
220 # TODO: Not sure why this is important, but without it the following start
221 # of the server fails.
221 # of the server fails.
222 server = create_vcsserver_proxy(server_and_port, protocol)
222 server = create_vcsserver_proxy(server_and_port, protocol)
223 server.ping()
223 server.ping()
224
224
225
225
226 def create_vcsserver_proxy(server_and_port, protocol):
226 def create_vcsserver_proxy(server_and_port, protocol):
227 if protocol == 'pyro4':
227 if protocol == 'pyro4':
228 return _create_vcsserver_proxy_pyro4(server_and_port)
228 return _create_vcsserver_proxy_pyro4(server_and_port)
229 elif protocol == 'http':
229 elif protocol == 'http':
230 return _create_vcsserver_proxy_http(server_and_port)
230 return _create_vcsserver_proxy_http(server_and_port)
231 else:
231 else:
232 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
232 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
233
233
234
234
235 def _create_vcsserver_proxy_pyro4(server_and_port):
235 def _create_vcsserver_proxy_pyro4(server_and_port):
236 server = Pyro4.Proxy(
236 server = Pyro4.Proxy(
237 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
237 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
238 return server
238 return server
239
239
240
240
241 def _create_vcsserver_proxy_http(server_and_port):
241 def _create_vcsserver_proxy_http(server_and_port):
242 from rhodecode.lib.vcs import client_http
242 from rhodecode.lib.vcs import client_http
243
243
244 session = _create_http_rpc_session()
244 session = _create_http_rpc_session()
245 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
245 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
246 return client_http.RemoteObject(url, session)
246 return client_http.RemoteObject(url, session)
247
247
248
248
249 class CurlSession(object):
249 class CurlSession(object):
250 """
250 """
251 Modeled so that it provides a subset of the requests interface.
251 Modeled so that it provides a subset of the requests interface.
252
252
253 This has been created so that it does only provide a minimal API for our
253 This has been created so that it does only provide a minimal API for our
254 needs. The parts which it provides are based on the API of the library
254 needs. The parts which it provides are based on the API of the library
255 `requests` which allows us to easily benchmark against it.
255 `requests` which allows us to easily benchmark against it.
256
256
257 Please have a look at the class :class:`requests.Session` when you extend
257 Please have a look at the class :class:`requests.Session` when you extend
258 it.
258 it.
259 """
259 """
260
260
261 def __init__(self):
261 def __init__(self):
262 curl = pycurl.Curl()
262 curl = pycurl.Curl()
263 # TODO: johbo: I did test with 7.19 of libcurl. This version has
263 # TODO: johbo: I did test with 7.19 of libcurl. This version has
264 # trouble with 100 - continue being set in the expect header. This
264 # trouble with 100 - continue being set in the expect header. This
265 # can lead to massive performance drops, switching it off here.
265 # can lead to massive performance drops, switching it off here.
266 curl.setopt(curl.HTTPHEADER, ["Expect:"])
266 curl.setopt(curl.HTTPHEADER, ["Expect:"])
267 curl.setopt(curl.TCP_NODELAY, True)
267 curl.setopt(curl.TCP_NODELAY, True)
268 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
268 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
269 self._curl = curl
269 self._curl = curl
270
270
271 def post(self, url, data, allow_redirects=False):
271 def post(self, url, data, allow_redirects=False):
272 response_buffer = StringIO()
272 response_buffer = StringIO()
273
273
274 curl = self._curl
274 curl = self._curl
275 curl.setopt(curl.URL, url)
275 curl.setopt(curl.URL, url)
276 curl.setopt(curl.POST, True)
276 curl.setopt(curl.POST, True)
277 curl.setopt(curl.POSTFIELDS, data)
277 curl.setopt(curl.POSTFIELDS, data)
278 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
278 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
279 curl.setopt(curl.WRITEDATA, response_buffer)
279 curl.setopt(curl.WRITEDATA, response_buffer)
280 curl.perform()
280 curl.perform()
281
281
282 return CurlResponse(response_buffer)
282 return CurlResponse(response_buffer)
283
283
284
284
285 class CurlResponse(object):
285 class CurlResponse(object):
286 """
286 """
287 The response of a request, modeled after the requests API.
287 The response of a request, modeled after the requests API.
288
288
289 This class provides a subset of the response interface known from the
289 This class provides a subset of the response interface known from the
290 library `requests`. It is intentionally kept similar, so that we can use
290 library `requests`. It is intentionally kept similar, so that we can use
291 `requests` as a drop in replacement for benchmarking purposes.
291 `requests` as a drop in replacement for benchmarking purposes.
292 """
292 """
293
293
294 def __init__(self, response_buffer):
294 def __init__(self, response_buffer):
295 self._response_buffer = response_buffer
295 self._response_buffer = response_buffer
296
296
297 @property
297 @property
298 def content(self):
298 def content(self):
299 return self._response_buffer.getvalue()
299 return self._response_buffer.getvalue()
300
300
301
301
302 def _create_http_rpc_session():
302 def _create_http_rpc_session():
303 session = CurlSession()
303 session = CurlSession()
304 return session
304 return session
@@ -1,283 +1,285 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23
23
24
24
25 Status
25 Status
26 ------
26 ------
27
27
28 This client implementation shall eventually replace the Pyro4 based
28 This client implementation shall eventually replace the Pyro4 based
29 implementation.
29 implementation.
30 """
30 """
31
31
32 import copy
32 import copy
33 import logging
33 import logging
34 import threading
34 import threading
35 import urllib2
35 import urllib2
36 import urlparse
36 import urlparse
37 import uuid
37 import uuid
38
38
39 import pycurl
39 import pycurl
40 import msgpack
40 import msgpack
41 import requests
41 import requests
42
42
43 from . import exceptions, CurlSession
43 from . import exceptions, CurlSession
44
44
45
45
46 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
47
47
48
48
49 # TODO: mikhail: Keep it in sync with vcsserver's
49 # TODO: mikhail: Keep it in sync with vcsserver's
50 # HTTPApplication.ALLOWED_EXCEPTIONS
50 # HTTPApplication.ALLOWED_EXCEPTIONS
51 EXCEPTIONS_MAP = {
51 EXCEPTIONS_MAP = {
52 'KeyError': KeyError,
52 'KeyError': KeyError,
53 'URLError': urllib2.URLError,
53 'URLError': urllib2.URLError,
54 }
54 }
55
55
56
56
57 class RepoMaker(object):
57 class RepoMaker(object):
58
58
59 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
60 self.url = urlparse.urljoin(
60 self.url = urlparse.urljoin(
61 'http://%s' % server_and_port, backend_endpoint)
61 'http://%s' % server_and_port, backend_endpoint)
62 self._session_factory = session_factory
62 self._session_factory = session_factory
63 self.backend_type = backend_type
63
64
64 def __call__(self, path, config, with_wire=None):
65 def __call__(self, path, config, with_wire=None):
65 log.debug('RepoMaker call on %s', path)
66 log.debug('RepoMaker call on %s', path)
66 return RemoteRepo(
67 return RemoteRepo(
67 path, config, self.url, self._session_factory(),
68 path, config, self.url, self._session_factory(),
68 with_wire=with_wire)
69 with_wire=with_wire)
69
70
70 def __getattr__(self, name):
71 def __getattr__(self, name):
71 def f(*args, **kwargs):
72 def f(*args, **kwargs):
72 return self._call(name, *args, **kwargs)
73 return self._call(name, *args, **kwargs)
73 return f
74 return f
74
75
75 @exceptions.map_vcs_exceptions
76 @exceptions.map_vcs_exceptions
76 def _call(self, name, *args, **kwargs):
77 def _call(self, name, *args, **kwargs):
77 payload = {
78 payload = {
78 'id': str(uuid.uuid4()),
79 'id': str(uuid.uuid4()),
79 'method': name,
80 'method': name,
81 'backend': self.backend_type,
80 'params': {'args': args, 'kwargs': kwargs}
82 'params': {'args': args, 'kwargs': kwargs}
81 }
83 }
82 return _remote_call(
84 return _remote_call(
83 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
85 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
84
86
85
87
86 class ServiceConnection(object):
88 class ServiceConnection(object):
87 def __init__(self, server_and_port, backend_endpoint, session_factory):
89 def __init__(self, server_and_port, backend_endpoint, session_factory):
88 self.url = urlparse.urljoin(
90 self.url = urlparse.urljoin(
89 'http://%s' % server_and_port, backend_endpoint)
91 'http://%s' % server_and_port, backend_endpoint)
90 self._session_factory = session_factory
92 self._session_factory = session_factory
91
93
92 def __getattr__(self, name):
94 def __getattr__(self, name):
93 def f(*args, **kwargs):
95 def f(*args, **kwargs):
94 return self._call(name, *args, **kwargs)
96 return self._call(name, *args, **kwargs)
95
97
96 return f
98 return f
97
99
98 @exceptions.map_vcs_exceptions
100 @exceptions.map_vcs_exceptions
99 def _call(self, name, *args, **kwargs):
101 def _call(self, name, *args, **kwargs):
100 payload = {
102 payload = {
101 'id': str(uuid.uuid4()),
103 'id': str(uuid.uuid4()),
102 'method': name,
104 'method': name,
103 'params': {'args': args, 'kwargs': kwargs}
105 'params': {'args': args, 'kwargs': kwargs}
104 }
106 }
105 return _remote_call(
107 return _remote_call(
106 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
108 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
107
109
108
110
109 class RemoteRepo(object):
111 class RemoteRepo(object):
110
112
111 def __init__(self, path, config, url, session, with_wire=None):
113 def __init__(self, path, config, url, session, with_wire=None):
112 self.url = url
114 self.url = url
113 self._session = session
115 self._session = session
114 self._wire = {
116 self._wire = {
115 "path": path,
117 "path": path,
116 "config": config,
118 "config": config,
117 "context": self._create_vcs_cache_context(),
119 "context": self._create_vcs_cache_context(),
118 }
120 }
119 if with_wire:
121 if with_wire:
120 self._wire.update(with_wire)
122 self._wire.update(with_wire)
121
123
122 # johbo: Trading complexity for performance. Avoiding the call to
124 # johbo: Trading complexity for performance. Avoiding the call to
123 # log.debug brings a few percent gain even if is is not active.
125 # log.debug brings a few percent gain even if is is not active.
124 if log.isEnabledFor(logging.DEBUG):
126 if log.isEnabledFor(logging.DEBUG):
125 self._call = self._call_with_logging
127 self._call = self._call_with_logging
126
128
127 def __getattr__(self, name):
129 def __getattr__(self, name):
128 def f(*args, **kwargs):
130 def f(*args, **kwargs):
129 return self._call(name, *args, **kwargs)
131 return self._call(name, *args, **kwargs)
130 return f
132 return f
131
133
132 @exceptions.map_vcs_exceptions
134 @exceptions.map_vcs_exceptions
133 def _call(self, name, *args, **kwargs):
135 def _call(self, name, *args, **kwargs):
134 # TODO: oliver: This is currently necessary pre-call since the
136 # TODO: oliver: This is currently necessary pre-call since the
135 # config object is being changed for hooking scenarios
137 # config object is being changed for hooking scenarios
136 wire = copy.deepcopy(self._wire)
138 wire = copy.deepcopy(self._wire)
137 wire["config"] = wire["config"].serialize()
139 wire["config"] = wire["config"].serialize()
138 payload = {
140 payload = {
139 'id': str(uuid.uuid4()),
141 'id': str(uuid.uuid4()),
140 'method': name,
142 'method': name,
141 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
143 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
142 }
144 }
143 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
145 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
144
146
145 def _call_with_logging(self, name, *args, **kwargs):
147 def _call_with_logging(self, name, *args, **kwargs):
146 log.debug('Calling %s@%s', self.url, name)
148 log.debug('Calling %s@%s', self.url, name)
147 return RemoteRepo._call(self, name, *args, **kwargs)
149 return RemoteRepo._call(self, name, *args, **kwargs)
148
150
149 def __getitem__(self, key):
151 def __getitem__(self, key):
150 return self.revision(key)
152 return self.revision(key)
151
153
152 def _create_vcs_cache_context(self):
154 def _create_vcs_cache_context(self):
153 """
155 """
154 Creates a unique string which is passed to the VCSServer on every
156 Creates a unique string which is passed to the VCSServer on every
155 remote call. It is used as cache key in the VCSServer.
157 remote call. It is used as cache key in the VCSServer.
156 """
158 """
157 return str(uuid.uuid4())
159 return str(uuid.uuid4())
158
160
159 def invalidate_vcs_cache(self):
161 def invalidate_vcs_cache(self):
160 """
162 """
161 This invalidates the context which is sent to the VCSServer on every
163 This invalidates the context which is sent to the VCSServer on every
162 call to a remote method. It forces the VCSServer to create a fresh
164 call to a remote method. It forces the VCSServer to create a fresh
163 repository instance on the next call to a remote method.
165 repository instance on the next call to a remote method.
164 """
166 """
165 self._wire['context'] = self._create_vcs_cache_context()
167 self._wire['context'] = self._create_vcs_cache_context()
166
168
167
169
168 class RemoteObject(object):
170 class RemoteObject(object):
169
171
170 def __init__(self, url, session):
172 def __init__(self, url, session):
171 self._url = url
173 self._url = url
172 self._session = session
174 self._session = session
173
175
174 # johbo: Trading complexity for performance. Avoiding the call to
176 # johbo: Trading complexity for performance. Avoiding the call to
175 # log.debug brings a few percent gain even if is is not active.
177 # log.debug brings a few percent gain even if is is not active.
176 if log.isEnabledFor(logging.DEBUG):
178 if log.isEnabledFor(logging.DEBUG):
177 self._call = self._call_with_logging
179 self._call = self._call_with_logging
178
180
179 def __getattr__(self, name):
181 def __getattr__(self, name):
180 def f(*args, **kwargs):
182 def f(*args, **kwargs):
181 return self._call(name, *args, **kwargs)
183 return self._call(name, *args, **kwargs)
182 return f
184 return f
183
185
184 @exceptions.map_vcs_exceptions
186 @exceptions.map_vcs_exceptions
185 def _call(self, name, *args, **kwargs):
187 def _call(self, name, *args, **kwargs):
186 payload = {
188 payload = {
187 'id': str(uuid.uuid4()),
189 'id': str(uuid.uuid4()),
188 'method': name,
190 'method': name,
189 'params': {'args': args, 'kwargs': kwargs}
191 'params': {'args': args, 'kwargs': kwargs}
190 }
192 }
191 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
193 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
192
194
193 def _call_with_logging(self, name, *args, **kwargs):
195 def _call_with_logging(self, name, *args, **kwargs):
194 log.debug('Calling %s@%s', self._url, name)
196 log.debug('Calling %s@%s', self._url, name)
195 return RemoteObject._call(self, name, *args, **kwargs)
197 return RemoteObject._call(self, name, *args, **kwargs)
196
198
197
199
198 def _remote_call(url, payload, exceptions_map, session):
200 def _remote_call(url, payload, exceptions_map, session):
199 try:
201 try:
200 response = session.post(url, data=msgpack.packb(payload))
202 response = session.post(url, data=msgpack.packb(payload))
201 except pycurl.error as e:
203 except pycurl.error as e:
202 raise exceptions.HttpVCSCommunicationError(e)
204 raise exceptions.HttpVCSCommunicationError(e)
203
205
204 try:
206 try:
205 response = msgpack.unpackb(response.content)
207 response = msgpack.unpackb(response.content)
206 except Exception:
208 except Exception:
207 log.exception('Failed to decode repsponse %r', response.content)
209 log.exception('Failed to decode repsponse %r', response.content)
208 raise
210 raise
209
211
210 error = response.get('error')
212 error = response.get('error')
211 if error:
213 if error:
212 type_ = error.get('type', 'Exception')
214 type_ = error.get('type', 'Exception')
213 exc = exceptions_map.get(type_, Exception)
215 exc = exceptions_map.get(type_, Exception)
214 exc = exc(error.get('message'))
216 exc = exc(error.get('message'))
215 try:
217 try:
216 exc._vcs_kind = error['_vcs_kind']
218 exc._vcs_kind = error['_vcs_kind']
217 except KeyError:
219 except KeyError:
218 pass
220 pass
219 raise exc
221 raise exc
220 return response.get('result')
222 return response.get('result')
221
223
222
224
223 class VcsHttpProxy(object):
225 class VcsHttpProxy(object):
224
226
225 CHUNK_SIZE = 16384
227 CHUNK_SIZE = 16384
226
228
227 def __init__(self, server_and_port, backend_endpoint):
229 def __init__(self, server_and_port, backend_endpoint):
228 adapter = requests.adapters.HTTPAdapter(max_retries=5)
230 adapter = requests.adapters.HTTPAdapter(max_retries=5)
229 self.base_url = urlparse.urljoin(
231 self.base_url = urlparse.urljoin(
230 'http://%s' % server_and_port, backend_endpoint)
232 'http://%s' % server_and_port, backend_endpoint)
231 self.session = requests.Session()
233 self.session = requests.Session()
232 self.session.mount('http://', adapter)
234 self.session.mount('http://', adapter)
233
235
234 def handle(self, environment, input_data, *args, **kwargs):
236 def handle(self, environment, input_data, *args, **kwargs):
235 data = {
237 data = {
236 'environment': environment,
238 'environment': environment,
237 'input_data': input_data,
239 'input_data': input_data,
238 'args': args,
240 'args': args,
239 'kwargs': kwargs
241 'kwargs': kwargs
240 }
242 }
241 result = self.session.post(
243 result = self.session.post(
242 self.base_url, msgpack.packb(data), stream=True)
244 self.base_url, msgpack.packb(data), stream=True)
243 return self._get_result(result)
245 return self._get_result(result)
244
246
245 def _deserialize_and_raise(self, error):
247 def _deserialize_and_raise(self, error):
246 exception = Exception(error['message'])
248 exception = Exception(error['message'])
247 try:
249 try:
248 exception._vcs_kind = error['_vcs_kind']
250 exception._vcs_kind = error['_vcs_kind']
249 except KeyError:
251 except KeyError:
250 pass
252 pass
251 raise exception
253 raise exception
252
254
253 def _iterate(self, result):
255 def _iterate(self, result):
254 unpacker = msgpack.Unpacker()
256 unpacker = msgpack.Unpacker()
255 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
257 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
256 unpacker.feed(line)
258 unpacker.feed(line)
257 for chunk in unpacker:
259 for chunk in unpacker:
258 yield chunk
260 yield chunk
259
261
260 def _get_result(self, result):
262 def _get_result(self, result):
261 iterator = self._iterate(result)
263 iterator = self._iterate(result)
262 error = iterator.next()
264 error = iterator.next()
263 if error:
265 if error:
264 self._deserialize_and_raise(error)
266 self._deserialize_and_raise(error)
265
267
266 status = iterator.next()
268 status = iterator.next()
267 headers = iterator.next()
269 headers = iterator.next()
268
270
269 return iterator, status, headers
271 return iterator, status, headers
270
272
271
273
272 class ThreadlocalSessionFactory(object):
274 class ThreadlocalSessionFactory(object):
273 """
275 """
274 Creates one CurlSession per thread on demand.
276 Creates one CurlSession per thread on demand.
275 """
277 """
276
278
277 def __init__(self):
279 def __init__(self):
278 self._thread_local = threading.local()
280 self._thread_local = threading.local()
279
281
280 def __call__(self):
282 def __call__(self):
281 if not hasattr(self._thread_local, 'curl_session'):
283 if not hasattr(self._thread_local, 'curl_session'):
282 self._thread_local.curl_session = CurlSession()
284 self._thread_local.curl_session = CurlSession()
283 return self._thread_local.curl_session
285 return self._thread_local.curl_session
@@ -1,100 +1,100 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import logging
21 import logging
22
22
23 import mock
23 import mock
24 import msgpack
24 import msgpack
25 import pytest
25 import pytest
26
26
27 from rhodecode.lib import vcs
27 from rhodecode.lib import vcs
28 from rhodecode.lib.vcs import client_http
28 from rhodecode.lib.vcs import client_http
29
29
30
30
31 def test_uses_persistent_http_connections(caplog, vcsbackend_hg):
31 def test_uses_persistent_http_connections(caplog, vcsbackend_hg):
32 repo = vcsbackend_hg.repo
32 repo = vcsbackend_hg.repo
33 remote_call = repo._remote.branches
33 remote_call = repo._remote.branches
34
34
35 with caplog.at_level(logging.INFO):
35 with caplog.at_level(logging.INFO):
36 for x in range(5):
36 for x in range(5):
37 remote_call(normal=True, closed=False)
37 remote_call(normal=True, closed=False)
38
38
39 new_connections = [
39 new_connections = [
40 r for r in caplog.record_tuples() if is_new_connection(*r)]
40 r for r in caplog.record_tuples() if is_new_connection(*r)]
41 assert len(new_connections) <= 1
41 assert len(new_connections) <= 1
42
42
43
43
44 def is_new_connection(logger, level, message):
44 def is_new_connection(logger, level, message):
45 return (
45 return (
46 logger == 'requests.packages.urllib3.connectionpool' and
46 logger == 'requests.packages.urllib3.connectionpool' and
47 message.startswith('Starting new HTTP'))
47 message.startswith('Starting new HTTP'))
48
48
49
49
50 @pytest.fixture
50 @pytest.fixture
51 def stub_session():
51 def stub_session():
52 """
52 """
53 Stub of `requests.Session()`.
53 Stub of `requests.Session()`.
54 """
54 """
55 session = mock.Mock()
55 session = mock.Mock()
56 session.post().content = msgpack.packb({})
56 session.post().content = msgpack.packb({})
57 session.reset_mock()
57 session.reset_mock()
58 return session
58 return session
59
59
60
60
61 @pytest.fixture
61 @pytest.fixture
62 def stub_session_factory(stub_session):
62 def stub_session_factory(stub_session):
63 """
63 """
64 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
64 Stub of `rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory`.
65 """
65 """
66 session_factory = mock.Mock()
66 session_factory = mock.Mock()
67 session_factory.return_value = stub_session
67 session_factory.return_value = stub_session
68 return session_factory
68 return session_factory
69
69
70
70
71 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
71 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
72 repo_maker = client_http.RepoMaker(
72 repo_maker = client_http.RepoMaker(
73 'server_and_port', 'endpoint', stub_session_factory)
73 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
74 repo_maker.example_call()
74 repo_maker.example_call()
75 stub_session_factory().post.assert_called_with(
75 stub_session_factory().post.assert_called_with(
76 'http://server_and_port/endpoint', data=mock.ANY)
76 'http://server_and_port/endpoint', data=mock.ANY)
77
77
78
78
79 def test_repo_maker_uses_session_for_instance_methods(
79 def test_repo_maker_uses_session_for_instance_methods(
80 stub_session_factory, config):
80 stub_session_factory, config):
81 repo_maker = client_http.RepoMaker(
81 repo_maker = client_http.RepoMaker(
82 'server_and_port', 'endpoint', stub_session_factory)
82 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
83 repo = repo_maker('stub_path', config)
83 repo = repo_maker('stub_path', config)
84 repo.example_call()
84 repo.example_call()
85 stub_session_factory().post.assert_called_with(
85 stub_session_factory().post.assert_called_with(
86 'http://server_and_port/endpoint', data=mock.ANY)
86 'http://server_and_port/endpoint', data=mock.ANY)
87
87
88
88
89 @mock.patch('rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory')
89 @mock.patch('rhodecode.lib.vcs.client_http.ThreadlocalSessionFactory')
90 @mock.patch('rhodecode.lib.vcs.connection')
90 @mock.patch('rhodecode.lib.vcs.connection')
91 def test_connect_passes_in_the_same_session(
91 def test_connect_passes_in_the_same_session(
92 connection, session_factory_class, stub_session):
92 connection, session_factory_class, stub_session):
93 session_factory = session_factory_class.return_value
93 session_factory = session_factory_class.return_value
94 session_factory.return_value = stub_session
94 session_factory.return_value = stub_session
95
95
96 vcs.connect_http('server_and_port')
96 vcs.connect_http('server_and_port')
97
97
98 assert connection.Hg._session_factory() == stub_session
98 assert connection.Hg._session_factory() == stub_session
99 assert connection.Svn._session_factory() == stub_session
99 assert connection.Svn._session_factory() == stub_session
100 assert connection.Git._session_factory() == stub_session
100 assert connection.Git._session_factory() == stub_session
General Comments 0
You need to be logged in to leave comments. Login now