##// END OF EJS Templates
http-protocol: Add a method to invalidate the VCSServer cache....
Martin Bornhold -
r403:e1d0ae4c default
parent child Browse files
Show More
@@ -1,235 +1,250 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23
24 24
25 25 Status
26 26 ------
27 27
28 28 This client implementation shall eventually replace the Pyro4 based
29 29 implementation.
30 30 """
31 31
32 32 import copy
33 33 import logging
34 34 import threading
35 35 import urllib2
36 36 import urlparse
37 37 import uuid
38 38
39 39 import msgpack
40 40 import requests
41 41
42 42 from . import exceptions, CurlSession
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 # TODO: mikhail: Keep it in sync with vcsserver's
49 49 # HTTPApplication.ALLOWED_EXCEPTIONS
50 50 EXCEPTIONS_MAP = {
51 51 'KeyError': KeyError,
52 52 'URLError': urllib2.URLError,
53 53 }
54 54
55 55
56 56 class RepoMaker(object):
57 57
58 58 def __init__(self, server_and_port, backend_endpoint, session_factory):
59 59 self.url = urlparse.urljoin(
60 60 'http://%s' % server_and_port, backend_endpoint)
61 61 self._session_factory = session_factory
62 62
63 63 def __call__(self, path, config, with_wire=None):
64 64 log.debug('RepoMaker call on %s', path)
65 65 return RemoteRepo(
66 66 path, config, self.url, self._session_factory(),
67 67 with_wire=with_wire)
68 68
69 69 def __getattr__(self, name):
70 70 def f(*args, **kwargs):
71 71 return self._call(name, *args, **kwargs)
72 72 return f
73 73
74 74 @exceptions.map_vcs_exceptions
75 75 def _call(self, name, *args, **kwargs):
76 76 payload = {
77 77 'id': str(uuid.uuid4()),
78 78 'method': name,
79 79 'params': {'args': args, 'kwargs': kwargs}
80 80 }
81 81 return _remote_call(
82 82 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
83 83
84 84
85 85 class RemoteRepo(object):
86 86
87 87 def __init__(self, path, config, url, session, with_wire=None):
88 88 self.url = url
89 89 self._session = session
90 90 self._wire = {
91 91 "path": path,
92 92 "config": config,
93 "context": str(uuid.uuid4()),
93 "context": self._create_vcs_cache_context(),
94 94 }
95 95 if with_wire:
96 96 self._wire.update(with_wire)
97 97
98 98 # johbo: Trading complexity for performance. Avoiding the call to
99 99 # log.debug brings a few percent gain even if is is not active.
100 100 if log.isEnabledFor(logging.DEBUG):
101 101 self._call = self._call_with_logging
102 102
103 103 def __getattr__(self, name):
104 104 def f(*args, **kwargs):
105 105 return self._call(name, *args, **kwargs)
106 106 return f
107 107
108 108 @exceptions.map_vcs_exceptions
109 109 def _call(self, name, *args, **kwargs):
110 110 # TODO: oliver: This is currently necessary pre-call since the
111 111 # config object is being changed for hooking scenarios
112 112 wire = copy.deepcopy(self._wire)
113 113 wire["config"] = wire["config"].serialize()
114 114 payload = {
115 115 'id': str(uuid.uuid4()),
116 116 'method': name,
117 117 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
118 118 }
119 119 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
120 120
121 121 def _call_with_logging(self, name, *args, **kwargs):
122 122 log.debug('Calling %s@%s', self.url, name)
123 123 return RemoteRepo._call(self, name, *args, **kwargs)
124 124
125 125 def __getitem__(self, key):
126 126 return self.revision(key)
127 127
128 def _create_vcs_cache_context(self):
129 """
130 Creates a unique string which is passed to the VCSServer on every
131 remote call. It is used as cache key in the VCSServer.
132 """
133 return str(uuid.uuid4())
134
135 def invalidate_vcs_cache(self):
136 """
137 This invalidates the context which is sent to the VCSServer on every
138 call to a remote method. It forces the VCSServer to create a fresh
139 repository instance on the next call to a remote method.
140 """
141 self._wire['context'] = self._create_vcs_cache_context()
142
128 143
129 144 class RemoteObject(object):
130 145
131 146 def __init__(self, url, session):
132 147 self._url = url
133 148 self._session = session
134 149
135 150 # johbo: Trading complexity for performance. Avoiding the call to
136 151 # log.debug brings a few percent gain even if is is not active.
137 152 if log.isEnabledFor(logging.DEBUG):
138 153 self._call = self._call_with_logging
139 154
140 155 def __getattr__(self, name):
141 156 def f(*args, **kwargs):
142 157 return self._call(name, *args, **kwargs)
143 158 return f
144 159
145 160 @exceptions.map_vcs_exceptions
146 161 def _call(self, name, *args, **kwargs):
147 162 payload = {
148 163 'id': str(uuid.uuid4()),
149 164 'method': name,
150 165 'params': {'args': args, 'kwargs': kwargs}
151 166 }
152 167 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
153 168
154 169 def _call_with_logging(self, name, *args, **kwargs):
155 170 log.debug('Calling %s@%s', self._url, name)
156 171 return RemoteObject._call(self, name, *args, **kwargs)
157 172
158 173
159 174 def _remote_call(url, payload, exceptions_map, session):
160 175 response = session.post(url, data=msgpack.packb(payload))
161 176 response = msgpack.unpackb(response.content)
162 177 error = response.get('error')
163 178 if error:
164 179 type_ = error.get('type', 'Exception')
165 180 exc = exceptions_map.get(type_, Exception)
166 181 exc = exc(error.get('message'))
167 182 try:
168 183 exc._vcs_kind = error['_vcs_kind']
169 184 except KeyError:
170 185 pass
171 186 raise exc
172 187 return response.get('result')
173 188
174 189
175 190 class VcsHttpProxy(object):
176 191
177 192 CHUNK_SIZE = 16384
178 193
179 194 def __init__(self, server_and_port, backend_endpoint):
180 195 adapter = requests.adapters.HTTPAdapter(max_retries=5)
181 196 self.base_url = urlparse.urljoin(
182 197 'http://%s' % server_and_port, backend_endpoint)
183 198 self.session = requests.Session()
184 199 self.session.mount('http://', adapter)
185 200
186 201 def handle(self, environment, input_data, *args, **kwargs):
187 202 data = {
188 203 'environment': environment,
189 204 'input_data': input_data,
190 205 'args': args,
191 206 'kwargs': kwargs
192 207 }
193 208 result = self.session.post(
194 209 self.base_url, msgpack.packb(data), stream=True)
195 210 return self._get_result(result)
196 211
197 212 def _deserialize_and_raise(self, error):
198 213 exception = Exception(error['message'])
199 214 try:
200 215 exception._vcs_kind = error['_vcs_kind']
201 216 except KeyError:
202 217 pass
203 218 raise exception
204 219
205 220 def _iterate(self, result):
206 221 unpacker = msgpack.Unpacker()
207 222 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
208 223 unpacker.feed(line)
209 224 for chunk in unpacker:
210 225 yield chunk
211 226
212 227 def _get_result(self, result):
213 228 iterator = self._iterate(result)
214 229 error = iterator.next()
215 230 if error:
216 231 self._deserialize_and_raise(error)
217 232
218 233 status = iterator.next()
219 234 headers = iterator.next()
220 235
221 236 return iterator, status, headers
222 237
223 238
224 239 class ThreadlocalSessionFactory(object):
225 240 """
226 241 Creates one CurlSession per thread on demand.
227 242 """
228 243
229 244 def __init__(self):
230 245 self._thread_local = threading.local()
231 246
232 247 def __call__(self):
233 248 if not hasattr(self._thread_local, 'curl_session'):
234 249 self._thread_local.curl_session = CurlSession()
235 250 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now