##// END OF EJS Templates
http-protocol: Invalidate the VCSServer cache when calling a writing method.
Martin Bornhold -
r405:6ab62426 default
parent child Browse files
Show More
@@ -1,250 +1,270 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23
24 24
25 25 Status
26 26 ------
27 27
28 28 This client implementation shall eventually replace the Pyro4 based
29 29 implementation.
30 30 """
31 31
32 32 import copy
33 33 import logging
34 34 import threading
35 35 import urllib2
36 36 import urlparse
37 37 import uuid
38 38
39 39 import msgpack
40 40 import requests
41 41
42 42 from . import exceptions, CurlSession
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 class RepoMaker(object):
57 57
58 58 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 59 self.url = urlparse.urljoin(
60 60 'http://%s' % server_and_port, backend_endpoint)
61 61 self._session_factory = session_factory
62 62
63 63 def __call__(self, path, config, with_wire=None):
64 64 log.debug('RepoMaker call on %s', path)
65 65 return RemoteRepo(
66 66 path, config, self.url, self._session_factory(),
67 67 with_wire=with_wire)
68 68
69 69 def __getattr__(self, name):
70 70 def f(*args, **kwargs):
71 71 return self._call(name, *args, **kwargs)
72 72 return f
73 73
74 74 @exceptions.map_vcs_exceptions
75 75 def _call(self, name, *args, **kwargs):
76 76 payload = {
77 77 'id': str(uuid.uuid4()),
78 78 'method': name,
79 79 'params': {'args': args, 'kwargs': kwargs}
80 80 }
81 81 return _remote_call(
82 82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
83 83
84 84
85 85 class RemoteRepo(object):
86 86
87 # List of method names that act like a write to the repository. After these
88 # methods we have to invalidate the VCSServer cache.
89 _writing_methods = [
90 'bookmark',
91 'commit',
92 'pull',
93 'pull_cmd',
94 'push',
95 'rebase',
96 'strip',
97 ]
98
87 99 def __init__(self, path, config, url, session, with_wire=None):
88 100 self.url = url
89 101 self._session = session
90 102 self._wire = {
91 103 "path": path,
92 104 "config": config,
93 105 "context": self._create_vcs_cache_context(),
94 106 }
95 107 if with_wire:
96 108 self._wire.update(with_wire)
97 109
98 110 # johbo: Trading complexity for performance. Avoiding the call to
99 111 # log.debug brings a few percent gain even if is is not active.
100 112 if log.isEnabledFor(logging.DEBUG):
101 113 self._call = self._call_with_logging
102 114
103 115 def __getattr__(self, name):
104 116 def f(*args, **kwargs):
105 117 return self._call(name, *args, **kwargs)
106 118 return f
107 119
108 120 @exceptions.map_vcs_exceptions
109 121 def _call(self, name, *args, **kwargs):
110 122 # TODO: oliver: This is currently necessary pre-call since the
111 123 # config object is being changed for hooking scenarios
112 124 wire = copy.deepcopy(self._wire)
113 125 wire["config"] = wire["config"].serialize()
114 126 payload = {
115 127 'id': str(uuid.uuid4()),
116 128 'method': name,
117 129 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
118 130 }
119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
131
132 try:
133 response = _remote_call(
134 self.url, payload, EXCEPTIONS_MAP, self._session)
135 finally:
136 if name in self._writing_methods:
137 self.invalidate_vcs_cache()
138
139 return response
120 140
121 141 def _call_with_logging(self, name, *args, **kwargs):
122 142 log.debug('Calling %s@%s', self.url, name)
123 143 return RemoteRepo._call(self, name, *args, **kwargs)
124 144
125 145 def __getitem__(self, key):
126 146 return self.revision(key)
127 147
128 148 def _create_vcs_cache_context(self):
129 149 """
130 150 Creates a unique string which is passed to the VCSServer on every
131 151 remote call. It is used as cache key in the VCSServer.
132 152 """
133 153 return str(uuid.uuid4())
134 154
135 155 def invalidate_vcs_cache(self):
136 156 """
137 157 This invalidates the context which is sent to the VCSServer on every
138 158 call to a remote method. It forces the VCSServer to create a fresh
139 159 repository instance on the next call to a remote method.
140 160 """
141 161 self._wire['context'] = self._create_vcs_cache_context()
142 162
143 163
144 164 class RemoteObject(object):
145 165
146 166 def __init__(self, url, session):
147 167 self._url = url
148 168 self._session = session
149 169
150 170 # johbo: Trading complexity for performance. Avoiding the call to
151 171 # log.debug brings a few percent gain even if is is not active.
152 172 if log.isEnabledFor(logging.DEBUG):
153 173 self._call = self._call_with_logging
154 174
155 175 def __getattr__(self, name):
156 176 def f(*args, **kwargs):
157 177 return self._call(name, *args, **kwargs)
158 178 return f
159 179
160 180 @exceptions.map_vcs_exceptions
161 181 def _call(self, name, *args, **kwargs):
162 182 payload = {
163 183 'id': str(uuid.uuid4()),
164 184 'method': name,
165 185 'params': {'args': args, 'kwargs': kwargs}
166 186 }
167 187 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
168 188
169 189 def _call_with_logging(self, name, *args, **kwargs):
170 190 log.debug('Calling %s@%s', self._url, name)
171 191 return RemoteObject._call(self, name, *args, **kwargs)
172 192
173 193
174 194 def _remote_call(url, payload, exceptions_map, session):
175 195 response = session.post(url, data=msgpack.packb(payload))
176 196 response = msgpack.unpackb(response.content)
177 197 error = response.get('error')
178 198 if error:
179 199 type_ = error.get('type', 'Exception')
180 200 exc = exceptions_map.get(type_, Exception)
181 201 exc = exc(error.get('message'))
182 202 try:
183 203 exc._vcs_kind = error['_vcs_kind']
184 204 except KeyError:
185 205 pass
186 206 raise exc
187 207 return response.get('result')
188 208
189 209
190 210 class VcsHttpProxy(object):
191 211
192 212 CHUNK_SIZE = 16384
193 213
194 214 def __init__(self, server_and_port, backend_endpoint):
195 215 adapter = requests.adapters.HTTPAdapter(max_retries=5)
196 216 self.base_url = urlparse.urljoin(
197 217 'http://%s' % server_and_port, backend_endpoint)
198 218 self.session = requests.Session()
199 219 self.session.mount('http://', adapter)
200 220
201 221 def handle(self, environment, input_data, *args, **kwargs):
202 222 data = {
203 223 'environment': environment,
204 224 'input_data': input_data,
205 225 'args': args,
206 226 'kwargs': kwargs
207 227 }
208 228 result = self.session.post(
209 229 self.base_url, msgpack.packb(data), stream=True)
210 230 return self._get_result(result)
211 231
212 232 def _deserialize_and_raise(self, error):
213 233 exception = Exception(error['message'])
214 234 try:
215 235 exception._vcs_kind = error['_vcs_kind']
216 236 except KeyError:
217 237 pass
218 238 raise exception
219 239
220 240 def _iterate(self, result):
221 241 unpacker = msgpack.Unpacker()
222 242 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
223 243 unpacker.feed(line)
224 244 for chunk in unpacker:
225 245 yield chunk
226 246
227 247 def _get_result(self, result):
228 248 iterator = self._iterate(result)
229 249 error = iterator.next()
230 250 if error:
231 251 self._deserialize_and_raise(error)
232 252
233 253 status = iterator.next()
234 254 headers = iterator.next()
235 255
236 256 return iterator, status, headers
237 257
238 258
239 259 class ThreadlocalSessionFactory(object):
240 260 """
241 261 Creates one CurlSession per thread on demand.
242 262 """
243 263
244 264 def __init__(self):
245 265 self._thread_local = threading.local()
246 266
247 267 def __call__(self):
248 268 if not hasattr(self._thread_local, 'curl_session'):
249 269 self._thread_local.curl_session = CurlSession()
250 270 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now