##// END OF EJS Templates
threading: Add factory that creates curl session for each thread....
Martin Bornhold -
r243:dae685be default
parent child Browse files
Show More
@@ -1,218 +1,233 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Client for the VCSServer implemented based on HTTP.
23 23
24 24
25 25 Status
26 26 ------
27 27
28 28 This client implementation shall eventually replace the Pyro4 based
29 29 implementation.
30 30 """
31 31
32 32 import copy
33 33 import logging
34 import threading
34 35 import urllib2
35 36 import urlparse
36 37 import uuid
37 38
38 39 import msgpack
39 40 import requests
40 41
41 from . import exceptions
42 from . import exceptions, CurlSession
42 43
43 44
44 45 log = logging.getLogger(__name__)
45 46
46 47
47 48 # TODO: mikhail: Keep it in sync with vcsserver's
48 49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 50 EXCEPTIONS_MAP = {
50 51 'KeyError': KeyError,
51 52 'URLError': urllib2.URLError,
52 53 }
53 54
54 55
55 56 class RepoMaker(object):
56 57
57 58 def __init__(self, server_and_port, backend_endpoint, session):
58 59 self.url = urlparse.urljoin(
59 60 'http://%s' % server_and_port, backend_endpoint)
60 61 self._session = session
61 62
62 63 def __call__(self, path, config, with_wire=None):
63 64 log.debug('RepoMaker call on %s', path)
64 65 return RemoteRepo(
65 66 path, config, self.url, self._session, with_wire=with_wire)
66 67
67 68 def __getattr__(self, name):
68 69 def f(*args, **kwargs):
69 70 return self._call(name, *args, **kwargs)
70 71 return f
71 72
72 73 @exceptions.map_vcs_exceptions
73 74 def _call(self, name, *args, **kwargs):
74 75 payload = {
75 76 'id': str(uuid.uuid4()),
76 77 'method': name,
77 78 'params': {'args': args, 'kwargs': kwargs}
78 79 }
79 80 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
80 81
81 82
82 83 class RemoteRepo(object):
83 84
84 85 def __init__(self, path, config, url, session, with_wire=None):
85 86 self.url = url
86 87 self._session = session
87 88 self._wire = {
88 89 "path": path,
89 90 "config": config,
90 91 "context": str(uuid.uuid4()),
91 92 }
92 93 if with_wire:
93 94 self._wire.update(with_wire)
94 95
95 96 # johbo: Trading complexity for performance. Avoiding the call to
96 97 # log.debug brings a few percent gain even if is is not active.
97 98 if log.isEnabledFor(logging.DEBUG):
98 99 self._call = self._call_with_logging
99 100
100 101 def __getattr__(self, name):
101 102 def f(*args, **kwargs):
102 103 return self._call(name, *args, **kwargs)
103 104 return f
104 105
105 106 @exceptions.map_vcs_exceptions
106 107 def _call(self, name, *args, **kwargs):
107 108 # TODO: oliver: This is currently necessary pre-call since the
108 109 # config object is being changed for hooking scenarios
109 110 wire = copy.deepcopy(self._wire)
110 111 wire["config"] = wire["config"].serialize()
111 112 payload = {
112 113 'id': str(uuid.uuid4()),
113 114 'method': name,
114 115 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
115 116 }
116 117 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
117 118
118 119 def _call_with_logging(self, name, *args, **kwargs):
119 120 log.debug('Calling %s@%s', self.url, name)
120 121 return RemoteRepo._call(self, name, *args, **kwargs)
121 122
122 123 def __getitem__(self, key):
123 124 return self.revision(key)
124 125
125 126
126 127 class RemoteObject(object):
127 128
128 129 def __init__(self, url, session):
129 130 self._url = url
130 131 self._session = session
131 132
132 133 # johbo: Trading complexity for performance. Avoiding the call to
133 134 # log.debug brings a few percent gain even if is is not active.
134 135 if log.isEnabledFor(logging.DEBUG):
135 136 self._call = self._call_with_logging
136 137
137 138 def __getattr__(self, name):
138 139 def f(*args, **kwargs):
139 140 return self._call(name, *args, **kwargs)
140 141 return f
141 142
142 143 @exceptions.map_vcs_exceptions
143 144 def _call(self, name, *args, **kwargs):
144 145 payload = {
145 146 'id': str(uuid.uuid4()),
146 147 'method': name,
147 148 'params': {'args': args, 'kwargs': kwargs}
148 149 }
149 150 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
150 151
151 152 def _call_with_logging(self, name, *args, **kwargs):
152 153 log.debug('Calling %s@%s', self._url, name)
153 154 return RemoteObject._call(self, name, *args, **kwargs)
154 155
155 156
156 157 def _remote_call(url, payload, exceptions_map, session):
157 158 response = session.post(url, data=msgpack.packb(payload))
158 159 response = msgpack.unpackb(response.content)
159 160 error = response.get('error')
160 161 if error:
161 162 type_ = error.get('type', 'Exception')
162 163 exc = exceptions_map.get(type_, Exception)
163 164 exc = exc(error.get('message'))
164 165 try:
165 166 exc._vcs_kind = error['_vcs_kind']
166 167 except KeyError:
167 168 pass
168 169 raise exc
169 170 return response.get('result')
170 171
171 172
172 173 class VcsHttpProxy(object):
173 174
174 175 CHUNK_SIZE = 16384
175 176
176 177 def __init__(self, server_and_port, backend_endpoint):
177 178 adapter = requests.adapters.HTTPAdapter(max_retries=5)
178 179 self.base_url = urlparse.urljoin(
179 180 'http://%s' % server_and_port, backend_endpoint)
180 181 self.session = requests.Session()
181 182 self.session.mount('http://', adapter)
182 183
183 184 def handle(self, environment, input_data, *args, **kwargs):
184 185 data = {
185 186 'environment': environment,
186 187 'input_data': input_data,
187 188 'args': args,
188 189 'kwargs': kwargs
189 190 }
190 191 result = self.session.post(
191 192 self.base_url, msgpack.packb(data), stream=True)
192 193 return self._get_result(result)
193 194
194 195 def _deserialize_and_raise(self, error):
195 196 exception = Exception(error['message'])
196 197 try:
197 198 exception._vcs_kind = error['_vcs_kind']
198 199 except KeyError:
199 200 pass
200 201 raise exception
201 202
202 203 def _iterate(self, result):
203 204 unpacker = msgpack.Unpacker()
204 205 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
205 206 unpacker.feed(line)
206 207 for chunk in unpacker:
207 208 yield chunk
208 209
209 210 def _get_result(self, result):
210 211 iterator = self._iterate(result)
211 212 error = iterator.next()
212 213 if error:
213 214 self._deserialize_and_raise(error)
214 215
215 216 status = iterator.next()
216 217 headers = iterator.next()
217 218
218 219 return iterator, status, headers
220
221
222 class ThreadlocalSessionFactory(object):
223 """
224 Creates one CurlSession per thread on demand.
225 """
226
227 def __init__(self):
228 self._thread_local = threading.local()
229
230 def __call__(self):
231 if not hasattr(self._thread_local, 'curl_session'):
232 self._thread_local.curl_session = CurlSession()
233 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now