##// END OF EJS Templates
vcs: Remove dead code.
Martin Bornhold -
r928:bccd79cf default
parent child Browse files
Show More
@@ -1,346 +1,210 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Provides the implementation of various client utilities to reach the vcsserver.
22 Provides the implementation of various client utilities to reach the vcsserver.
23 """
23 """
24
24
25
25
26 import copy
26 import copy
27 import logging
27 import logging
28 import threading
29 import urlparse
30 import uuid
28 import uuid
31 import weakref
29 import weakref
32 from urllib2 import URLError
33
30
34 import msgpack
35 import Pyro4
31 import Pyro4
36 import requests
37 from pyramid.threadlocal import get_current_request
32 from pyramid.threadlocal import get_current_request
38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
33 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
39
34
40 from rhodecode.lib.vcs import exceptions
35 from rhodecode.lib.vcs import exceptions
41 from rhodecode.lib.vcs.conf import settings
36 from rhodecode.lib.vcs.conf import settings
42
37
43 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
44
39
45
40
46 # TODO: mikhail: Keep it in sync with vcsserver's
47 # HTTPApplication.ALLOWED_EXCEPTIONS
48 EXCEPTIONS_MAP = {
49 'KeyError': KeyError,
50 'URLError': URLError,
51 }
52
53
54 class HTTPRepoMaker(object):
55 def __init__(self, server_and_port, backend_endpoint):
56 self.url = urlparse.urljoin(
57 'http://%s' % server_and_port, backend_endpoint)
58
59 def __call__(self, path, config, with_wire=None):
60 log.debug('HTTPRepoMaker call on %s', path)
61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
62
63 def __getattr__(self, name):
64 def f(*args, **kwargs):
65 return self._call(name, *args, **kwargs)
66 return f
67
68 @exceptions.map_vcs_exceptions
69 def _call(self, name, *args, **kwargs):
70 payload = {
71 'id': str(uuid.uuid4()),
72 'method': name,
73 'params': {'args': args, 'kwargs': kwargs}
74 }
75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
76
77
78 class VcsHttpProxy(object):
79
80 CHUNK_SIZE = 16384
81
82 def __init__(self, server_and_port, backend_endpoint):
83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
84 self.base_url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
86 self.session = requests.Session()
87 self.session.mount('http://', adapter)
88
89 def handle(self, environment, input_data, *args, **kwargs):
90 data = {
91 'environment': environment,
92 'input_data': input_data,
93 'args': args,
94 'kwargs': kwargs
95 }
96 result = self.session.post(
97 self.base_url, msgpack.packb(data), stream=True)
98 return self._get_result(result)
99
100 def _deserialize_and_raise(self, error):
101 exception = Exception(error['message'])
102 try:
103 exception._vcs_kind = error['_vcs_kind']
104 except KeyError:
105 pass
106 raise exception
107
108 def _iterate(self, result):
109 unpacker = msgpack.Unpacker()
110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
111 unpacker.feed(line)
112 for chunk in unpacker:
113 yield chunk
114
115 def _get_result(self, result):
116 iterator = self._iterate(result)
117 error = iterator.next()
118 if error:
119 self._deserialize_and_raise(error)
120
121 status = iterator.next()
122 headers = iterator.next()
123
124 return iterator, status, headers
125
126
127 class HTTPRemoteRepo(object):
128 def __init__(self, path, config, url, with_wire=None):
129 self.url = url
130 self._wire = {
131 "path": path,
132 "config": config,
133 "context": str(uuid.uuid4()),
134 }
135 if with_wire:
136 self._wire.update(with_wire)
137
138 def __getattr__(self, name):
139 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
141 return f
142
143 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
145 log.debug('Calling %s@%s', self.url, name)
146 # TODO: oliver: This is currently necessary pre-call since the
147 # config object is being changed for hooking scenarios
148 wire = copy.deepcopy(self._wire)
149 wire["config"] = wire["config"].serialize()
150 payload = {
151 'id': str(uuid.uuid4()),
152 'method': name,
153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
154 }
155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
156
157 def __getitem__(self, key):
158 return self.revision(key)
159
160
161 def _remote_call(url, payload, exceptions_map):
162 response = requests.post(url, data=msgpack.packb(payload))
163 response = msgpack.unpackb(response.content)
164 error = response.get('error')
165 if error:
166 type_ = error.get('type', 'Exception')
167 exc = exceptions_map.get(type_, Exception)
168 exc = exc(error.get('message'))
169 try:
170 exc._vcs_kind = error['_vcs_kind']
171 except KeyError:
172 pass
173 raise exc
174 return response.get('result')
175
176
177 class RepoMaker(object):
41 class RepoMaker(object):
178
42
179 def __init__(self, proxy_factory):
43 def __init__(self, proxy_factory):
180 self._proxy_factory = proxy_factory
44 self._proxy_factory = proxy_factory
181
45
182 def __call__(self, path, config, with_wire=None):
46 def __call__(self, path, config, with_wire=None):
183 log.debug('RepoMaker call on %s', path)
47 log.debug('RepoMaker call on %s', path)
184 return RemoteRepo(
48 return RemoteRepo(
185 path, config, remote_proxy=self._proxy_factory(),
49 path, config, remote_proxy=self._proxy_factory(),
186 with_wire=with_wire)
50 with_wire=with_wire)
187
51
188 def __getattr__(self, name):
52 def __getattr__(self, name):
189 remote_proxy = self._proxy_factory()
53 remote_proxy = self._proxy_factory()
190 func = _get_proxy_method(remote_proxy, name)
54 func = _get_proxy_method(remote_proxy, name)
191 return _wrap_remote_call(remote_proxy, func)
55 return _wrap_remote_call(remote_proxy, func)
192
56
193
57
194 class RequestScopeProxyFactory(object):
58 class RequestScopeProxyFactory(object):
195 """
59 """
196 This factory returns pyro proxy instances based on a per request scope.
60 This factory returns pyro proxy instances based on a per request scope.
197 It returns the same instance if called from within the same request and
61 It returns the same instance if called from within the same request and
198 different instances if called from different requests.
62 different instances if called from different requests.
199 """
63 """
200
64
201 def __init__(self, remote_uri):
65 def __init__(self, remote_uri):
202 self._remote_uri = remote_uri
66 self._remote_uri = remote_uri
203 self._proxy_pool = []
67 self._proxy_pool = []
204 self._borrowed_proxies = {}
68 self._borrowed_proxies = {}
205
69
206 def __call__(self, request=None):
70 def __call__(self, request=None):
207 """
71 """
208 Wrapper around `getProxy`.
72 Wrapper around `getProxy`.
209 """
73 """
210 request = request or get_current_request()
74 request = request or get_current_request()
211 return self.getProxy(request)
75 return self.getProxy(request)
212
76
213 def getProxy(self, request):
77 def getProxy(self, request):
214 """
78 """
215 Call this to get the pyro proxy instance for the request.
79 Call this to get the pyro proxy instance for the request.
216 """
80 """
217
81
218 # If called without a request context we return new proxy instances
82 # If called without a request context we return new proxy instances
219 # on every call. This allows to run e.g. invoke tasks.
83 # on every call. This allows to run e.g. invoke tasks.
220 if request is None:
84 if request is None:
221 log.info('Creating pyro proxy without request context for '
85 log.info('Creating pyro proxy without request context for '
222 'remote_uri=%s', self._remote_uri)
86 'remote_uri=%s', self._remote_uri)
223 return Pyro4.Proxy(self._remote_uri)
87 return Pyro4.Proxy(self._remote_uri)
224
88
225 # If there is an already borrowed proxy for the request context we
89 # If there is an already borrowed proxy for the request context we
226 # return that instance instead of creating a new one.
90 # return that instance instead of creating a new one.
227 if request in self._borrowed_proxies:
91 if request in self._borrowed_proxies:
228 return self._borrowed_proxies[request]
92 return self._borrowed_proxies[request]
229
93
230 # Get proxy from pool or create new instance.
94 # Get proxy from pool or create new instance.
231 try:
95 try:
232 proxy = self._proxy_pool.pop()
96 proxy = self._proxy_pool.pop()
233 except IndexError:
97 except IndexError:
234 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
98 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
235 proxy = Pyro4.Proxy(self._remote_uri)
99 proxy = Pyro4.Proxy(self._remote_uri)
236
100
237 # Mark proxy as borrowed for the request context and add a callback
101 # Mark proxy as borrowed for the request context and add a callback
238 # that returns it when the request processing is finished.
102 # that returns it when the request processing is finished.
239 self._borrowed_proxies[request] = proxy
103 self._borrowed_proxies[request] = proxy
240 request.add_finished_callback(self._returnProxy)
104 request.add_finished_callback(self._returnProxy)
241
105
242 return proxy
106 return proxy
243
107
244 def _returnProxy(self, request):
108 def _returnProxy(self, request):
245 """
109 """
246 Callback that gets called by pyramid when the request is finished.
110 Callback that gets called by pyramid when the request is finished.
247 It puts the proxy back into the pool.
111 It puts the proxy back into the pool.
248 """
112 """
249 if request in self._borrowed_proxies:
113 if request in self._borrowed_proxies:
250 proxy = self._borrowed_proxies.pop(request)
114 proxy = self._borrowed_proxies.pop(request)
251 self._proxy_pool.append(proxy)
115 self._proxy_pool.append(proxy)
252 else:
116 else:
253 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
117 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
254 'for this request.', self._remote_uri)
118 'for this request.', self._remote_uri)
255
119
256
120
257 class RemoteRepo(object):
121 class RemoteRepo(object):
258
122
259 def __init__(self, path, config, remote_proxy, with_wire=None):
123 def __init__(self, path, config, remote_proxy, with_wire=None):
260 self._wire = {
124 self._wire = {
261 "path": path,
125 "path": path,
262 "config": config,
126 "config": config,
263 "context": self._create_vcs_cache_context(),
127 "context": self._create_vcs_cache_context(),
264 }
128 }
265 if with_wire:
129 if with_wire:
266 self._wire.update(with_wire)
130 self._wire.update(with_wire)
267 self._remote_proxy = remote_proxy
131 self._remote_proxy = remote_proxy
268 self.refs = RefsWrapper(self)
132 self.refs = RefsWrapper(self)
269
133
270 def __getattr__(self, name):
134 def __getattr__(self, name):
271 log.debug('Calling %s@%s', self._remote_proxy, name)
135 log.debug('Calling %s@%s', self._remote_proxy, name)
272 # TODO: oliver: This is currently necessary pre-call since the
136 # TODO: oliver: This is currently necessary pre-call since the
273 # config object is being changed for hooking scenarios
137 # config object is being changed for hooking scenarios
274 wire = copy.deepcopy(self._wire)
138 wire = copy.deepcopy(self._wire)
275 wire["config"] = wire["config"].serialize()
139 wire["config"] = wire["config"].serialize()
276
140
277 try:
141 try:
278 func = _get_proxy_method(self._remote_proxy, name)
142 func = _get_proxy_method(self._remote_proxy, name)
279 except DaemonError as e:
143 except DaemonError as e:
280 if e.message == 'unknown object':
144 if e.message == 'unknown object':
281 raise exceptions.VCSBackendNotSupportedError
145 raise exceptions.VCSBackendNotSupportedError
282 else:
146 else:
283 raise
147 raise
284
148
285 return _wrap_remote_call(self._remote_proxy, func, wire)
149 return _wrap_remote_call(self._remote_proxy, func, wire)
286
150
287 def __getitem__(self, key):
151 def __getitem__(self, key):
288 return self.revision(key)
152 return self.revision(key)
289
153
290 def _create_vcs_cache_context(self):
154 def _create_vcs_cache_context(self):
291 """
155 """
292 Creates a unique string which is passed to the VCSServer on every
156 Creates a unique string which is passed to the VCSServer on every
293 remote call. It is used as cache key in the VCSServer.
157 remote call. It is used as cache key in the VCSServer.
294 """
158 """
295 return str(uuid.uuid4())
159 return str(uuid.uuid4())
296
160
297 def invalidate_vcs_cache(self):
161 def invalidate_vcs_cache(self):
298 """
162 """
299 This is a no-op method for the pyro4 backend but we want to have the
163 This is a no-op method for the pyro4 backend but we want to have the
300 same API for client.RemoteRepo and client_http.RemoteRepo classes.
164 same API for client.RemoteRepo and client_http.RemoteRepo classes.
301 """
165 """
302
166
303
167
304 def _get_proxy_method(proxy, name):
168 def _get_proxy_method(proxy, name):
305 try:
169 try:
306 return getattr(proxy, name)
170 return getattr(proxy, name)
307 except CommunicationError:
171 except CommunicationError:
308 raise exceptions.PyroVCSCommunicationError(
172 raise exceptions.PyroVCSCommunicationError(
309 'Unable to connect to remote pyro server %s' % proxy)
173 'Unable to connect to remote pyro server %s' % proxy)
310
174
311
175
312 def _wrap_remote_call(proxy, func, *args):
176 def _wrap_remote_call(proxy, func, *args):
313 all_args = list(args)
177 all_args = list(args)
314
178
315 @exceptions.map_vcs_exceptions
179 @exceptions.map_vcs_exceptions
316 def caller(*args, **kwargs):
180 def caller(*args, **kwargs):
317 all_args.extend(args)
181 all_args.extend(args)
318 try:
182 try:
319 return func(*all_args, **kwargs)
183 return func(*all_args, **kwargs)
320 except ConnectionClosedError:
184 except ConnectionClosedError:
321 log.debug('Connection to VCSServer closed, trying to reconnect.')
185 log.debug('Connection to VCSServer closed, trying to reconnect.')
322 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
186 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
323
187
324 return func(*all_args, **kwargs)
188 return func(*all_args, **kwargs)
325
189
326 return caller
190 return caller
327
191
328
192
329 class RefsWrapper(object):
193 class RefsWrapper(object):
330
194
331 def __init__(self, repo):
195 def __init__(self, repo):
332 self._repo = weakref.proxy(repo)
196 self._repo = weakref.proxy(repo)
333
197
334 def __setitem__(self, key, value):
198 def __setitem__(self, key, value):
335 self._repo._assign_ref(key, value)
199 self._repo._assign_ref(key, value)
336
200
337
201
338 class FunctionWrapper(object):
202 class FunctionWrapper(object):
339
203
340 def __init__(self, func, wire):
204 def __init__(self, func, wire):
341 self._func = func
205 self._func = func
342 self._wire = wire
206 self._wire = wire
343
207
344 @exceptions.map_vcs_exceptions
208 @exceptions.map_vcs_exceptions
345 def __call__(self, *args, **kwargs):
209 def __call__(self, *args, **kwargs):
346 return self._func(self._wire, *args, **kwargs)
210 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now