##// END OF EJS Templates
file-nodes: added streaming remote attributes for vcsserver....
dan -
r3895:2b1d7e0d default
parent child Browse files
Show More
@@ -844,10 +844,9 b' class RepoFilesView(RepoAppView):'
844 844 if disposition == 'attachment':
845 845 disposition = self._get_attachement_headers(f_path)
846 846
847 def stream_node():
848 yield file_node.raw_bytes
847 stream_content = file_node.stream_bytes()
849 848
850 response = Response(app_iter=stream_node())
849 response = Response(app_iter=stream_content)
851 850 response.content_disposition = disposition
852 851 response.content_type = mimetype
853 852
@@ -883,10 +882,9 b' class RepoFilesView(RepoAppView):'
883 882
884 883 disposition = self._get_attachement_headers(f_path)
885 884
886 def stream_node():
887 yield file_node.raw_bytes
885 stream_content = file_node.stream_bytes()
888 886
889 response = Response(app_iter=stream_node())
887 response = Response(app_iter=stream_content)
890 888 response.content_disposition = disposition
891 889 response.content_type = file_node.mimetype
892 890
@@ -72,11 +72,11 b' def connect_http(server_and_port):'
72 72
73 73 session_factory = client_http.ThreadlocalSessionFactory()
74 74
75 connection.Git = client_http.RepoMaker(
75 connection.Git = client_http.RemoteVCSMaker(
76 76 server_and_port, '/git', 'git', session_factory)
77 connection.Hg = client_http.RepoMaker(
77 connection.Hg = client_http.RemoteVCSMaker(
78 78 server_and_port, '/hg', 'hg', session_factory)
79 connection.Svn = client_http.RepoMaker(
79 connection.Svn = client_http.RemoteVCSMaker(
80 80 server_and_port, '/svn', 'svn', session_factory)
81 81 connection.Service = client_http.ServiceConnection(
82 82 server_and_port, '/_service', session_factory)
@@ -107,21 +107,6 b' def connect_vcs(server_and_port, protoco'
107 107 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
108 108
109 109
110 def create_vcsserver_proxy(server_and_port, protocol):
111 if protocol == 'http':
112 return _create_vcsserver_proxy_http(server_and_port)
113 else:
114 raise Exception('Invalid vcs server protocol "{}"'.format(protocol))
115
116
117 def _create_vcsserver_proxy_http(server_and_port):
118 from rhodecode.lib.vcs import client_http
119
120 session = _create_http_rpc_session()
121 url = urlparse.urljoin('http://%s' % server_and_port, '/server')
122 return client_http.RemoteObject(url, session)
123
124
125 110 class CurlSession(object):
126 111 """
127 112 Modeled so that it provides a subset of the requests interface.
@@ -176,12 +161,23 b' class CurlResponse(object):'
176 161
177 162 @property
178 163 def content(self):
164 try:
179 165 return self._response_buffer.getvalue()
166 finally:
167 self._response_buffer.close()
180 168
181 169 @property
182 170 def status_code(self):
183 171 return self._status_code
184 172
173 def iter_content(self, chunk_size):
174 self._response_buffer.seek(0)
175 while 1:
176 chunk = self._response_buffer.read(chunk_size)
177 if not chunk:
178 break
179 yield chunk
180
185 181
186 182 def _create_http_rpc_session():
187 183 session = CurlSession()
@@ -1060,6 +1060,12 b' class BaseCommit(object):'
1060 1060 """
1061 1061 raise NotImplementedError
1062 1062
1063 def get_file_content_streamed(self, path):
1064 """
1065 returns a streaming response from vcsserver with file content
1066 """
1067 raise NotImplementedError
1068
1063 1069 def get_file_size(self, path):
1064 1070 """
1065 1071 Returns size of the file at the given `path`.
@@ -1631,6 +1637,9 b' class EmptyCommit(BaseCommit):'
1631 1637 def get_file_content(self, path):
1632 1638 return u''
1633 1639
1640 def get_file_content_streamed(self, path):
1641 yield self.get_file_content()
1642
1634 1643 def get_file_size(self, path):
1635 1644 return 0
1636 1645
@@ -252,6 +252,11 b' class GitCommit(base.BaseCommit):'
252 252 tree_id, _ = self._get_tree_id_for_path(path)
253 253 return self._remote.blob_as_pretty_string(tree_id)
254 254
255 def get_file_content_streamed(self, path):
256 tree_id, _ = self._get_tree_id_for_path(path)
257 stream_method = getattr(self._remote, 'stream:blob_as_pretty_string')
258 return stream_method(tree_id)
259
255 260 def get_file_size(self, path):
256 261 """
257 262 Returns size of the file at given `path`.
@@ -238,6 +238,11 b' class MercurialCommit(base.BaseCommit):'
238 238 path = self._get_filectx(path)
239 239 return self._remote.fctx_node_data(self.raw_id, path)
240 240
241 def get_file_content_streamed(self, path):
242 path = self._get_filectx(path)
243 stream_method = getattr(self._remote, 'stream:fctx_node_data')
244 return stream_method(self.raw_id, path)
245
241 246 def get_file_size(self, path):
242 247 """
243 248 Returns size of the file at given ``path``.
@@ -117,6 +117,11 b' class SubversionCommit(base.BaseCommit):'
117 117 path = self._fix_path(path)
118 118 return self._remote.get_file_content(safe_str(path), self._svn_rev)
119 119
120 def get_file_content_streamed(self, path):
121 path = self._fix_path(path)
122 stream_method = getattr(self._remote, 'stream:get_file_content')
123 return stream_method(safe_str(path), self._svn_rev)
124
120 125 def get_file_size(self, path):
121 126 path = self._fix_path(path)
122 127 return self._remote.get_file_size(safe_str(path), self._svn_rev)
@@ -51,163 +51,6 b' EXCEPTIONS_MAP = {'
51 51 }
52 52
53 53
54 class RepoMaker(object):
55
56 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
57 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
58 self._session_factory = session_factory
59 self.backend_type = backend_type
60
61 def __call__(self, path, repo_id, config, with_wire=None):
62 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
63 return RemoteRepo(path, repo_id, config, self.url, self._session_factory(),
64 with_wire=with_wire)
65
66 def __getattr__(self, name):
67 def f(*args, **kwargs):
68 return self._call(name, *args, **kwargs)
69 return f
70
71 @exceptions.map_vcs_exceptions
72 def _call(self, name, *args, **kwargs):
73 payload = {
74 'id': str(uuid.uuid4()),
75 'method': name,
76 'backend': self.backend_type,
77 'params': {'args': args, 'kwargs': kwargs}
78 }
79 return _remote_call(
80 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
81
82
83 class ServiceConnection(object):
84 def __init__(self, server_and_port, backend_endpoint, session_factory):
85 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
86 self._session_factory = session_factory
87
88 def __getattr__(self, name):
89 def f(*args, **kwargs):
90 return self._call(name, *args, **kwargs)
91
92 return f
93
94 @exceptions.map_vcs_exceptions
95 def _call(self, name, *args, **kwargs):
96 payload = {
97 'id': str(uuid.uuid4()),
98 'method': name,
99 'params': {'args': args, 'kwargs': kwargs}
100 }
101 return _remote_call(
102 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
103
104
105 class RemoteRepo(object):
106
107 def __init__(self, path, repo_id, config, url, session, with_wire=None):
108 self.url = url
109 self._session = session
110 with_wire = with_wire or {}
111
112 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
113 self._wire = {
114 "path": path, # repo path
115 "repo_id": repo_id,
116 "config": config,
117 "repo_state_uid": repo_state_uid,
118 "context": self._create_vcs_cache_context(path, repo_state_uid)
119 }
120
121 if with_wire:
122 self._wire.update(with_wire)
123
124 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
125 # log.debug brings a few percent gain even if is is not active.
126 if log.isEnabledFor(logging.DEBUG):
127 self._call_with_logging = True
128
129 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
130
131 def __getattr__(self, name):
132 def f(*args, **kwargs):
133 return self._call(name, *args, **kwargs)
134 return f
135
136 @exceptions.map_vcs_exceptions
137 def _call(self, name, *args, **kwargs):
138 # TODO: oliver: This is currently necessary pre-call since the
139 # config object is being changed for hooking scenarios
140 wire = copy.deepcopy(self._wire)
141 wire["config"] = wire["config"].serialize()
142 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
143
144 payload = {
145 'id': str(uuid.uuid4()),
146 'method': name,
147 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
148 }
149
150 if self._call_with_logging:
151 start = time.time()
152 context_uid = wire.get('context')
153 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
154 self.url, name, args, context_uid)
155 result = _remote_call(self.url, payload, EXCEPTIONS_MAP, self._session)
156 if self._call_with_logging:
157 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
158 self.url, name, time.time()-start, context_uid)
159 return result
160
161 def __getitem__(self, key):
162 return self.revision(key)
163
164 def _create_vcs_cache_context(self, *args):
165 """
166 Creates a unique string which is passed to the VCSServer on every
167 remote call. It is used as cache key in the VCSServer.
168 """
169 hash_key = '-'.join(map(str, args))
170 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
171
172 def invalidate_vcs_cache(self):
173 """
174 This invalidates the context which is sent to the VCSServer on every
175 call to a remote method. It forces the VCSServer to create a fresh
176 repository instance on the next call to a remote method.
177 """
178 self._wire['context'] = str(uuid.uuid4())
179
180
181 class RemoteObject(object):
182
183 def __init__(self, url, session):
184 self._url = url
185 self._session = session
186
187 # johbo: Trading complexity for performance. Avoiding the call to
188 # log.debug brings a few percent gain even if is is not active.
189 if log.isEnabledFor(logging.DEBUG):
190 self._call = self._call_with_logging
191
192 def __getattr__(self, name):
193 def f(*args, **kwargs):
194 return self._call(name, *args, **kwargs)
195 return f
196
197 @exceptions.map_vcs_exceptions
198 def _call(self, name, *args, **kwargs):
199 payload = {
200 'id': str(uuid.uuid4()),
201 'method': name,
202 'params': {'args': args, 'kwargs': kwargs}
203 }
204 return _remote_call(self._url, payload, EXCEPTIONS_MAP, self._session)
205
206 def _call_with_logging(self, name, *args, **kwargs):
207 log.debug('Calling %s@%s', self._url, name)
208 return RemoteObject._call(self, name, *args, **kwargs)
209
210
211 54 def _remote_call(url, payload, exceptions_map, session):
212 55 try:
213 56 response = session.post(url, data=msgpack.packb(payload))
@@ -254,6 +97,191 b' def _remote_call(url, payload, exception'
254 97 return response.get('result')
255 98
256 99
100 def _streaming_remote_call(url, payload, exceptions_map, session, chunk_size):
101 try:
102 response = session.post(url, data=msgpack.packb(payload))
103 except pycurl.error as e:
104 msg = '{}. \npycurl traceback: {}'.format(e, traceback.format_exc())
105 raise exceptions.HttpVCSCommunicationError(msg)
106 except Exception as e:
107 message = getattr(e, 'message', '')
108 if 'Failed to connect' in message:
109 # gevent doesn't return proper pycurl errors
110 raise exceptions.HttpVCSCommunicationError(e)
111 else:
112 raise
113
114 if response.status_code >= 400:
115 log.error('Call to %s returned non 200 HTTP code: %s',
116 url, response.status_code)
117 raise exceptions.HttpVCSCommunicationError(repr(response.content))
118
119 return response.iter_content(chunk_size=chunk_size)
120
121
122 class ServiceConnection(object):
123 def __init__(self, server_and_port, backend_endpoint, session_factory):
124 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
125 self._session_factory = session_factory
126
127 def __getattr__(self, name):
128 def f(*args, **kwargs):
129 return self._call(name, *args, **kwargs)
130
131 return f
132
133 @exceptions.map_vcs_exceptions
134 def _call(self, name, *args, **kwargs):
135 payload = {
136 'id': str(uuid.uuid4()),
137 'method': name,
138 'params': {'args': args, 'kwargs': kwargs}
139 }
140 return _remote_call(
141 self.url, payload, EXCEPTIONS_MAP, self._session_factory())
142
143
144 class RemoteVCSMaker(object):
145
146 def __init__(self, server_and_port, backend_endpoint, backend_type, session_factory):
147 self.url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint)
148 self.stream_url = urlparse.urljoin('http://%s' % server_and_port, backend_endpoint+'/stream')
149
150 self._session_factory = session_factory
151 self.backend_type = backend_type
152
153 def __call__(self, path, repo_id, config, with_wire=None):
154 log.debug('%s RepoMaker call on %s', self.backend_type.upper(), path)
155 return RemoteRepo(path, repo_id, config, self, with_wire=with_wire)
156
157 def __getattr__(self, name):
158 def remote_attr(*args, **kwargs):
159 return self._call(name, *args, **kwargs)
160 return remote_attr
161
162 @exceptions.map_vcs_exceptions
163 def _call(self, func_name, *args, **kwargs):
164 payload = {
165 'id': str(uuid.uuid4()),
166 'method': func_name,
167 'backend': self.backend_type,
168 'params': {'args': args, 'kwargs': kwargs}
169 }
170 url = self.url
171 return _remote_call(url, payload, EXCEPTIONS_MAP, self._session_factory())
172
173
174 class RemoteRepo(object):
175 CHUNK_SIZE = 16384
176
177 def __init__(self, path, repo_id, config, remote_maker, with_wire=None):
178 self.url = remote_maker.url
179 self.stream_url = remote_maker.stream_url
180 self._session = remote_maker._session_factory()
181
182 with_wire = with_wire or {}
183
184 repo_state_uid = with_wire.get('repo_state_uid') or 'state'
185 self._wire = {
186 "path": path, # repo path
187 "repo_id": repo_id,
188 "config": config,
189 "repo_state_uid": repo_state_uid,
190 "context": self._create_vcs_cache_context(path, repo_state_uid)
191 }
192
193 if with_wire:
194 self._wire.update(with_wire)
195
196 # NOTE(johbo): Trading complexity for performance. Avoiding the call to
197 # log.debug brings a few percent gain even if is is not active.
198 if log.isEnabledFor(logging.DEBUG):
199 self._call_with_logging = True
200
201 self.cert_dir = get_cert_path(rhodecode.CONFIG.get('__file__'))
202
203 def __getattr__(self, name):
204
205 if name.startswith('stream:'):
206 def repo_remote_attr(*args, **kwargs):
207 return self._call_stream(name, *args, **kwargs)
208 else:
209 def repo_remote_attr(*args, **kwargs):
210 return self._call(name, *args, **kwargs)
211
212 return repo_remote_attr
213
214 def _base_call(self, name, *args, **kwargs):
215 # TODO: oliver: This is currently necessary pre-call since the
216 # config object is being changed for hooking scenarios
217 wire = copy.deepcopy(self._wire)
218 wire["config"] = wire["config"].serialize()
219 wire["config"].append(('vcs', 'ssl_dir', self.cert_dir))
220
221 payload = {
222 'id': str(uuid.uuid4()),
223 'method': name,
224 'params': {'wire': wire, 'args': args, 'kwargs': kwargs}
225 }
226
227 context_uid = wire.get('context')
228 return context_uid, payload
229
230 @exceptions.map_vcs_exceptions
231 def _call(self, name, *args, **kwargs):
232 context_uid, payload = self._base_call(name, *args, **kwargs)
233 url = self.url
234
235 start = time.time()
236 if self._call_with_logging:
237 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
238 url, name, args, context_uid)
239
240 result = _remote_call(url, payload, EXCEPTIONS_MAP, self._session)
241 if self._call_with_logging:
242 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
243 url, name, time.time()-start, context_uid)
244 return result
245
246 @exceptions.map_vcs_exceptions
247 def _call_stream(self, name, *args, **kwargs):
248 context_uid, payload = self._base_call(name, *args, **kwargs)
249 payload['chunk_size'] = self.CHUNK_SIZE
250 url = self.stream_url
251
252 start = time.time()
253 if self._call_with_logging:
254 log.debug('Calling %s@%s with args:%.10240r. wire_context: %s',
255 url, name, args, context_uid)
256
257 result = _streaming_remote_call(url, payload, EXCEPTIONS_MAP, self._session,
258 self.CHUNK_SIZE)
259
260 if self._call_with_logging:
261 log.debug('Call %s@%s took: %.4fs. wire_context: %s',
262 url, name, time.time()-start, context_uid)
263 return result
264
265 def __getitem__(self, key):
266 return self.revision(key)
267
268 def _create_vcs_cache_context(self, *args):
269 """
270 Creates a unique string which is passed to the VCSServer on every
271 remote call. It is used as cache key in the VCSServer.
272 """
273 hash_key = '-'.join(map(str, args))
274 return str(uuid.uuid5(uuid.NAMESPACE_URL, hash_key))
275
276 def invalidate_vcs_cache(self):
277 """
278 This invalidates the context which is sent to the VCSServer on every
279 call to a remote method. It forces the VCSServer to create a fresh
280 repository instance on the next call to a remote method.
281 """
282 self._wire['context'] = str(uuid.uuid4())
283
284
257 285 class VcsHttpProxy(object):
258 286
259 287 CHUNK_SIZE = 16384
@@ -27,6 +27,7 b' import stat'
27 27
28 28 from zope.cachedescriptors.property import Lazy as LazyProperty
29 29
30 import rhodecode
30 31 from rhodecode.config.conf import LANGUAGES_EXTENSIONS_MAP
31 32 from rhodecode.lib.utils import safe_unicode, safe_str
32 33 from rhodecode.lib.utils2 import md5
@@ -369,6 +370,17 b' class FileNode(Node):'
369 370 content = self._content
370 371 return content
371 372
373 def stream_bytes(self):
374 """
375 Returns an iterator that will stream the content of the file directly from
376 vcsserver without loading it to memory.
377 """
378 if self.commit:
379 return self.commit.get_file_content_streamed(self.path)
380 raise NodeError(
381 "Cannot retrieve message of the file without related "
382 "commit attribute")
383
372 384 @LazyProperty
373 385 def md5(self):
374 386 """
@@ -848,3 +860,11 b' class LargeFileNode(FileNode):'
848 860 Overwrites name to be the org lf path
849 861 """
850 862 return self.org_path
863
864 def stream_bytes(self):
865 with open(self.path, 'rb') as stream:
866 while True:
867 data = stream.read(16 * 1024)
868 if not data:
869 break
870 yield data
@@ -57,7 +57,6 b' from rhodecode.model.integration import '
57 57 from rhodecode.integrations import integration_type_registry
58 58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 59 from rhodecode.lib.utils import repo2db_mapper
60 from rhodecode.lib.vcs import create_vcsserver_proxy
61 60 from rhodecode.lib.vcs.backends import get_backend
62 61 from rhodecode.lib.vcs.nodes import FileNode
63 62 from rhodecode.tests import (
@@ -1398,82 +1397,7 b' def testrun():'
1398 1397 }
1399 1398
1400 1399
1401 @pytest.fixture(autouse=True)
1402 def collect_appenlight_stats(request, testrun):
1403 """
1404 This fixture reports memory consumtion of single tests.
1405
1406 It gathers data based on `psutil` and sends them to Appenlight. The option
1407 ``--ae`` has te be used to enable this fixture and the API key for your
1408 application has to be provided in ``--ae-key``.
1409 """
1410 try:
1411 # cygwin cannot have yet psutil support.
1412 import psutil
1413 except ImportError:
1414 return
1415
1416 if not request.config.getoption('--appenlight'):
1417 return
1418 else:
1419 # Only request the baseapp fixture if appenlight tracking is
1420 # enabled. This will speed up a test run of unit tests by 2 to 3
1421 # seconds if appenlight is not enabled.
1422 baseapp = request.getfuncargvalue("baseapp")
1423 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1424 client = AppenlightClient(
1425 url=url,
1426 api_key=request.config.getoption('--appenlight-api-key'),
1427 namespace=request.node.nodeid,
1428 request=str(testrun['uuid']),
1429 testrun=testrun)
1430
1431 client.collect({
1432 'message': "Starting",
1433 })
1434
1435 server_and_port = baseapp.config.get_settings()['vcs.server']
1436 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1437 server = create_vcsserver_proxy(server_and_port, protocol)
1438 with server:
1439 vcs_pid = server.get_pid()
1440 server.run_gc()
1441 vcs_process = psutil.Process(vcs_pid)
1442 mem = vcs_process.memory_info()
1443 client.tag_before('vcsserver.rss', mem.rss)
1444 client.tag_before('vcsserver.vms', mem.vms)
1445
1446 test_process = psutil.Process()
1447 mem = test_process.memory_info()
1448 client.tag_before('test.rss', mem.rss)
1449 client.tag_before('test.vms', mem.vms)
1450
1451 client.tag_before('time', time.time())
1452
1453 @request.addfinalizer
1454 def send_stats():
1455 client.tag_after('time', time.time())
1456 with server:
1457 gc_stats = server.run_gc()
1458 for tag, value in gc_stats.items():
1459 client.tag_after(tag, value)
1460 mem = vcs_process.memory_info()
1461 client.tag_after('vcsserver.rss', mem.rss)
1462 client.tag_after('vcsserver.vms', mem.vms)
1463
1464 mem = test_process.memory_info()
1465 client.tag_after('test.rss', mem.rss)
1466 client.tag_after('test.vms', mem.vms)
1467
1468 client.collect({
1469 'message': "Finished",
1470 })
1471 client.send_stats()
1472
1473 return client
1474
1475
1476 class AppenlightClient():
1400 class AppenlightClient(object):
1477 1401
1478 1402 url_template = '{url}?protocol_version=0.5'
1479 1403
@@ -96,7 +96,7 b' def test_uses_persistent_http_connection'
96 96
97 97
98 98 def test_repo_maker_uses_session_for_classmethods(stub_session_factory):
99 repo_maker = client_http.RepoMaker(
99 repo_maker = client_http.RemoteVCSMaker(
100 100 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
101 101 repo_maker.example_call()
102 102 stub_session_factory().post.assert_called_with(
@@ -105,7 +105,7 b' def test_repo_maker_uses_session_for_cla'
105 105
106 106 def test_repo_maker_uses_session_for_instance_methods(
107 107 stub_session_factory, config):
108 repo_maker = client_http.RepoMaker(
108 repo_maker = client_http.RemoteVCSMaker(
109 109 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_factory)
110 110 repo = repo_maker('stub_path', 'stub_repo_id', config)
111 111 repo.example_call()
@@ -125,7 +125,7 b' def test_connect_passes_in_the_same_sess'
125 125
126 126 def test_repo_maker_uses_session_that_throws_error(
127 127 stub_session_failing_factory, config):
128 repo_maker = client_http.RepoMaker(
128 repo_maker = client_http.RemoteVCSMaker(
129 129 'server_and_port', 'endpoint', 'test_dummy_scm', stub_session_failing_factory)
130 130 repo = repo_maker('stub_path', 'stub_repo_id', config)
131 131
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now