##// END OF EJS Templates
pyro4: Create pyro proxy factory which returns instaces scoped by the request.
Martin Bornhold -
r341:5b26b74b default
parent child Browse files
Show More
@@ -1,278 +1,278 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Various version Control System version lib (vcs) management abstraction layer
22 Various version Control System version lib (vcs) management abstraction layer
23 for Python. Build with server client architecture.
23 for Python. Build with server client architecture.
24 """
24 """
25
25
26
26
27 VERSION = (0, 5, 0, 'dev')
27 VERSION = (0, 5, 0, 'dev')
28
28
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
29 __version__ = '.'.join((str(each) for each in VERSION[:4]))
30
30
31 __all__ = [
31 __all__ = [
32 'get_version', 'get_repo', 'get_backend',
32 'get_version', 'get_repo', 'get_backend',
33 'VCSError', 'RepositoryError', 'CommitError'
33 'VCSError', 'RepositoryError', 'CommitError'
34 ]
34 ]
35
35
36 import atexit
36 import atexit
37 import logging
37 import logging
38 import subprocess
38 import subprocess
39 import time
39 import time
40 import urlparse
40 import urlparse
41 from cStringIO import StringIO
41 from cStringIO import StringIO
42
42
43 import pycurl
43 import pycurl
44 import Pyro4
44 import Pyro4
45 from Pyro4.errors import CommunicationError
45 from Pyro4.errors import CommunicationError
46
46
47 from rhodecode.lib.vcs.conf import settings
47 from rhodecode.lib.vcs.conf import settings
48 from rhodecode.lib.vcs.backends import get_repo, get_backend
48 from rhodecode.lib.vcs.backends import get_repo, get_backend
49 from rhodecode.lib.vcs.exceptions import (
49 from rhodecode.lib.vcs.exceptions import (
50 VCSError, RepositoryError, CommitError)
50 VCSError, RepositoryError, CommitError)
51
51
52
52
53 log = logging.getLogger(__name__)
53 log = logging.getLogger(__name__)
54
54
55
55
56 def get_version():
56 def get_version():
57 """
57 """
58 Returns shorter version (digit parts only) as string.
58 Returns shorter version (digit parts only) as string.
59 """
59 """
60 return '.'.join((str(each) for each in VERSION[:3]))
60 return '.'.join((str(each) for each in VERSION[:3]))
61
61
62
62
63 def connect_pyro4(server_and_port):
63 def connect_pyro4(server_and_port):
64 from rhodecode.lib.vcs import connection, client
64 from rhodecode.lib.vcs import connection, client
65 from rhodecode.lib.middleware.utils import scm_app
65 from rhodecode.lib.middleware.utils import scm_app
66
66
67 git_remote = client.ThreadlocalProxyFactory(
67 git_remote = client.RequestScopeProxyFactory(
68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
68 settings.pyro_remote(settings.PYRO_GIT, server_and_port))
69 hg_remote = client.ThreadlocalProxyFactory(
69 hg_remote = client.RequestScopeProxyFactory(
70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
70 settings.pyro_remote(settings.PYRO_HG, server_and_port))
71 svn_remote = client.ThreadlocalProxyFactory(
71 svn_remote = client.RequestScopeProxyFactory(
72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
72 settings.pyro_remote(settings.PYRO_SVN, server_and_port))
73
73
74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
74 connection.Git = client.RepoMaker(proxy_factory=git_remote)
75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
75 connection.Hg = client.RepoMaker(proxy_factory=hg_remote)
76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
76 connection.Svn = client.RepoMaker(proxy_factory=svn_remote)
77
77
78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
78 scm_app.GIT_REMOTE_WSGI = Pyro4.Proxy(
79 settings.pyro_remote(
79 settings.pyro_remote(
80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
80 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
81 scm_app.HG_REMOTE_WSGI = Pyro4.Proxy(
82 settings.pyro_remote(
82 settings.pyro_remote(
83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
83 settings.PYRO_HG_REMOTE_WSGI, server_and_port))
84
84
85 @atexit.register
85 @atexit.register
86 def free_connection_resources():
86 def free_connection_resources():
87 connection.Git = None
87 connection.Git = None
88 connection.Hg = None
88 connection.Hg = None
89 connection.Svn = None
89 connection.Svn = None
90
90
91
91
92 def connect_http(server_and_port):
92 def connect_http(server_and_port):
93 from rhodecode.lib.vcs import connection, client_http
93 from rhodecode.lib.vcs import connection, client_http
94 from rhodecode.lib.middleware.utils import scm_app
94 from rhodecode.lib.middleware.utils import scm_app
95
95
96 session_factory = client_http.ThreadlocalSessionFactory()
96 session_factory = client_http.ThreadlocalSessionFactory()
97
97
98 connection.Git = client_http.RepoMaker(
98 connection.Git = client_http.RepoMaker(
99 server_and_port, '/git', session_factory)
99 server_and_port, '/git', session_factory)
100 connection.Hg = client_http.RepoMaker(
100 connection.Hg = client_http.RepoMaker(
101 server_and_port, '/hg', session_factory)
101 server_and_port, '/hg', session_factory)
102 connection.Svn = client_http.RepoMaker(
102 connection.Svn = client_http.RepoMaker(
103 server_and_port, '/svn', session_factory)
103 server_and_port, '/svn', session_factory)
104
104
105 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
105 scm_app.HG_REMOTE_WSGI = client_http.VcsHttpProxy(
106 server_and_port, '/proxy/hg')
106 server_and_port, '/proxy/hg')
107 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
107 scm_app.GIT_REMOTE_WSGI = client_http.VcsHttpProxy(
108 server_and_port, '/proxy/git')
108 server_and_port, '/proxy/git')
109
109
110 @atexit.register
110 @atexit.register
111 def free_connection_resources():
111 def free_connection_resources():
112 connection.Git = None
112 connection.Git = None
113 connection.Hg = None
113 connection.Hg = None
114 connection.Svn = None
114 connection.Svn = None
115
115
116
116
117 def connect_vcs(server_and_port, protocol='pyro4'):
117 def connect_vcs(server_and_port, protocol='pyro4'):
118 """
118 """
119 Initializes the connection to the vcs server.
119 Initializes the connection to the vcs server.
120
120
121 :param server_and_port: str, e.g. "localhost:9900"
121 :param server_and_port: str, e.g. "localhost:9900"
122 :param protocol: str, "pyro4" or "http"
122 :param protocol: str, "pyro4" or "http"
123 """
123 """
124 if protocol == 'pyro4':
124 if protocol == 'pyro4':
125 connect_pyro4(server_and_port)
125 connect_pyro4(server_and_port)
126 elif protocol == 'http':
126 elif protocol == 'http':
127 connect_http(server_and_port)
127 connect_http(server_and_port)
128
128
129
129
130 # TODO: johbo: This function should be moved into our test suite, there is
130 # TODO: johbo: This function should be moved into our test suite, there is
131 # no reason to support starting the vcsserver in Enterprise itself.
131 # no reason to support starting the vcsserver in Enterprise itself.
132 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
132 def start_vcs_server(server_and_port, protocol='pyro4', log_level=None):
133 """
133 """
134 Starts the vcs server in a subprocess.
134 Starts the vcs server in a subprocess.
135 """
135 """
136 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
136 log.info('Starting VCSServer as a sub process with %s protocol', protocol)
137 if protocol == 'http':
137 if protocol == 'http':
138 return _start_http_vcs_server(server_and_port, log_level)
138 return _start_http_vcs_server(server_and_port, log_level)
139 elif protocol == 'pyro4':
139 elif protocol == 'pyro4':
140 return _start_pyro4_vcs_server(server_and_port, log_level)
140 return _start_pyro4_vcs_server(server_and_port, log_level)
141
141
142
142
143 def _start_pyro4_vcs_server(server_and_port, log_level=None):
143 def _start_pyro4_vcs_server(server_and_port, log_level=None):
144 _try_to_shutdown_running_server(server_and_port)
144 _try_to_shutdown_running_server(server_and_port)
145 host, port = server_and_port.rsplit(":", 1)
145 host, port = server_and_port.rsplit(":", 1)
146 host = host.strip('[]')
146 host = host.strip('[]')
147 args = [
147 args = [
148 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
148 'vcsserver', '--port', port, '--host', host, '--locale', 'en_US.UTF-8',
149 '--threadpool', '32']
149 '--threadpool', '32']
150 if log_level:
150 if log_level:
151 args += ['--log-level', log_level]
151 args += ['--log-level', log_level]
152 proc = subprocess.Popen(args)
152 proc = subprocess.Popen(args)
153
153
154 def cleanup_server_process():
154 def cleanup_server_process():
155 proc.kill()
155 proc.kill()
156 atexit.register(cleanup_server_process)
156 atexit.register(cleanup_server_process)
157
157
158 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
158 server = create_vcsserver_proxy(server_and_port, protocol='pyro4')
159 _wait_until_vcs_server_is_reachable(server)
159 _wait_until_vcs_server_is_reachable(server)
160
160
161
161
162 def _start_http_vcs_server(server_and_port, log_level=None):
162 def _start_http_vcs_server(server_and_port, log_level=None):
163 # TODO: mikhail: shutdown if an http server already runs
163 # TODO: mikhail: shutdown if an http server already runs
164
164
165 host, port = server_and_port.rsplit(":", 1)
165 host, port = server_and_port.rsplit(":", 1)
166 args = [
166 args = [
167 'pserve', 'vcsserver/development_pyramid.ini',
167 'pserve', 'vcsserver/development_pyramid.ini',
168 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
168 'http_port=%s' % (port, ), 'http_host=%s' % (host, )]
169 proc = subprocess.Popen(args)
169 proc = subprocess.Popen(args)
170
170
171 def cleanup_server_process():
171 def cleanup_server_process():
172 proc.kill()
172 proc.kill()
173 atexit.register(cleanup_server_process)
173 atexit.register(cleanup_server_process)
174
174
175 server = create_vcsserver_proxy(server_and_port, protocol='http')
175 server = create_vcsserver_proxy(server_and_port, protocol='http')
176 _wait_until_vcs_server_is_reachable(server)
176 _wait_until_vcs_server_is_reachable(server)
177
177
178
178
179 def _wait_until_vcs_server_is_reachable(server):
179 def _wait_until_vcs_server_is_reachable(server):
180 while xrange(80): # max 40s of sleep
180 while xrange(80): # max 40s of sleep
181 try:
181 try:
182 server.ping()
182 server.ping()
183 break
183 break
184 except (CommunicationError, pycurl.error):
184 except (CommunicationError, pycurl.error):
185 pass
185 pass
186 time.sleep(0.5)
186 time.sleep(0.5)
187
187
188
188
189 def _try_to_shutdown_running_server(server_and_port):
189 def _try_to_shutdown_running_server(server_and_port):
190 server = create_vcsserver_proxy(server_and_port)
190 server = create_vcsserver_proxy(server_and_port)
191 try:
191 try:
192 server.shutdown()
192 server.shutdown()
193 except (CommunicationError, pycurl.error):
193 except (CommunicationError, pycurl.error):
194 return
194 return
195
195
196 # TODO: Not sure why this is important, but without it the following start
196 # TODO: Not sure why this is important, but without it the following start
197 # of the server fails.
197 # of the server fails.
198 server = create_vcsserver_proxy(server_and_port)
198 server = create_vcsserver_proxy(server_and_port)
199 server.ping()
199 server.ping()
200
200
201
201
202 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
202 def create_vcsserver_proxy(server_and_port, protocol='pyro4'):
203 if protocol == 'pyro4':
203 if protocol == 'pyro4':
204 return _create_vcsserver_proxy_pyro4(server_and_port)
204 return _create_vcsserver_proxy_pyro4(server_and_port)
205 elif protocol == 'http':
205 elif protocol == 'http':
206 return _create_vcsserver_proxy_http(server_and_port)
206 return _create_vcsserver_proxy_http(server_and_port)
207
207
208
208
209 def _create_vcsserver_proxy_pyro4(server_and_port):
209 def _create_vcsserver_proxy_pyro4(server_and_port):
210 server = Pyro4.Proxy(
210 server = Pyro4.Proxy(
211 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
211 settings.pyro_remote(settings.PYRO_VCSSERVER, server_and_port))
212 return server
212 return server
213
213
214
214
215 def _create_vcsserver_proxy_http(server_and_port):
215 def _create_vcsserver_proxy_http(server_and_port):
216 from rhodecode.lib.vcs import client_http
216 from rhodecode.lib.vcs import client_http
217
217
218 session = _create_http_rpc_session()
218 session = _create_http_rpc_session()
219 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
219 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
220 return client_http.RemoteObject(url, session)
220 return client_http.RemoteObject(url, session)
221
221
222
222
223 class CurlSession(object):
223 class CurlSession(object):
224 """
224 """
225 Modeled so that it provides a subset of the requests interface.
225 Modeled so that it provides a subset of the requests interface.
226
226
227 This has been created so that it does only provide a minimal API for our
227 This has been created so that it does only provide a minimal API for our
228 needs. The parts which it provides are based on the API of the library
228 needs. The parts which it provides are based on the API of the library
229 `requests` which allows us to easily benchmark against it.
229 `requests` which allows us to easily benchmark against it.
230
230
231 Please have a look at the class :class:`requests.Session` when you extend
231 Please have a look at the class :class:`requests.Session` when you extend
232 it.
232 it.
233 """
233 """
234
234
235 def __init__(self):
235 def __init__(self):
236 curl = pycurl.Curl()
236 curl = pycurl.Curl()
237 # TODO: johbo: I did test with 7.19 of libcurl. This version has
237 # TODO: johbo: I did test with 7.19 of libcurl. This version has
238 # trouble with 100 - continue being set in the expect header. This
238 # trouble with 100 - continue being set in the expect header. This
239 # can lead to massive performance drops, switching it off here.
239 # can lead to massive performance drops, switching it off here.
240 curl.setopt(curl.HTTPHEADER, ["Expect:"])
240 curl.setopt(curl.HTTPHEADER, ["Expect:"])
241 curl.setopt(curl.TCP_NODELAY, True)
241 curl.setopt(curl.TCP_NODELAY, True)
242 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
242 curl.setopt(curl.PROTOCOLS, curl.PROTO_HTTP)
243 self._curl = curl
243 self._curl = curl
244
244
245 def post(self, url, data, allow_redirects=False):
245 def post(self, url, data, allow_redirects=False):
246 response_buffer = StringIO()
246 response_buffer = StringIO()
247
247
248 curl = self._curl
248 curl = self._curl
249 curl.setopt(curl.URL, url)
249 curl.setopt(curl.URL, url)
250 curl.setopt(curl.POST, True)
250 curl.setopt(curl.POST, True)
251 curl.setopt(curl.POSTFIELDS, data)
251 curl.setopt(curl.POSTFIELDS, data)
252 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
252 curl.setopt(curl.FOLLOWLOCATION, allow_redirects)
253 curl.setopt(curl.WRITEDATA, response_buffer)
253 curl.setopt(curl.WRITEDATA, response_buffer)
254 curl.perform()
254 curl.perform()
255
255
256 return CurlResponse(response_buffer)
256 return CurlResponse(response_buffer)
257
257
258
258
259 class CurlResponse(object):
259 class CurlResponse(object):
260 """
260 """
261 The response of a request, modeled after the requests API.
261 The response of a request, modeled after the requests API.
262
262
263 This class provides a subset of the response interface known from the
263 This class provides a subset of the response interface known from the
264 library `requests`. It is intentionally kept similar, so that we can use
264 library `requests`. It is intentionally kept similar, so that we can use
265 `requests` as a drop in replacement for benchmarking purposes.
265 `requests` as a drop in replacement for benchmarking purposes.
266 """
266 """
267
267
268 def __init__(self, response_buffer):
268 def __init__(self, response_buffer):
269 self._response_buffer = response_buffer
269 self._response_buffer = response_buffer
270
270
271 @property
271 @property
272 def content(self):
272 def content(self):
273 return self._response_buffer.getvalue()
273 return self._response_buffer.getvalue()
274
274
275
275
276 def _create_http_rpc_session():
276 def _create_http_rpc_session():
277 session = CurlSession()
277 session = CurlSession()
278 return session
278 return session
@@ -1,284 +1,327 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Provides the implementation of various client utilities to reach the vcsserver.
22 Provides the implementation of various client utilities to reach the vcsserver.
23 """
23 """
24
24
25
25
26 import copy
26 import copy
27 import logging
27 import logging
28 import threading
28 import threading
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31 import weakref
31 import weakref
32 from urllib2 import URLError
32 from urllib2 import URLError
33
33
34 import msgpack
34 import msgpack
35 import Pyro4
35 import Pyro4
36 import requests
36 import requests
37 from pyramid.threadlocal import get_current_request
37 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
38
39
39 from rhodecode.lib.vcs import exceptions
40 from rhodecode.lib.vcs import exceptions
40 from rhodecode.lib.vcs.conf import settings
41 from rhodecode.lib.vcs.conf import settings
41
42
42 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
43
44
44
45
45 # TODO: mikhail: Keep it in sync with vcsserver's
46 # TODO: mikhail: Keep it in sync with vcsserver's
46 # HTTPApplication.ALLOWED_EXCEPTIONS
47 # HTTPApplication.ALLOWED_EXCEPTIONS
47 EXCEPTIONS_MAP = {
48 EXCEPTIONS_MAP = {
48 'KeyError': KeyError,
49 'KeyError': KeyError,
49 'URLError': URLError,
50 'URLError': URLError,
50 }
51 }
51
52
52
53
53 class HTTPRepoMaker(object):
54 class HTTPRepoMaker(object):
54 def __init__(self, server_and_port, backend_endpoint):
55 def __init__(self, server_and_port, backend_endpoint):
55 self.url = urlparse.urljoin(
56 self.url = urlparse.urljoin(
56 'http://%s' % server_and_port, backend_endpoint)
57 'http://%s' % server_and_port, backend_endpoint)
57
58
58 def __call__(self, path, config, with_wire=None):
59 def __call__(self, path, config, with_wire=None):
59 log.debug('HTTPRepoMaker call on %s', path)
60 log.debug('HTTPRepoMaker call on %s', path)
60 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
61
62
62 def __getattr__(self, name):
63 def __getattr__(self, name):
63 def f(*args, **kwargs):
64 def f(*args, **kwargs):
64 return self._call(name, *args, **kwargs)
65 return self._call(name, *args, **kwargs)
65 return f
66 return f
66
67
67 @exceptions.map_vcs_exceptions
68 @exceptions.map_vcs_exceptions
68 def _call(self, name, *args, **kwargs):
69 def _call(self, name, *args, **kwargs):
69 payload = {
70 payload = {
70 'id': str(uuid.uuid4()),
71 'id': str(uuid.uuid4()),
71 'method': name,
72 'method': name,
72 'params': {'args': args, 'kwargs': kwargs}
73 'params': {'args': args, 'kwargs': kwargs}
73 }
74 }
74 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
75
76
76
77
77 class VcsHttpProxy(object):
78 class VcsHttpProxy(object):
78
79
79 CHUNK_SIZE = 16384
80 CHUNK_SIZE = 16384
80
81
81 def __init__(self, server_and_port, backend_endpoint):
82 def __init__(self, server_and_port, backend_endpoint):
82 adapter = requests.adapters.HTTPAdapter(max_retries=5)
83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
83 self.base_url = urlparse.urljoin(
84 self.base_url = urlparse.urljoin(
84 'http://%s' % server_and_port, backend_endpoint)
85 'http://%s' % server_and_port, backend_endpoint)
85 self.session = requests.Session()
86 self.session = requests.Session()
86 self.session.mount('http://', adapter)
87 self.session.mount('http://', adapter)
87
88
88 def handle(self, environment, input_data, *args, **kwargs):
89 def handle(self, environment, input_data, *args, **kwargs):
89 data = {
90 data = {
90 'environment': environment,
91 'environment': environment,
91 'input_data': input_data,
92 'input_data': input_data,
92 'args': args,
93 'args': args,
93 'kwargs': kwargs
94 'kwargs': kwargs
94 }
95 }
95 result = self.session.post(
96 result = self.session.post(
96 self.base_url, msgpack.packb(data), stream=True)
97 self.base_url, msgpack.packb(data), stream=True)
97 return self._get_result(result)
98 return self._get_result(result)
98
99
99 def _deserialize_and_raise(self, error):
100 def _deserialize_and_raise(self, error):
100 exception = Exception(error['message'])
101 exception = Exception(error['message'])
101 try:
102 try:
102 exception._vcs_kind = error['_vcs_kind']
103 exception._vcs_kind = error['_vcs_kind']
103 except KeyError:
104 except KeyError:
104 pass
105 pass
105 raise exception
106 raise exception
106
107
107 def _iterate(self, result):
108 def _iterate(self, result):
108 unpacker = msgpack.Unpacker()
109 unpacker = msgpack.Unpacker()
109 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
110 unpacker.feed(line)
111 unpacker.feed(line)
111 for chunk in unpacker:
112 for chunk in unpacker:
112 yield chunk
113 yield chunk
113
114
114 def _get_result(self, result):
115 def _get_result(self, result):
115 iterator = self._iterate(result)
116 iterator = self._iterate(result)
116 error = iterator.next()
117 error = iterator.next()
117 if error:
118 if error:
118 self._deserialize_and_raise(error)
119 self._deserialize_and_raise(error)
119
120
120 status = iterator.next()
121 status = iterator.next()
121 headers = iterator.next()
122 headers = iterator.next()
122
123
123 return iterator, status, headers
124 return iterator, status, headers
124
125
125
126
126 class HTTPRemoteRepo(object):
127 class HTTPRemoteRepo(object):
127 def __init__(self, path, config, url, with_wire=None):
128 def __init__(self, path, config, url, with_wire=None):
128 self.url = url
129 self.url = url
129 self._wire = {
130 self._wire = {
130 "path": path,
131 "path": path,
131 "config": config,
132 "config": config,
132 "context": str(uuid.uuid4()),
133 "context": str(uuid.uuid4()),
133 }
134 }
134 if with_wire:
135 if with_wire:
135 self._wire.update(with_wire)
136 self._wire.update(with_wire)
136
137
137 def __getattr__(self, name):
138 def __getattr__(self, name):
138 def f(*args, **kwargs):
139 def f(*args, **kwargs):
139 return self._call(name, *args, **kwargs)
140 return self._call(name, *args, **kwargs)
140 return f
141 return f
141
142
142 @exceptions.map_vcs_exceptions
143 @exceptions.map_vcs_exceptions
143 def _call(self, name, *args, **kwargs):
144 def _call(self, name, *args, **kwargs):
144 log.debug('Calling %s@%s', self.url, name)
145 log.debug('Calling %s@%s', self.url, name)
145 # TODO: oliver: This is currently necessary pre-call since the
146 # TODO: oliver: This is currently necessary pre-call since the
146 # config object is being changed for hooking scenarios
147 # config object is being changed for hooking scenarios
147 wire = copy.deepcopy(self._wire)
148 wire = copy.deepcopy(self._wire)
148 wire["config"] = wire["config"].serialize()
149 wire["config"] = wire["config"].serialize()
149 payload = {
150 payload = {
150 'id': str(uuid.uuid4()),
151 'id': str(uuid.uuid4()),
151 'method': name,
152 'method': name,
152 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
153 }
154 }
154 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
155
156
156 def __getitem__(self, key):
157 def __getitem__(self, key):
157 return self.revision(key)
158 return self.revision(key)
158
159
159
160
160 def _remote_call(url, payload, exceptions_map):
161 def _remote_call(url, payload, exceptions_map):
161 response = requests.post(url, data=msgpack.packb(payload))
162 response = requests.post(url, data=msgpack.packb(payload))
162 response = msgpack.unpackb(response.content)
163 response = msgpack.unpackb(response.content)
163 error = response.get('error')
164 error = response.get('error')
164 if error:
165 if error:
165 type_ = error.get('type', 'Exception')
166 type_ = error.get('type', 'Exception')
166 exc = exceptions_map.get(type_, Exception)
167 exc = exceptions_map.get(type_, Exception)
167 exc = exc(error.get('message'))
168 exc = exc(error.get('message'))
168 try:
169 try:
169 exc._vcs_kind = error['_vcs_kind']
170 exc._vcs_kind = error['_vcs_kind']
170 except KeyError:
171 except KeyError:
171 pass
172 pass
172 raise exc
173 raise exc
173 return response.get('result')
174 return response.get('result')
174
175
175
176
176 class RepoMaker(object):
177 class RepoMaker(object):
177
178
178 def __init__(self, proxy_factory):
179 def __init__(self, proxy_factory):
179 self._proxy_factory = proxy_factory
180 self._proxy_factory = proxy_factory
180
181
181 def __call__(self, path, config, with_wire=None):
182 def __call__(self, path, config, with_wire=None):
182 log.debug('RepoMaker call on %s', path)
183 log.debug('RepoMaker call on %s', path)
183 return RemoteRepo(
184 return RemoteRepo(
184 path, config, remote_proxy=self._proxy_factory(),
185 path, config, remote_proxy=self._proxy_factory(),
185 with_wire=with_wire)
186 with_wire=with_wire)
186
187
187 def __getattr__(self, name):
188 def __getattr__(self, name):
188 remote_proxy = self._proxy_factory()
189 remote_proxy = self._proxy_factory()
189 func = _get_proxy_method(remote_proxy, name)
190 func = _get_proxy_method(remote_proxy, name)
190 return _wrap_remote_call(remote_proxy, func)
191 return _wrap_remote_call(remote_proxy, func)
191
192
192
193
193 class ThreadlocalProxyFactory(object):
194 class RequestScopeProxyFactory(object):
194 """
195 """
195 Creates one Pyro4 proxy per thread on demand.
196 This factory returns pyro proxy instances based on a per request scope.
197 It returns the same instance if called from within the same request and
198 different instances if called from different requests.
196 """
199 """
197
200
198 def __init__(self, remote_uri):
201 def __init__(self, remote_uri):
199 self._remote_uri = remote_uri
202 self._remote_uri = remote_uri
200 self._thread_local = threading.local()
203 self._proxy_pool = []
204 self._borrowed_proxies = {}
205
206 def __call__(self, request=None):
207 """
208 Wrapper around `getProxy`.
209 """
210 request = request or get_current_request()
211 return self.getProxy(request)
212
213 def getProxy(self, request=None):
214 """
215 Call this to get the pyro proxy instance for the request.
216 """
217 request = request or get_current_request()
218
219 # Return already borrowed proxy for this request
220 if request in self._borrowed_proxies:
221 return self._borrowed_proxies[request]
201
222
202 def __call__(self):
223 # Get proxy from pool or create new instance.
203 if not hasattr(self._thread_local, 'proxy'):
224 try:
204 self._thread_local.proxy = Pyro4.Proxy(self._remote_uri)
225 proxy = self._proxy_pool.pop()
205 return self._thread_local.proxy
226 except IndexError:
227 log.info('Creating new proxy for remote_uri=%s', self._remote_uri)
228 proxy = Pyro4.Proxy(self._remote_uri)
229
230 # Store proxy instance as borrowed and add request callback.
231 self._borrowed_proxies[request] = proxy
232 request.add_finished_callback(self._returnProxy)
233
234 return proxy
235
236 def _returnProxy(self, request=None):
237 """
238 Callback that gets called by pyramid when the request is finished.
239 It puts the proxy back into the pool.
240 """
241 request = request or get_current_request()
242
243 if request in self._borrowed_proxies:
244 proxy = self._borrowed_proxies.pop(request)
245 self._proxy_pool.append(proxy)
246 else:
247 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
248 'for this request.', self._remote_uri)
206
249
207
250
208 class RemoteRepo(object):
251 class RemoteRepo(object):
209
252
210 def __init__(self, path, config, remote_proxy, with_wire=None):
253 def __init__(self, path, config, remote_proxy, with_wire=None):
211 self._wire = {
254 self._wire = {
212 "path": path,
255 "path": path,
213 "config": config,
256 "config": config,
214 "context": uuid.uuid4(),
257 "context": uuid.uuid4(),
215 }
258 }
216 if with_wire:
259 if with_wire:
217 self._wire.update(with_wire)
260 self._wire.update(with_wire)
218 self._remote_proxy = remote_proxy
261 self._remote_proxy = remote_proxy
219 self.refs = RefsWrapper(self)
262 self.refs = RefsWrapper(self)
220
263
221 def __getattr__(self, name):
264 def __getattr__(self, name):
222 log.debug('Calling %s@%s', self._remote_proxy, name)
265 log.debug('Calling %s@%s', self._remote_proxy, name)
223 # TODO: oliver: This is currently necessary pre-call since the
266 # TODO: oliver: This is currently necessary pre-call since the
224 # config object is being changed for hooking scenarios
267 # config object is being changed for hooking scenarios
225 wire = copy.deepcopy(self._wire)
268 wire = copy.deepcopy(self._wire)
226 wire["config"] = wire["config"].serialize()
269 wire["config"] = wire["config"].serialize()
227
270
228 try:
271 try:
229 func = _get_proxy_method(self._remote_proxy, name)
272 func = _get_proxy_method(self._remote_proxy, name)
230 except DaemonError as e:
273 except DaemonError as e:
231 if e.message == 'unknown object':
274 if e.message == 'unknown object':
232 raise exceptions.VCSBackendNotSupportedError
275 raise exceptions.VCSBackendNotSupportedError
233 else:
276 else:
234 raise
277 raise
235
278
236 return _wrap_remote_call(self._remote_proxy, func, wire)
279 return _wrap_remote_call(self._remote_proxy, func, wire)
237
280
238 def __getitem__(self, key):
281 def __getitem__(self, key):
239 return self.revision(key)
282 return self.revision(key)
240
283
241
284
242 def _get_proxy_method(proxy, name):
285 def _get_proxy_method(proxy, name):
243 try:
286 try:
244 return getattr(proxy, name)
287 return getattr(proxy, name)
245 except CommunicationError:
288 except CommunicationError:
246 raise CommunicationError(
289 raise CommunicationError(
247 'Unable to connect to remote pyro server %s' % proxy)
290 'Unable to connect to remote pyro server %s' % proxy)
248
291
249
292
250 def _wrap_remote_call(proxy, func, *args):
293 def _wrap_remote_call(proxy, func, *args):
251 all_args = list(args)
294 all_args = list(args)
252
295
253 @exceptions.map_vcs_exceptions
296 @exceptions.map_vcs_exceptions
254 def caller(*args, **kwargs):
297 def caller(*args, **kwargs):
255 all_args.extend(args)
298 all_args.extend(args)
256 try:
299 try:
257 return func(*all_args, **kwargs)
300 return func(*all_args, **kwargs)
258 except ConnectionClosedError:
301 except ConnectionClosedError:
259 log.debug('Connection to VCSServer closed, trying to reconnect.')
302 log.debug('Connection to VCSServer closed, trying to reconnect.')
260 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
303 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
261
304
262 return func(*all_args, **kwargs)
305 return func(*all_args, **kwargs)
263
306
264 return caller
307 return caller
265
308
266
309
267 class RefsWrapper(object):
310 class RefsWrapper(object):
268
311
269 def __init__(self, repo):
312 def __init__(self, repo):
270 self._repo = weakref.proxy(repo)
313 self._repo = weakref.proxy(repo)
271
314
272 def __setitem__(self, key, value):
315 def __setitem__(self, key, value):
273 self._repo._assign_ref(key, value)
316 self._repo._assign_ref(key, value)
274
317
275
318
276 class FunctionWrapper(object):
319 class FunctionWrapper(object):
277
320
278 def __init__(self, func, wire):
321 def __init__(self, func, wire):
279 self._func = func
322 self._func = func
280 self._wire = wire
323 self._wire = wire
281
324
282 @exceptions.map_vcs_exceptions
325 @exceptions.map_vcs_exceptions
283 def __call__(self, *args, **kwargs):
326 def __call__(self, *args, **kwargs):
284 return self._func(self._wire, *args, **kwargs)
327 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now