##// END OF EJS Templates
threading: Use the curl session factory.
Martin Bornhold -
r244:a99bca28 default
parent child Browse files
Show More
@@ -1,275 +1,278 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Various version Control System version lib (vcs) management abstraction layer
23 23 for Python. Build with server client architecture.
24 24 """
25 25
26 26
27 27 VERSION = (0, 5, 0, 'dev')
28 28
29 29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30 30
31 31 __all__ = [
32 32 'get_version', 'get_repo', 'get_backend',
33 33 'VCSError', 'RepositoryError', 'CommitError'
34 34 ]
35 35
36 36 import atexit
37 37 import logging
38 38 import subprocess
39 39 import time
40 40 import urlparse
41 41 from cStringIO import StringIO
42 42
43 43 import pycurl
44 44 import Pyro4
45 45 from Pyro4.errors import CommunicationError
46 46
47 47 from rhodecode.lib.vcs.conf import settings
48 48 from rhodecode.lib.vcs.backends import get_repo, get_backend
49 49 from rhodecode.lib.vcs.exceptions import (
50 50 VCSError, RepositoryError, CommitError)
51 51
52 52
53 53 log = logging.getLogger(__name__)
54 54
55 55
56 56 def get_version():
57 57 """
58 58 Returns shorter version (digit parts only) as string.
59 59 """
60 60 return '.'.join((str(each) for each in VERSION[:3]))
61 61
62 62
63 63 def connect_pyro4(server_and_port):
64 64 from rhodecode.lib.vcs import connection, client
65 65 from rhodecode.lib.middleware.utils import scm_app
66 66
67 67 git_remote = client.ThreadlocalProxyFactory(
68 68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
69 69 hg_remote = client.ThreadlocalProxyFactory(
70 70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
71 71 svn_remote = client.ThreadlocalProxyFactory(
72 72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
73 73
74 74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
75 75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
76 76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
77 77
78 78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
79 79 settings.pyro_remote(
80 80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
81 81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
82 82 settings.pyro_remote(
83 83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
84 84
85 85 @atexit.register
86 86 def free_connection_resources():
87 87 connection.Git = None
88 88 connection.Hg = None
89 89 connection.Svn = None
90 90
91 91
92 92 def connect_http(server_and_port):
93 93 from rhodecode.lib.vcs import connection, client_http
94 94 from rhodecode.lib.middleware.utils import scm_app
95 95
96 session = _create_http_rpc_session()
96 session_factory = client_http.ThreadlocalSessionFactory()
97 97
98 connection.Git = client_http.RepoMaker(server_and_port, '/git', session)
99 connection.Hg = client_http.RepoMaker(server_and_port, '/hg', session)
100 connection.Svn = client_http.RepoMaker(server_and_port, '/svn', session)
98 connection.Git = client_http.RepoMaker(
99 server_and_port, '/git', session_factory)
100 connection.Hg = client_http.RepoMaker(
101 server_and_port, '/hg', session_factory)
102 connection.Svn = client_http.RepoMaker(
103 server_and_port, '/svn', session_factory)
101 104
102 105 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
103 106 server_and_port, '/proxy/hg')
104 107 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
105 108 server_and_port, '/proxy/git')
106 109
107 110 @atexit.register
108 111 def free_connection_resources():
109 112 connection.Git = None
110 113 connection.Hg = None
111 114 connection.Svn = None
112 115
113 116
114 117 def connect_vcs(server_and_port, protocol='pyro4'):
115 118 """
116 119 Initializes the connection to the vcs server.
117 120
118 121 :param server_and_port: str, e.g. "localhost:9900"
119 122 :param protocol: str, "pyro4" or "http"
120 123 """
121 124 if protocol == 'pyro4':
122 125 connect_pyro4(server_and_port)
123 126 elif protocol == 'http':
124 127 connect_http(server_and_port)
125 128
126 129
127 130 # TODO: johbo: This function should be moved into our test suite, there is
128 131 # no reason to support starting the vcsserver in Enterprise itself.
129 132 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
130 133 """
131 134 Starts the vcs server in a subprocess.
132 135 """
133 136 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
134 137 if protocol == 'http':
135 138 return _start_http_vcs_server(server_and_port, log_level)
136 139 elif protocol == 'pyro4':
137 140 return _start_pyro4_vcs_server(server_and_port, log_level)
138 141
139 142
140 143 def _start_pyro4_vcs_server(server_and_port, log_level=None):
141 144 _try_to_shutdown_running_server(server_and_port)
142 145 host, port = server_and_port.rsplit(":", 1)
143 146 host = host.strip('[]')
144 147 args = [
145 148 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
146 149 '--threadpool', '32']
147 150 if log_level:
148 151 args += ['--log-level', log_level]
149 152 proc = subprocess.Popen(args)
150 153
151 154 def cleanup_server_process():
152 155 proc.kill()
153 156 atexit.register(cleanup_server_process)
154 157
155 158 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
156 159 _wait_until_vcs_server_is_reachable(server)
157 160
158 161
159 162 def _start_http_vcs_server(server_and_port, log_level=None):
160 163 # TODO: mikhail: shutdown if an http server already runs
161 164
162 165 host, port = server_and_port.rsplit(":", 1)
163 166 args = [
164 167 'pserve', 'vcsserver/development_pyramid.ini',
165 168 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
166 169 proc = subprocess.Popen(args)
167 170
168 171 def cleanup_server_process():
169 172 proc.kill()
170 173 atexit.register(cleanup_server_process)
171 174
172 175 server = create_vcsserver_proxy(server_and_port, protocol='http')
173 176 _wait_until_vcs_server_is_reachable(server)
174 177
175 178
176 179 def _wait_until_vcs_server_is_reachable(server):
177 180 while xrange(80): # max 40s of sleep
178 181 try:
179 182 server.ping()
180 183 break
181 184 except (CommunicationError, pycurl.error):
182 185 pass
183 186 time.sleep(0.5)
184 187
185 188
186 189 def _try_to_shutdown_running_server(server_and_port):
187 190 server = create_vcsserver_proxy(server_and_port)
188 191 try:
189 192 server.shutdown()
190 193 except (CommunicationError, pycurl.error):
191 194 return
192 195
193 196 # TODO: Not sure why this is important, but without it the following start
194 197 # of the server fails.
195 198 server = create_vcsserver_proxy(server_and_port)
196 199 server.ping()
197 200
198 201
199 202 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
200 203 if protocol == 'pyro4':
201 204 return _create_vcsserver_proxy_pyro4(server_and_port)
202 205 elif protocol == 'http':
203 206 return _create_vcsserver_proxy_http(server_and_port)
204 207
205 208
206 209 def _create_vcsserver_proxy_pyro4(server_and_port):
207 210 server = Pyro4.Proxy(
208 211 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
209 212 return server
210 213
211 214
212 215 def _create_vcsserver_proxy_http(server_and_port):
213 216 from rhodecode.lib.vcs import client_http
214 217
215 218 session = _create_http_rpc_session()
216 219 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
217 220 return client_http.RemoteObject(url, session)
218 221
219 222
220 223 class CurlSession(object):
221 224 """
222 225 Modeled so that it provides a subset of the requests interface.
223 226
224 227 This has been created so that it does only provide a minimal API for our
225 228 needs. The parts which it provides are based on the API of the library
226 229 `requests` which allows us to easily benchmark against it.
227 230
228 231 Please have a look at the class :class:`requests.Session` when you extend
229 232 it.
230 233 """
231 234
232 235 def __init__(self):
233 236 curl = pycurl.Curl()
234 237 # TODO: johbo: I did test with 7.19 of libcurl. This version has
235 238 # trouble with 100 - continue being set in the expect header. This
236 239 # can lead to massive performance drops, switching it off here.
237 240 curl.setopt(curl.HTTPHEADER, ["Expect:"])
238 241 curl.setopt(curl.TCP_NODELAY, True)
239 242 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
240 243 self._curl = curl
241 244
242 245 def post(self, url, data, allow_redirects=False):
243 246 response_buffer = StringIO()
244 247
245 248 curl = self._curl
246 249 curl.setopt(curl.URL, url)
247 250 curl.setopt(curl.POST, True)
248 251 curl.setopt(curl.POSTFIELDS, data)
249 252 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
250 253 curl.setopt(curl.WRITEDATA, response_buffer)
251 254 curl.perform()
252 255
253 256 return CurlResponse(response_buffer)
254 257
255 258
256 259 class CurlResponse(object):
257 260 """
258 261 The response of a request, modeled after the requests API.
259 262
260 263 This class provides a subset of the response interface known from the
261 264 library `requests`. It is intentionally kept similar, so that we can use
262 265 `requests` as a drop in replacement for benchmarking purposes.
263 266 """
264 267
265 268 def __init__(self, response_buffer):
266 269 self._response_buffer = response_buffer
267 270
268 271 @property
269 272 def content(self):
270 273 return self._response_buffer.getvalue()
271 274
272 275
273 276 def _create_http_rpc_session():
274 277 session = CurlSession()
275 278 return session
@@ -1,233 +1,235 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23
24 24
25 25 Status
26 26 ------
27 27
28 28 This client implementation shall eventually replace the Pyro4 based
29 29 implementation.
30 30 """
31 31
32 32 import copy
33 33 import logging
34 34 import threading
35 35 import urllib2
36 36 import urlparse
37 37 import uuid
38 38
39 39 import msgpack
40 40 import requests
41 41
42 42 from . import exceptions, CurlSession
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 class RepoMaker(object):
57 57
58 def __init__(self, server_and_port, backend_endpoint, session):
58 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 59 self.url = urlparse.urljoin(
60 60 'http://%s' % server_and_port, backend_endpoint)
61 self._session = session
61 self._session_factory = session_factory
62 62
63 63 def __call__(self, path, config, with_wire=None):
64 64 log.debug('RepoMaker call on %s', path)
65 65 return RemoteRepo(
66 path, config, self.url, self._session, with_wire=with_wire)
66 path, config, self.url, self._session_factory(),
67 with_wire=with_wire)
67 68
68 69 def __getattr__(self, name):
69 70 def f(*args, **kwargs):
70 71 return self._call(name, *args, **kwargs)
71 72 return f
72 73
73 74 @exceptions.map_vcs_exceptions
74 75 def _call(self, name, *args, **kwargs):
75 76 payload = {
76 77 'id': str(uuid.uuid4()),
77 78 'method': name,
78 79 'params': {'args': args, 'kwargs': kwargs}
79 80 }
80 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
81 return _remote_call(
82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
81 83
82 84
83 85 class RemoteRepo(object):
84 86
85 87 def __init__(self, path, config, url, session, with_wire=None):
86 88 self.url = url
87 89 self._session = session
88 90 self._wire = {
89 91 "path": path,
90 92 "config": config,
91 93 "context": str(uuid.uuid4()),
92 94 }
93 95 if with_wire:
94 96 self._wire.update(with_wire)
95 97
96 98 # johbo: Trading complexity for performance. Avoiding the call to
97 99 # log.debug brings a few percent gain even if is is not active.
98 100 if log.isEnabledFor(logging.DEBUG):
99 101 self._call = self._call_with_logging
100 102
101 103 def __getattr__(self, name):
102 104 def f(*args, **kwargs):
103 105 return self._call(name, *args, **kwargs)
104 106 return f
105 107
106 108 @exceptions.map_vcs_exceptions
107 109 def _call(self, name, *args, **kwargs):
108 110 # TODO: oliver: This is currently necessary pre-call since the
109 111 # config object is being changed for hooking scenarios
110 112 wire = copy.deepcopy(self._wire)
111 113 wire["config"] = wire["config"].serialize()
112 114 payload = {
113 115 'id': str(uuid.uuid4()),
114 116 'method': name,
115 117 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
116 118 }
117 119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
118 120
119 121 def _call_with_logging(self, name, *args, **kwargs):
120 122 log.debug('Calling %s@%s', self.url, name)
121 123 return RemoteRepo._call(self, name, *args, **kwargs)
122 124
123 125 def __getitem__(self, key):
124 126 return self.revision(key)
125 127
126 128
127 129 class RemoteObject(object):
128 130
129 131 def __init__(self, url, session):
130 132 self._url = url
131 133 self._session = session
132 134
133 135 # johbo: Trading complexity for performance. Avoiding the call to
134 136 # log.debug brings a few percent gain even if is is not active.
135 137 if log.isEnabledFor(logging.DEBUG):
136 138 self._call = self._call_with_logging
137 139
138 140 def __getattr__(self, name):
139 141 def f(*args, **kwargs):
140 142 return self._call(name, *args, **kwargs)
141 143 return f
142 144
143 145 @exceptions.map_vcs_exceptions
144 146 def _call(self, name, *args, **kwargs):
145 147 payload = {
146 148 'id': str(uuid.uuid4()),
147 149 'method': name,
148 150 'params': {'args': args, 'kwargs': kwargs}
149 151 }
150 152 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
151 153
152 154 def _call_with_logging(self, name, *args, **kwargs):
153 155 log.debug('Calling %s@%s', self._url, name)
154 156 return RemoteObject._call(self, name, *args, **kwargs)
155 157
156 158
157 159 def _remote_call(url, payload, exceptions_map, session):
158 160 response = session.post(url, data=msgpack.packb(payload))
159 161 response = msgpack.unpackb(response.content)
160 162 error = response.get('error')
161 163 if error:
162 164 type_ = error.get('type', 'Exception')
163 165 exc = exceptions_map.get(type_, Exception)
164 166 exc = exc(error.get('message'))
165 167 try:
166 168 exc._vcs_kind = error['_vcs_kind']
167 169 except KeyError:
168 170 pass
169 171 raise exc
170 172 return response.get('result')
171 173
172 174
173 175 class VcsHttpProxy(object):
174 176
175 177 CHUNK_SIZE = 16384
176 178
177 179 def __init__(self, server_and_port, backend_endpoint):
178 180 adapter = requests.adapters.HTTPAdapter(max_retries=5)
179 181 self.base_url = urlparse.urljoin(
180 182 'http://%s' % server_and_port, backend_endpoint)
181 183 self.session = requests.Session()
182 184 self.session.mount('http://', adapter)
183 185
184 186 def handle(self, environment, input_data, *args, **kwargs):
185 187 data = {
186 188 'environment': environment,
187 189 'input_data': input_data,
188 190 'args': args,
189 191 'kwargs': kwargs
190 192 }
191 193 result = self.session.post(
192 194 self.base_url, msgpack.packb(data), stream=True)
193 195 return self._get_result(result)
194 196
195 197 def _deserialize_and_raise(self, error):
196 198 exception = Exception(error['message'])
197 199 try:
198 200 exception._vcs_kind = error['_vcs_kind']
199 201 except KeyError:
200 202 pass
201 203 raise exception
202 204
203 205 def _iterate(self, result):
204 206 unpacker = msgpack.Unpacker()
205 207 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
206 208 unpacker.feed(line)
207 209 for chunk in unpacker:
208 210 yield chunk
209 211
210 212 def _get_result(self, result):
211 213 iterator = self._iterate(result)
212 214 error = iterator.next()
213 215 if error:
214 216 self._deserialize_and_raise(error)
215 217
216 218 status = iterator.next()
217 219 headers = iterator.next()
218 220
219 221 return iterator, status, headers
220 222
221 223
222 224 class ThreadlocalSessionFactory(object):
223 225 """
224 226 Creates one CurlSession per thread on demand.
225 227 """
226 228
227 229 def __init__(self):
228 230 self._thread_local = threading.local()
229 231
230 232 def __call__(self):
231 233 if not hasattr(self._thread_local, 'curl_session'):
232 234 self._thread_local.curl_session = CurlSession()
233 235 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now