##// END OF EJS Templates
http-protocol: Add a method to invalidate the VCSServer cache....
Martin Bornhold -
r403:e1d0ae4c default
parent child Browse files
Show More
@@ -1,235 +1,250 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23
23
24
24
25 Status
25 Status
26 ------
26 ------
27
27
28 This client implementation shall eventually replace the Pyro4 based
28 This client implementation shall eventually replace the Pyro4 based
29 implementation.
29 implementation.
30 """
30 """
31
31
32 import copy
32 import copy
33 import logging
33 import logging
34 import threading
34 import threading
35 import urllib2
35 import urllib2
36 import urlparse
36 import urlparse
37 import uuid
37 import uuid
38
38
39 import msgpack
39 import msgpack
40 import requests
40 import requests
41
41
42 from . import exceptions, CurlSession
42 from . import exceptions, CurlSession
43
43
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
51 'KeyError': KeyError,
51 'KeyError': KeyError,
52 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
53 }
53 }
54
54
55
55
56 class RepoMaker(object):
56 class RepoMaker(object):
57
57
58 def __init__(self, server_and_port, backend_endpoint, session_factory):
58 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 self.url = urlparse.urljoin(
59 self.url = urlparse.urljoin(
60 'http://%s' % server_and_port, backend_endpoint)
60 'http://%s' % server_and_port, backend_endpoint)
61 self._session_factory = session_factory
61 self._session_factory = session_factory
62
62
63 def __call__(self, path, config, with_wire=None):
63 def __call__(self, path, config, with_wire=None):
64 log.debug('RepoMaker call on %s', path)
64 log.debug('RepoMaker call on %s', path)
65 return RemoteRepo(
65 return RemoteRepo(
66 path, config, self.url, self._session_factory(),
66 path, config, self.url, self._session_factory(),
67 with_wire=with_wire)
67 with_wire=with_wire)
68
68
69 def __getattr__(self, name):
69 def __getattr__(self, name):
70 def f(*args, **kwargs):
70 def f(*args, **kwargs):
71 return self._call(name, *args, **kwargs)
71 return self._call(name, *args, **kwargs)
72 return f
72 return f
73
73
74 @exceptions.map_vcs_exceptions
74 @exceptions.map_vcs_exceptions
75 def _call(self, name, *args, **kwargs):
75 def _call(self, name, *args, **kwargs):
76 payload = {
76 payload = {
77 'id': str(uuid.uuid4()),
77 'id': str(uuid.uuid4()),
78 'method': name,
78 'method': name,
79 'params': {'args': args, 'kwargs': kwargs}
79 'params': {'args': args, 'kwargs': kwargs}
80 }
80 }
81 return _remote_call(
81 return _remote_call(
82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
83
83
84
84
85 class RemoteRepo(object):
85 class RemoteRepo(object):
86
86
87 def __init__(self, path, config, url, session, with_wire=None):
87 def __init__(self, path, config, url, session, with_wire=None):
88 self.url = url
88 self.url = url
89 self._session = session
89 self._session = session
90 self._wire = {
90 self._wire = {
91 "path": path,
91 "path": path,
92 "config": config,
92 "config": config,
93 "context": str(uuid.uuid4()),
93 "context": self._create_vcs_cache_context(),
94 }
94 }
95 if with_wire:
95 if with_wire:
96 self._wire.update(with_wire)
96 self._wire.update(with_wire)
97
97
98 # johbo: Trading complexity for performance. Avoiding the call to
98 # johbo: Trading complexity for performance. Avoiding the call to
99 # log.debug brings a few percent gain even if is is not active.
99 # log.debug brings a few percent gain even if is is not active.
100 if log.isEnabledFor(logging.DEBUG):
100 if log.isEnabledFor(logging.DEBUG):
101 self._call = self._call_with_logging
101 self._call = self._call_with_logging
102
102
103 def __getattr__(self, name):
103 def __getattr__(self, name):
104 def f(*args, **kwargs):
104 def f(*args, **kwargs):
105 return self._call(name, *args, **kwargs)
105 return self._call(name, *args, **kwargs)
106 return f
106 return f
107
107
108 @exceptions.map_vcs_exceptions
108 @exceptions.map_vcs_exceptions
109 def _call(self, name, *args, **kwargs):
109 def _call(self, name, *args, **kwargs):
110 # TODO: oliver: This is currently necessary pre-call since the
110 # TODO: oliver: This is currently necessary pre-call since the
111 # config object is being changed for hooking scenarios
111 # config object is being changed for hooking scenarios
112 wire = copy.deepcopy(self._wire)
112 wire = copy.deepcopy(self._wire)
113 wire["config"] = wire["config"].serialize()
113 wire["config"] = wire["config"].serialize()
114 payload = {
114 payload = {
115 'id': str(uuid.uuid4()),
115 'id': str(uuid.uuid4()),
116 'method': name,
116 'method': name,
117 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
117 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
118 }
118 }
119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
120
120
121 def _call_with_logging(self, name, *args, **kwargs):
121 def _call_with_logging(self, name, *args, **kwargs):
122 log.debug('Calling %s@%s', self.url, name)
122 log.debug('Calling %s@%s', self.url, name)
123 return RemoteRepo._call(self, name, *args, **kwargs)
123 return RemoteRepo._call(self, name, *args, **kwargs)
124
124
125 def __getitem__(self, key):
125 def __getitem__(self, key):
126 return self.revision(key)
126 return self.revision(key)
127
127
128 def _create_vcs_cache_context(self):
129 """
130 Creates a unique string which is passed to the VCSServer on every
131 remote call. It is used as cache key in the VCSServer.
132 """
133 return str(uuid.uuid4())
134
135 def invalidate_vcs_cache(self):
136 """
137 This invalidates the context which is sent to the VCSServer on every
138 call to a remote method. It forces the VCSServer to create a fresh
139 repository instance on the next call to a remote method.
140 """
141 self._wire['context'] = self._create_vcs_cache_context()
142
128
143
129 class RemoteObject(object):
144 class RemoteObject(object):
130
145
131 def __init__(self, url, session):
146 def __init__(self, url, session):
132 self._url = url
147 self._url = url
133 self._session = session
148 self._session = session
134
149
135 # johbo: Trading complexity for performance. Avoiding the call to
150 # johbo: Trading complexity for performance. Avoiding the call to
136 # log.debug brings a few percent gain even if is is not active.
151 # log.debug brings a few percent gain even if is is not active.
137 if log.isEnabledFor(logging.DEBUG):
152 if log.isEnabledFor(logging.DEBUG):
138 self._call = self._call_with_logging
153 self._call = self._call_with_logging
139
154
140 def __getattr__(self, name):
155 def __getattr__(self, name):
141 def f(*args, **kwargs):
156 def f(*args, **kwargs):
142 return self._call(name, *args, **kwargs)
157 return self._call(name, *args, **kwargs)
143 return f
158 return f
144
159
145 @exceptions.map_vcs_exceptions
160 @exceptions.map_vcs_exceptions
146 def _call(self, name, *args, **kwargs):
161 def _call(self, name, *args, **kwargs):
147 payload = {
162 payload = {
148 'id': str(uuid.uuid4()),
163 'id': str(uuid.uuid4()),
149 'method': name,
164 'method': name,
150 'params': {'args': args, 'kwargs': kwargs}
165 'params': {'args': args, 'kwargs': kwargs}
151 }
166 }
152 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
167 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
153
168
154 def _call_with_logging(self, name, *args, **kwargs):
169 def _call_with_logging(self, name, *args, **kwargs):
155 log.debug('Calling %s@%s', self._url, name)
170 log.debug('Calling %s@%s', self._url, name)
156 return RemoteObject._call(self, name, *args, **kwargs)
171 return RemoteObject._call(self, name, *args, **kwargs)
157
172
158
173
159 def _remote_call(url, payload, exceptions_map, session):
174 def _remote_call(url, payload, exceptions_map, session):
160 response = session.post(url, data=msgpack.packb(payload))
175 response = session.post(url, data=msgpack.packb(payload))
161 response = msgpack.unpackb(response.content)
176 response = msgpack.unpackb(response.content)
162 error = response.get('error')
177 error = response.get('error')
163 if error:
178 if error:
164 type_ = error.get('type', 'Exception')
179 type_ = error.get('type', 'Exception')
165 exc = exceptions_map.get(type_, Exception)
180 exc = exceptions_map.get(type_, Exception)
166 exc = exc(error.get('message'))
181 exc = exc(error.get('message'))
167 try:
182 try:
168 exc._vcs_kind = error['_vcs_kind']
183 exc._vcs_kind = error['_vcs_kind']
169 except KeyError:
184 except KeyError:
170 pass
185 pass
171 raise exc
186 raise exc
172 return response.get('result')
187 return response.get('result')
173
188
174
189
175 class VcsHttpProxy(object):
190 class VcsHttpProxy(object):
176
191
177 CHUNK_SIZE = 16384
192 CHUNK_SIZE = 16384
178
193
179 def __init__(self, server_and_port, backend_endpoint):
194 def __init__(self, server_and_port, backend_endpoint):
180 adapter = requests.adapters.HTTPAdapter(max_retries=5)
195 adapter = requests.adapters.HTTPAdapter(max_retries=5)
181 self.base_url = urlparse.urljoin(
196 self.base_url = urlparse.urljoin(
182 'http://%s' % server_and_port, backend_endpoint)
197 'http://%s' % server_and_port, backend_endpoint)
183 self.session = requests.Session()
198 self.session = requests.Session()
184 self.session.mount('http://', adapter)
199 self.session.mount('http://', adapter)
185
200
186 def handle(self, environment, input_data, *args, **kwargs):
201 def handle(self, environment, input_data, *args, **kwargs):
187 data = {
202 data = {
188 'environment': environment,
203 'environment': environment,
189 'input_data': input_data,
204 'input_data': input_data,
190 'args': args,
205 'args': args,
191 'kwargs': kwargs
206 'kwargs': kwargs
192 }
207 }
193 result = self.session.post(
208 result = self.session.post(
194 self.base_url, msgpack.packb(data), stream=True)
209 self.base_url, msgpack.packb(data), stream=True)
195 return self._get_result(result)
210 return self._get_result(result)
196
211
197 def _deserialize_and_raise(self, error):
212 def _deserialize_and_raise(self, error):
198 exception = Exception(error['message'])
213 exception = Exception(error['message'])
199 try:
214 try:
200 exception._vcs_kind = error['_vcs_kind']
215 exception._vcs_kind = error['_vcs_kind']
201 except KeyError:
216 except KeyError:
202 pass
217 pass
203 raise exception
218 raise exception
204
219
205 def _iterate(self, result):
220 def _iterate(self, result):
206 unpacker = msgpack.Unpacker()
221 unpacker = msgpack.Unpacker()
207 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
222 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
208 unpacker.feed(line)
223 unpacker.feed(line)
209 for chunk in unpacker:
224 for chunk in unpacker:
210 yield chunk
225 yield chunk
211
226
212 def _get_result(self, result):
227 def _get_result(self, result):
213 iterator = self._iterate(result)
228 iterator = self._iterate(result)
214 error = iterator.next()
229 error = iterator.next()
215 if error:
230 if error:
216 self._deserialize_and_raise(error)
231 self._deserialize_and_raise(error)
217
232
218 status = iterator.next()
233 status = iterator.next()
219 headers = iterator.next()
234 headers = iterator.next()
220
235
221 return iterator, status, headers
236 return iterator, status, headers
222
237
223
238
224 class ThreadlocalSessionFactory(object):
239 class ThreadlocalSessionFactory(object):
225 """
240 """
226 Creates one CurlSession per thread on demand.
241 Creates one CurlSession per thread on demand.
227 """
242 """
228
243
229 def __init__(self):
244 def __init__(self):
230 self._thread_local = threading.local()
245 self._thread_local = threading.local()
231
246
232 def __call__(self):
247 def __call__(self):
233 if not hasattr(self._thread_local, 'curl_session'):
248 if not hasattr(self._thread_local, 'curl_session'):
234 self._thread_local.curl_session = CurlSession()
249 self._thread_local.curl_session = CurlSession()
235 return self._thread_local.curl_session
250 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now