##// END OF EJS Templates
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
Martin Bornhold -
r341:5b26b74b default
parent child Browse files
Show More
@@ -1,278 +1,278 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Various version Control System version lib (vcs) management abstraction layer
23 23 for Python. Build with server client architecture.
24 24 """
25 25
26 26
27 27 VERSION = (0, 5, 0, 'dev')
28 28
29 29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30 30
31 31 __all__ = [
32 32 'get_version', 'get_repo', 'get_backend',
33 33 'VCSError', 'RepositoryError', 'CommitError'
34 34 ]
35 35
36 36 import atexit
37 37 import logging
38 38 import subprocess
39 39 import time
40 40 import urlparse
41 41 from cStringIO import StringIO
42 42
43 43 import pycurl
44 44 import Pyro4
45 45 from Pyro4.errors import CommunicationError
46 46
47 47 from rhodecode.lib.vcs.conf import settings
48 48 from rhodecode.lib.vcs.backends import get_repo, get_backend
49 49 from rhodecode.lib.vcs.exceptions import (
50 50 VCSError, RepositoryError, CommitError)
51 51
52 52
53 53 log = logging.getLogger(__name__)
54 54
55 55
56 56 def get_version():
57 57 """
58 58 Returns shorter version (digit parts only) as string.
59 59 """
60 60 return '.'.join((str(each) for each in VERSION[:3]))
61 61
62 62
63 63 def connect_pyro4(server_and_port):
64 64 from rhodecode.lib.vcs import connection, client
65 65 from rhodecode.lib.middleware.utils import scm_app
66 66
67 git_remote = client.ThreadlocalProxyFactory(
67 git_remote = client.RequestScopeProxyFactory(
68 68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
69 hg_remote = client.ThreadlocalProxyFactory(
69 hg_remote = client.RequestScopeProxyFactory(
70 70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
71 svn_remote = client.ThreadlocalProxyFactory(
71 svn_remote = client.RequestScopeProxyFactory(
72 72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
73 73
74 74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
75 75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
76 76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
77 77
78 78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
79 79 settings.pyro_remote(
80 80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
81 81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
82 82 settings.pyro_remote(
83 83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
84 84
85 85 @atexit.register
86 86 def free_connection_resources():
87 87 connection.Git = None
88 88 connection.Hg = None
89 89 connection.Svn = None
90 90
91 91
92 92 def connect_http(server_and_port):
93 93 from rhodecode.lib.vcs import connection, client_http
94 94 from rhodecode.lib.middleware.utils import scm_app
95 95
96 96 session_factory = client_http.ThreadlocalSessionFactory()
97 97
98 98 connection.Git = client_http.RepoMaker(
99 99 server_and_port, '/git', session_factory)
100 100 connection.Hg = client_http.RepoMaker(
101 101 server_and_port, '/hg', session_factory)
102 102 connection.Svn = client_http.RepoMaker(
103 103 server_and_port, '/svn', session_factory)
104 104
105 105 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
106 106 server_and_port, '/proxy/hg')
107 107 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
108 108 server_and_port, '/proxy/git')
109 109
110 110 @atexit.register
111 111 def free_connection_resources():
112 112 connection.Git = None
113 113 connection.Hg = None
114 114 connection.Svn = None
115 115
116 116
117 117 def connect_vcs(server_and_port, protocol='pyro4'):
118 118 """
119 119 Initializes the connection to the vcs server.
120 120
121 121 :param server_and_port: str, e.g. "localhost:9900"
122 122 :param protocol: str, "pyro4" or "http"
123 123 """
124 124 if protocol == 'pyro4':
125 125 connect_pyro4(server_and_port)
126 126 elif protocol == 'http':
127 127 connect_http(server_and_port)
128 128
129 129
130 130 # TODO: johbo: This function should be moved into our test suite, there is
131 131 # no reason to support starting the vcsserver in Enterprise itself.
132 132 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
133 133 """
134 134 Starts the vcs server in a subprocess.
135 135 """
136 136 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
137 137 if protocol == 'http':
138 138 return _start_http_vcs_server(server_and_port, log_level)
139 139 elif protocol == 'pyro4':
140 140 return _start_pyro4_vcs_server(server_and_port, log_level)
141 141
142 142
143 143 def _start_pyro4_vcs_server(server_and_port, log_level=None):
144 144 _try_to_shutdown_running_server(server_and_port)
145 145 host, port = server_and_port.rsplit(":", 1)
146 146 host = host.strip('[]')
147 147 args = [
148 148 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
149 149 '--threadpool', '32']
150 150 if log_level:
151 151 args += ['--log-level', log_level]
152 152 proc = subprocess.Popen(args)
153 153
154 154 def cleanup_server_process():
155 155 proc.kill()
156 156 atexit.register(cleanup_server_process)
157 157
158 158 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
159 159 _wait_until_vcs_server_is_reachable(server)
160 160
161 161
162 162 def _start_http_vcs_server(server_and_port, log_level=None):
163 163 # TODO: mikhail: shutdown if an http server already runs
164 164
165 165 host, port = server_and_port.rsplit(":", 1)
166 166 args = [
167 167 'pserve', 'vcsserver/development_pyramid.ini',
168 168 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
169 169 proc = subprocess.Popen(args)
170 170
171 171 def cleanup_server_process():
172 172 proc.kill()
173 173 atexit.register(cleanup_server_process)
174 174
175 175 server = create_vcsserver_proxy(server_and_port, protocol='http')
176 176 _wait_until_vcs_server_is_reachable(server)
177 177
178 178
179 179 def _wait_until_vcs_server_is_reachable(server):
180 180 while xrange(80): # max 40s of sleep
181 181 try:
182 182 server.ping()
183 183 break
184 184 except (CommunicationError, pycurl.error):
185 185 pass
186 186 time.sleep(0.5)
187 187
188 188
189 189 def _try_to_shutdown_running_server(server_and_port):
190 190 server = create_vcsserver_proxy(server_and_port)
191 191 try:
192 192 server.shutdown()
193 193 except (CommunicationError, pycurl.error):
194 194 return
195 195
196 196 # TODO: Not sure why this is important, but without it the following start
197 197 # of the server fails.
198 198 server = create_vcsserver_proxy(server_and_port)
199 199 server.ping()
200 200
201 201
202 202 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
203 203 if protocol == 'pyro4':
204 204 return _create_vcsserver_proxy_pyro4(server_and_port)
205 205 elif protocol == 'http':
206 206 return _create_vcsserver_proxy_http(server_and_port)
207 207
208 208
209 209 def _create_vcsserver_proxy_pyro4(server_and_port):
210 210 server = Pyro4.Proxy(
211 211 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
212 212 return server
213 213
214 214
215 215 def _create_vcsserver_proxy_http(server_and_port):
216 216 from rhodecode.lib.vcs import client_http
217 217
218 218 session = _create_http_rpc_session()
219 219 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
220 220 return client_http.RemoteObject(url, session)
221 221
222 222
223 223 class CurlSession(object):
224 224 """
225 225 Modeled so that it provides a subset of the requests interface.
226 226
227 227 This has been created so that it does only provide a minimal API for our
228 228 needs. The parts which it provides are based on the API of the library
229 229 `requests` which allows us to easily benchmark against it.
230 230
231 231 Please have a look at the class :class:`requests.Session` when you extend
232 232 it.
233 233 """
234 234
235 235 def __init__(self):
236 236 curl = pycurl.Curl()
237 237 # TODO: johbo: I did test with 7.19 of libcurl. This version has
238 238 # trouble with 100 - continue being set in the expect header. This
239 239 # can lead to massive performance drops, switching it off here.
240 240 curl.setopt(curl.HTTPHEADER, ["Expect:"])
241 241 curl.setopt(curl.TCP_NODELAY, True)
242 242 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
243 243 self._curl = curl
244 244
245 245 def post(self, url, data, allow_redirects=False):
246 246 response_buffer = StringIO()
247 247
248 248 curl = self._curl
249 249 curl.setopt(curl.URL, url)
250 250 curl.setopt(curl.POST, True)
251 251 curl.setopt(curl.POSTFIELDS, data)
252 252 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
253 253 curl.setopt(curl.WRITEDATA, response_buffer)
254 254 curl.perform()
255 255
256 256 return CurlResponse(response_buffer)
257 257
258 258
259 259 class CurlResponse(object):
260 260 """
261 261 The response of a request, modeled after the requests API.
262 262
263 263 This class provides a subset of the response interface known from the
264 264 library `requests`. It is intentionally kept similar, so that we can use
265 265 `requests` as a drop in replacement for benchmarking purposes.
266 266 """
267 267
268 268 def __init__(self, response_buffer):
269 269 self._response_buffer = response_buffer
270 270
271 271 @property
272 272 def content(self):
273 273 return self._response_buffer.getvalue()
274 274
275 275
276 276 def _create_http_rpc_session():
277 277 session = CurlSession()
278 278 return session
@@ -1,284 +1,327 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Provides the implementation of various client utilities to reach the vcsserver.
23 23 """
24 24
25 25
26 26 import copy
27 27 import logging
28 28 import threading
29 29 import urlparse
30 30 import uuid
31 31 import weakref
32 32 from urllib2 import URLError
33 33
34 34 import msgpack
35 35 import Pyro4
36 36 import requests
37 from pyramid.threadlocal import get_current_request
37 38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
38 39
39 40 from rhodecode.lib.vcs import exceptions
40 41 from rhodecode.lib.vcs.conf import settings
41 42
42 43 log = logging.getLogger(__name__)
43 44
44 45
45 46 # TODO: mikhail: Keep it in sync with vcsserver's
46 47 # HTTPApplication.ALLOWED_EXCEPTIONS
47 48 EXCEPTIONS_MAP = {
48 49 'KeyError': KeyError,
49 50 'URLError': URLError,
50 51 }
51 52
52 53
53 54 class HTTPRepoMaker(object):
54 55 def __init__(self, server_and_port, backend_endpoint):
55 56 self.url = urlparse.urljoin(
56 57 'http://%s' % server_and_port, backend_endpoint)
57 58
58 59 def __call__(self, path, config, with_wire=None):
59 60 log.debug('HTTPRepoMaker call on %s', path)
60 61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
61 62
62 63 def __getattr__(self, name):
63 64 def f(*args, **kwargs):
64 65 return self._call(name, *args, **kwargs)
65 66 return f
66 67
67 68 @exceptions.map_vcs_exceptions
68 69 def _call(self, name, *args, **kwargs):
69 70 payload = {
70 71 'id': str(uuid.uuid4()),
71 72 'method': name,
72 73 'params': {'args': args, 'kwargs': kwargs}
73 74 }
74 75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
75 76
76 77
77 78 class VcsHttpProxy(object):
78 79
79 80 CHUNK_SIZE = 16384
80 81
81 82 def __init__(self, server_and_port, backend_endpoint):
82 83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
83 84 self.base_url = urlparse.urljoin(
84 85 'http://%s' % server_and_port, backend_endpoint)
85 86 self.session = requests.Session()
86 87 self.session.mount('http://', adapter)
87 88
88 89 def handle(self, environment, input_data, *args, **kwargs):
89 90 data = {
90 91 'environment': environment,
91 92 'input_data': input_data,
92 93 'args': args,
93 94 'kwargs': kwargs
94 95 }
95 96 result = self.session.post(
96 97 self.base_url, msgpack.packb(data), stream=True)
97 98 return self._get_result(result)
98 99
99 100 def _deserialize_and_raise(self, error):
100 101 exception = Exception(error['message'])
101 102 try:
102 103 exception._vcs_kind = error['_vcs_kind']
103 104 except KeyError:
104 105 pass
105 106 raise exception
106 107
107 108 def _iterate(self, result):
108 109 unpacker = msgpack.Unpacker()
109 110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
110 111 unpacker.feed(line)
111 112 for chunk in unpacker:
112 113 yield chunk
113 114
114 115 def _get_result(self, result):
115 116 iterator = self._iterate(result)
116 117 error = iterator.next()
117 118 if error:
118 119 self._deserialize_and_raise(error)
119 120
120 121 status = iterator.next()
121 122 headers = iterator.next()
122 123
123 124 return iterator, status, headers
124 125
125 126
126 127 class HTTPRemoteRepo(object):
127 128 def __init__(self, path, config, url, with_wire=None):
128 129 self.url = url
129 130 self._wire = {
130 131 "path": path,
131 132 "config": config,
132 133 "context": str(uuid.uuid4()),
133 134 }
134 135 if with_wire:
135 136 self._wire.update(with_wire)
136 137
137 138 def __getattr__(self, name):
138 139 def f(*args, **kwargs):
139 140 return self._call(name, *args, **kwargs)
140 141 return f
141 142
142 143 @exceptions.map_vcs_exceptions
143 144 def _call(self, name, *args, **kwargs):
144 145 log.debug('Calling %s@%s', self.url, name)
145 146 # TODO: oliver: This is currently necessary pre-call since the
146 147 # config object is being changed for hooking scenarios
147 148 wire = copy.deepcopy(self._wire)
148 149 wire["config"] = wire["config"].serialize()
149 150 payload = {
150 151 'id': str(uuid.uuid4()),
151 152 'method': name,
152 153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
153 154 }
154 155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
155 156
156 157 def __getitem__(self, key):
157 158 return self.revision(key)
158 159
159 160
160 161 def _remote_call(url, payload, exceptions_map):
161 162 response = requests.post(url, data=msgpack.packb(payload))
162 163 response = msgpack.unpackb(response.content)
163 164 error = response.get('error')
164 165 if error:
165 166 type_ = error.get('type', 'Exception')
166 167 exc = exceptions_map.get(type_, Exception)
167 168 exc = exc(error.get('message'))
168 169 try:
169 170 exc._vcs_kind = error['_vcs_kind']
170 171 except KeyError:
171 172 pass
172 173 raise exc
173 174 return response.get('result')
174 175
175 176
176 177 class RepoMaker(object):
177 178
178 179 def __init__(self, proxy_factory):
179 180 self._proxy_factory = proxy_factory
180 181
181 182 def __call__(self, path, config, with_wire=None):
182 183 log.debug('RepoMaker call on %s', path)
183 184 return RemoteRepo(
184 185 path, config, remote_proxy=self._proxy_factory(),
185 186 with_wire=with_wire)
186 187
187 188 def __getattr__(self, name):
188 189 remote_proxy = self._proxy_factory()
189 190 func = _get_proxy_method(remote_proxy, name)
190 191 return _wrap_remote_call(remote_proxy, func)
191 192
192 193
193 class ThreadlocalProxyFactory(object):
194 class RequestScopeProxyFactory(object):
194 195 """
195 Creates one Pyro4 proxy per thread on demand.
196 This factory returns pyro proxy instances based on a per request scope.
197 It returns the same instance if called from within the same request and
198 different instances if called from different requests.
196 199 """
197 200
198 201 def __init__(self, remote_uri):
199 202 self._remote_uri = remote_uri
200 self._thread_local = threading.local()
203 self._proxy_pool = []
204 self._borrowed_proxies = {}
205
206 def __call__(self, request=None):
207 """
208 Wrapper around `getProxy`.
209 """
210 request = request or get_current_request()
211 return self.getProxy(request)
212
213 def getProxy(self, request=None):
214 """
215 Call this to get the pyro proxy instance for the request.
216 """
217 request = request or get_current_request()
218
219 # Return already borrowed proxy for this request
220 if request in self._borrowed_proxies:
221 return self._borrowed_proxies[request]
201 222
202 def __call__(self):
203 if not hasattr(self._thread_local, 'proxy'):
204 self._thread_local.proxy = Pyro4.Proxy(self._remote_uri)
205 return self._thread_local.proxy
223 # Get proxy from pool or create new instance.
224 try:
225 proxy = self._proxy_pool.pop()
226 except IndexError:
227 log.info('Creating new proxy for remote_uri=%s', self._remote_uri)
228 proxy = Pyro4.Proxy(self._remote_uri)
229
230 # Store proxy instance as borrowed and add request callback.
231 self._borrowed_proxies[request] = proxy
232 request.add_finished_callback(self._returnProxy)
233
234 return proxy
235
236 def _returnProxy(self, request=None):
237 """
238 Callback that gets called by pyramid when the request is finished.
239 It puts the proxy back into the pool.
240 """
241 request = request or get_current_request()
242
243 if request in self._borrowed_proxies:
244 proxy = self._borrowed_proxies.pop(request)
245 self._proxy_pool.append(proxy)
246 else:
247 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
248 'for this request.', self._remote_uri)
206 249
207 250
208 251 class RemoteRepo(object):
209 252
210 253 def __init__(self, path, config, remote_proxy, with_wire=None):
211 254 self._wire = {
212 255 "path": path,
213 256 "config": config,
214 257 "context": uuid.uuid4(),
215 258 }
216 259 if with_wire:
217 260 self._wire.update(with_wire)
218 261 self._remote_proxy = remote_proxy
219 262 self.refs = RefsWrapper(self)
220 263
221 264 def __getattr__(self, name):
222 265 log.debug('Calling %s@%s', self._remote_proxy, name)
223 266 # TODO: oliver: This is currently necessary pre-call since the
224 267 # config object is being changed for hooking scenarios
225 268 wire = copy.deepcopy(self._wire)
226 269 wire["config"] = wire["config"].serialize()
227 270
228 271 try:
229 272 func = _get_proxy_method(self._remote_proxy, name)
230 273 except DaemonError as e:
231 274 if e.message == 'unknown object':
232 275 raise exceptions.VCSBackendNotSupportedError
233 276 else:
234 277 raise
235 278
236 279 return _wrap_remote_call(self._remote_proxy, func, wire)
237 280
238 281 def __getitem__(self, key):
239 282 return self.revision(key)
240 283
241 284
242 285 def _get_proxy_method(proxy, name):
243 286 try:
244 287 return getattr(proxy, name)
245 288 except CommunicationError:
246 289 raise CommunicationError(
247 290 'Unable to connect to remote pyro server %s' % proxy)
248 291
249 292
250 293 def _wrap_remote_call(proxy, func, *args):
251 294 all_args = list(args)
252 295
253 296 @exceptions.map_vcs_exceptions
254 297 def caller(*args, **kwargs):
255 298 all_args.extend(args)
256 299 try:
257 300 return func(*all_args, **kwargs)
258 301 except ConnectionClosedError:
259 302 log.debug('Connection to VCSServer closed, trying to reconnect.')
260 303 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
261 304
262 305 return func(*all_args, **kwargs)
263 306
264 307 return caller
265 308
266 309
267 310 class RefsWrapper(object):
268 311
269 312 def __init__(self, repo):
270 313 self._repo = weakref.proxy(repo)
271 314
272 315 def __setitem__(self, key, value):
273 316 self._repo._assign_ref(key, value)
274 317
275 318
276 319 class FunctionWrapper(object):
277 320
278 321 def __init__(self, func, wire):
279 322 self._func = func
280 323 self._wire = wire
281 324
282 325 @exceptions.map_vcs_exceptions
283 326 def __call__(self, *args, **kwargs):
284 327 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now