##// END OF EJS Templates
threading: Add factory that creates curl session for each thread....
Martin Bornhold -
r243:dae685be default
parent child Browse files
Show More
@@ -1,218 +1,233 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2016-2016 RhodeCode GmbH
3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 Client for the VCSServer implemented based on HTTP.
22 Client for the VCSServer implemented based on HTTP.
23
23
24
24
25 Status
25 Status
26 ------
26 ------
27
27
28 This client implementation shall eventually replace the Pyro4 based
28 This client implementation shall eventually replace the Pyro4 based
29 implementation.
29 implementation.
30 """
30 """
31
31
32 import copy
32 import copy
33 import logging
33 import logging
34 import threading
34 import urllib2
35 import urllib2
35 import urlparse
36 import urlparse
36 import uuid
37 import uuid
37
38
38 import msgpack
39 import msgpack
39 import requests
40 import requests
40
41
41 from . import exceptions
42 from . import exceptions, CurlSession
42
43
43
44
44 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
45
46
46
47
47 # TODO: mikhail: Keep it in sync with vcsserver's
48 # TODO: mikhail: Keep it in sync with vcsserver's
48 # HTTPApplication.ALLOWED_EXCEPTIONS
49 # HTTPApplication.ALLOWED_EXCEPTIONS
49 EXCEPTIONS_MAP = {
50 EXCEPTIONS_MAP = {
50 'KeyError': KeyError,
51 'KeyError': KeyError,
51 'URLError': urllib2.URLError,
52 'URLError': urllib2.URLError,
52 }
53 }
53
54
54
55
55 class RepoMaker(object):
56 class RepoMaker(object):
56
57
57 def __init__(self, server_and_port, backend_endpoint, session):
58 def __init__(self, server_and_port, backend_endpoint, session):
58 self.url = urlparse.urljoin(
59 self.url = urlparse.urljoin(
59 'http://%s' % server_and_port, backend_endpoint)
60 'http://%s' % server_and_port, backend_endpoint)
60 self._session = session
61 self._session = session
61
62
62 def __call__(self, path, config, with_wire=None):
63 def __call__(self, path, config, with_wire=None):
63 log.debug('RepoMaker call on %s', path)
64 log.debug('RepoMaker call on %s', path)
64 return RemoteRepo(
65 return RemoteRepo(
65 path, config, self.url, self._session, with_wire=with_wire)
66 path, config, self.url, self._session, with_wire=with_wire)
66
67
67 def __getattr__(self, name):
68 def __getattr__(self, name):
68 def f(*args, **kwargs):
69 def f(*args, **kwargs):
69 return self._call(name, *args, **kwargs)
70 return self._call(name, *args, **kwargs)
70 return f
71 return f
71
72
72 @exceptions.map_vcs_exceptions
73 @exceptions.map_vcs_exceptions
73 def _call(self, name, *args, **kwargs):
74 def _call(self, name, *args, **kwargs):
74 payload = {
75 payload = {
75 'id': str(uuid.uuid4()),
76 'id': str(uuid.uuid4()),
76 'method': name,
77 'method': name,
77 'params': {'args': args, 'kwargs': kwargs}
78 'params': {'args': args, 'kwargs': kwargs}
78 }
79 }
79 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
80 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
80
81
81
82
82 class RemoteRepo(object):
83 class RemoteRepo(object):
83
84
84 def __init__(self, path, config, url, session, with_wire=None):
85 def __init__(self, path, config, url, session, with_wire=None):
85 self.url = url
86 self.url = url
86 self._session = session
87 self._session = session
87 self._wire = {
88 self._wire = {
88 "path": path,
89 "path": path,
89 "config": config,
90 "config": config,
90 "context": str(uuid.uuid4()),
91 "context": str(uuid.uuid4()),
91 }
92 }
92 if with_wire:
93 if with_wire:
93 self._wire.update(with_wire)
94 self._wire.update(with_wire)
94
95
95 # johbo: Trading complexity for performance. Avoiding the call to
96 # johbo: Trading complexity for performance. Avoiding the call to
96 # log.debug brings a few percent gain even if is is not active.
97 # log.debug brings a few percent gain even if is is not active.
97 if log.isEnabledFor(logging.DEBUG):
98 if log.isEnabledFor(logging.DEBUG):
98 self._call = self._call_with_logging
99 self._call = self._call_with_logging
99
100
100 def __getattr__(self, name):
101 def __getattr__(self, name):
101 def f(*args, **kwargs):
102 def f(*args, **kwargs):
102 return self._call(name, *args, **kwargs)
103 return self._call(name, *args, **kwargs)
103 return f
104 return f
104
105
105 @exceptions.map_vcs_exceptions
106 @exceptions.map_vcs_exceptions
106 def _call(self, name, *args, **kwargs):
107 def _call(self, name, *args, **kwargs):
107 # TODO: oliver: This is currently necessary pre-call since the
108 # TODO: oliver: This is currently necessary pre-call since the
108 # config object is being changed for hooking scenarios
109 # config object is being changed for hooking scenarios
109 wire = copy.deepcopy(self._wire)
110 wire = copy.deepcopy(self._wire)
110 wire["config"] = wire["config"].serialize()
111 wire["config"] = wire["config"].serialize()
111 payload = {
112 payload = {
112 'id': str(uuid.uuid4()),
113 'id': str(uuid.uuid4()),
113 'method': name,
114 'method': name,
114 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
115 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
115 }
116 }
116 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
117 return _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
117
118
118 def _call_with_logging(self, name, *args, **kwargs):
119 def _call_with_logging(self, name, *args, **kwargs):
119 log.debug('Calling %s@%s', self.url, name)
120 log.debug('Calling %s@%s', self.url, name)
120 return RemoteRepo._call(self, name, *args, **kwargs)
121 return RemoteRepo._call(self, name, *args, **kwargs)
121
122
122 def __getitem__(self, key):
123 def __getitem__(self, key):
123 return self.revision(key)
124 return self.revision(key)
124
125
125
126
126 class RemoteObject(object):
127 class RemoteObject(object):
127
128
128 def __init__(self, url, session):
129 def __init__(self, url, session):
129 self._url = url
130 self._url = url
130 self._session = session
131 self._session = session
131
132
132 # johbo: Trading complexity for performance. Avoiding the call to
133 # johbo: Trading complexity for performance. Avoiding the call to
133 # log.debug brings a few percent gain even if is is not active.
134 # log.debug brings a few percent gain even if is is not active.
134 if log.isEnabledFor(logging.DEBUG):
135 if log.isEnabledFor(logging.DEBUG):
135 self._call = self._call_with_logging
136 self._call = self._call_with_logging
136
137
137 def __getattr__(self, name):
138 def __getattr__(self, name):
138 def f(*args, **kwargs):
139 def f(*args, **kwargs):
139 return self._call(name, *args, **kwargs)
140 return self._call(name, *args, **kwargs)
140 return f
141 return f
141
142
142 @exceptions.map_vcs_exceptions
143 @exceptions.map_vcs_exceptions
143 def _call(self, name, *args, **kwargs):
144 def _call(self, name, *args, **kwargs):
144 payload = {
145 payload = {
145 'id': str(uuid.uuid4()),
146 'id': str(uuid.uuid4()),
146 'method': name,
147 'method': name,
147 'params': {'args': args, 'kwargs': kwargs}
148 'params': {'args': args, 'kwargs': kwargs}
148 }
149 }
149 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
150 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
150
151
151 def _call_with_logging(self, name, *args, **kwargs):
152 def _call_with_logging(self, name, *args, **kwargs):
152 log.debug('Calling %s@%s', self._url, name)
153 log.debug('Calling %s@%s', self._url, name)
153 return RemoteObject._call(self, name, *args, **kwargs)
154 return RemoteObject._call(self, name, *args, **kwargs)
154
155
155
156
156 def _remote_call(url, payload, exceptions_map, session):
157 def _remote_call(url, payload, exceptions_map, session):
157 response = session.post(url, data=msgpack.packb(payload))
158 response = session.post(url, data=msgpack.packb(payload))
158 response = msgpack.unpackb(response.content)
159 response = msgpack.unpackb(response.content)
159 error = response.get('error')
160 error = response.get('error')
160 if error:
161 if error:
161 type_ = error.get('type', 'Exception')
162 type_ = error.get('type', 'Exception')
162 exc = exceptions_map.get(type_, Exception)
163 exc = exceptions_map.get(type_, Exception)
163 exc = exc(error.get('message'))
164 exc = exc(error.get('message'))
164 try:
165 try:
165 exc._vcs_kind = error['_vcs_kind']
166 exc._vcs_kind = error['_vcs_kind']
166 except KeyError:
167 except KeyError:
167 pass
168 pass
168 raise exc
169 raise exc
169 return response.get('result')
170 return response.get('result')
170
171
171
172
172 class VcsHttpProxy(object):
173 class VcsHttpProxy(object):
173
174
174 CHUNK_SIZE = 16384
175 CHUNK_SIZE = 16384
175
176
176 def __init__(self, server_and_port, backend_endpoint):
177 def __init__(self, server_and_port, backend_endpoint):
177 adapter = requests.adapters.HTTPAdapter(max_retries=5)
178 adapter = requests.adapters.HTTPAdapter(max_retries=5)
178 self.base_url = urlparse.urljoin(
179 self.base_url = urlparse.urljoin(
179 'http://%s' % server_and_port, backend_endpoint)
180 'http://%s' % server_and_port, backend_endpoint)
180 self.session = requests.Session()
181 self.session = requests.Session()
181 self.session.mount('http://', adapter)
182 self.session.mount('http://', adapter)
182
183
183 def handle(self, environment, input_data, *args, **kwargs):
184 def handle(self, environment, input_data, *args, **kwargs):
184 data = {
185 data = {
185 'environment': environment,
186 'environment': environment,
186 'input_data': input_data,
187 'input_data': input_data,
187 'args': args,
188 'args': args,
188 'kwargs': kwargs
189 'kwargs': kwargs
189 }
190 }
190 result = self.session.post(
191 result = self.session.post(
191 self.base_url, msgpack.packb(data), stream=True)
192 self.base_url, msgpack.packb(data), stream=True)
192 return self._get_result(result)
193 return self._get_result(result)
193
194
194 def _deserialize_and_raise(self, error):
195 def _deserialize_and_raise(self, error):
195 exception = Exception(error['message'])
196 exception = Exception(error['message'])
196 try:
197 try:
197 exception._vcs_kind = error['_vcs_kind']
198 exception._vcs_kind = error['_vcs_kind']
198 except KeyError:
199 except KeyError:
199 pass
200 pass
200 raise exception
201 raise exception
201
202
202 def _iterate(self, result):
203 def _iterate(self, result):
203 unpacker = msgpack.Unpacker()
204 unpacker = msgpack.Unpacker()
204 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
205 for line in result.iter_content(chunk_size=self.CHUNK_SIZE):
205 unpacker.feed(line)
206 unpacker.feed(line)
206 for chunk in unpacker:
207 for chunk in unpacker:
207 yield chunk
208 yield chunk
208
209
209 def _get_result(self, result):
210 def _get_result(self, result):
210 iterator = self._iterate(result)
211 iterator = self._iterate(result)
211 error = iterator.next()
212 error = iterator.next()
212 if error:
213 if error:
213 self._deserialize_and_raise(error)
214 self._deserialize_and_raise(error)
214
215
215 status = iterator.next()
216 status = iterator.next()
216 headers = iterator.next()
217 headers = iterator.next()
217
218
218 return iterator, status, headers
219 return iterator, status, headers
220
221
222 class ThreadlocalSessionFactory(object):
223 """
224 Creates one CurlSession per thread on demand.
225 """
226
227 def __init__(self):
228 self._thread_local = threading.local()
229
230 def __call__(self):
231 if not hasattr(self._thread_local, 'curl_session'):
232 self._thread_local.curl_session = CurlSession()
233 return self._thread_local.curl_session
General Comments 0
You need to be logged in to leave comments. Login now