##// END OF EJS Templates
pyro4: Adapt pyro4 RemoteRepo to have the same API as http RemoteRepo class.
Martin Bornhold -
r404:94c8766a default
parent child Browse files
Show More
@@ -1,333 +1,346 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Provides the implementation of various client utilities to reach the vcsserver.
23 23 """
24 24
25 25
26 26 import copy
27 27 import logging
28 28 import threading
29 29 import urlparse
30 30 import uuid
31 31 import weakref
32 32 from urllib2 import URLError
33 33
34 34 import msgpack
35 35 import Pyro4
36 36 import requests
37 37 from pyramid.threadlocal import get_current_request
38 38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
39 39
40 40 from rhodecode.lib.vcs import exceptions
41 41 from rhodecode.lib.vcs.conf import settings
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 # TODO: mikhail: Keep it in sync with vcsserver's
47 47 # HTTPApplication.ALLOWED_EXCEPTIONS
48 48 EXCEPTIONS_MAP = {
49 49 'KeyError': KeyError,
50 50 'URLError': URLError,
51 51 }
52 52
53 53
54 54 class HTTPRepoMaker(object):
55 55 def __init__(self, server_and_port, backend_endpoint):
56 56 self.url = urlparse.urljoin(
57 57 'http://%s' % server_and_port, backend_endpoint)
58 58
59 59 def __call__(self, path, config, with_wire=None):
60 60 log.debug('HTTPRepoMaker call on %s', path)
61 61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
62 62
63 63 def __getattr__(self, name):
64 64 def f(*args, **kwargs):
65 65 return self._call(name, *args, **kwargs)
66 66 return f
67 67
68 68 @exceptions.map_vcs_exceptions
69 69 def _call(self, name, *args, **kwargs):
70 70 payload = {
71 71 'id': str(uuid.uuid4()),
72 72 'method': name,
73 73 'params': {'args': args, 'kwargs': kwargs}
74 74 }
75 75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
76 76
77 77
78 78 class VcsHttpProxy(object):
79 79
80 80 CHUNK_SIZE = 16384
81 81
82 82 def __init__(self, server_and_port, backend_endpoint):
83 83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
84 84 self.base_url = urlparse.urljoin(
85 85 'http://%s' % server_and_port, backend_endpoint)
86 86 self.session = requests.Session()
87 87 self.session.mount('http://', adapter)
88 88
89 89 def handle(self, environment, input_data, *args, **kwargs):
90 90 data = {
91 91 'environment': environment,
92 92 'input_data': input_data,
93 93 'args': args,
94 94 'kwargs': kwargs
95 95 }
96 96 result = self.session.post(
97 97 self.base_url, msgpack.packb(data), stream=True)
98 98 return self._get_result(result)
99 99
100 100 def _deserialize_and_raise(self, error):
101 101 exception = Exception(error['message'])
102 102 try:
103 103 exception._vcs_kind = error['_vcs_kind']
104 104 except KeyError:
105 105 pass
106 106 raise exception
107 107
108 108 def _iterate(self, result):
109 109 unpacker = msgpack.Unpacker()
110 110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
111 111 unpacker.feed(line)
112 112 for chunk in unpacker:
113 113 yield chunk
114 114
115 115 def _get_result(self, result):
116 116 iterator = self._iterate(result)
117 117 error = iterator.next()
118 118 if error:
119 119 self._deserialize_and_raise(error)
120 120
121 121 status = iterator.next()
122 122 headers = iterator.next()
123 123
124 124 return iterator, status, headers
125 125
126 126
127 127 class HTTPRemoteRepo(object):
128 128 def __init__(self, path, config, url, with_wire=None):
129 129 self.url = url
130 130 self._wire = {
131 131 "path": path,
132 132 "config": config,
133 133 "context": str(uuid.uuid4()),
134 134 }
135 135 if with_wire:
136 136 self._wire.update(with_wire)
137 137
138 138 def __getattr__(self, name):
139 139 def f(*args, **kwargs):
140 140 return self._call(name, *args, **kwargs)
141 141 return f
142 142
143 143 @exceptions.map_vcs_exceptions
144 144 def _call(self, name, *args, **kwargs):
145 145 log.debug('Calling %s@%s', self.url, name)
146 146 # TODO: oliver: This is currently necessary pre-call since the
147 147 # config object is being changed for hooking scenarios
148 148 wire = copy.deepcopy(self._wire)
149 149 wire["config"] = wire["config"].serialize()
150 150 payload = {
151 151 'id': str(uuid.uuid4()),
152 152 'method': name,
153 153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
154 154 }
155 155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
156 156
157 157 def __getitem__(self, key):
158 158 return self.revision(key)
159 159
160 160
161 161 def _remote_call(url, payload, exceptions_map):
162 162 response = requests.post(url, data=msgpack.packb(payload))
163 163 response = msgpack.unpackb(response.content)
164 164 error = response.get('error')
165 165 if error:
166 166 type_ = error.get('type', 'Exception')
167 167 exc = exceptions_map.get(type_, Exception)
168 168 exc = exc(error.get('message'))
169 169 try:
170 170 exc._vcs_kind = error['_vcs_kind']
171 171 except KeyError:
172 172 pass
173 173 raise exc
174 174 return response.get('result')
175 175
176 176
177 177 class RepoMaker(object):
178 178
179 179 def __init__(self, proxy_factory):
180 180 self._proxy_factory = proxy_factory
181 181
182 182 def __call__(self, path, config, with_wire=None):
183 183 log.debug('RepoMaker call on %s', path)
184 184 return RemoteRepo(
185 185 path, config, remote_proxy=self._proxy_factory(),
186 186 with_wire=with_wire)
187 187
188 188 def __getattr__(self, name):
189 189 remote_proxy = self._proxy_factory()
190 190 func = _get_proxy_method(remote_proxy, name)
191 191 return _wrap_remote_call(remote_proxy, func)
192 192
193 193
194 194 class RequestScopeProxyFactory(object):
195 195 """
196 196 This factory returns pyro proxy instances based on a per request scope.
197 197 It returns the same instance if called from within the same request and
198 198 different instances if called from different requests.
199 199 """
200 200
201 201 def __init__(self, remote_uri):
202 202 self._remote_uri = remote_uri
203 203 self._proxy_pool = []
204 204 self._borrowed_proxies = {}
205 205
206 206 def __call__(self, request=None):
207 207 """
208 208 Wrapper around `getProxy`.
209 209 """
210 210 request = request or get_current_request()
211 211 return self.getProxy(request)
212 212
213 213 def getProxy(self, request):
214 214 """
215 215 Call this to get the pyro proxy instance for the request.
216 216 """
217 217
218 218 # If called without a request context we return new proxy instances
219 219 # on every call. This allows to run e.g. invoke tasks.
220 220 if request is None:
221 221 log.info('Creating pyro proxy without request context for '
222 222 'remote_uri=%s', self._remote_uri)
223 223 return Pyro4.Proxy(self._remote_uri)
224 224
225 225 # If there is an already borrowed proxy for the request context we
226 226 # return that instance instead of creating a new one.
227 227 if request in self._borrowed_proxies:
228 228 return self._borrowed_proxies[request]
229 229
230 230 # Get proxy from pool or create new instance.
231 231 try:
232 232 proxy = self._proxy_pool.pop()
233 233 except IndexError:
234 234 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
235 235 proxy = Pyro4.Proxy(self._remote_uri)
236 236
237 237 # Mark proxy as borrowed for the request context and add a callback
238 238 # that returns it when the request processing is finished.
239 239 self._borrowed_proxies[request] = proxy
240 240 request.add_finished_callback(self._returnProxy)
241 241
242 242 return proxy
243 243
244 244 def _returnProxy(self, request):
245 245 """
246 246 Callback that gets called by pyramid when the request is finished.
247 247 It puts the proxy back into the pool.
248 248 """
249 249 if request in self._borrowed_proxies:
250 250 proxy = self._borrowed_proxies.pop(request)
251 251 self._proxy_pool.append(proxy)
252 252 else:
253 253 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
254 254 'for this request.', self._remote_uri)
255 255
256 256
257 257 class RemoteRepo(object):
258 258
259 259 def __init__(self, path, config, remote_proxy, with_wire=None):
260 260 self._wire = {
261 261 "path": path,
262 262 "config": config,
263 "context": uuid.uuid4(),
263 "context": self._create_vcs_cache_context(),
264 264 }
265 265 if with_wire:
266 266 self._wire.update(with_wire)
267 267 self._remote_proxy = remote_proxy
268 268 self.refs = RefsWrapper(self)
269 269
270 270 def __getattr__(self, name):
271 271 log.debug('Calling %s@%s', self._remote_proxy, name)
272 272 # TODO: oliver: This is currently necessary pre-call since the
273 273 # config object is being changed for hooking scenarios
274 274 wire = copy.deepcopy(self._wire)
275 275 wire["config"] = wire["config"].serialize()
276 276
277 277 try:
278 278 func = _get_proxy_method(self._remote_proxy, name)
279 279 except DaemonError as e:
280 280 if e.message == 'unknown object':
281 281 raise exceptions.VCSBackendNotSupportedError
282 282 else:
283 283 raise
284 284
285 285 return _wrap_remote_call(self._remote_proxy, func, wire)
286 286
287 287 def __getitem__(self, key):
288 288 return self.revision(key)
289 289
290 def _create_vcs_cache_context(self):
291 """
292 Creates a unique string which is passed to the VCSServer on every
293 remote call. It is used as cache key in the VCSServer.
294 """
295 return str(uuid.uuid4())
296
297 def invalidate_vcs_cache(self):
298 """
299 This is a no-op method for the pyro4 backend but we want to have the
300 same API for client.RemoteRepo and client_http.RemoteRepo classes.
301 """
302
290 303
291 304 def _get_proxy_method(proxy, name):
292 305 try:
293 306 return getattr(proxy, name)
294 307 except CommunicationError:
295 308 raise CommunicationError(
296 309 'Unable to connect to remote pyro server %s' % proxy)
297 310
298 311
299 312 def _wrap_remote_call(proxy, func, *args):
300 313 all_args = list(args)
301 314
302 315 @exceptions.map_vcs_exceptions
303 316 def caller(*args, **kwargs):
304 317 all_args.extend(args)
305 318 try:
306 319 return func(*all_args, **kwargs)
307 320 except ConnectionClosedError:
308 321 log.debug('Connection to VCSServer closed, trying to reconnect.')
309 322 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
310 323
311 324 return func(*all_args, **kwargs)
312 325
313 326 return caller
314 327
315 328
316 329 class RefsWrapper(object):
317 330
318 331 def __init__(self, repo):
319 332 self._repo = weakref.proxy(repo)
320 333
321 334 def __setitem__(self, key, value):
322 335 self._repo._assign_ref(key, value)
323 336
324 337
325 338 class FunctionWrapper(object):
326 339
327 340 def __init__(self, func, wire):
328 341 self._func = func
329 342 self._wire = wire
330 343
331 344 @exceptions.map_vcs_exceptions
332 345 def __call__(self, *args, **kwargs):
333 346 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now