##// END OF EJS Templates
threading: Use the curl session factory.
Martin Bornhold -
r244:a99bca28 default
parent child Browse files
Show More
@@ -1,275 +1,278 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Various version Control System version lib (vcs) management abstraction layer
22 Various version Control System version lib (vcs) management abstraction layer
23 for Python. Build with server client architecture.
23 for Python. Build with server client architecture.
24 """
24 """
25
25
26
26
27 VERSION = (0, 5, 0, 'dev')
27 VERSION = (0, 5, 0, 'dev')
28
28
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30
30
31 __all__ = [
31 __all__ = [
32 'get_version', 'get_repo', 'get_backend',
32 'get_version', 'get_repo', 'get_backend',
33 'VCSError', 'RepositoryError', 'CommitError'
33 'VCSError', 'RepositoryError', 'CommitError'
34 ]
34 ]
35
35
36 import atexit
36 import atexit
37 import logging
37 import logging
38 import subprocess
38 import subprocess
39 import time
39 import time
40 import urlparse
40 import urlparse
41 from cStringIO import StringIO
41 from cStringIO import StringIO
42
42
43 import pycurl
43 import pycurl
44 import Pyro4
44 import Pyro4
45 from Pyro4.errors import CommunicationError
45 from Pyro4.errors import CommunicationError
46
46
47 from rhodecode.lib.vcs.conf import settings
47 from rhodecode.lib.vcs.conf import settings
48 from rhodecode.lib.vcs.backends import get_repo, get_backend
48 from rhodecode.lib.vcs.backends import get_repo, get_backend
49 from rhodecode.lib.vcs.exceptions import (
49 from rhodecode.lib.vcs.exceptions import (
50 VCSError, RepositoryError, CommitError)
50 VCSError, RepositoryError, CommitError)
51
51
52
52
53 log = logging.getLogger(__name__)
53 log = logging.getLogger(__name__)
54
54
55
55
56 def get_version():
56 def get_version():
57 """
57 """
58 Returns shorter version (digit parts only) as string.
58 Returns shorter version (digit parts only) as string.
59 """
59 """
60 return '.'.join((str(each) for each in VERSION[:3]))
60 return '.'.join((str(each) for each in VERSION[:3]))
61
61
62
62
63 def connect_pyro4(server_and_port):
63 def connect_pyro4(server_and_port):
64 from rhodecode.lib.vcs import connection, client
64 from rhodecode.lib.vcs import connection, client
65 from rhodecode.lib.middleware.utils import scm_app
65 from rhodecode.lib.middleware.utils import scm_app
66
66
67 git_remote = client.ThreadlocalProxyFactory(
67 git_remote = client.ThreadlocalProxyFactory(
68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
69 hg_remote = client.ThreadlocalProxyFactory(
69 hg_remote = client.ThreadlocalProxyFactory(
70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
71 svn_remote = client.ThreadlocalProxyFactory(
71 svn_remote = client.ThreadlocalProxyFactory(
72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
73
73
74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
77
77
78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
79 settings.pyro_remote(
79 settings.pyro_remote(
80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
82 settings.pyro_remote(
82 settings.pyro_remote(
83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
84
84
85 @atexit.register
85 @atexit.register
86 def free_connection_resources():
86 def free_connection_resources():
87 connection.Git = None
87 connection.Git = None
88 connection.Hg = None
88 connection.Hg = None
89 connection.Svn = None
89 connection.Svn = None
90
90
91
91
92 def connect_http(server_and_port):
92 def connect_http(server_and_port):
93 from rhodecode.lib.vcs import connection, client_http
93 from rhodecode.lib.vcs import connection, client_http
94 from rhodecode.lib.middleware.utils import scm_app
94 from rhodecode.lib.middleware.utils import scm_app
95
95
96 session = _create_http_rpc_session()
96 session_factory = client_http.ThreadlocalSessionFactory()
97
97
98 connection.Git = client_http.RepoMaker(server_and_port, '/git', session)
98 connection.Git = client_http.RepoMaker(
99 connection.Hg = client_http.RepoMaker(server_and_port, '/hg', session)
99 server_and_port, '/git', session_factory)
100 connection.Svn = client_http.RepoMaker(server_and_port, '/svn', session)
100 connection.Hg = client_http.RepoMaker(
101 server_and_port, '/hg', session_factory)
102 connection.Svn = client_http.RepoMaker(
103 server_and_port, '/svn', session_factory)
101
104
102 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
105 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
103 server_and_port, '/proxy/hg')
106 server_and_port, '/proxy/hg')
104 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
107 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
105 server_and_port, '/proxy/git')
108 server_and_port, '/proxy/git')
106
109
107 @atexit.register
110 @atexit.register
108 def free_connection_resources():
111 def free_connection_resources():
109 connection.Git = None
112 connection.Git = None
110 connection.Hg = None
113 connection.Hg = None
111 connection.Svn = None
114 connection.Svn = None
112
115
113
116
114 def connect_vcs(server_and_port, protocol='pyro4'):
117 def connect_vcs(server_and_port, protocol='pyro4'):
115 """
118 """
116 Initializes the connection to the vcs server.
119 Initializes the connection to the vcs server.
117
120
118 :param server_and_port: str, e.g. "localhost:9900"
121 :param server_and_port: str, e.g. "localhost:9900"
119 :param protocol: str, "pyro4" or "http"
122 :param protocol: str, "pyro4" or "http"
120 """
123 """
121 if protocol == 'pyro4':
124 if protocol == 'pyro4':
122 connect_pyro4(server_and_port)
125 connect_pyro4(server_and_port)
123 elif protocol == 'http':
126 elif protocol == 'http':
124 connect_http(server_and_port)
127 connect_http(server_and_port)
125
128
126
129
127 # TODO: johbo: This function should be moved into our test suite, there is
130 # TODO: johbo: This function should be moved into our test suite, there is
128 # no reason to support starting the vcsserver in Enterprise itself.
131 # no reason to support starting the vcsserver in Enterprise itself.
129 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
132 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
130 """
133 """
131 Starts the vcs server in a subprocess.
134 Starts the vcs server in a subprocess.
132 """
135 """
133 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
136 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
134 if protocol == 'http':
137 if protocol == 'http':
135 return _start_http_vcs_server(server_and_port, log_level)
138 return _start_http_vcs_server(server_and_port, log_level)
136 elif protocol == 'pyro4':
139 elif protocol == 'pyro4':
137 return _start_pyro4_vcs_server(server_and_port, log_level)
140 return _start_pyro4_vcs_server(server_and_port, log_level)
138
141
139
142
140 def _start_pyro4_vcs_server(server_and_port, log_level=None):
143 def _start_pyro4_vcs_server(server_and_port, log_level=None):
141 _try_to_shutdown_running_server(server_and_port)
144 _try_to_shutdown_running_server(server_and_port)
142 host, port = server_and_port.rsplit(":", 1)
145 host, port = server_and_port.rsplit(":", 1)
143 host = host.strip('[]')
146 host = host.strip('[]')
144 args = [
147 args = [
145 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
148 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
146 '--threadpool', '32']
149 '--threadpool', '32']
147 if log_level:
150 if log_level:
148 args += ['--log-level', log_level]
151 args += ['--log-level', log_level]
149 proc = subprocess.Popen(args)
152 proc = subprocess.Popen(args)
150
153
151 def cleanup_server_process():
154 def cleanup_server_process():
152 proc.kill()
155 proc.kill()
153 atexit.register(cleanup_server_process)
156 atexit.register(cleanup_server_process)
154
157
155 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
158 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
156 _wait_until_vcs_server_is_reachable(server)
159 _wait_until_vcs_server_is_reachable(server)
157
160
158
161
159 def _start_http_vcs_server(server_and_port, log_level=None):
162 def _start_http_vcs_server(server_and_port, log_level=None):
160 # TODO: mikhail: shutdown if an http server already runs
163 # TODO: mikhail: shutdown if an http server already runs
161
164
162 host, port = server_and_port.rsplit(":", 1)
165 host, port = server_and_port.rsplit(":", 1)
163 args = [
166 args = [
164 'pserve', 'vcsserver/development_pyramid.ini',
167 'pserve', 'vcsserver/development_pyramid.ini',
165 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
168 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
166 proc = subprocess.Popen(args)
169 proc = subprocess.Popen(args)
167
170
168 def cleanup_server_process():
171 def cleanup_server_process():
169 proc.kill()
172 proc.kill()
170 atexit.register(cleanup_server_process)
173 atexit.register(cleanup_server_process)
171
174
172 server = create_vcsserver_proxy(server_and_port, protocol='http')
175 server = create_vcsserver_proxy(server_and_port, protocol='http')
173 _wait_until_vcs_server_is_reachable(server)
176 _wait_until_vcs_server_is_reachable(server)
174
177
175
178
176 def _wait_until_vcs_server_is_reachable(server):
179 def _wait_until_vcs_server_is_reachable(server):
177 while xrange(80): # max 40s of sleep
180 while xrange(80): # max 40s of sleep
178 try:
181 try:
179 server.ping()
182 server.ping()
180 break
183 break
181 except (CommunicationError, pycurl.error):
184 except (CommunicationError, pycurl.error):
182 pass
185 pass
183 time.sleep(0.5)
186 time.sleep(0.5)
184
187
185
188
186 def _try_to_shutdown_running_server(server_and_port):
189 def _try_to_shutdown_running_server(server_and_port):
187 server = create_vcsserver_proxy(server_and_port)
190 server = create_vcsserver_proxy(server_and_port)
188 try:
191 try:
189 server.shutdown()
192 server.shutdown()
190 except (CommunicationError, pycurl.error):
193 except (CommunicationError, pycurl.error):
191 return
194 return
192
195
193 # TODO: Not sure why this is important, but without it the following start
196 # TODO: Not sure why this is important, but without it the following start
194 # of the server fails.
197 # of the server fails.
195 server = create_vcsserver_proxy(server_and_port)
198 server = create_vcsserver_proxy(server_and_port)
196 server.ping()
199 server.ping()
197
200
198
201
199 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
202 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
200 if protocol == 'pyro4':
203 if protocol == 'pyro4':
201 return _create_vcsserver_proxy_pyro4(server_and_port)
204 return _create_vcsserver_proxy_pyro4(server_and_port)
202 elif protocol == 'http':
205 elif protocol == 'http':
203 return _create_vcsserver_proxy_http(server_and_port)
206 return _create_vcsserver_proxy_http(server_and_port)
204
207
205
208
206 def _create_vcsserver_proxy_pyro4(server_and_port):
209 def _create_vcsserver_proxy_pyro4(server_and_port):
207 server = Pyro4.Proxy(
210 server = Pyro4.Proxy(
208 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
211 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
209 return server
212 return server
210
213
211
214
212 def _create_vcsserver_proxy_http(server_and_port):
215 def _create_vcsserver_proxy_http(server_and_port):
213 from rhodecode.lib.vcs import client_http
216 from rhodecode.lib.vcs import client_http
214
217
215 session = _create_http_rpc_session()
218 session = _create_http_rpc_session()
216 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
219 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
217 return client_http.RemoteObject(url, session)
220 return client_http.RemoteObject(url, session)
218
221
219
222
220 class CurlSession(object):
223 class CurlSession(object):
221 """
224 """
222 Modeled so that it provides a subset of the requests interface.
225 Modeled so that it provides a subset of the requests interface.
223
226
224 This has been created so that it does only provide a minimal API for our
227 This has been created so that it does only provide a minimal API for our
225 needs. The parts which it provides are based on the API of the library
228 needs. The parts which it provides are based on the API of the library
226 `requests` which allows us to easily benchmark against it.
229 `requests` which allows us to easily benchmark against it.
227
230
228 Please have a look at the class :class:`requests.Session` when you extend
231 Please have a look at the class :class:`requests.Session` when you extend
229 it.
232 it.
230 """
233 """
231
234
232 def __init__(self):
235 def __init__(self):
233 curl = pycurl.Curl()
236 curl = pycurl.Curl()
234 # TODO: johbo: I did test with 7.19 of libcurl. This version has
237 # TODO: johbo: I did test with 7.19 of libcurl. This version has
235 # trouble with 100 - continue being set in the expect header. This
238 # trouble with 100 - continue being set in the expect header. This
236 # can lead to massive performance drops, switching it off here.
239 # can lead to massive performance drops, switching it off here.
237 curl.setopt(curl.HTTPHEADER, ["Expect:"])
240 curl.setopt(curl.HTTPHEADER, ["Expect:"])
238 curl.setopt(curl.TCP_NODELAY, True)
241 curl.setopt(curl.TCP_NODELAY, True)
239 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
242 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
240 self._curl = curl
243 self._curl = curl
241
244
242 def post(self, url, data, allow_redirects=False):
245 def post(self, url, data, allow_redirects=False):
243 response_buffer = StringIO()
246 response_buffer = StringIO()
244
247
245 curl = self._curl
248 curl = self._curl
246 curl.setopt(curl.URL, url)
249 curl.setopt(curl.URL, url)
247 curl.setopt(curl.POST, True)
250 curl.setopt(curl.POST, True)
248 curl.setopt(curl.POSTFIELDS, data)
251 curl.setopt(curl.POSTFIELDS, data)
249 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
252 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
250 curl.setopt(curl.WRITEDATA, response_buffer)
253 curl.setopt(curl.WRITEDATA, response_buffer)
251 curl.perform()
254 curl.perform()
252
255
253 return CurlResponse(response_buffer)
256 return CurlResponse(response_buffer)
254
257
255
258
256 class CurlResponse(object):
259 class CurlResponse(object):
257 """
260 """
258 The response of a request, modeled after the requests API.
261 The response of a request, modeled after the requests API.
259
262
260 This class provides a subset of the response interface known from the
263 This class provides a subset of the response interface known from the
261 library `requests`. It is intentionally kept similar, so that we can use
264 library `requests`. It is intentionally kept similar, so that we can use
262 `requests` as a drop in replacement for benchmarking purposes.
265 `requests` as a drop in replacement for benchmarking purposes.
263 """
266 """
264
267
265 def __init__(self, response_buffer):
268 def __init__(self, response_buffer):
266 self._response_buffer = response_buffer
269 self._response_buffer = response_buffer
267
270
268 @property
271 @property
269 def content(self):
272 def content(self):
270 return self._response_buffer.getvalue()
273 return self._response_buffer.getvalue()
271
274
272
275
273 def _create_http_rpc_session():
276 def _create_http_rpc_session():
274 session = CurlSession()
277 session = CurlSession()
275 return session
278 return session
@@ -1,233 +1,235 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23
23
24
24
25 Status
25 Status
26 ------
26 ------
27
27
28 This client implementation shall eventually replace the Pyro4 based
28 This client implementation shall eventually replace the Pyro4 based
29 implementation.
29 implementation.
30 """
30 """
31
31
32 import copy
32 import copy
33 import logging
33 import logging
34 import threading
34 import threading
35 import urllib2
35 import urllib2
36 import urlparse
36 import urlparse
37 import uuid
37 import uuid
38
38
39 import msgpack
39 import msgpack
40 import requests
40 import requests
41
41
42 from . import exceptions, CurlSession
42 from . import exceptions, CurlSession
43
43
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
51 'KeyError': KeyError,
51 'KeyError': KeyError,
52 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
53 }
53 }
54
54
55
55
56 class RepoMaker(object):
56 class RepoMaker(object):
57
57
58 def __init__(self, server_and_port, backend_endpoint, session):
58 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 self.url = urlparse.urljoin(
59 self.url = urlparse.urljoin(
60 'http://%s' % server_and_port, backend_endpoint)
60 'http://%s' % server_and_port, backend_endpoint)
61 self._session = session
61 self._session_factory = session_factory
62
62
63 def __call__(self, path, config, with_wire=None):
63 def __call__(self, path, config, with_wire=None):
64 log.debug('RepoMaker call on %s', path)
64 log.debug('RepoMaker call on %s', path)
65 return RemoteRepo(
65 return RemoteRepo(
66 path, config, self.url, self._session, with_wire=with_wire)
66 path, config, self.url, self._session_factory(),
67 with_wire=with_wire)
67
68
68 def __getattr__(self, name):
69 def __getattr__(self, name):
69 def f(*args, **kwargs):
70 def f(*args, **kwargs):
70 return self._call(name, *args, **kwargs)
71 return self._call(name, *args, **kwargs)
71 return f
72 return f
72
73
73 @exceptions.map_vcs_exceptions
74 @exceptions.map_vcs_exceptions
74 def _call(self, name, *args, **kwargs):
75 def _call(self, name, *args, **kwargs):
75 payload = {
76 payload = {
76 'id': str(uuid.uuid4()),
77 'id': str(uuid.uuid4()),
77 'method': name,
78 'method': name,
78 'params': {'args': args, 'kwargs': kwargs}
79 'params': {'args': args, 'kwargs': kwargs}
79 }
80 }
80 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
81 return _remote_call(
82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
81
83
82
84
83 class RemoteRepo(object):
85 class RemoteRepo(object):
84
86
85 def __init__(self, path, config, url, session, with_wire=None):
87 def __init__(self, path, config, url, session, with_wire=None):
86 self.url = url
88 self.url = url
87 self._session = session
89 self._session = session
88 self._wire = {
90 self._wire = {
89 "path": path,
91 "path": path,
90 "config": config,
92 "config": config,
91 "context": str(uuid.uuid4()),
93 "context": str(uuid.uuid4()),
92 }
94 }
93 if with_wire:
95 if with_wire:
94 self._wire.update(with_wire)
96 self._wire.update(with_wire)
95
97
96 # johbo: Trading complexity for performance. Avoiding the call to
98 # johbo: Trading complexity for performance. Avoiding the call to
97 # log.debug brings a few percent gain even if is is not active.
99 # log.debug brings a few percent gain even if is is not active.
98 if log.isEnabledFor(logging.DEBUG):
100 if log.isEnabledFor(logging.DEBUG):
99 self._call = self._call_with_logging
101 self._call = self._call_with_logging
100
102
101 def __getattr__(self, name):
103 def __getattr__(self, name):
102 def f(*args, **kwargs):
104 def f(*args, **kwargs):
103 return self._call(name, *args, **kwargs)
105 return self._call(name, *args, **kwargs)
104 return f
106 return f
105
107
106 @exceptions.map_vcs_exceptions
108 @exceptions.map_vcs_exceptions
107 def _call(self, name, *args, **kwargs):
109 def _call(self, name, *args, **kwargs):
108 # TODO: oliver: This is currently necessary pre-call since the
110 # TODO: oliver: This is currently necessary pre-call since the
109 # config object is being changed for hooking scenarios
111 # config object is being changed for hooking scenarios
110 wire = copy.deepcopy(self._wire)
112 wire = copy.deepcopy(self._wire)
111 wire["config"] = wire["config"].serialize()
113 wire["config"] = wire["config"].serialize()
112 payload = {
114 payload = {
113 'id': str(uuid.uuid4()),
115 'id': str(uuid.uuid4()),
114 'method': name,
116 'method': name,
115 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
117 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
116 }
118 }
117 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
118
120
119 def _call_with_logging(self, name, *args, **kwargs):
121 def _call_with_logging(self, name, *args, **kwargs):
120 log.debug('Calling %s@%s', self.url, name)
122 log.debug('Calling %s@%s', self.url, name)
121 return RemoteRepo._call(self, name, *args, **kwargs)
123 return RemoteRepo._call(self, name, *args, **kwargs)
122
124
123 def __getitem__(self, key):
125 def __getitem__(self, key):
124 return self.revision(key)
126 return self.revision(key)
125
127
126
128
127 class RemoteObject(object):
129 class RemoteObject(object):
128
130
129 def __init__(self, url, session):
131 def __init__(self, url, session):
130 self._url = url
132 self._url = url
131 self._session = session
133 self._session = session
132
134
133 # johbo: Trading complexity for performance. Avoiding the call to
135 # johbo: Trading complexity for performance. Avoiding the call to
134 # log.debug brings a few percent gain even if is is not active.
136 # log.debug brings a few percent gain even if is is not active.
135 if log.isEnabledFor(logging.DEBUG):
137 if log.isEnabledFor(logging.DEBUG):
136 self._call = self._call_with_logging
138 self._call = self._call_with_logging
137
139
138 def __getattr__(self, name):
140 def __getattr__(self, name):
139 def f(*args, **kwargs):
141 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
142 return self._call(name, *args, **kwargs)
141 return f
143 return f
142
144
143 @exceptions.map_vcs_exceptions
145 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
146 def _call(self, name, *args, **kwargs):
145 payload = {
147 payload = {
146 'id': str(uuid.uuid4()),
148 'id': str(uuid.uuid4()),
147 'method': name,
149 'method': name,
148 'params': {'args': args, 'kwargs': kwargs}
150 'params': {'args': args, 'kwargs': kwargs}
149 }
151 }
150 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
152 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
151
153
152 def _call_with_logging(self, name, *args, **kwargs):
154 def _call_with_logging(self, name, *args, **kwargs):
153 log.debug('Calling %s@%s', self._url, name)
155 log.debug('Calling %s@%s', self._url, name)
154 return RemoteObject._call(self, name, *args, **kwargs)
156 return RemoteObject._call(self, name, *args, **kwargs)
155
157
156
158
157 def _remote_call(url, payload, exceptions_map, session):
159 def _remote_call(url, payload, exceptions_map, session):
158 response = session.post(url, data=msgpack.packb(payload))
160 response = session.post(url, data=msgpack.packb(payload))
159 response = msgpack.unpackb(response.content)
161 response = msgpack.unpackb(response.content)
160 error = response.get('error')
162 error = response.get('error')
161 if error:
163 if error:
162 type_ = error.get('type', 'Exception')
164 type_ = error.get('type', 'Exception')
163 exc = exceptions_map.get(type_, Exception)
165 exc = exceptions_map.get(type_, Exception)
164 exc = exc(error.get('message'))
166 exc = exc(error.get('message'))
165 try:
167 try:
166 exc._vcs_kind = error['_vcs_kind']
168 exc._vcs_kind = error['_vcs_kind']
167 except KeyError:
169 except KeyError:
168 pass
170 pass
169 raise exc
171 raise exc
170 return response.get('result')
172 return response.get('result')
171
173
172
174
173 class VcsHttpProxy(object):
175 class VcsHttpProxy(object):
174
176
175 CHUNK_SIZE = 16384
177 CHUNK_SIZE = 16384
176
178
177 def __init__(self, server_and_port, backend_endpoint):
179 def __init__(self, server_and_port, backend_endpoint):
178 adapter = requests.adapters.HTTPAdapter(max_retries=5)
180 adapter = requests.adapters.HTTPAdapter(max_retries=5)
179 self.base_url = urlparse.urljoin(
181 self.base_url = urlparse.urljoin(
180 'http://%s' % server_and_port, backend_endpoint)
182 'http://%s' % server_and_port, backend_endpoint)
181 self.session = requests.Session()
183 self.session = requests.Session()
182 self.session.mount('http://', adapter)
184 self.session.mount('http://', adapter)
183
185
184 def handle(self, environment, input_data, *args, **kwargs):
186 def handle(self, environment, input_data, *args, **kwargs):
185 data = {
187 data = {
186 'environment': environment,
188 'environment': environment,
187 'input_data': input_data,
189 'input_data': input_data,
188 'args': args,
190 'args': args,
189 'kwargs': kwargs
191 'kwargs': kwargs
190 }
192 }
191 result = self.session.post(
193 result = self.session.post(
192 self.base_url, msgpack.packb(data), stream=True)
194 self.base_url, msgpack.packb(data), stream=True)
193 return self._get_result(result)
195 return self._get_result(result)
194
196
195 def _deserialize_and_raise(self, error):
197 def _deserialize_and_raise(self, error):
196 exception = Exception(error['message'])
198 exception = Exception(error['message'])
197 try:
199 try:
198 exception._vcs_kind = error['_vcs_kind']
200 exception._vcs_kind = error['_vcs_kind']
199 except KeyError:
201 except KeyError:
200 pass
202 pass
201 raise exception
203 raise exception
202
204
203 def _iterate(self, result):
205 def _iterate(self, result):
204 unpacker = msgpack.Unpacker()
206 unpacker = msgpack.Unpacker()
205 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
207 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
206 unpacker.feed(line)
208 unpacker.feed(line)
207 for chunk in unpacker:
209 for chunk in unpacker:
208 yield chunk
210 yield chunk
209
211
210 def _get_result(self, result):
212 def _get_result(self, result):
211 iterator = self._iterate(result)
213 iterator = self._iterate(result)
212 error = iterator.next()
214 error = iterator.next()
213 if error:
215 if error:
214 self._deserialize_and_raise(error)
216 self._deserialize_and_raise(error)
215
217
216 status = iterator.next()
218 status = iterator.next()
217 headers = iterator.next()
219 headers = iterator.next()
218
220
219 return iterator, status, headers
221 return iterator, status, headers
220
222
221
223
222 class ThreadlocalSessionFactory(object):
224 class ThreadlocalSessionFactory(object):
223 """
225 """
224 Creates one CurlSession per thread on demand.
226 Creates one CurlSession per thread on demand.
225 """
227 """
226
228
227 def __init__(self):
229 def __init__(self):
228 self._thread_local = threading.local()
230 self._thread_local = threading.local()
229
231
230 def __call__(self):
232 def __call__(self):
231 if not hasattr(self._thread_local, 'curl_session'):
233 if not hasattr(self._thread_local, 'curl_session'):
232 self._thread_local.curl_session = CurlSession()
234 self._thread_local.curl_session = CurlSession()
233 return self._thread_local.curl_session
235 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now