##// END OF EJS Templates
pyro: Slightly improved log message and comment.
Martin Bornhold -
r355:12bfd543 default
parent child Browse files
Show More
@@ -1,332 +1,333 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2014-2016 RhodeCode GmbH
3 # Copyright (C) 2014-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Provides the implementation of various client utilities to reach the vcsserver.
22 Provides the implementation of various client utilities to reach the vcsserver.
23 """
23 """
24
24
25
25
26 import copy
26 import copy
27 import logging
27 import logging
28 import threading
28 import threading
29 import urlparse
29 import urlparse
30 import uuid
30 import uuid
31 import weakref
31 import weakref
32 from urllib2 import URLError
32 from urllib2 import URLError
33
33
34 import msgpack
34 import msgpack
35 import Pyro4
35 import Pyro4
36 import requests
36 import requests
37 from pyramid.threadlocal import get_current_request
37 from pyramid.threadlocal import get_current_request
38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
38 from Pyro4.errors import CommunicationError, ConnectionClosedError, DaemonError
39
39
40 from rhodecode.lib.vcs import exceptions
40 from rhodecode.lib.vcs import exceptions
41 from rhodecode.lib.vcs.conf import settings
41 from rhodecode.lib.vcs.conf import settings
42
42
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 # TODO: mikhail: Keep it in sync with vcsserver's
46 # TODO: mikhail: Keep it in sync with vcsserver's
47 # HTTPApplication.ALLOWED_EXCEPTIONS
47 # HTTPApplication.ALLOWED_EXCEPTIONS
48 EXCEPTIONS_MAP = {
48 EXCEPTIONS_MAP = {
49 'KeyError': KeyError,
49 'KeyError': KeyError,
50 'URLError': URLError,
50 'URLError': URLError,
51 }
51 }
52
52
53
53
54 class HTTPRepoMaker(object):
54 class HTTPRepoMaker(object):
55 def __init__(self, server_and_port, backend_endpoint):
55 def __init__(self, server_and_port, backend_endpoint):
56 self.url = urlparse.urljoin(
56 self.url = urlparse.urljoin(
57 'http://%s' % server_and_port, backend_endpoint)
57 'http://%s' % server_and_port, backend_endpoint)
58
58
59 def __call__(self, path, config, with_wire=None):
59 def __call__(self, path, config, with_wire=None):
60 log.debug('HTTPRepoMaker call on %s', path)
60 log.debug('HTTPRepoMaker call on %s', path)
61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
61 return HTTPRemoteRepo(path, config, self.url, with_wire=with_wire)
62
62
63 def __getattr__(self, name):
63 def __getattr__(self, name):
64 def f(*args, **kwargs):
64 def f(*args, **kwargs):
65 return self._call(name, *args, **kwargs)
65 return self._call(name, *args, **kwargs)
66 return f
66 return f
67
67
68 @exceptions.map_vcs_exceptions
68 @exceptions.map_vcs_exceptions
69 def _call(self, name, *args, **kwargs):
69 def _call(self, name, *args, **kwargs):
70 payload = {
70 payload = {
71 'id': str(uuid.uuid4()),
71 'id': str(uuid.uuid4()),
72 'method': name,
72 'method': name,
73 'params': {'args': args, 'kwargs': kwargs}
73 'params': {'args': args, 'kwargs': kwargs}
74 }
74 }
75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
75 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
76
76
77
77
78 class VcsHttpProxy(object):
78 class VcsHttpProxy(object):
79
79
80 CHUNK_SIZE = 16384
80 CHUNK_SIZE = 16384
81
81
82 def __init__(self, server_and_port, backend_endpoint):
82 def __init__(self, server_and_port, backend_endpoint):
83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
83 adapter = requests.adapters.HTTPAdapter(max_retries=5)
84 self.base_url = urlparse.urljoin(
84 self.base_url = urlparse.urljoin(
85 'http://%s' % server_and_port, backend_endpoint)
85 'http://%s' % server_and_port, backend_endpoint)
86 self.session = requests.Session()
86 self.session = requests.Session()
87 self.session.mount('http://', adapter)
87 self.session.mount('http://', adapter)
88
88
89 def handle(self, environment, input_data, *args, **kwargs):
89 def handle(self, environment, input_data, *args, **kwargs):
90 data = {
90 data = {
91 'environment': environment,
91 'environment': environment,
92 'input_data': input_data,
92 'input_data': input_data,
93 'args': args,
93 'args': args,
94 'kwargs': kwargs
94 'kwargs': kwargs
95 }
95 }
96 result = self.session.post(
96 result = self.session.post(
97 self.base_url, msgpack.packb(data), stream=True)
97 self.base_url, msgpack.packb(data), stream=True)
98 return self._get_result(result)
98 return self._get_result(result)
99
99
100 def _deserialize_and_raise(self, error):
100 def _deserialize_and_raise(self, error):
101 exception = Exception(error['message'])
101 exception = Exception(error['message'])
102 try:
102 try:
103 exception._vcs_kind = error['_vcs_kind']
103 exception._vcs_kind = error['_vcs_kind']
104 except KeyError:
104 except KeyError:
105 pass
105 pass
106 raise exception
106 raise exception
107
107
108 def _iterate(self, result):
108 def _iterate(self, result):
109 unpacker = msgpack.Unpacker()
109 unpacker = msgpack.Unpacker()
110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
110 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
111 unpacker.feed(line)
111 unpacker.feed(line)
112 for chunk in unpacker:
112 for chunk in unpacker:
113 yield chunk
113 yield chunk
114
114
115 def _get_result(self, result):
115 def _get_result(self, result):
116 iterator = self._iterate(result)
116 iterator = self._iterate(result)
117 error = iterator.next()
117 error = iterator.next()
118 if error:
118 if error:
119 self._deserialize_and_raise(error)
119 self._deserialize_and_raise(error)
120
120
121 status = iterator.next()
121 status = iterator.next()
122 headers = iterator.next()
122 headers = iterator.next()
123
123
124 return iterator, status, headers
124 return iterator, status, headers
125
125
126
126
127 class HTTPRemoteRepo(object):
127 class HTTPRemoteRepo(object):
128 def __init__(self, path, config, url, with_wire=None):
128 def __init__(self, path, config, url, with_wire=None):
129 self.url = url
129 self.url = url
130 self._wire = {
130 self._wire = {
131 "path": path,
131 "path": path,
132 "config": config,
132 "config": config,
133 "context": str(uuid.uuid4()),
133 "context": str(uuid.uuid4()),
134 }
134 }
135 if with_wire:
135 if with_wire:
136 self._wire.update(with_wire)
136 self._wire.update(with_wire)
137
137
138 def __getattr__(self, name):
138 def __getattr__(self, name):
139 def f(*args, **kwargs):
139 def f(*args, **kwargs):
140 return self._call(name, *args, **kwargs)
140 return self._call(name, *args, **kwargs)
141 return f
141 return f
142
142
143 @exceptions.map_vcs_exceptions
143 @exceptions.map_vcs_exceptions
144 def _call(self, name, *args, **kwargs):
144 def _call(self, name, *args, **kwargs):
145 log.debug('Calling %s@%s', self.url, name)
145 log.debug('Calling %s@%s', self.url, name)
146 # TODO: oliver: This is currently necessary pre-call since the
146 # TODO: oliver: This is currently necessary pre-call since the
147 # config object is being changed for hooking scenarios
147 # config object is being changed for hooking scenarios
148 wire = copy.deepcopy(self._wire)
148 wire = copy.deepcopy(self._wire)
149 wire["config"] = wire["config"].serialize()
149 wire["config"] = wire["config"].serialize()
150 payload = {
150 payload = {
151 'id': str(uuid.uuid4()),
151 'id': str(uuid.uuid4()),
152 'method': name,
152 'method': name,
153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
153 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
154 }
154 }
155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
155 return _remote_call(self.url, payload, EXCEPTIONS_MAP)
156
156
157 def __getitem__(self, key):
157 def __getitem__(self, key):
158 return self.revision(key)
158 return self.revision(key)
159
159
160
160
161 def _remote_call(url, payload, exceptions_map):
161 def _remote_call(url, payload, exceptions_map):
162 response = requests.post(url, data=msgpack.packb(payload))
162 response = requests.post(url, data=msgpack.packb(payload))
163 response = msgpack.unpackb(response.content)
163 response = msgpack.unpackb(response.content)
164 error = response.get('error')
164 error = response.get('error')
165 if error:
165 if error:
166 type_ = error.get('type', 'Exception')
166 type_ = error.get('type', 'Exception')
167 exc = exceptions_map.get(type_, Exception)
167 exc = exceptions_map.get(type_, Exception)
168 exc = exc(error.get('message'))
168 exc = exc(error.get('message'))
169 try:
169 try:
170 exc._vcs_kind = error['_vcs_kind']
170 exc._vcs_kind = error['_vcs_kind']
171 except KeyError:
171 except KeyError:
172 pass
172 pass
173 raise exc
173 raise exc
174 return response.get('result')
174 return response.get('result')
175
175
176
176
177 class RepoMaker(object):
177 class RepoMaker(object):
178
178
179 def __init__(self, proxy_factory):
179 def __init__(self, proxy_factory):
180 self._proxy_factory = proxy_factory
180 self._proxy_factory = proxy_factory
181
181
182 def __call__(self, path, config, with_wire=None):
182 def __call__(self, path, config, with_wire=None):
183 log.debug('RepoMaker call on %s', path)
183 log.debug('RepoMaker call on %s', path)
184 return RemoteRepo(
184 return RemoteRepo(
185 path, config, remote_proxy=self._proxy_factory(),
185 path, config, remote_proxy=self._proxy_factory(),
186 with_wire=with_wire)
186 with_wire=with_wire)
187
187
188 def __getattr__(self, name):
188 def __getattr__(self, name):
189 remote_proxy = self._proxy_factory()
189 remote_proxy = self._proxy_factory()
190 func = _get_proxy_method(remote_proxy, name)
190 func = _get_proxy_method(remote_proxy, name)
191 return _wrap_remote_call(remote_proxy, func)
191 return _wrap_remote_call(remote_proxy, func)
192
192
193
193
194 class RequestScopeProxyFactory(object):
194 class RequestScopeProxyFactory(object):
195 """
195 """
196 This factory returns pyro proxy instances based on a per request scope.
196 This factory returns pyro proxy instances based on a per request scope.
197 It returns the same instance if called from within the same request and
197 It returns the same instance if called from within the same request and
198 different instances if called from different requests.
198 different instances if called from different requests.
199 """
199 """
200
200
201 def __init__(self, remote_uri):
201 def __init__(self, remote_uri):
202 self._remote_uri = remote_uri
202 self._remote_uri = remote_uri
203 self._proxy_pool = []
203 self._proxy_pool = []
204 self._borrowed_proxies = {}
204 self._borrowed_proxies = {}
205
205
206 def __call__(self, request=None):
206 def __call__(self, request=None):
207 """
207 """
208 Wrapper around `getProxy`.
208 Wrapper around `getProxy`.
209 """
209 """
210 request = request or get_current_request()
210 request = request or get_current_request()
211 return self.getProxy(request)
211 return self.getProxy(request)
212
212
213 def getProxy(self, request):
213 def getProxy(self, request):
214 """
214 """
215 Call this to get the pyro proxy instance for the request.
215 Call this to get the pyro proxy instance for the request.
216 """
216 """
217
217
218 # If called without a request context we return new proxy instances
218 # If called without a request context we return new proxy instances
219 # on every call. This allows to run e.g. invoke tasks.
219 # on every call. This allows to run e.g. invoke tasks.
220 if request is None:
220 if request is None:
221 log.info('Creating pyro proxy without request context for '
221 log.info('Creating pyro proxy without request context for '
222 'remote_uri=%s', self._remote_uri)
222 'remote_uri=%s', self._remote_uri)
223 return Pyro4.Proxy(self._remote_uri)
223 return Pyro4.Proxy(self._remote_uri)
224
224
225 # If there is an already borrowed proxy for the request context we
225 # If there is an already borrowed proxy for the request context we
226 # return that instance instead of creating a new one.
226 # return that instance instead of creating a new one.
227 if request in self._borrowed_proxies:
227 if request in self._borrowed_proxies:
228 return self._borrowed_proxies[request]
228 return self._borrowed_proxies[request]
229
229
230 # Get proxy from pool or create new instance.
230 # Get proxy from pool or create new instance.
231 try:
231 try:
232 proxy = self._proxy_pool.pop()
232 proxy = self._proxy_pool.pop()
233 except IndexError:
233 except IndexError:
234 log.info('Creating new proxy for remote_uri=%s', self._remote_uri)
234 log.info('Creating pyro proxy for remote_uri=%s', self._remote_uri)
235 proxy = Pyro4.Proxy(self._remote_uri)
235 proxy = Pyro4.Proxy(self._remote_uri)
236
236
237 # Store proxy instance as borrowed and add request callback.
237 # Mark proxy as borrowed for the request context and add a callback
238 # that returns it when the request processing is finished.
238 self._borrowed_proxies[request] = proxy
239 self._borrowed_proxies[request] = proxy
239 request.add_finished_callback(self._returnProxy)
240 request.add_finished_callback(self._returnProxy)
240
241
241 return proxy
242 return proxy
242
243
243 def _returnProxy(self, request):
244 def _returnProxy(self, request):
244 """
245 """
245 Callback that gets called by pyramid when the request is finished.
246 Callback that gets called by pyramid when the request is finished.
246 It puts the proxy back into the pool.
247 It puts the proxy back into the pool.
247 """
248 """
248 if request in self._borrowed_proxies:
249 if request in self._borrowed_proxies:
249 proxy = self._borrowed_proxies.pop(request)
250 proxy = self._borrowed_proxies.pop(request)
250 self._proxy_pool.append(proxy)
251 self._proxy_pool.append(proxy)
251 else:
252 else:
252 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
253 log.warn('Return proxy for remote_uri=%s but no proxy borrowed '
253 'for this request.', self._remote_uri)
254 'for this request.', self._remote_uri)
254
255
255
256
256 class RemoteRepo(object):
257 class RemoteRepo(object):
257
258
258 def __init__(self, path, config, remote_proxy, with_wire=None):
259 def __init__(self, path, config, remote_proxy, with_wire=None):
259 self._wire = {
260 self._wire = {
260 "path": path,
261 "path": path,
261 "config": config,
262 "config": config,
262 "context": uuid.uuid4(),
263 "context": uuid.uuid4(),
263 }
264 }
264 if with_wire:
265 if with_wire:
265 self._wire.update(with_wire)
266 self._wire.update(with_wire)
266 self._remote_proxy = remote_proxy
267 self._remote_proxy = remote_proxy
267 self.refs = RefsWrapper(self)
268 self.refs = RefsWrapper(self)
268
269
269 def __getattr__(self, name):
270 def __getattr__(self, name):
270 log.debug('Calling %s@%s', self._remote_proxy, name)
271 log.debug('Calling %s@%s', self._remote_proxy, name)
271 # TODO: oliver: This is currently necessary pre-call since the
272 # TODO: oliver: This is currently necessary pre-call since the
272 # config object is being changed for hooking scenarios
273 # config object is being changed for hooking scenarios
273 wire = copy.deepcopy(self._wire)
274 wire = copy.deepcopy(self._wire)
274 wire["config"] = wire["config"].serialize()
275 wire["config"] = wire["config"].serialize()
275
276
276 try:
277 try:
277 func = _get_proxy_method(self._remote_proxy, name)
278 func = _get_proxy_method(self._remote_proxy, name)
278 except DaemonError as e:
279 except DaemonError as e:
279 if e.message == 'unknown object':
280 if e.message == 'unknown object':
280 raise exceptions.VCSBackendNotSupportedError
281 raise exceptions.VCSBackendNotSupportedError
281 else:
282 else:
282 raise
283 raise
283
284
284 return _wrap_remote_call(self._remote_proxy, func, wire)
285 return _wrap_remote_call(self._remote_proxy, func, wire)
285
286
286 def __getitem__(self, key):
287 def __getitem__(self, key):
287 return self.revision(key)
288 return self.revision(key)
288
289
289
290
290 def _get_proxy_method(proxy, name):
291 def _get_proxy_method(proxy, name):
291 try:
292 try:
292 return getattr(proxy, name)
293 return getattr(proxy, name)
293 except CommunicationError:
294 except CommunicationError:
294 raise CommunicationError(
295 raise CommunicationError(
295 'Unable to connect to remote pyro server %s' % proxy)
296 'Unable to connect to remote pyro server %s' % proxy)
296
297
297
298
298 def _wrap_remote_call(proxy, func, *args):
299 def _wrap_remote_call(proxy, func, *args):
299 all_args = list(args)
300 all_args = list(args)
300
301
301 @exceptions.map_vcs_exceptions
302 @exceptions.map_vcs_exceptions
302 def caller(*args, **kwargs):
303 def caller(*args, **kwargs):
303 all_args.extend(args)
304 all_args.extend(args)
304 try:
305 try:
305 return func(*all_args, **kwargs)
306 return func(*all_args, **kwargs)
306 except ConnectionClosedError:
307 except ConnectionClosedError:
307 log.debug('Connection to VCSServer closed, trying to reconnect.')
308 log.debug('Connection to VCSServer closed, trying to reconnect.')
308 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
309 proxy._pyroReconnect(tries=settings.PYRO_RECONNECT_TRIES)
309
310
310 return func(*all_args, **kwargs)
311 return func(*all_args, **kwargs)
311
312
312 return caller
313 return caller
313
314
314
315
315 class RefsWrapper(object):
316 class RefsWrapper(object):
316
317
317 def __init__(self, repo):
318 def __init__(self, repo):
318 self._repo = weakref.proxy(repo)
319 self._repo = weakref.proxy(repo)
319
320
320 def __setitem__(self, key, value):
321 def __setitem__(self, key, value):
321 self._repo._assign_ref(key, value)
322 self._repo._assign_ref(key, value)
322
323
323
324
324 class FunctionWrapper(object):
325 class FunctionWrapper(object):
325
326
326 def __init__(self, func, wire):
327 def __init__(self, func, wire):
327 self._func = func
328 self._func = func
328 self._wire = wire
329 self._wire = wire
329
330
330 @exceptions.map_vcs_exceptions
331 @exceptions.map_vcs_exceptions
331 def __call__(self, *args, **kwargs):
332 def __call__(self, *args, **kwargs):
332 return self._func(self._wire, *args, **kwargs)
333 return self._func(self._wire, *args, **kwargs)
General Comments 0
You need to be logged in to leave comments. Login now