##// END OF EJS Templates
vcs: Remove dead code.
Martin Bornhold -
r928:bccd79cf default
parent child Browse files
Show More
@@ -1,346 +1,210 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Provides the implementation of various client utilities to reach the vcsserver.
23 23 """
24 24
25 25
26 26 import copy
27 27 import logging
28 import threading
29 import urlparse
30 28 import uuid
31 29 import weakref
32 from urllib2 import URLError
33 30
34 import msgpack
35 31 import Pyro4
36 import requests
37 32 from pyramid.threadlocal import get_current_request
38 33 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
39 34
40 35 from rhodecode.lib.vcs import exceptions
41 36 from rhodecode.lib.vcs.conf import settings
42 37
43 38 log = logging.getLogger(__name__)
44 39
45 40
46 # TODO: mikhail: Keep it in sync with vcsserver's
47 # HTTPApplication.ALLOWED_EXCEPTIONS
48 EXCEPTIONS_MAP = {
49 'KeyError': KeyError,
50 'URLError': URLError,
51 }
52
53
54 class HTTPRepoMaker(object):
55 def __init__(self, server_and_port, backend_endpoint):
56 self.url = urlparse.urljoin(
57 'http://%s' % server_and_port, backend_endpoint)
58
59 def __call__(self, path, config, with_wire=None):
60 log.debug('HTTPRepoMaker call on %s', path)
61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
62
63 def __getattr__(self, name):
64 def f(*args, **kwargs):
65 return self._call(name, *args, **kwargs)
66 return f
67
68 @exceptions.map_vcs_exceptions
69 def _call(self, name, *args, **kwargs):
70 payload = {
71 'id': str(uuid.uuid4()),
72 'method': name,
73 'params': {'args': args, 'kwargs': kwargs}
74 }
75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
76
77
78 class VcsHttpProxy(object):
79
80 CHUNK_SIZE = 16384
81
82 def __init__(self, server_and_port, backend_endpoint):
83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
84 self.base_url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
86 self.session = requests.Session()
87 self.session.mount('http://', adapter)
88
89 def handle(self, environment, input_data, *args, **kwargs):
90 data = {
91 'environment': environment,
92 'input_data': input_data,
93 'args': args,
94 'kwargs': kwargs
95 }
96 result = self.session.post(
97 self.base_url, msgpack.packb(data), stream=True)
98 return self._get_result(result)
99
100 def _deserialize_and_raise(self, error):
101 exception = Exception(error['message'])
102 try:
103 exception._vcs_kind = error['_vcs_kind']
104 except KeyError:
105 pass
106 raise exception
107
108 def _iterate(self, result):
109 unpacker = msgpack.Unpacker()
110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
111 unpacker.feed(line)
112 for chunk in unpacker:
113 yield chunk
114
115 def _get_result(self, result):
116 iterator = self._iterate(result)
117 error = iterator.next()
118 if error:
119 self._deserialize_and_raise(error)
120
121 status = iterator.next()
122 headers = iterator.next()
123
124 return iterator, status, headers
125
126
127 class HTTPRemoteRepo(object):
128 def __init__(self, path, config, url, with_wire=None):
129 self.url = url
130 self._wire = {
131 "path": path,
132 "config": config,
133 "context": str(uuid.uuid4()),
134 }
135 if with_wire:
136 self._wire.update(with_wire)
137
138 def __getattr__(self, name):
139 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
141 return f
142
143 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
145 log.debug('Calling %s@%s', self.url, name)
146 # TODO: oliver: This is currently necessary pre-call since the
147 # config object is being changed for hooking scenarios
148 wire = copy.deepcopy(self._wire)
149 wire["config"] = wire["config"].serialize()
150 payload = {
151 'id': str(uuid.uuid4()),
152 'method': name,
153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
154 }
155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
156
157 def __getitem__(self, key):
158 return self.revision(key)
159
160
161 def _remote_call(url, payload, exceptions_map):
162 response = requests.post(url, data=msgpack.packb(payload))
163 response = msgpack.unpackb(response.content)
164 error = response.get('error')
165 if error:
166 type_ = error.get('type', 'Exception')
167 exc = exceptions_map.get(type_, Exception)
168 exc = exc(error.get('message'))
169 try:
170 exc._vcs_kind = error['_vcs_kind']
171 except KeyError:
172 pass
173 raise exc
174 return response.get('result')
175
176
177 41 class RepoMaker(object):
178 42
179 43 def __init__(self, proxy_factory):
180 44 self._proxy_factory = proxy_factory
181 45
182 46 def __call__(self, path, config, with_wire=None):
183 47 log.debug('RepoMaker call on %s', path)
184 48 return RemoteRepo(
185 49 path, config, remote_proxy=self._proxy_factory(),
186 50 with_wire=with_wire)
187 51
188 52 def __getattr__(self, name):
189 53 remote_proxy = self._proxy_factory()
190 54 func = _get_proxy_method(remote_proxy, name)
191 55 return _wrap_remote_call(remote_proxy, func)
192 56
193 57
194 58 class RequestScopeProxyFactory(object):
195 59 """
196 60 This factory returns pyro proxy instances based on a per request scope.
197 61 It returns the same instance if called from within the same request and
198 62 different instances if called from different requests.
199 63 """
200 64
201 65 def __init__(self, remote_uri):
202 66 self._remote_uri = remote_uri
203 67 self._proxy_pool = []
204 68 self._borrowed_proxies = {}
205 69
206 70 def __call__(self, request=None):
207 71 """
208 72 Wrapper around `getProxy`.
209 73 """
210 74 request = request or get_current_request()
211 75 return self.getProxy(request)
212 76
213 77 def getProxy(self, request):
214 78 """
215 79 Call this to get the pyro proxy instance for the request.
216 80 """
217 81
218 82 # If called without a request context we return new proxy instances
219 83 # on every call. This allows to run e.g. invoke tasks.
220 84 if request is None:
221 85 log.info('Creating pyro proxy without request context for '
222 86 'remote_uri=%s', self._remote_uri)
223 87 return Pyro4.Proxy(self._remote_uri)
224 88
225 89 # If there is an already borrowed proxy for the request context we
226 90 # return that instance instead of creating a new one.
227 91 if request in self._borrowed_proxies:
228 92 return self._borrowed_proxies[request]
229 93
230 94 # Get proxy from pool or create new instance.
231 95 try:
232 96 proxy = self._proxy_pool.pop()
233 97 except IndexError:
234 98 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
235 99 proxy = Pyro4.Proxy(self._remote_uri)
236 100
237 101 # Mark proxy as borrowed for the request context and add a callback
238 102 # that returns it when the request processing is finished.
239 103 self._borrowed_proxies[request] = proxy
240 104 request.add_finished_callback(self._returnProxy)
241 105
242 106 return proxy
243 107
244 108 def _returnProxy(self, request):
245 109 """
246 110 Callback that gets called by pyramid when the request is finished.
247 111 It puts the proxy back into the pool.
248 112 """
249 113 if request in self._borrowed_proxies:
250 114 proxy = self._borrowed_proxies.pop(request)
251 115 self._proxy_pool.append(proxy)
252 116 else:
253 117 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
254 118 'for this request.', self._remote_uri)
255 119
256 120
257 121 class RemoteRepo(object):
258 122
259 123 def __init__(self, path, config, remote_proxy, with_wire=None):
260 124 self._wire = {
261 125 "path": path,
262 126 "config": config,
263 127 "context": self._create_vcs_cache_context(),
264 128 }
265 129 if with_wire:
266 130 self._wire.update(with_wire)
267 131 self._remote_proxy = remote_proxy
268 132 self.refs = RefsWrapper(self)
269 133
270 134 def __getattr__(self, name):
271 135 log.debug('Calling %s@%s', self._remote_proxy, name)
272 136 # TODO: oliver: This is currently necessary pre-call since the
273 137 # config object is being changed for hooking scenarios
274 138 wire = copy.deepcopy(self._wire)
275 139 wire["config"] = wire["config"].serialize()
276 140
277 141 try:
278 142 func = _get_proxy_method(self._remote_proxy, name)
279 143 except DaemonError as e:
280 144 if e.message == 'unknown object':
281 145 raise exceptions.VCSBackendNotSupportedError
282 146 else:
283 147 raise
284 148
285 149 return _wrap_remote_call(self._remote_proxy, func, wire)
286 150
287 151 def __getitem__(self, key):
288 152 return self.revision(key)
289 153
290 154 def _create_vcs_cache_context(self):
291 155 """
292 156 Creates a unique string which is passed to the VCSServer on every
293 157 remote call. It is used as cache key in the VCSServer.
294 158 """
295 159 return str(uuid.uuid4())
296 160
297 161 def invalidate_vcs_cache(self):
298 162 """
299 163 This is a no-op method for the pyro4 backend but we want to have the
300 164 same API for client.RemoteRepo and client_http.RemoteRepo classes.
301 165 """
302 166
303 167
304 168 def _get_proxy_method(proxy, name):
305 169 try:
306 170 return getattr(proxy, name)
307 171 except CommunicationError:
308 172 raise exceptions.PyroVCSCommunicationError(
309 173 'Unable to connect to remote pyro server %s' % proxy)
310 174
311 175
312 176 def _wrap_remote_call(proxy, func, *args):
313 177 all_args = list(args)
314 178
315 179 @exceptions.map_vcs_exceptions
316 180 def caller(*args, **kwargs):
317 181 all_args.extend(args)
318 182 try:
319 183 return func(*all_args, **kwargs)
320 184 except ConnectionClosedError:
321 185 log.debug('Connection to VCSServer closed, trying to reconnect.')
322 186 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
323 187
324 188 return func(*all_args, **kwargs)
325 189
326 190 return caller
327 191
328 192
329 193 class RefsWrapper(object):
330 194
331 195 def __init__(self, repo):
332 196 self._repo = weakref.proxy(repo)
333 197
334 198 def __setitem__(self, key, value):
335 199 self._repo._assign_ref(key, value)
336 200
337 201
338 202 class FunctionWrapper(object):
339 203
340 204 def __init__(self, func, wire):
341 205 self._func = func
342 206 self._wire = wire
343 207
344 208 @exceptions.map_vcs_exceptions
345 209 def __call__(self, *args, **kwargs):
346 210 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now