##// END OF EJS Templates
core: various fixes of bytes vs str usage based on rhodecode-ce tests outputs
super-admin -
r1070:0eb4128e python3
parent child Browse files
Show More
@@ -1,134 +1,135 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17 import os
18 18 import sys
19 19 import traceback
20 20 import logging
21 21 import urllib.parse
22 22
23 23 from vcsserver.lib.rc_cache import region_meta
24 24
25 25 from vcsserver import exceptions
26 26 from vcsserver.exceptions import NoContentException
27 27 from vcsserver.hgcompat import (archival)
28 from vcsserver.str_utils import safe_bytes
28 29
29 30 log = logging.getLogger(__name__)
30 31
31 32
32 33 class RepoFactory(object):
33 34 """
34 35 Utility to create instances of repository
35 36
36 37 It provides internal caching of the `repo` object based on
37 38 the :term:`call context`.
38 39 """
39 40 repo_type = None
40 41
41 42 def __init__(self):
42 43 self._cache_region = region_meta.dogpile_cache_regions['repo_object']
43 44
44 45 def _create_config(self, path, config):
45 46 config = {}
46 47 return config
47 48
48 49 def _create_repo(self, wire, create):
49 50 raise NotImplementedError()
50 51
51 52 def repo(self, wire, create=False):
52 53 raise NotImplementedError()
53 54
54 55
55 56 def obfuscate_qs(query_string):
56 57 if query_string is None:
57 58 return None
58 59
59 60 parsed = []
60 61 for k, v in urllib.parse.parse_qsl(query_string, keep_blank_values=True):
61 62 if k in ['auth_token', 'api_key']:
62 63 v = "*****"
63 64 parsed.append((k, v))
64 65
65 66 return '&'.join('{}{}'.format(
66 67 k, '={}'.format(v) if v else '') for k, v in parsed)
67 68
68 69
69 70 def raise_from_original(new_type, org_exc: Exception):
70 71 """
71 72 Raise a new exception type with original args and traceback.
72 73 """
73 74
74 75 exc_type, exc_value, exc_traceback = sys.exc_info()
75 76 new_exc = new_type(*exc_value.args)
76 77
77 78 # store the original traceback into the new exc
78 79 new_exc._org_exc_tb = traceback.format_tb(exc_traceback)
79 80
80 81 try:
81 82 raise new_exc.with_traceback(exc_traceback)
82 83 finally:
83 84 del exc_traceback
84 85
85 86
86 87 class ArchiveNode(object):
87 88 def __init__(self, path, mode, is_link, raw_bytes):
88 89 self.path = path
89 90 self.mode = mode
90 91 self.is_link = is_link
91 92 self.raw_bytes = raw_bytes
92 93
93 94
94 95 def archive_repo(walker, archive_dest_path, kind, mtime, archive_at_path,
95 96 archive_dir_name, commit_id, write_metadata=True, extra_metadata=None):
96 97 """
97 98 walker should be a file walker, for example:
98 99 def walker():
99 100 for file_info in files:
100 101 yield ArchiveNode(fn, mode, is_link, ctx[fn].data)
101 102 """
102 103 extra_metadata = extra_metadata or {}
103 104
104 105 if kind == "tgz":
105 106 archiver = archival.tarit(archive_dest_path, mtime, "gz")
106 107 elif kind == "tbz2":
107 108 archiver = archival.tarit(archive_dest_path, mtime, "bz2")
108 109 elif kind == 'zip':
109 110 archiver = archival.zipit(archive_dest_path, mtime)
110 111 else:
111 112 raise exceptions.ArchiveException()(
112 113 'Remote does not support: "%s" archive type.' % kind)
113 114
114 115 for f in walker(commit_id, archive_at_path):
115 f_path = os.path.join(archive_dir_name, f.path.lstrip('/'))
116 f_path = os.path.join(safe_bytes(archive_dir_name), f.path.lstrip(b'/'))
116 117 try:
117 118 archiver.addfile(f_path, f.mode, f.is_link, f.raw_bytes())
118 119 except NoContentException:
119 120 # NOTE(marcink): this is a special case for SVN so we can create "empty"
120 121 # directories which arent supported by archiver
121 archiver.addfile(os.path.join(f_path, '.dir'), f.mode, f.is_link, '')
122 archiver.addfile(os.path.join(f_path, b'.dir'), f.mode, f.is_link, '')
122 123
123 124 if write_metadata:
124 125 metadata = dict([
125 126 ('commit_id', commit_id),
126 127 ('mtime', mtime),
127 128 ])
128 129 metadata.update(extra_metadata)
129 130
130 meta = ["%s:%s" % (f_name, value) for f_name, value in metadata.items()]
131 f_path = os.path.join(archive_dir_name, '.archival.txt')
132 archiver.addfile(f_path, 0o644, False, '\n'.join(meta))
131 meta = [safe_bytes(f"{f_name}:{value}") for f_name, value in metadata.items()]
132 f_path = os.path.join(safe_bytes(archive_dir_name), b'.archival.txt')
133 archiver.addfile(f_path, 0o644, False, b'\n'.join(meta))
133 134
134 135 return archiver.done()
@@ -1,292 +1,292 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import re
19 19 import logging
20 20 from wsgiref.util import FileWrapper
21 21
22 22 from pyramid.config import Configurator
23 23 from pyramid.response import Response, FileIter
24 24 from pyramid.httpexceptions import (
25 25 HTTPBadRequest, HTTPNotImplemented, HTTPNotFound, HTTPForbidden,
26 26 HTTPUnprocessableEntity)
27 27
28 28 from vcsserver.lib.rc_json import json
29 29 from vcsserver.git_lfs.lib import OidHandler, LFSOidStore
30 30 from vcsserver.git_lfs.utils import safe_result, get_cython_compat_decorator
31 31 from vcsserver.str_utils import safe_int
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 GIT_LFS_CONTENT_TYPE = 'application/vnd.git-lfs' #+json ?
37 37 GIT_LFS_PROTO_PAT = re.compile(r'^/(.+)/(info/lfs/(.+))')
38 38
39 39
40 40 def write_response_error(http_exception, text=None):
41 41 content_type = GIT_LFS_CONTENT_TYPE + '+json'
42 42 _exception = http_exception(content_type=content_type)
43 43 _exception.content_type = content_type
44 44 if text:
45 _exception.text = json.dumps({'message': text})
45 _exception.body = json.dumps({'message': text})
46 46 log.debug('LFS: writing response of type %s to client with text:%s',
47 47 http_exception, text)
48 48 return _exception
49 49
50 50
51 51 class AuthHeaderRequired(object):
52 52 """
53 53 Decorator to check if request has proper auth-header
54 54 """
55 55
56 56 def __call__(self, func):
57 57 return get_cython_compat_decorator(self.__wrapper, func)
58 58
59 59 def __wrapper(self, func, *fargs, **fkwargs):
60 60 request = fargs[1]
61 61 auth = request.authorization
62 62 if not auth:
63 63 return write_response_error(HTTPForbidden)
64 64 return func(*fargs[1:], **fkwargs)
65 65
66 66
67 67 # views
68 68
69 69 def lfs_objects(request):
70 70 # indicate not supported, V1 API
71 71 log.warning('LFS: v1 api not supported, reporting it back to client')
72 72 return write_response_error(HTTPNotImplemented, 'LFS: v1 api not supported')
73 73
74 74
75 75 @AuthHeaderRequired()
76 76 def lfs_objects_batch(request):
77 77 """
78 78 The client sends the following information to the Batch endpoint to transfer some objects:
79 79
80 80 operation - Should be download or upload.
81 81 transfers - An optional Array of String identifiers for transfer
82 82 adapters that the client has configured. If omitted, the basic
83 83 transfer adapter MUST be assumed by the server.
84 84 objects - An Array of objects to download.
85 85 oid - String OID of the LFS object.
86 86 size - Integer byte size of the LFS object. Must be at least zero.
87 87 """
88 88 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
89 89 auth = request.authorization
90 90 repo = request.matchdict.get('repo')
91 91 data = request.json
92 92 operation = data.get('operation')
93 93 http_scheme = request.registry.git_lfs_http_scheme
94 94
95 95 if operation not in ('download', 'upload'):
96 96 log.debug('LFS: unsupported operation:%s', operation)
97 97 return write_response_error(
98 98 HTTPBadRequest, 'unsupported operation mode: `%s`' % operation)
99 99
100 100 if 'objects' not in data:
101 101 log.debug('LFS: missing objects data')
102 102 return write_response_error(
103 103 HTTPBadRequest, 'missing objects data')
104 104
105 105 log.debug('LFS: handling operation of type: %s', operation)
106 106
107 107 objects = []
108 108 for o in data['objects']:
109 109 try:
110 110 oid = o['oid']
111 111 obj_size = o['size']
112 112 except KeyError:
113 113 log.exception('LFS, failed to extract data')
114 114 return write_response_error(
115 115 HTTPBadRequest, 'unsupported data in objects')
116 116
117 117 obj_data = {'oid': oid}
118 118
119 119 obj_href = request.route_url('lfs_objects_oid', repo=repo, oid=oid,
120 120 _scheme=http_scheme)
121 121 obj_verify_href = request.route_url('lfs_objects_verify', repo=repo,
122 122 _scheme=http_scheme)
123 123 store = LFSOidStore(
124 124 oid, repo, store_location=request.registry.git_lfs_store_path)
125 125 handler = OidHandler(
126 126 store, repo, auth, oid, obj_size, obj_data,
127 127 obj_href, obj_verify_href)
128 128
129 129 # this verifies also OIDs
130 130 actions, errors = handler.exec_operation(operation)
131 131 if errors:
132 132 log.warning('LFS: got following errors: %s', errors)
133 133 obj_data['errors'] = errors
134 134
135 135 if actions:
136 136 obj_data['actions'] = actions
137 137
138 138 obj_data['size'] = obj_size
139 139 obj_data['authenticated'] = True
140 140 objects.append(obj_data)
141 141
142 142 result = {'objects': objects, 'transfer': 'basic'}
143 143 log.debug('LFS Response %s', safe_result(result))
144 144
145 145 return result
146 146
147 147
148 148 def lfs_objects_oid_upload(request):
149 149 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
150 150 repo = request.matchdict.get('repo')
151 151 oid = request.matchdict.get('oid')
152 152 store = LFSOidStore(
153 153 oid, repo, store_location=request.registry.git_lfs_store_path)
154 154 engine = store.get_engine(mode='wb')
155 155 log.debug('LFS: starting chunked write of LFS oid: %s to storage', oid)
156 156
157 157 body = request.environ['wsgi.input']
158 158
159 159 with engine as f:
160 160 blksize = 64 * 1024 # 64kb
161 161 while True:
162 162 # read in chunks as stream comes in from Gunicorn
163 163 # this is a specific Gunicorn support function.
164 164 # might work differently on waitress
165 165 chunk = body.read(blksize)
166 166 if not chunk:
167 167 break
168 168 f.write(chunk)
169 169
170 170 return {'upload': 'ok'}
171 171
172 172
173 173 def lfs_objects_oid_download(request):
174 174 repo = request.matchdict.get('repo')
175 175 oid = request.matchdict.get('oid')
176 176
177 177 store = LFSOidStore(
178 178 oid, repo, store_location=request.registry.git_lfs_store_path)
179 179 if not store.has_oid():
180 180 log.debug('LFS: oid %s does not exists in store', oid)
181 181 return write_response_error(
182 182 HTTPNotFound, 'requested file with oid `%s` not found in store' % oid)
183 183
184 184 # TODO(marcink): support range header ?
185 185 # Range: bytes=0-, `bytes=(\d+)\-.*`
186 186
187 187 f = open(store.oid_path, 'rb')
188 188 response = Response(
189 189 content_type='application/octet-stream', app_iter=FileIter(f))
190 190 response.headers.add('X-RC-LFS-Response-Oid', str(oid))
191 191 return response
192 192
193 193
194 194 def lfs_objects_verify(request):
195 195 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
196 196 repo = request.matchdict.get('repo')
197 197
198 198 data = request.json
199 199 oid = data.get('oid')
200 200 size = safe_int(data.get('size'))
201 201
202 202 if not (oid and size):
203 203 return write_response_error(
204 204 HTTPBadRequest, 'missing oid and size in request data')
205 205
206 206 store = LFSOidStore(
207 207 oid, repo, store_location=request.registry.git_lfs_store_path)
208 208 if not store.has_oid():
209 209 log.debug('LFS: oid %s does not exists in store', oid)
210 210 return write_response_error(
211 211 HTTPNotFound, 'oid `%s` does not exists in store' % oid)
212 212
213 213 store_size = store.size_oid()
214 214 if store_size != size:
215 215 msg = 'requested file size mismatch store size:%s requested:%s' % (
216 216 store_size, size)
217 217 return write_response_error(
218 218 HTTPUnprocessableEntity, msg)
219 219
220 220 return {'message': {'size': 'ok', 'in_store': 'ok'}}
221 221
222 222
223 223 def lfs_objects_lock(request):
224 224 return write_response_error(
225 225 HTTPNotImplemented, 'GIT LFS locking api not supported')
226 226
227 227
228 228 def not_found(request):
229 229 return write_response_error(
230 230 HTTPNotFound, 'request path not found')
231 231
232 232
233 233 def lfs_disabled(request):
234 234 return write_response_error(
235 235 HTTPNotImplemented, 'GIT LFS disabled for this repo')
236 236
237 237
238 238 def git_lfs_app(config):
239 239
240 240 # v1 API deprecation endpoint
241 241 config.add_route('lfs_objects',
242 242 '/{repo:.*?[^/]}/info/lfs/objects')
243 243 config.add_view(lfs_objects, route_name='lfs_objects',
244 244 request_method='POST', renderer='json')
245 245
246 246 # locking API
247 247 config.add_route('lfs_objects_lock',
248 248 '/{repo:.*?[^/]}/info/lfs/locks')
249 249 config.add_view(lfs_objects_lock, route_name='lfs_objects_lock',
250 250 request_method=('POST', 'GET'), renderer='json')
251 251
252 252 config.add_route('lfs_objects_lock_verify',
253 253 '/{repo:.*?[^/]}/info/lfs/locks/verify')
254 254 config.add_view(lfs_objects_lock, route_name='lfs_objects_lock_verify',
255 255 request_method=('POST', 'GET'), renderer='json')
256 256
257 257 # batch API
258 258 config.add_route('lfs_objects_batch',
259 259 '/{repo:.*?[^/]}/info/lfs/objects/batch')
260 260 config.add_view(lfs_objects_batch, route_name='lfs_objects_batch',
261 261 request_method='POST', renderer='json')
262 262
263 263 # oid upload/download API
264 264 config.add_route('lfs_objects_oid',
265 265 '/{repo:.*?[^/]}/info/lfs/objects/{oid}')
266 266 config.add_view(lfs_objects_oid_upload, route_name='lfs_objects_oid',
267 267 request_method='PUT', renderer='json')
268 268 config.add_view(lfs_objects_oid_download, route_name='lfs_objects_oid',
269 269 request_method='GET', renderer='json')
270 270
271 271 # verification API
272 272 config.add_route('lfs_objects_verify',
273 273 '/{repo:.*?[^/]}/info/lfs/verify')
274 274 config.add_view(lfs_objects_verify, route_name='lfs_objects_verify',
275 275 request_method='POST', renderer='json')
276 276
277 277 # not found handler for API
278 278 config.add_notfound_view(not_found, renderer='json')
279 279
280 280
281 281 def create_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme):
282 282 config = Configurator()
283 283 if git_lfs_enabled:
284 284 config.include(git_lfs_app)
285 285 config.registry.git_lfs_store_path = git_lfs_store_path
286 286 config.registry.git_lfs_http_scheme = git_lfs_http_scheme
287 287 else:
288 288 # not found handler for API, reporting disabled LFS support
289 289 config.add_notfound_view(lfs_disabled, renderer='json')
290 290
291 291 app = config.make_wsgi_app()
292 292 return app
@@ -1,741 +1,741 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import sys
21 21 import base64
22 22 import locale
23 23 import logging
24 24 import uuid
25 25 import time
26 26 import wsgiref.util
27 27 import traceback
28 28 import tempfile
29 29 import psutil
30 30
31 31 from itertools import chain
32 32
33 33 import msgpack
34 34 import configparser
35 35
36 36 from pyramid.config import Configurator
37 37 from pyramid.wsgi import wsgiapp
38 38 from pyramid.response import Response
39 39
40 40 from vcsserver.lib.rc_json import json
41 41 from vcsserver.config.settings_maker import SettingsMaker
42 from vcsserver.str_utils import safe_int
42 from vcsserver.str_utils import safe_int, safe_bytes, safe_str
43 43 from vcsserver.lib.statsd_client import StatsdClient
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47 # due to Mercurial/glibc2.27 problems we need to detect if locale settings are
48 48 # causing problems and "fix" it in case they do and fallback to LC_ALL = C
49 49
50 50 try:
51 51 locale.setlocale(locale.LC_ALL, '')
52 52 except locale.Error as e:
53 53 log.error(
54 54 'LOCALE ERROR: failed to set LC_ALL, fallback to LC_ALL=C, org error: %s', e)
55 55 os.environ['LC_ALL'] = 'C'
56 56
57 57
58 58 import vcsserver
59 59 from vcsserver import remote_wsgi, scm_app, settings, hgpatches
60 60 from vcsserver.git_lfs.app import GIT_LFS_CONTENT_TYPE, GIT_LFS_PROTO_PAT
61 61 from vcsserver.echo_stub import remote_wsgi as remote_wsgi_stub
62 62 from vcsserver.echo_stub.echo_app import EchoApp
63 63 from vcsserver.exceptions import HTTPRepoLocked, HTTPRepoBranchProtected
64 64 from vcsserver.lib.exc_tracking import store_exception
65 65 from vcsserver.server import VcsServer
66 66
67 67 strict_vcs = True
68 68
69 69 git_import_err = None
70 70 try:
71 71 from vcsserver.remote.git import GitFactory, GitRemote
72 72 except ImportError as e:
73 73 GitFactory = None
74 74 GitRemote = None
75 75 git_import_err = e
76 76 if strict_vcs:
77 77 raise
78 78
79 79
80 80 hg_import_err = None
81 81 try:
82 82 from vcsserver.remote.hg import MercurialFactory, HgRemote
83 83 except ImportError as e:
84 84 MercurialFactory = None
85 85 HgRemote = None
86 86 hg_import_err = e
87 87 if strict_vcs:
88 88 raise
89 89
90 90
91 91 svn_import_err = None
92 92 try:
93 93 from vcsserver.remote.svn import SubversionFactory, SvnRemote
94 94 except ImportError as e:
95 95 SubversionFactory = None
96 96 SvnRemote = None
97 97 svn_import_err = e
98 98 if strict_vcs:
99 99 raise
100 100
101 101
102 102 def _is_request_chunked(environ):
103 103 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
104 104 return stream
105 105
106 106
107 107 def log_max_fd():
108 108 try:
109 109 maxfd = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)[1]
110 110 log.info('Max file descriptors value: %s', maxfd)
111 111 except Exception:
112 112 pass
113 113
114 114
115 115 class VCS(object):
116 116 def __init__(self, locale_conf=None, cache_config=None):
117 117 self.locale = locale_conf
118 118 self.cache_config = cache_config
119 119 self._configure_locale()
120 120
121 121 log_max_fd()
122 122
123 123 if GitFactory and GitRemote:
124 124 git_factory = GitFactory()
125 125 self._git_remote = GitRemote(git_factory)
126 126 else:
127 127 log.error("Git client import failed: %s", git_import_err)
128 128
129 129 if MercurialFactory and HgRemote:
130 130 hg_factory = MercurialFactory()
131 131 self._hg_remote = HgRemote(hg_factory)
132 132 else:
133 133 log.error("Mercurial client import failed: %s", hg_import_err)
134 134
135 135 if SubversionFactory and SvnRemote:
136 136 svn_factory = SubversionFactory()
137 137
138 138 # hg factory is used for svn url validation
139 139 hg_factory = MercurialFactory()
140 140 self._svn_remote = SvnRemote(svn_factory, hg_factory=hg_factory)
141 141 else:
142 142 log.error("Subversion client import failed: %s", svn_import_err)
143 143
144 144 self._vcsserver = VcsServer()
145 145
146 146 def _configure_locale(self):
147 147 if self.locale:
148 148 log.info('Settings locale: `LC_ALL` to %s', self.locale)
149 149 else:
150 150 log.info('Configuring locale subsystem based on environment variables')
151 151 try:
152 152 # If self.locale is the empty string, then the locale
153 153 # module will use the environment variables. See the
154 154 # documentation of the package `locale`.
155 155 locale.setlocale(locale.LC_ALL, self.locale)
156 156
157 157 language_code, encoding = locale.getlocale()
158 158 log.info(
159 159 'Locale set to language code "%s" with encoding "%s".',
160 160 language_code, encoding)
161 161 except locale.Error:
162 162 log.exception('Cannot set locale, not configuring the locale system')
163 163
164 164
165 165 class WsgiProxy(object):
166 166 def __init__(self, wsgi):
167 167 self.wsgi = wsgi
168 168
169 169 def __call__(self, environ, start_response):
170 170 input_data = environ['wsgi.input'].read()
171 171 input_data = msgpack.unpackb(input_data)
172 172
173 173 error = None
174 174 try:
175 175 data, status, headers = self.wsgi.handle(
176 176 input_data['environment'], input_data['input_data'],
177 177 *input_data['args'], **input_data['kwargs'])
178 178 except Exception as e:
179 179 data, status, headers = [], None, None
180 180 error = {
181 181 'message': str(e),
182 182 '_vcs_kind': getattr(e, '_vcs_kind', None)
183 183 }
184 184
185 185 start_response(200, {})
186 186 return self._iterator(error, status, headers, data)
187 187
188 188 def _iterator(self, error, status, headers, data):
189 189 initial_data = [
190 190 error,
191 191 status,
192 192 headers,
193 193 ]
194 194
195 195 for d in chain(initial_data, data):
196 196 yield msgpack.packb(d)
197 197
198 198
199 199 def not_found(request):
200 200 return {'status': '404 NOT FOUND'}
201 201
202 202
203 203 class VCSViewPredicate(object):
204 204 def __init__(self, val, config):
205 205 self.remotes = val
206 206
207 207 def text(self):
208 208 return 'vcs view method = %s' % (list(self.remotes.keys()),)
209 209
210 210 phash = text
211 211
212 212 def __call__(self, context, request):
213 213 """
214 214 View predicate that returns true if given backend is supported by
215 215 defined remotes.
216 216 """
217 217 backend = request.matchdict.get('backend')
218 218 return backend in self.remotes
219 219
220 220
221 221 class HTTPApplication(object):
222 222 ALLOWED_EXCEPTIONS = ('KeyError', 'URLError')
223 223
224 224 remote_wsgi = remote_wsgi
225 225 _use_echo_app = False
226 226
227 227 def __init__(self, settings=None, global_config=None):
228 228
229 229 self.config = Configurator(settings=settings)
230 230 # Init our statsd at very start
231 231 self.config.registry.statsd = StatsdClient.statsd
232 232
233 233 self.global_config = global_config
234 234 self.config.include('vcsserver.lib.rc_cache')
235 235
236 236 settings_locale = settings.get('locale', '') or 'en_US.UTF-8'
237 237 vcs = VCS(locale_conf=settings_locale, cache_config=settings)
238 238 self._remotes = {
239 239 'hg': vcs._hg_remote,
240 240 'git': vcs._git_remote,
241 241 'svn': vcs._svn_remote,
242 242 'server': vcs._vcsserver,
243 243 }
244 244 if settings.get('dev.use_echo_app', 'false').lower() == 'true':
245 245 self._use_echo_app = True
246 246 log.warning("Using EchoApp for VCS operations.")
247 247 self.remote_wsgi = remote_wsgi_stub
248 248
249 249 self._configure_settings(global_config, settings)
250 250
251 251 self._configure()
252 252
253 253 def _configure_settings(self, global_config, app_settings):
254 254 """
255 255 Configure the settings module.
256 256 """
257 257 settings_merged = global_config.copy()
258 258 settings_merged.update(app_settings)
259 259
260 260 git_path = app_settings.get('git_path', None)
261 261 if git_path:
262 262 settings.GIT_EXECUTABLE = git_path
263 263 binary_dir = app_settings.get('core.binary_dir', None)
264 264 if binary_dir:
265 265 settings.BINARY_DIR = binary_dir
266 266
267 267 # Store the settings to make them available to other modules.
268 268 vcsserver.PYRAMID_SETTINGS = settings_merged
269 269 vcsserver.CONFIG = settings_merged
270 270
271 271 def _configure(self):
272 272 self.config.add_renderer(name='msgpack', factory=self._msgpack_renderer_factory)
273 273
274 274 self.config.add_route('service', '/_service')
275 275 self.config.add_route('status', '/status')
276 276 self.config.add_route('hg_proxy', '/proxy/hg')
277 277 self.config.add_route('git_proxy', '/proxy/git')
278 278
279 279 # rpc methods
280 280 self.config.add_route('vcs', '/{backend}')
281 281
282 282 # streaming rpc remote methods
283 283 self.config.add_route('vcs_stream', '/{backend}/stream')
284 284
285 285 # vcs operations clone/push as streaming
286 286 self.config.add_route('stream_git', '/stream/git/*repo_name')
287 287 self.config.add_route('stream_hg', '/stream/hg/*repo_name')
288 288
289 289 self.config.add_view(self.status_view, route_name='status', renderer='json')
290 290 self.config.add_view(self.service_view, route_name='service', renderer='msgpack')
291 291
292 292 self.config.add_view(self.hg_proxy(), route_name='hg_proxy')
293 293 self.config.add_view(self.git_proxy(), route_name='git_proxy')
294 294 self.config.add_view(self.vcs_view, route_name='vcs', renderer='msgpack',
295 295 vcs_view=self._remotes)
296 296 self.config.add_view(self.vcs_stream_view, route_name='vcs_stream',
297 297 vcs_view=self._remotes)
298 298
299 299 self.config.add_view(self.hg_stream(), route_name='stream_hg')
300 300 self.config.add_view(self.git_stream(), route_name='stream_git')
301 301
302 302 self.config.add_view_predicate('vcs_view', VCSViewPredicate)
303 303
304 304 self.config.add_notfound_view(not_found, renderer='json')
305 305
306 306 self.config.add_view(self.handle_vcs_exception, context=Exception)
307 307
308 308 self.config.add_tween(
309 309 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
310 310 )
311 311 self.config.add_request_method(
312 312 'vcsserver.lib.request_counter.get_request_counter',
313 313 'request_count')
314 314
315 315 def wsgi_app(self):
316 316 return self.config.make_wsgi_app()
317 317
318 318 def _vcs_view_params(self, request):
319 319 remote = self._remotes[request.matchdict['backend']]
320 320 payload = msgpack.unpackb(request.body, use_list=True)
321 321
322 322 method = payload.get('method')
323 323 params = payload['params']
324 324 wire = params.get('wire')
325 325 args = params.get('args')
326 326 kwargs = params.get('kwargs')
327 327 context_uid = None
328 328
329 329 if wire:
330 330 try:
331 331 wire['context'] = context_uid = uuid.UUID(wire['context'])
332 332 except KeyError:
333 333 pass
334 334 args.insert(0, wire)
335 335 repo_state_uid = wire.get('repo_state_uid') if wire else None
336 336
337 337 # NOTE(marcink): trading complexity for slight performance
338 338 if log.isEnabledFor(logging.DEBUG):
339 339 no_args_methods = [
340 340
341 341 ]
342 342 if method in no_args_methods:
343 343 call_args = ''
344 344 else:
345 345 call_args = args[1:]
346 346
347 347 log.debug('Method requested:`%s` with args:%s kwargs:%s context_uid: %s, repo_state_uid:%s',
348 348 method, call_args, kwargs, context_uid, repo_state_uid)
349 349
350 350 statsd = request.registry.statsd
351 351 if statsd:
352 352 statsd.incr(
353 353 'vcsserver_method_total', tags=[
354 354 "method:{}".format(method),
355 355 ])
356 356 return payload, remote, method, args, kwargs
357 357
358 358 def vcs_view(self, request):
359 359
360 360 payload, remote, method, args, kwargs = self._vcs_view_params(request)
361 361 payload_id = payload.get('id')
362 362
363 363 try:
364 364 resp = getattr(remote, method)(*args, **kwargs)
365 365 except Exception as e:
366 366 exc_info = list(sys.exc_info())
367 367 exc_type, exc_value, exc_traceback = exc_info
368 368
369 369 org_exc = getattr(e, '_org_exc', None)
370 370 org_exc_name = None
371 371 org_exc_tb = ''
372 372 if org_exc:
373 373 org_exc_name = org_exc.__class__.__name__
374 374 org_exc_tb = getattr(e, '_org_exc_tb', '')
375 375 # replace our "faked" exception with our org
376 376 exc_info[0] = org_exc.__class__
377 377 exc_info[1] = org_exc
378 378
379 379 should_store_exc = True
380 380 if org_exc:
381 381 def get_exc_fqn(_exc_obj):
382 382 module_name = getattr(org_exc.__class__, '__module__', 'UNKNOWN')
383 383 return module_name + '.' + org_exc_name
384 384
385 385 exc_fqn = get_exc_fqn(org_exc)
386 386
387 387 if exc_fqn in ['mercurial.error.RepoLookupError',
388 388 'vcsserver.exceptions.RefNotFoundException']:
389 389 should_store_exc = False
390 390
391 391 if should_store_exc:
392 392 store_exception(id(exc_info), exc_info, request_path=request.path)
393 393
394 394 tb_info = ''.join(
395 395 traceback.format_exception(exc_type, exc_value, exc_traceback))
396 396
397 397 type_ = e.__class__.__name__
398 398 if type_ not in self.ALLOWED_EXCEPTIONS:
399 399 type_ = None
400 400
401 401 resp = {
402 402 'id': payload_id,
403 403 'error': {
404 404 'message': str(e),
405 405 'traceback': tb_info,
406 406 'org_exc': org_exc_name,
407 407 'org_exc_tb': org_exc_tb,
408 408 'type': type_
409 409 }
410 410 }
411 411
412 412 try:
413 413 resp['error']['_vcs_kind'] = getattr(e, '_vcs_kind', None)
414 414 except AttributeError:
415 415 pass
416 416 else:
417 417 resp = {
418 418 'id': payload_id,
419 419 'result': resp
420 420 }
421 421
422 422 return resp
423 423
424 424 def vcs_stream_view(self, request):
425 425 payload, remote, method, args, kwargs = self._vcs_view_params(request)
426 426 # this method has a stream: marker we remove it here
427 427 method = method.split('stream:')[-1]
428 428 chunk_size = safe_int(payload.get('chunk_size')) or 4096
429 429
430 430 try:
431 431 resp = getattr(remote, method)(*args, **kwargs)
432 432 except Exception as e:
433 433 raise
434 434
435 435 def get_chunked_data(method_resp):
436 436 stream = io.BytesIO(method_resp)
437 437 while 1:
438 438 chunk = stream.read(chunk_size)
439 439 if not chunk:
440 440 break
441 441 yield chunk
442 442
443 443 response = Response(app_iter=get_chunked_data(resp))
444 444 response.content_type = 'application/octet-stream'
445 445
446 446 return response
447 447
448 448 def status_view(self, request):
449 449 import vcsserver
450 450 return {'status': 'OK', 'vcsserver_version': safe_str(vcsserver.__version__),
451 451 'pid': os.getpid()}
452 452
453 453 def service_view(self, request):
454 454 import vcsserver
455 455
456 456 payload = msgpack.unpackb(request.body, use_list=True)
457 457 server_config, app_config = {}, {}
458 458
459 459 try:
460 460 path = self.global_config['__file__']
461 461 config = configparser.RawConfigParser()
462 462
463 463 config.read(path)
464 464
465 465 if config.has_section('server:main'):
466 466 server_config = dict(config.items('server:main'))
467 467 if config.has_section('app:main'):
468 468 app_config = dict(config.items('app:main'))
469 469
470 470 except Exception:
471 471 log.exception('Failed to read .ini file for display')
472 472
473 473 environ = list(os.environ.items())
474 474
475 475 resp = {
476 476 'id': payload.get('id'),
477 477 'result': dict(
478 478 version=safe_str(vcsserver.__version__),
479 479 config=server_config,
480 480 app_config=app_config,
481 481 environ=environ,
482 482 payload=payload,
483 483 )
484 484 }
485 485 return resp
486 486
487 487 def _msgpack_renderer_factory(self, info):
488 488 def _render(value, system):
489 489 request = system.get('request')
490 490 if request is not None:
491 491 response = request.response
492 492 ct = response.content_type
493 493 if ct == response.default_content_type:
494 494 response.content_type = 'application/x-msgpack'
495 495
496 496 return msgpack.packb(value, use_bin_type=False)
497 497 return _render
498 498
499 499 def set_env_from_config(self, environ, config):
500 500 dict_conf = {}
501 501 try:
502 502 for elem in config:
503 503 if elem[0] == 'rhodecode':
504 504 dict_conf = json.loads(elem[2])
505 505 break
506 506 except Exception:
507 507 log.exception('Failed to fetch SCM CONFIG')
508 508 return
509 509
510 510 username = dict_conf.get('username')
511 511 if username:
512 512 environ['REMOTE_USER'] = username
513 513 # mercurial specific, some extension api rely on this
514 514 environ['HGUSER'] = username
515 515
516 516 ip = dict_conf.get('ip')
517 517 if ip:
518 518 environ['REMOTE_HOST'] = ip
519 519
520 520 if _is_request_chunked(environ):
521 521 # set the compatibility flag for webob
522 522 environ['wsgi.input_terminated'] = True
523 523
524 524 def hg_proxy(self):
525 525 @wsgiapp
526 526 def _hg_proxy(environ, start_response):
527 527 app = WsgiProxy(self.remote_wsgi.HgRemoteWsgi())
528 528 return app(environ, start_response)
529 529 return _hg_proxy
530 530
531 531 def git_proxy(self):
532 532 @wsgiapp
533 533 def _git_proxy(environ, start_response):
534 534 app = WsgiProxy(self.remote_wsgi.GitRemoteWsgi())
535 535 return app(environ, start_response)
536 536 return _git_proxy
537 537
538 538 def hg_stream(self):
539 539 if self._use_echo_app:
540 540 @wsgiapp
541 541 def _hg_stream(environ, start_response):
542 542 app = EchoApp('fake_path', 'fake_name', None)
543 543 return app(environ, start_response)
544 544 return _hg_stream
545 545 else:
546 546 @wsgiapp
547 547 def _hg_stream(environ, start_response):
548 548 log.debug('http-app: handling hg stream')
549 549 repo_path = environ['HTTP_X_RC_REPO_PATH']
550 550 repo_name = environ['HTTP_X_RC_REPO_NAME']
551 551 packed_config = base64.b64decode(
552 552 environ['HTTP_X_RC_REPO_CONFIG'])
553 553 config = msgpack.unpackb(packed_config)
554 554 app = scm_app.create_hg_wsgi_app(
555 555 repo_path, repo_name, config)
556 556
557 557 # Consistent path information for hgweb
558 558 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
559 559 environ['REPO_NAME'] = repo_name
560 560 self.set_env_from_config(environ, config)
561 561
562 562 log.debug('http-app: starting app handler '
563 563 'with %s and process request', app)
564 564 return app(environ, ResponseFilter(start_response))
565 565 return _hg_stream
566 566
567 567 def git_stream(self):
568 568 if self._use_echo_app:
569 569 @wsgiapp
570 570 def _git_stream(environ, start_response):
571 571 app = EchoApp('fake_path', 'fake_name', None)
572 572 return app(environ, start_response)
573 573 return _git_stream
574 574 else:
575 575 @wsgiapp
576 576 def _git_stream(environ, start_response):
577 577 log.debug('http-app: handling git stream')
578 578 repo_path = environ['HTTP_X_RC_REPO_PATH']
579 579 repo_name = environ['HTTP_X_RC_REPO_NAME']
580 580 packed_config = base64.b64decode(
581 581 environ['HTTP_X_RC_REPO_CONFIG'])
582 582 config = msgpack.unpackb(packed_config)
583 583
584 584 environ['PATH_INFO'] = environ['HTTP_X_RC_PATH_INFO']
585 585 self.set_env_from_config(environ, config)
586 586
587 587 content_type = environ.get('CONTENT_TYPE', '')
588 588
589 589 path = environ['PATH_INFO']
590 590 is_lfs_request = GIT_LFS_CONTENT_TYPE in content_type
591 591 log.debug(
592 592 'LFS: Detecting if request `%s` is LFS server path based '
593 593 'on content type:`%s`, is_lfs:%s',
594 594 path, content_type, is_lfs_request)
595 595
596 596 if not is_lfs_request:
597 597 # fallback detection by path
598 598 if GIT_LFS_PROTO_PAT.match(path):
599 599 is_lfs_request = True
600 600 log.debug(
601 601 'LFS: fallback detection by path of: `%s`, is_lfs:%s',
602 602 path, is_lfs_request)
603 603
604 604 if is_lfs_request:
605 605 app = scm_app.create_git_lfs_wsgi_app(
606 606 repo_path, repo_name, config)
607 607 else:
608 608 app = scm_app.create_git_wsgi_app(
609 609 repo_path, repo_name, config)
610 610
611 611 log.debug('http-app: starting app handler '
612 612 'with %s and process request', app)
613 613
614 614 return app(environ, start_response)
615 615
616 616 return _git_stream
617 617
618 618 def handle_vcs_exception(self, exception, request):
619 619 _vcs_kind = getattr(exception, '_vcs_kind', '')
620 620 if _vcs_kind == 'repo_locked':
621 621 # Get custom repo-locked status code if present.
622 622 status_code = request.headers.get('X-RC-Locked-Status-Code')
623 623 return HTTPRepoLocked(
624 624 title=exception.message, status_code=status_code)
625 625
626 626 elif _vcs_kind == 'repo_branch_protected':
627 627 # Get custom repo-branch-protected status code if present.
628 628 return HTTPRepoBranchProtected(title=exception.message)
629 629
630 630 exc_info = request.exc_info
631 631 store_exception(id(exc_info), exc_info)
632 632
633 633 traceback_info = 'unavailable'
634 634 if request.exc_info:
635 635 exc_type, exc_value, exc_tb = request.exc_info
636 636 traceback_info = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
637 637
638 638 log.error(
639 639 'error occurred handling this request for path: %s, \n tb: %s',
640 640 request.path, traceback_info)
641 641
642 642 statsd = request.registry.statsd
643 643 if statsd:
644 644 exc_type = "{}.{}".format(exception.__class__.__module__, exception.__class__.__name__)
645 645 statsd.incr('vcsserver_exception_total',
646 646 tags=["type:{}".format(exc_type)])
647 647 raise exception
648 648
649 649
650 650 class ResponseFilter(object):
651 651
652 652 def __init__(self, start_response):
653 653 self._start_response = start_response
654 654
655 655 def __call__(self, status, response_headers, exc_info=None):
656 656 headers = tuple(
657 657 (h, v) for h, v in response_headers
658 658 if not wsgiref.util.is_hop_by_hop(h))
659 659 return self._start_response(status, headers, exc_info)
660 660
661 661
662 662 def sanitize_settings_and_apply_defaults(global_config, settings):
663 663 global_settings_maker = SettingsMaker(global_config)
664 664 settings_maker = SettingsMaker(settings)
665 665
666 666 settings_maker.make_setting('logging.autoconfigure', False, parser='bool')
667 667
668 668 logging_conf = os.path.join(os.path.dirname(global_config.get('__file__')), 'logging.ini')
669 669 settings_maker.enable_logging(logging_conf)
670 670
671 671 # Default includes, possible to change as a user
672 672 pyramid_includes = settings_maker.make_setting('pyramid.includes', [], parser='list:newline')
673 673 log.debug("Using the following pyramid.includes: %s", pyramid_includes)
674 674
675 675 settings_maker.make_setting('__file__', global_config.get('__file__'))
676 676
677 677 settings_maker.make_setting('pyramid.default_locale_name', 'en')
678 678 settings_maker.make_setting('locale', 'en_US.UTF-8')
679 679
680 680 settings_maker.make_setting('core.binary_dir', '')
681 681
682 682 temp_store = tempfile.gettempdir()
683 683 default_cache_dir = os.path.join(temp_store, 'rc_cache')
684 684 # save default, cache dir, and use it for all backends later.
685 685 default_cache_dir = settings_maker.make_setting(
686 686 'cache_dir',
687 687 default=default_cache_dir, default_when_empty=True,
688 688 parser='dir:ensured')
689 689
690 690 # exception store cache
691 691 settings_maker.make_setting(
692 692 'exception_tracker.store_path',
693 693 default=os.path.join(default_cache_dir, 'exc_store'), default_when_empty=True,
694 694 parser='dir:ensured'
695 695 )
696 696
697 697 # repo_object cache defaults
698 698 settings_maker.make_setting(
699 699 'rc_cache.repo_object.backend',
700 700 default='dogpile.cache.rc.file_namespace',
701 701 parser='string')
702 702 settings_maker.make_setting(
703 703 'rc_cache.repo_object.expiration_time',
704 704 default=30 * 24 * 60 * 60, # 30days
705 705 parser='int')
706 706 settings_maker.make_setting(
707 707 'rc_cache.repo_object.arguments.filename',
708 708 default=os.path.join(default_cache_dir, 'vcsserver_cache_repo_object.db'),
709 709 parser='string')
710 710
711 711 # statsd
712 712 settings_maker.make_setting('statsd.enabled', False, parser='bool')
713 713 settings_maker.make_setting('statsd.statsd_host', 'statsd-exporter', parser='string')
714 714 settings_maker.make_setting('statsd.statsd_port', 9125, parser='int')
715 715 settings_maker.make_setting('statsd.statsd_prefix', '')
716 716 settings_maker.make_setting('statsd.statsd_ipv6', False, parser='bool')
717 717
718 718 settings_maker.env_expand()
719 719
720 720
721 721 def main(global_config, **settings):
722 722 start_time = time.time()
723 723 log.info('Pyramid app config starting')
724 724
725 725 if MercurialFactory:
726 726 hgpatches.patch_largefiles_capabilities()
727 727 hgpatches.patch_subrepo_type_mapping()
728 728
729 729 # Fill in and sanitize the defaults & do ENV expansion
730 730 sanitize_settings_and_apply_defaults(global_config, settings)
731 731
732 732 # init and bootstrap StatsdClient
733 733 StatsdClient.setup(settings)
734 734
735 735 pyramid_app = HTTPApplication(settings=settings, global_config=global_config).wsgi_app()
736 736 total_time = time.time() - start_time
737 737 log.info('Pyramid app `%s` created and configured in %.2fs',
738 738 getattr(pyramid_app, 'func_name', 'pyramid_app'), total_time)
739 739 return pyramid_app
740 740
741 741
@@ -1,1317 +1,1327 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import collections
19 19 import logging
20 20 import os
21 21 import posixpath as vcspath
22 22 import re
23 23 import stat
24 24 import traceback
25 25 import urllib.request, urllib.parse, urllib.error
26 26 import urllib.request, urllib.error, urllib.parse
27 27 from functools import wraps
28 28
29 29 import more_itertools
30 30 import pygit2
31 31 from pygit2 import Repository as LibGit2Repo
32 32 from pygit2 import index as LibGit2Index
33 33 from dulwich import index, objects
34 34 from dulwich.client import HttpGitClient, LocalGitClient
35 35 from dulwich.errors import (
36 36 NotGitRepository, ChecksumMismatch, WrongObjectException,
37 37 MissingCommitError, ObjectMissing, HangupException,
38 38 UnexpectedCommandError)
39 39 from dulwich.repo import Repo as DulwichRepo
40 40 from dulwich.server import update_server_info
41 41
42 42 from vcsserver import exceptions, settings, subprocessio
43 43 from vcsserver.str_utils import safe_str, safe_int, safe_bytes
44 44 from vcsserver.base import RepoFactory, obfuscate_qs, ArchiveNode, archive_repo
45 45 from vcsserver.hgcompat import (
46 46 hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler)
47 47 from vcsserver.git_lfs.lib import LFSOidStore
48 48 from vcsserver.vcs_base import RemoteBase
49 49
50 50 DIR_STAT = stat.S_IFDIR
51 51 FILE_MODE = stat.S_IFMT
52 52 GIT_LINK = objects.S_IFGITLINK
53 PEELED_REF_MARKER = '^{}'
53 PEELED_REF_MARKER = b'^{}'
54 54
55 55
56 56 log = logging.getLogger(__name__)
57 57
58 58
59 59 def reraise_safe_exceptions(func):
60 60 """Converts Dulwich exceptions to something neutral."""
61 61
62 62 @wraps(func)
63 63 def wrapper(*args, **kwargs):
64 64 try:
65 65 return func(*args, **kwargs)
66 66 except (ChecksumMismatch, WrongObjectException, MissingCommitError, ObjectMissing,) as e:
67 67 exc = exceptions.LookupException(org_exc=e)
68 68 raise exc(safe_str(e))
69 69 except (HangupException, UnexpectedCommandError) as e:
70 70 exc = exceptions.VcsException(org_exc=e)
71 71 raise exc(safe_str(e))
72 72 except Exception as e:
73 73 # NOTE(marcink): becuase of how dulwich handles some exceptions
74 74 # (KeyError on empty repos), we cannot track this and catch all
75 75 # exceptions, it's an exceptions from other handlers
76 76 #if not hasattr(e, '_vcs_kind'):
77 77 #log.exception("Unhandled exception in git remote call")
78 78 #raise_from_original(exceptions.UnhandledException)
79 79 raise
80 80 return wrapper
81 81
82 82
83 83 class Repo(DulwichRepo):
84 84 """
85 85 A wrapper for dulwich Repo class.
86 86
87 87 Since dulwich is sometimes keeping .idx file descriptors open, it leads to
88 88 "Too many open files" error. We need to close all opened file descriptors
89 89 once the repo object is destroyed.
90 90 """
91 91 def __del__(self):
92 92 if hasattr(self, 'object_store'):
93 93 self.close()
94 94
95 95
96 96 class Repository(LibGit2Repo):
97 97
98 98 def __enter__(self):
99 99 return self
100 100
101 101 def __exit__(self, exc_type, exc_val, exc_tb):
102 102 self.free()
103 103
104 104
105 105 class GitFactory(RepoFactory):
106 106 repo_type = 'git'
107 107
108 108 def _create_repo(self, wire, create, use_libgit2=False):
109 109 if use_libgit2:
110 110 return Repository(wire['path'])
111 111 else:
112 112 repo_path = safe_str(wire['path'], to_encoding=settings.WIRE_ENCODING)
113 113 return Repo(repo_path)
114 114
115 115 def repo(self, wire, create=False, use_libgit2=False):
116 116 """
117 117 Get a repository instance for the given path.
118 118 """
119 119 return self._create_repo(wire, create, use_libgit2)
120 120
121 121 def repo_libgit2(self, wire):
122 122 return self.repo(wire, use_libgit2=True)
123 123
124 124
125 125 class GitRemote(RemoteBase):
126 126
127 127 def __init__(self, factory):
128 128 self._factory = factory
129 129 self._bulk_methods = {
130 130 "date": self.date,
131 131 "author": self.author,
132 132 "branch": self.branch,
133 133 "message": self.message,
134 134 "parents": self.parents,
135 135 "_commit": self.revision,
136 136 }
137 137
138 138 def _wire_to_config(self, wire):
139 139 if 'config' in wire:
140 140 return dict([(x[0] + '_' + x[1], x[2]) for x in wire['config']])
141 141 return {}
142 142
143 143 def _remote_conf(self, config):
144 144 params = [
145 145 '-c', 'core.askpass=""',
146 146 ]
147 147 ssl_cert_dir = config.get('vcs_ssl_dir')
148 148 if ssl_cert_dir:
149 149 params.extend(['-c', 'http.sslCAinfo={}'.format(ssl_cert_dir)])
150 150 return params
151 151
152 152 @reraise_safe_exceptions
153 153 def discover_git_version(self):
154 154 stdout, _ = self.run_git_command(
155 155 {}, ['--version'], _bare=True, _safe=True)
156 156 prefix = b'git version'
157 157 if stdout.startswith(prefix):
158 158 stdout = stdout[len(prefix):]
159 return stdout.strip()
159 return safe_str(stdout.strip())
160 160
161 161 @reraise_safe_exceptions
162 162 def is_empty(self, wire):
163 163 repo_init = self._factory.repo_libgit2(wire)
164 164 with repo_init as repo:
165 165
166 166 try:
167 167 has_head = repo.head.name
168 168 if has_head:
169 169 return False
170 170
171 171 # NOTE(marcink): check again using more expensive method
172 172 return repo.is_empty
173 173 except Exception:
174 174 pass
175 175
176 176 return True
177 177
178 178 @reraise_safe_exceptions
179 179 def assert_correct_path(self, wire):
180 180 cache_on, context_uid, repo_id = self._cache_on(wire)
181 181 region = self._region(wire)
182 182
183 183 @region.conditional_cache_on_arguments(condition=cache_on)
184 184 def _assert_correct_path(_context_uid, _repo_id):
185 185 try:
186 186 repo_init = self._factory.repo_libgit2(wire)
187 187 with repo_init as repo:
188 188 pass
189 189 except pygit2.GitError:
190 190 path = wire.get('path')
191 191 tb = traceback.format_exc()
192 192 log.debug("Invalid Git path `%s`, tb: %s", path, tb)
193 193 return False
194 194
195 195 return True
196 196 return _assert_correct_path(context_uid, repo_id)
197 197
198 198 @reraise_safe_exceptions
199 199 def bare(self, wire):
200 200 repo_init = self._factory.repo_libgit2(wire)
201 201 with repo_init as repo:
202 202 return repo.is_bare
203 203
204 204 @reraise_safe_exceptions
205 205 def blob_as_pretty_string(self, wire, sha):
206 206 repo_init = self._factory.repo_libgit2(wire)
207 207 with repo_init as repo:
208 208 blob_obj = repo[sha]
209 209 blob = blob_obj.data
210 210 return blob
211 211
212 212 @reraise_safe_exceptions
213 213 def blob_raw_length(self, wire, sha):
214 214 cache_on, context_uid, repo_id = self._cache_on(wire)
215 215 region = self._region(wire)
216 216
217 217 @region.conditional_cache_on_arguments(condition=cache_on)
218 218 def _blob_raw_length(_repo_id, _sha):
219 219
220 220 repo_init = self._factory.repo_libgit2(wire)
221 221 with repo_init as repo:
222 222 blob = repo[sha]
223 223 return blob.size
224 224
225 225 return _blob_raw_length(repo_id, sha)
226 226
227 227 def _parse_lfs_pointer(self, raw_content):
228 228 spec_string = b'version https://git-lfs.github.com/spec'
229 229 if raw_content and raw_content.startswith(spec_string):
230 230
231 231 pattern = re.compile(rb"""
232 232 (?:\n)?
233 233 ^version[ ]https://git-lfs\.github\.com/spec/(?P<spec_ver>v\d+)\n
234 234 ^oid[ ] sha256:(?P<oid_hash>[0-9a-f]{64})\n
235 235 ^size[ ](?P<oid_size>[0-9]+)\n
236 236 (?:\n)?
237 237 """, re.VERBOSE | re.MULTILINE)
238 238 match = pattern.match(raw_content)
239 239 if match:
240 240 return match.groupdict()
241 241
242 242 return {}
243 243
244 244 @reraise_safe_exceptions
245 245 def is_large_file(self, wire, commit_id):
246 246 cache_on, context_uid, repo_id = self._cache_on(wire)
247 247 region = self._region(wire)
248 248
249 249 @region.conditional_cache_on_arguments(condition=cache_on)
250 250 def _is_large_file(_repo_id, _sha):
251 251 repo_init = self._factory.repo_libgit2(wire)
252 252 with repo_init as repo:
253 253 blob = repo[commit_id]
254 254 if blob.is_binary:
255 255 return {}
256 256
257 257 return self._parse_lfs_pointer(blob.data)
258 258
259 259 return _is_large_file(repo_id, commit_id)
260 260
261 261 @reraise_safe_exceptions
262 262 def is_binary(self, wire, tree_id):
263 263 cache_on, context_uid, repo_id = self._cache_on(wire)
264 264 region = self._region(wire)
265 265
266 266 @region.conditional_cache_on_arguments(condition=cache_on)
267 267 def _is_binary(_repo_id, _tree_id):
268 268 repo_init = self._factory.repo_libgit2(wire)
269 269 with repo_init as repo:
270 270 blob_obj = repo[tree_id]
271 271 return blob_obj.is_binary
272 272
273 273 return _is_binary(repo_id, tree_id)
274 274
275 275 @reraise_safe_exceptions
276 276 def in_largefiles_store(self, wire, oid):
277 277 conf = self._wire_to_config(wire)
278 278 repo_init = self._factory.repo_libgit2(wire)
279 279 with repo_init as repo:
280 280 repo_name = repo.path
281 281
282 282 store_location = conf.get('vcs_git_lfs_store_location')
283 283 if store_location:
284 284
285 285 store = LFSOidStore(
286 286 oid=oid, repo=repo_name, store_location=store_location)
287 287 return store.has_oid()
288 288
289 289 return False
290 290
291 291 @reraise_safe_exceptions
292 292 def store_path(self, wire, oid):
293 293 conf = self._wire_to_config(wire)
294 294 repo_init = self._factory.repo_libgit2(wire)
295 295 with repo_init as repo:
296 296 repo_name = repo.path
297 297
298 298 store_location = conf.get('vcs_git_lfs_store_location')
299 299 if store_location:
300 300 store = LFSOidStore(
301 301 oid=oid, repo=repo_name, store_location=store_location)
302 302 return store.oid_path
303 303 raise ValueError('Unable to fetch oid with path {}'.format(oid))
304 304
305 305 @reraise_safe_exceptions
306 306 def bulk_request(self, wire, rev, pre_load):
307 307 cache_on, context_uid, repo_id = self._cache_on(wire)
308 308 region = self._region(wire)
309 309
310 310 @region.conditional_cache_on_arguments(condition=cache_on)
311 311 def _bulk_request(_repo_id, _rev, _pre_load):
312 312 result = {}
313 313 for attr in pre_load:
314 314 try:
315 315 method = self._bulk_methods[attr]
316 316 args = [wire, rev]
317 317 result[attr] = method(*args)
318 318 except KeyError as e:
319 319 raise exceptions.VcsException(e)(
320 320 "Unknown bulk attribute: %s" % attr)
321 321 return result
322 322
323 323 return _bulk_request(repo_id, rev, sorted(pre_load))
324 324
325 325 def _build_opener(self, url):
326 326 handlers = []
327 327 url_obj = url_parser(url)
328 328 _, authinfo = url_obj.authinfo()
329 329
330 330 if authinfo:
331 331 # create a password manager
332 332 passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
333 333 passmgr.add_password(*authinfo)
334 334
335 335 handlers.extend((httpbasicauthhandler(passmgr),
336 336 httpdigestauthhandler(passmgr)))
337 337
338 338 return urllib.request.build_opener(*handlers)
339 339
340 340 def _type_id_to_name(self, type_id: int):
341 341 return {
342 342 1: 'commit',
343 343 2: 'tree',
344 344 3: 'blob',
345 345 4: 'tag'
346 346 }[type_id]
347 347
348 348 @reraise_safe_exceptions
349 349 def check_url(self, url, config):
350 350 url_obj = url_parser(url)
351 351 test_uri, _ = url_obj.authinfo()
352 352 url_obj.passwd = '*****' if url_obj.passwd else url_obj.passwd
353 353 url_obj.query = obfuscate_qs(url_obj.query)
354 354 cleaned_uri = str(url_obj)
355 355 log.info("Checking URL for remote cloning/import: %s", cleaned_uri)
356 356
357 357 if not test_uri.endswith('info/refs'):
358 358 test_uri = test_uri.rstrip('/') + '/info/refs'
359 359
360 360 o = self._build_opener(url)
361 361 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
362 362
363 363 q = {"service": 'git-upload-pack'}
364 364 qs = '?%s' % urllib.parse.urlencode(q)
365 365 cu = "%s%s" % (test_uri, qs)
366 366 req = urllib.request.Request(cu, None, {})
367 367
368 368 try:
369 369 log.debug("Trying to open URL %s", cleaned_uri)
370 370 resp = o.open(req)
371 371 if resp.code != 200:
372 372 raise exceptions.URLError()('Return Code is not 200')
373 373 except Exception as e:
374 374 log.warning("URL cannot be opened: %s", cleaned_uri, exc_info=True)
375 375 # means it cannot be cloned
376 376 raise exceptions.URLError(e)("[%s] org_exc: %s" % (cleaned_uri, e))
377 377
378 378 # now detect if it's proper git repo
379 379 gitdata = resp.read()
380 380 if 'service=git-upload-pack' in gitdata:
381 381 pass
382 382 elif re.findall(r'[0-9a-fA-F]{40}\s+refs', gitdata):
383 383 # old style git can return some other format !
384 384 pass
385 385 else:
386 386 raise exceptions.URLError()(
387 387 "url [%s] does not look like an git" % (cleaned_uri,))
388 388
389 389 return True
390 390
391 391 @reraise_safe_exceptions
392 392 def clone(self, wire, url, deferred, valid_refs, update_after_clone):
393 393 # TODO(marcink): deprecate this method. Last i checked we don't use it anymore
394 394 remote_refs = self.pull(wire, url, apply_refs=False)
395 395 repo = self._factory.repo(wire)
396 396 if isinstance(valid_refs, list):
397 397 valid_refs = tuple(valid_refs)
398 398
399 399 for k in remote_refs:
400 400 # only parse heads/tags and skip so called deferred tags
401 401 if k.startswith(valid_refs) and not k.endswith(deferred):
402 402 repo[k] = remote_refs[k]
403 403
404 404 if update_after_clone:
405 405 # we want to checkout HEAD
406 406 repo["HEAD"] = remote_refs["HEAD"]
407 407 index.build_index_from_tree(repo.path, repo.index_path(),
408 408 repo.object_store, repo["HEAD"].tree)
409 409
410 410 @reraise_safe_exceptions
411 411 def branch(self, wire, commit_id):
412 412 cache_on, context_uid, repo_id = self._cache_on(wire)
413 413 region = self._region(wire)
414 414 @region.conditional_cache_on_arguments(condition=cache_on)
415 415 def _branch(_context_uid, _repo_id, _commit_id):
416 416 regex = re.compile('^refs/heads')
417 417
418 418 def filter_with(ref):
419 419 return regex.match(ref[0]) and ref[1] == _commit_id
420 420
421 421 branches = list(filter(filter_with, list(self.get_refs(wire).items())))
422 422 return [x[0].split('refs/heads/')[-1] for x in branches]
423 423
424 424 return _branch(context_uid, repo_id, commit_id)
425 425
426 426 @reraise_safe_exceptions
427 427 def commit_branches(self, wire, commit_id):
428 428 cache_on, context_uid, repo_id = self._cache_on(wire)
429 429 region = self._region(wire)
430 430 @region.conditional_cache_on_arguments(condition=cache_on)
431 431 def _commit_branches(_context_uid, _repo_id, _commit_id):
432 432 repo_init = self._factory.repo_libgit2(wire)
433 433 with repo_init as repo:
434 434 branches = [x for x in repo.branches.with_commit(_commit_id)]
435 435 return branches
436 436
437 437 return _commit_branches(context_uid, repo_id, commit_id)
438 438
439 439 @reraise_safe_exceptions
440 440 def add_object(self, wire, content):
441 441 repo_init = self._factory.repo_libgit2(wire)
442 442 with repo_init as repo:
443 443 blob = objects.Blob()
444 444 blob.set_raw_string(content)
445 445 repo.object_store.add_object(blob)
446 446 return blob.id
447 447
448 448 # TODO: this is quite complex, check if that can be simplified
449 449 @reraise_safe_exceptions
450 450 def commit(self, wire, commit_data, branch, commit_tree, updated, removed):
451 451 # Defines the root tree
452 452 class _Root(object):
453 453 def __repr__(self):
454 454 return 'ROOT TREE'
455 455 ROOT = _Root()
456 456
457 457 repo = self._factory.repo(wire)
458 458 object_store = repo.object_store
459 459
460 460 # Create tree and populates it with blobs
461 461
462 462 if commit_tree and repo[commit_tree]:
463 463 git_commit = repo[commit_data['parents'][0]]
464 464 commit_tree = repo[git_commit.tree] # root tree
465 465 else:
466 466 commit_tree = objects.Tree()
467 467
468 468 for node in updated:
469 469 # Compute subdirs if needed
470 470 dirpath, nodename = vcspath.split(node['path'])
471 471 dirnames = list(map(safe_str, dirpath and dirpath.split('/') or []))
472 472 parent = commit_tree
473 473 ancestors = [('', parent)]
474 474
475 475 # Tries to dig for the deepest existing tree
476 476 while dirnames:
477 477 curdir = dirnames.pop(0)
478 478 try:
479 479 dir_id = parent[curdir][1]
480 480 except KeyError:
481 481 # put curdir back into dirnames and stops
482 482 dirnames.insert(0, curdir)
483 483 break
484 484 else:
485 485 # If found, updates parent
486 486 parent = repo[dir_id]
487 487 ancestors.append((curdir, parent))
488 488 # Now parent is deepest existing tree and we need to create
489 489 # subtrees for dirnames (in reverse order)
490 490 # [this only applies for nodes from added]
491 491 new_trees = []
492 492
493 493 blob = objects.Blob.from_string(node['content'])
494 494
495 495 if dirnames:
496 496 # If there are trees which should be created we need to build
497 497 # them now (in reverse order)
498 498 reversed_dirnames = list(reversed(dirnames))
499 499 curtree = objects.Tree()
500 500 curtree[node['node_path']] = node['mode'], blob.id
501 501 new_trees.append(curtree)
502 502 for dirname in reversed_dirnames[:-1]:
503 503 newtree = objects.Tree()
504 504 newtree[dirname] = (DIR_STAT, curtree.id)
505 505 new_trees.append(newtree)
506 506 curtree = newtree
507 507 parent[reversed_dirnames[-1]] = (DIR_STAT, curtree.id)
508 508 else:
509 509 parent.add(name=node['node_path'], mode=node['mode'], hexsha=blob.id)
510 510
511 511 new_trees.append(parent)
512 512 # Update ancestors
513 513 reversed_ancestors = reversed(
514 514 [(a[1], b[1], b[0]) for a, b in zip(ancestors, ancestors[1:])])
515 515 for parent, tree, path in reversed_ancestors:
516 516 parent[path] = (DIR_STAT, tree.id)
517 517 object_store.add_object(tree)
518 518
519 519 object_store.add_object(blob)
520 520 for tree in new_trees:
521 521 object_store.add_object(tree)
522 522
523 523 for node_path in removed:
524 524 paths = node_path.split('/')
525 525 tree = commit_tree # start with top-level
526 526 trees = [{'tree': tree, 'path': ROOT}]
527 527 # Traverse deep into the forest...
528 528 # resolve final tree by iterating the path.
529 529 # e.g a/b/c.txt will get
530 530 # - root as tree then
531 531 # - 'a' as tree,
532 532 # - 'b' as tree,
533 533 # - stop at c as blob.
534 534 for path in paths:
535 535 try:
536 536 obj = repo[tree[path][1]]
537 537 if isinstance(obj, objects.Tree):
538 538 trees.append({'tree': obj, 'path': path})
539 539 tree = obj
540 540 except KeyError:
541 541 break
542 542 #PROBLEM:
543 543 """
544 544 We're not editing same reference tree object
545 545 """
546 546 # Cut down the blob and all rotten trees on the way back...
547 547 for path, tree_data in reversed(list(zip(paths, trees))):
548 548 tree = tree_data['tree']
549 549 tree.__delitem__(path)
550 550 # This operation edits the tree, we need to mark new commit back
551 551
552 552 if len(tree) > 0:
553 553 # This tree still has elements - don't remove it or any
554 554 # of it's parents
555 555 break
556 556
557 557 object_store.add_object(commit_tree)
558 558
559 559 # Create commit
560 560 commit = objects.Commit()
561 561 commit.tree = commit_tree.id
562 bytes_keys = [
563 'author',
564 'committer',
565 'message',
566 'encoding'
567 ]
568
562 569 for k, v in commit_data.items():
570 if k in bytes_keys:
571 v = safe_bytes(v)
563 572 setattr(commit, k, v)
573
564 574 object_store.add_object(commit)
565 575
566 self.create_branch(wire, branch, commit.id)
576 self.create_branch(wire, branch, safe_str(commit.id))
567 577
568 578 # dulwich set-ref
569 ref = 'refs/heads/%s' % branch
570 repo.refs[ref] = commit.id
579 repo.refs[safe_bytes(f'refs/heads/{branch}')] = commit.id
571 580
572 581 return commit.id
573 582
574 583 @reraise_safe_exceptions
575 584 def pull(self, wire, url, apply_refs=True, refs=None, update_after=False):
576 585 if url != 'default' and '://' not in url:
577 586 client = LocalGitClient(url)
578 587 else:
579 588 url_obj = url_parser(url)
580 589 o = self._build_opener(url)
581 590 url, _ = url_obj.authinfo()
582 591 client = HttpGitClient(base_url=url, opener=o)
583 592 repo = self._factory.repo(wire)
584 593
585 594 determine_wants = repo.object_store.determine_wants_all
586 595 if refs:
587 596 def determine_wants_requested(references):
588 597 return [references[r] for r in references if r in refs]
589 598 determine_wants = determine_wants_requested
590 599
591 600 try:
592 601 remote_refs = client.fetch(
593 602 path=url, target=repo, determine_wants=determine_wants)
594 603 except NotGitRepository as e:
595 604 log.warning(
596 605 'Trying to fetch from "%s" failed, not a Git repository.', url)
597 606 # Exception can contain unicode which we convert
598 607 raise exceptions.AbortException(e)(repr(e))
599 608
600 609 # mikhail: client.fetch() returns all the remote refs, but fetches only
601 610 # refs filtered by `determine_wants` function. We need to filter result
602 611 # as well
603 612 if refs:
604 613 remote_refs = {k: remote_refs[k] for k in remote_refs if k in refs}
605 614
606 615 if apply_refs:
607 616 # TODO: johbo: Needs proper test coverage with a git repository
608 617 # that contains a tag object, so that we would end up with
609 618 # a peeled ref at this point.
610 619 for k in remote_refs:
611 620 if k.endswith(PEELED_REF_MARKER):
612 621 log.debug("Skipping peeled reference %s", k)
613 622 continue
614 623 repo[k] = remote_refs[k]
615 624
616 625 if refs and not update_after:
617 626 # mikhail: explicitly set the head to the last ref.
618 627 repo["HEAD"] = remote_refs[refs[-1]]
619 628
620 629 if update_after:
621 630 # we want to checkout HEAD
622 631 repo["HEAD"] = remote_refs["HEAD"]
623 632 index.build_index_from_tree(repo.path, repo.index_path(),
624 633 repo.object_store, repo["HEAD"].tree)
625 634 return remote_refs
626 635
627 636 @reraise_safe_exceptions
628 637 def sync_fetch(self, wire, url, refs=None, all_refs=False):
629 638 repo = self._factory.repo(wire)
630 639 if refs and not isinstance(refs, (list, tuple)):
631 640 refs = [refs]
632 641
633 642 config = self._wire_to_config(wire)
634 643 # get all remote refs we'll use to fetch later
635 644 cmd = ['ls-remote']
636 645 if not all_refs:
637 646 cmd += ['--heads', '--tags']
638 647 cmd += [url]
639 648 output, __ = self.run_git_command(
640 649 wire, cmd, fail_on_stderr=False,
641 650 _copts=self._remote_conf(config),
642 651 extra_env={'GIT_TERMINAL_PROMPT': '0'})
643 652
644 653 remote_refs = collections.OrderedDict()
645 654 fetch_refs = []
646 655
647 656 for ref_line in output.splitlines():
648 sha, ref = ref_line.split('\t')
657 sha, ref = ref_line.split(b'\t')
649 658 sha = sha.strip()
650 659 if ref in remote_refs:
651 660 # duplicate, skip
652 661 continue
653 662 if ref.endswith(PEELED_REF_MARKER):
654 663 log.debug("Skipping peeled reference %s", ref)
655 664 continue
656 665 # don't sync HEAD
657 if ref in ['HEAD']:
666 if ref in [b'HEAD']:
658 667 continue
659 668
660 669 remote_refs[ref] = sha
661 670
662 671 if refs and sha in refs:
663 672 # we filter fetch using our specified refs
664 fetch_refs.append('{}:{}'.format(ref, ref))
673 fetch_refs.append(f'{safe_str(ref)}:{safe_str(ref)}')
665 674 elif not refs:
666 fetch_refs.append('{}:{}'.format(ref, ref))
675 fetch_refs.append(f'{safe_str(ref)}:{safe_str(ref)}')
667 676 log.debug('Finished obtaining fetch refs, total: %s', len(fetch_refs))
668 677
669 678 if fetch_refs:
670 679 for chunk in more_itertools.chunked(fetch_refs, 1024 * 4):
671 680 fetch_refs_chunks = list(chunk)
672 681 log.debug('Fetching %s refs from import url', len(fetch_refs_chunks))
673 682 self.run_git_command(
674 683 wire, ['fetch', url, '--force', '--prune', '--'] + fetch_refs_chunks,
675 684 fail_on_stderr=False,
676 685 _copts=self._remote_conf(config),
677 686 extra_env={'GIT_TERMINAL_PROMPT': '0'})
678 687
679 688 return remote_refs
680 689
681 690 @reraise_safe_exceptions
682 691 def sync_push(self, wire, url, refs=None):
683 692 if not self.check_url(url, wire):
684 693 return
685 694 config = self._wire_to_config(wire)
686 695 self._factory.repo(wire)
687 696 self.run_git_command(
688 697 wire, ['push', url, '--mirror'], fail_on_stderr=False,
689 698 _copts=self._remote_conf(config),
690 699 extra_env={'GIT_TERMINAL_PROMPT': '0'})
691 700
692 701 @reraise_safe_exceptions
693 702 def get_remote_refs(self, wire, url):
694 703 repo = Repo(url)
695 704 return repo.get_refs()
696 705
697 706 @reraise_safe_exceptions
698 707 def get_description(self, wire):
699 708 repo = self._factory.repo(wire)
700 709 return repo.get_description()
701 710
702 711 @reraise_safe_exceptions
703 712 def get_missing_revs(self, wire, rev1, rev2, path2):
704 713 repo = self._factory.repo(wire)
705 714 LocalGitClient(thin_packs=False).fetch(path2, repo)
706 715
707 716 wire_remote = wire.copy()
708 717 wire_remote['path'] = path2
709 718 repo_remote = self._factory.repo(wire_remote)
710 719 LocalGitClient(thin_packs=False).fetch(wire["path"], repo_remote)
711 720
712 721 revs = [
713 722 x.commit.id
714 723 for x in repo_remote.get_walker(include=[rev2], exclude=[rev1])]
715 724 return revs
716 725
717 726 @reraise_safe_exceptions
718 727 def get_object(self, wire, sha, maybe_unreachable=False):
719 728 cache_on, context_uid, repo_id = self._cache_on(wire)
720 729 region = self._region(wire)
721 730
722 731 @region.conditional_cache_on_arguments(condition=cache_on)
723 732 def _get_object(_context_uid, _repo_id, _sha):
724 733 repo_init = self._factory.repo_libgit2(wire)
725 734 with repo_init as repo:
726 735
727 736 missing_commit_err = 'Commit {} does not exist for `{}`'.format(sha, wire['path'])
728 737 try:
729 738 commit = repo.revparse_single(sha)
730 739 except KeyError:
731 740 # NOTE(marcink): KeyError doesn't give us any meaningful information
732 741 # here, we instead give something more explicit
733 742 e = exceptions.RefNotFoundException('SHA: %s not found', sha)
734 743 raise exceptions.LookupException(e)(missing_commit_err)
735 744 except ValueError as e:
736 745 raise exceptions.LookupException(e)(missing_commit_err)
737 746
738 747 is_tag = False
739 748 if isinstance(commit, pygit2.Tag):
740 749 commit = repo.get(commit.target)
741 750 is_tag = True
742 751
743 752 check_dangling = True
744 753 if is_tag:
745 754 check_dangling = False
746 755
747 756 if check_dangling and maybe_unreachable:
748 757 check_dangling = False
749 758
750 759 # we used a reference and it parsed means we're not having a dangling commit
751 760 if sha != commit.hex:
752 761 check_dangling = False
753 762
754 763 if check_dangling:
755 764 # check for dangling commit
756 765 for branch in repo.branches.with_commit(commit.hex):
757 766 if branch:
758 767 break
759 768 else:
760 769 # NOTE(marcink): Empty error doesn't give us any meaningful information
761 770 # here, we instead give something more explicit
762 771 e = exceptions.RefNotFoundException('SHA: %s not found in branches', sha)
763 772 raise exceptions.LookupException(e)(missing_commit_err)
764 773
765 774 commit_id = commit.hex
766 775 type_id = commit.type
767 776
768 777 return {
769 778 'id': commit_id,
770 779 'type': self._type_id_to_name(type_id),
771 780 'commit_id': commit_id,
772 781 'idx': 0
773 782 }
774 783
775 784 return _get_object(context_uid, repo_id, sha)
776 785
777 786 @reraise_safe_exceptions
778 787 def get_refs(self, wire):
779 788 cache_on, context_uid, repo_id = self._cache_on(wire)
780 789 region = self._region(wire)
781 790
782 791 @region.conditional_cache_on_arguments(condition=cache_on)
783 792 def _get_refs(_context_uid, _repo_id):
784 793
785 794 repo_init = self._factory.repo_libgit2(wire)
786 795 with repo_init as repo:
787 796 regex = re.compile('^refs/(heads|tags)/')
788 797 return {x.name: x.target.hex for x in
789 798 [ref for ref in repo.listall_reference_objects() if regex.match(ref.name)]}
790 799
791 800 return _get_refs(context_uid, repo_id)
792 801
793 802 @reraise_safe_exceptions
794 803 def get_branch_pointers(self, wire):
795 804 cache_on, context_uid, repo_id = self._cache_on(wire)
796 805 region = self._region(wire)
797 806
798 807 @region.conditional_cache_on_arguments(condition=cache_on)
799 808 def _get_branch_pointers(_context_uid, _repo_id):
800 809
801 810 repo_init = self._factory.repo_libgit2(wire)
802 811 regex = re.compile('^refs/heads')
803 812 with repo_init as repo:
804 813 branches = [ref for ref in repo.listall_reference_objects() if regex.match(ref.name)]
805 814 return {x.target.hex: x.shorthand for x in branches}
806 815
807 816 return _get_branch_pointers(context_uid, repo_id)
808 817
809 818 @reraise_safe_exceptions
810 819 def head(self, wire, show_exc=True):
811 820 cache_on, context_uid, repo_id = self._cache_on(wire)
812 821 region = self._region(wire)
813 822
814 823 @region.conditional_cache_on_arguments(condition=cache_on)
815 824 def _head(_context_uid, _repo_id, _show_exc):
816 825 repo_init = self._factory.repo_libgit2(wire)
817 826 with repo_init as repo:
818 827 try:
819 828 return repo.head.peel().hex
820 829 except Exception:
821 830 if show_exc:
822 831 raise
823 832 return _head(context_uid, repo_id, show_exc)
824 833
825 834 @reraise_safe_exceptions
826 835 def init(self, wire):
827 836 repo_path = safe_str(wire['path'])
828 837 self.repo = Repo.init(repo_path)
829 838
830 839 @reraise_safe_exceptions
831 840 def init_bare(self, wire):
832 841 repo_path = safe_str(wire['path'])
833 842 self.repo = Repo.init_bare(repo_path)
834 843
835 844 @reraise_safe_exceptions
836 845 def revision(self, wire, rev):
837 846
838 847 cache_on, context_uid, repo_id = self._cache_on(wire)
839 848 region = self._region(wire)
840 849
841 850 @region.conditional_cache_on_arguments(condition=cache_on)
842 851 def _revision(_context_uid, _repo_id, _rev):
843 852 repo_init = self._factory.repo_libgit2(wire)
844 853 with repo_init as repo:
845 854 commit = repo[rev]
846 855 obj_data = {
847 856 'id': commit.id.hex,
848 857 }
849 858 # tree objects itself don't have tree_id attribute
850 859 if hasattr(commit, 'tree_id'):
851 860 obj_data['tree'] = commit.tree_id.hex
852 861
853 862 return obj_data
854 863 return _revision(context_uid, repo_id, rev)
855 864
856 865 @reraise_safe_exceptions
857 866 def date(self, wire, commit_id):
858 867 cache_on, context_uid, repo_id = self._cache_on(wire)
859 868 region = self._region(wire)
860 869
861 870 @region.conditional_cache_on_arguments(condition=cache_on)
862 871 def _date(_repo_id, _commit_id):
863 872 repo_init = self._factory.repo_libgit2(wire)
864 873 with repo_init as repo:
865 874 commit = repo[commit_id]
866 875
867 876 if hasattr(commit, 'commit_time'):
868 877 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
869 878 else:
870 879 commit = commit.get_object()
871 880 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
872 881
873 882 # TODO(marcink): check dulwich difference of offset vs timezone
874 883 return [commit_time, commit_time_offset]
875 884 return _date(repo_id, commit_id)
876 885
877 886 @reraise_safe_exceptions
878 887 def author(self, wire, commit_id):
879 888 cache_on, context_uid, repo_id = self._cache_on(wire)
880 889 region = self._region(wire)
881 890
882 891 @region.conditional_cache_on_arguments(condition=cache_on)
883 892 def _author(_repo_id, _commit_id):
884 893 repo_init = self._factory.repo_libgit2(wire)
885 894 with repo_init as repo:
886 895 commit = repo[commit_id]
887 896
888 897 if hasattr(commit, 'author'):
889 898 author = commit.author
890 899 else:
891 900 author = commit.get_object().author
892 901
893 902 if author.email:
894 903 return "{} <{}>".format(author.name, author.email)
895 904
896 905 try:
897 906 return "{}".format(author.name)
898 907 except Exception:
899 908 return "{}".format(safe_str(author.raw_name))
900 909
901 910 return _author(repo_id, commit_id)
902 911
903 912 @reraise_safe_exceptions
904 913 def message(self, wire, commit_id):
905 914 cache_on, context_uid, repo_id = self._cache_on(wire)
906 915 region = self._region(wire)
907 916 @region.conditional_cache_on_arguments(condition=cache_on)
908 917 def _message(_repo_id, _commit_id):
909 918 repo_init = self._factory.repo_libgit2(wire)
910 919 with repo_init as repo:
911 920 commit = repo[commit_id]
912 921 return commit.message
913 922 return _message(repo_id, commit_id)
914 923
915 924 @reraise_safe_exceptions
916 925 def parents(self, wire, commit_id):
917 926 cache_on, context_uid, repo_id = self._cache_on(wire)
918 927 region = self._region(wire)
928
919 929 @region.conditional_cache_on_arguments(condition=cache_on)
920 930 def _parents(_repo_id, _commit_id):
921 931 repo_init = self._factory.repo_libgit2(wire)
922 932 with repo_init as repo:
923 933 commit = repo[commit_id]
924 934 if hasattr(commit, 'parent_ids'):
925 935 parent_ids = commit.parent_ids
926 936 else:
927 937 parent_ids = commit.get_object().parent_ids
928 938
929 939 return [x.hex for x in parent_ids]
930 940 return _parents(repo_id, commit_id)
931 941
932 942 @reraise_safe_exceptions
933 943 def children(self, wire, commit_id):
934 944 cache_on, context_uid, repo_id = self._cache_on(wire)
935 945 region = self._region(wire)
936 946
937 947 @region.conditional_cache_on_arguments(condition=cache_on)
938 948 def _children(_repo_id, _commit_id):
939 949 output, __ = self.run_git_command(
940 950 wire, ['rev-list', '--all', '--children'])
941 951
942 952 child_ids = []
943 953 pat = re.compile(r'^%s' % commit_id)
944 954 for l in output.splitlines():
945 955 if pat.match(l):
946 956 found_ids = l.split(' ')[1:]
947 957 child_ids.extend(found_ids)
948 958
949 959 return child_ids
950 960 return _children(repo_id, commit_id)
951 961
952 962 @reraise_safe_exceptions
953 963 def set_refs(self, wire, key, value):
954 964 repo_init = self._factory.repo_libgit2(wire)
955 965 with repo_init as repo:
956 966 repo.references.create(key, value, force=True)
957 967
958 968 @reraise_safe_exceptions
959 969 def create_branch(self, wire, branch_name, commit_id, force=False):
960 970 repo_init = self._factory.repo_libgit2(wire)
961 971 with repo_init as repo:
962 972 commit = repo[commit_id]
963 973
964 974 if force:
965 975 repo.branches.local.create(branch_name, commit, force=force)
966 976 elif not repo.branches.get(branch_name):
967 977 # create only if that branch isn't existing
968 978 repo.branches.local.create(branch_name, commit, force=force)
969 979
970 980 @reraise_safe_exceptions
971 981 def remove_ref(self, wire, key):
972 982 repo_init = self._factory.repo_libgit2(wire)
973 983 with repo_init as repo:
974 984 repo.references.delete(key)
975 985
976 986 @reraise_safe_exceptions
977 987 def tag_remove(self, wire, tag_name):
978 988 repo_init = self._factory.repo_libgit2(wire)
979 989 with repo_init as repo:
980 990 key = 'refs/tags/{}'.format(tag_name)
981 991 repo.references.delete(key)
982 992
983 993 @reraise_safe_exceptions
984 994 def tree_changes(self, wire, source_id, target_id):
985 995 # TODO(marcink): remove this seems it's only used by tests
986 996 repo = self._factory.repo(wire)
987 997 source = repo[source_id].tree if source_id else None
988 998 target = repo[target_id].tree
989 999 result = repo.object_store.tree_changes(source, target)
990 1000 return list(result)
991 1001
992 1002 @reraise_safe_exceptions
993 1003 def tree_and_type_for_path(self, wire, commit_id, path):
994 1004
995 1005 cache_on, context_uid, repo_id = self._cache_on(wire)
996 1006 region = self._region(wire)
997 1007
998 1008 @region.conditional_cache_on_arguments(condition=cache_on)
999 1009 def _tree_and_type_for_path(_context_uid, _repo_id, _commit_id, _path):
1000 1010 repo_init = self._factory.repo_libgit2(wire)
1001 1011
1002 1012 with repo_init as repo:
1003 1013 commit = repo[commit_id]
1004 1014 try:
1005 1015 tree = commit.tree[path]
1006 1016 except KeyError:
1007 1017 return None, None, None
1008 1018
1009 1019 return tree.id.hex, tree.type_str, tree.filemode
1010 1020 return _tree_and_type_for_path(context_uid, repo_id, commit_id, path)
1011 1021
1012 1022 @reraise_safe_exceptions
1013 1023 def tree_items(self, wire, tree_id):
1014 1024 cache_on, context_uid, repo_id = self._cache_on(wire)
1015 1025 region = self._region(wire)
1016 1026
1017 1027 @region.conditional_cache_on_arguments(condition=cache_on)
1018 1028 def _tree_items(_repo_id, _tree_id):
1019 1029
1020 1030 repo_init = self._factory.repo_libgit2(wire)
1021 1031 with repo_init as repo:
1022 1032 try:
1023 1033 tree = repo[tree_id]
1024 1034 except KeyError:
1025 1035 raise ObjectMissing('No tree with id: {}'.format(tree_id))
1026 1036
1027 1037 result = []
1028 1038 for item in tree:
1029 1039 item_sha = item.hex
1030 1040 item_mode = item.filemode
1031 1041 item_type = item.type_str
1032 1042
1033 1043 if item_type == 'commit':
1034 1044 # NOTE(marcink): submodules we translate to 'link' for backward compat
1035 1045 item_type = 'link'
1036 1046
1037 1047 result.append((item.name, item_mode, item_sha, item_type))
1038 1048 return result
1039 1049 return _tree_items(repo_id, tree_id)
1040 1050
1041 1051 @reraise_safe_exceptions
1042 1052 def diff_2(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
1043 1053 """
1044 1054 Old version that uses subprocess to call diff
1045 1055 """
1046 1056
1047 1057 flags = [
1048 1058 '-U%s' % context, '--patch',
1049 1059 '--binary',
1050 1060 '--find-renames',
1051 1061 '--no-indent-heuristic',
1052 1062 # '--indent-heuristic',
1053 1063 #'--full-index',
1054 1064 #'--abbrev=40'
1055 1065 ]
1056 1066
1057 1067 if opt_ignorews:
1058 1068 flags.append('--ignore-all-space')
1059 1069
1060 1070 if commit_id_1 == self.EMPTY_COMMIT:
1061 1071 cmd = ['show'] + flags + [commit_id_2]
1062 1072 else:
1063 1073 cmd = ['diff'] + flags + [commit_id_1, commit_id_2]
1064 1074
1065 1075 if file_filter:
1066 1076 cmd.extend(['--', file_filter])
1067 1077
1068 1078 diff, __ = self.run_git_command(wire, cmd)
1069 1079 # If we used 'show' command, strip first few lines (until actual diff
1070 1080 # starts)
1071 1081 if commit_id_1 == self.EMPTY_COMMIT:
1072 1082 lines = diff.splitlines()
1073 1083 x = 0
1074 1084 for line in lines:
1075 1085 if line.startswith(b'diff'):
1076 1086 break
1077 1087 x += 1
1078 1088 # Append new line just like 'diff' command do
1079 1089 diff = '\n'.join(lines[x:]) + '\n'
1080 1090 return diff
1081 1091
1082 1092 @reraise_safe_exceptions
1083 1093 def diff(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
1084 1094 repo_init = self._factory.repo_libgit2(wire)
1085 1095 with repo_init as repo:
1086 1096 swap = True
1087 1097 flags = 0
1088 1098 flags |= pygit2.GIT_DIFF_SHOW_BINARY
1089 1099
1090 1100 if opt_ignorews:
1091 1101 flags |= pygit2.GIT_DIFF_IGNORE_WHITESPACE
1092 1102
1093 1103 if commit_id_1 == self.EMPTY_COMMIT:
1094 1104 comm1 = repo[commit_id_2]
1095 1105 diff_obj = comm1.tree.diff_to_tree(
1096 1106 flags=flags, context_lines=context, swap=swap)
1097 1107
1098 1108 else:
1099 1109 comm1 = repo[commit_id_2]
1100 1110 comm2 = repo[commit_id_1]
1101 1111 diff_obj = comm1.tree.diff_to_tree(
1102 1112 comm2.tree, flags=flags, context_lines=context, swap=swap)
1103 1113 similar_flags = 0
1104 1114 similar_flags |= pygit2.GIT_DIFF_FIND_RENAMES
1105 1115 diff_obj.find_similar(flags=similar_flags)
1106 1116
1107 1117 if file_filter:
1108 1118 for p in diff_obj:
1109 1119 if p.delta.old_file.path == file_filter:
1110 1120 return p.patch or ''
1111 1121 # fo matching path == no diff
1112 1122 return ''
1113 1123 return diff_obj.patch or ''
1114 1124
1115 1125 @reraise_safe_exceptions
1116 1126 def node_history(self, wire, commit_id, path, limit):
1117 1127 cache_on, context_uid, repo_id = self._cache_on(wire)
1118 1128 region = self._region(wire)
1119 1129
1120 1130 @region.conditional_cache_on_arguments(condition=cache_on)
1121 1131 def _node_history(_context_uid, _repo_id, _commit_id, _path, _limit):
1122 1132 # optimize for n==1, rev-list is much faster for that use-case
1123 1133 if limit == 1:
1124 1134 cmd = ['rev-list', '-1', commit_id, '--', path]
1125 1135 else:
1126 1136 cmd = ['log']
1127 1137 if limit:
1128 1138 cmd.extend(['-n', str(safe_int(limit, 0))])
1129 1139 cmd.extend(['--pretty=format: %H', '-s', commit_id, '--', path])
1130 1140
1131 1141 output, __ = self.run_git_command(wire, cmd)
1132 1142 commit_ids = re.findall(rb'[0-9a-fA-F]{40}', output)
1133 1143
1134 1144 return [x for x in commit_ids]
1135 1145 return _node_history(context_uid, repo_id, commit_id, path, limit)
1136 1146
1137 1147 @reraise_safe_exceptions
1138 1148 def node_annotate_legacy(self, wire, commit_id, path):
1139 1149 #note: replaced by pygit2 impelementation
1140 1150 cmd = ['blame', '-l', '--root', '-r', commit_id, '--', path]
1141 1151 # -l ==> outputs long shas (and we need all 40 characters)
1142 1152 # --root ==> doesn't put '^' character for boundaries
1143 1153 # -r commit_id ==> blames for the given commit
1144 1154 output, __ = self.run_git_command(wire, cmd)
1145 1155
1146 1156 result = []
1147 1157 for i, blame_line in enumerate(output.splitlines()[:-1]):
1148 1158 line_no = i + 1
1149 1159 blame_commit_id, line = re.split(rb' ', blame_line, 1)
1150 1160 result.append((line_no, blame_commit_id, line))
1151 1161
1152 1162 return result
1153 1163
1154 1164 @reraise_safe_exceptions
1155 1165 def node_annotate(self, wire, commit_id, path):
1156 1166
1157 1167 result_libgit = []
1158 1168 repo_init = self._factory.repo_libgit2(wire)
1159 1169 with repo_init as repo:
1160 1170 commit = repo[commit_id]
1161 1171 blame_obj = repo.blame(path, newest_commit=commit_id)
1162 1172 for i, line in enumerate(commit.tree[path].data.splitlines()):
1163 1173 line_no = i + 1
1164 1174 hunk = blame_obj.for_line(line_no)
1165 1175 blame_commit_id = hunk.final_commit_id.hex
1166 1176
1167 1177 result_libgit.append((line_no, blame_commit_id, line))
1168 1178
1169 1179 return result_libgit
1170 1180
1171 1181 @reraise_safe_exceptions
1172 1182 def update_server_info(self, wire):
1173 1183 repo = self._factory.repo(wire)
1174 1184 update_server_info(repo)
1175 1185
1176 1186 @reraise_safe_exceptions
1177 1187 def get_all_commit_ids(self, wire):
1178 1188
1179 1189 cache_on, context_uid, repo_id = self._cache_on(wire)
1180 1190 region = self._region(wire)
1181 1191
1182 1192 @region.conditional_cache_on_arguments(condition=cache_on)
1183 1193 def _get_all_commit_ids(_context_uid, _repo_id):
1184 1194
1185 1195 cmd = ['rev-list', '--reverse', '--date-order', '--branches', '--tags']
1186 1196 try:
1187 1197 output, __ = self.run_git_command(wire, cmd)
1188 1198 return output.splitlines()
1189 1199 except Exception:
1190 1200 # Can be raised for empty repositories
1191 1201 return []
1192 1202
1193 1203 @region.conditional_cache_on_arguments(condition=cache_on)
1194 1204 def _get_all_commit_ids_pygit2(_context_uid, _repo_id):
1195 1205 repo_init = self._factory.repo_libgit2(wire)
1196 1206 from pygit2 import GIT_SORT_REVERSE, GIT_SORT_TIME, GIT_BRANCH_ALL
1197 1207 results = []
1198 1208 with repo_init as repo:
1199 1209 for commit in repo.walk(repo.head.target, GIT_SORT_TIME | GIT_BRANCH_ALL | GIT_SORT_REVERSE):
1200 1210 results.append(commit.id.hex)
1201 1211
1202 1212 return _get_all_commit_ids(context_uid, repo_id)
1203 1213
1204 1214 @reraise_safe_exceptions
1205 1215 def run_git_command(self, wire, cmd, **opts):
1206 1216 path = wire.get('path', None)
1207 1217
1208 1218 if path and os.path.isdir(path):
1209 1219 opts['cwd'] = path
1210 1220
1211 1221 if '_bare' in opts:
1212 1222 _copts = []
1213 1223 del opts['_bare']
1214 1224 else:
1215 1225 _copts = ['-c', 'core.quotepath=false', ]
1216 1226 safe_call = False
1217 1227 if '_safe' in opts:
1218 1228 # no exc on failure
1219 1229 del opts['_safe']
1220 1230 safe_call = True
1221 1231
1222 1232 if '_copts' in opts:
1223 1233 _copts.extend(opts['_copts'] or [])
1224 1234 del opts['_copts']
1225 1235
1226 1236 gitenv = os.environ.copy()
1227 1237 gitenv.update(opts.pop('extra_env', {}))
1228 1238 # need to clean fix GIT_DIR !
1229 1239 if 'GIT_DIR' in gitenv:
1230 1240 del gitenv['GIT_DIR']
1231 1241 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
1232 1242 gitenv['GIT_DISCOVERY_ACROSS_FILESYSTEM'] = '1'
1233 1243
1234 1244 cmd = [settings.GIT_EXECUTABLE] + _copts + cmd
1235 1245 _opts = {'env': gitenv, 'shell': False}
1236 1246
1237 1247 proc = None
1238 1248 try:
1239 1249 _opts.update(opts)
1240 1250 proc = subprocessio.SubprocessIOChunker(cmd, **_opts)
1241 1251
1242 1252 return b''.join(proc), b''.join(proc.stderr)
1243 1253 except OSError as err:
1244 cmd = ' '.join(cmd) # human friendly CMD
1254 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
1245 1255 tb_err = ("Couldn't run git command (%s).\n"
1246 1256 "Original error was:%s\n"
1247 1257 "Call options:%s\n"
1248 1258 % (cmd, err, _opts))
1249 1259 log.exception(tb_err)
1250 1260 if safe_call:
1251 1261 return '', err
1252 1262 else:
1253 1263 raise exceptions.VcsException()(tb_err)
1254 1264 finally:
1255 1265 if proc:
1256 1266 proc.close()
1257 1267
1258 1268 @reraise_safe_exceptions
1259 1269 def install_hooks(self, wire, force=False):
1260 1270 from vcsserver.hook_utils import install_git_hooks
1261 1271 bare = self.bare(wire)
1262 1272 path = wire['path']
1263 1273 return install_git_hooks(path, bare, force_create=force)
1264 1274
1265 1275 @reraise_safe_exceptions
1266 1276 def get_hooks_info(self, wire):
1267 1277 from vcsserver.hook_utils import (
1268 1278 get_git_pre_hook_version, get_git_post_hook_version)
1269 1279 bare = self.bare(wire)
1270 1280 path = wire['path']
1271 1281 return {
1272 1282 'pre_version': get_git_pre_hook_version(path, bare),
1273 1283 'post_version': get_git_post_hook_version(path, bare),
1274 1284 }
1275 1285
1276 1286 @reraise_safe_exceptions
1277 1287 def set_head_ref(self, wire, head_name):
1278 1288 log.debug('Setting refs/head to `%s`', head_name)
1279 1289 cmd = ['symbolic-ref', '"HEAD"', '"refs/heads/%s"' % head_name]
1280 1290 output, __ = self.run_git_command(wire, cmd)
1281 1291 return [head_name] + output.splitlines()
1282 1292
1283 1293 @reraise_safe_exceptions
1284 1294 def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path,
1285 1295 archive_dir_name, commit_id):
1286 1296
1287 1297 def file_walker(_commit_id, path):
1288 1298 repo_init = self._factory.repo_libgit2(wire)
1289 1299
1290 1300 with repo_init as repo:
1291 1301 commit = repo[commit_id]
1292 1302
1293 1303 if path in ['', '/']:
1294 1304 tree = commit.tree
1295 1305 else:
1296 1306 tree = commit.tree[path.rstrip('/')]
1297 1307 tree_id = tree.id.hex
1298 1308 try:
1299 1309 tree = repo[tree_id]
1300 1310 except KeyError:
1301 1311 raise ObjectMissing('No tree with id: {}'.format(tree_id))
1302 1312
1303 1313 index = LibGit2Index.Index()
1304 1314 index.read_tree(tree)
1305 1315 file_iter = index
1306 1316
1307 1317 for fn in file_iter:
1308 1318 file_path = fn.path
1309 1319 mode = fn.mode
1310 1320 is_link = stat.S_ISLNK(mode)
1311 1321 if mode == pygit2.GIT_FILEMODE_COMMIT:
1312 1322 log.debug('Skipping path %s as a commit node', file_path)
1313 1323 continue
1314 1324 yield ArchiveNode(file_path, mode, is_link, repo[fn.hex].read_raw)
1315 1325
1316 1326 return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path,
1317 1327 archive_dir_name, commit_id)
@@ -1,1062 +1,1072 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import logging
20 20 import stat
21 21 import urllib.request, urllib.parse, urllib.error
22 22 import urllib.request, urllib.error, urllib.parse
23 23 import traceback
24 24
25 25 from hgext import largefiles, rebase, purge
26 26
27 27 from mercurial import commands
28 28 from mercurial import unionrepo
29 29 from mercurial import verify
30 30 from mercurial import repair
31 31
32 32 import vcsserver
33 33 from vcsserver import exceptions
34 34 from vcsserver.base import RepoFactory, obfuscate_qs, raise_from_original, archive_repo, ArchiveNode
35 35 from vcsserver.hgcompat import (
36 36 archival, bin, clone, config as hgconfig, diffopts, hex, get_ctx,
37 37 hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler,
38 38 makepeer, instance, match, memctx, exchange, memfilectx, nullrev, hg_merge,
39 39 patch, peer, revrange, ui, hg_tag, Abort, LookupError, RepoError,
40 40 RepoLookupError, InterventionRequired, RequirementError,
41 41 alwaysmatcher, patternmatcher, hgutil, hgext_strip)
42 42 from vcsserver.str_utils import ascii_bytes, ascii_str, safe_str, safe_bytes
43 43 from vcsserver.vcs_base import RemoteBase
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 def make_ui_from_config(repo_config):
49 49
50 50 class LoggingUI(ui.ui):
51 51
52 52 def status(self, *msg, **opts):
53 53 str_msg = map(safe_str, msg)
54 54 log.info(' '.join(str_msg).rstrip('\n'))
55 55 #super(LoggingUI, self).status(*msg, **opts)
56 56
57 57 def warn(self, *msg, **opts):
58 58 str_msg = map(safe_str, msg)
59 59 log.warning('ui_logger:'+' '.join(str_msg).rstrip('\n'))
60 60 #super(LoggingUI, self).warn(*msg, **opts)
61 61
62 62 def error(self, *msg, **opts):
63 63 str_msg = map(safe_str, msg)
64 64 log.error('ui_logger:'+' '.join(str_msg).rstrip('\n'))
65 65 #super(LoggingUI, self).error(*msg, **opts)
66 66
67 67 def note(self, *msg, **opts):
68 68 str_msg = map(safe_str, msg)
69 69 log.info('ui_logger:'+' '.join(str_msg).rstrip('\n'))
70 70 #super(LoggingUI, self).note(*msg, **opts)
71 71
72 72 def debug(self, *msg, **opts):
73 73 str_msg = map(safe_str, msg)
74 74 log.debug('ui_logger:'+' '.join(str_msg).rstrip('\n'))
75 75 #super(LoggingUI, self).debug(*msg, **opts)
76 76
77 77 baseui = LoggingUI()
78 78
79 79 # clean the baseui object
80 80 baseui._ocfg = hgconfig.config()
81 81 baseui._ucfg = hgconfig.config()
82 82 baseui._tcfg = hgconfig.config()
83 83
84 84 for section, option, value in repo_config:
85 85 baseui.setconfig(ascii_bytes(section), ascii_bytes(option), ascii_bytes(value))
86 86
87 87 # make our hgweb quiet so it doesn't print output
88 88 baseui.setconfig(b'ui', b'quiet', b'true')
89 89
90 90 baseui.setconfig(b'ui', b'paginate', b'never')
91 91 # for better Error reporting of Mercurial
92 92 baseui.setconfig(b'ui', b'message-output', b'stderr')
93 93
94 94 # force mercurial to only use 1 thread, otherwise it may try to set a
95 95 # signal in a non-main thread, thus generating a ValueError.
96 96 baseui.setconfig(b'worker', b'numcpus', 1)
97 97
98 98 # If there is no config for the largefiles extension, we explicitly disable
99 99 # it here. This overrides settings from repositories hgrc file. Recent
100 100 # mercurial versions enable largefiles in hgrc on clone from largefile
101 101 # repo.
102 102 if not baseui.hasconfig(b'extensions', b'largefiles'):
103 103 log.debug('Explicitly disable largefiles extension for repo.')
104 104 baseui.setconfig(b'extensions', b'largefiles', b'!')
105 105
106 106 return baseui
107 107
108 108
109 109 def reraise_safe_exceptions(func):
110 110 """Decorator for converting mercurial exceptions to something neutral."""
111 111
112 112 def wrapper(*args, **kwargs):
113 113 try:
114 114 return func(*args, **kwargs)
115 115 except (Abort, InterventionRequired) as e:
116 116 raise_from_original(exceptions.AbortException(e), e)
117 117 except RepoLookupError as e:
118 118 raise_from_original(exceptions.LookupException(e), e)
119 119 except RequirementError as e:
120 120 raise_from_original(exceptions.RequirementException(e), e)
121 121 except RepoError as e:
122 122 raise_from_original(exceptions.VcsException(e), e)
123 123 except LookupError as e:
124 124 raise_from_original(exceptions.LookupException(e), e)
125 125 except Exception as e:
126 126 if not hasattr(e, '_vcs_kind'):
127 127 log.exception("Unhandled exception in hg remote call")
128 128 raise_from_original(exceptions.UnhandledException(e), e)
129 129
130 130 raise
131 131 return wrapper
132 132
133 133
134 134 class MercurialFactory(RepoFactory):
135 135 repo_type = 'hg'
136 136
137 137 def _create_config(self, config, hooks=True):
138 138 if not hooks:
139 139 hooks_to_clean = frozenset((
140 140 'changegroup.repo_size', 'preoutgoing.pre_pull',
141 141 'outgoing.pull_logger', 'prechangegroup.pre_push'))
142 142 new_config = []
143 143 for section, option, value in config:
144 144 if section == 'hooks' and option in hooks_to_clean:
145 145 continue
146 146 new_config.append((section, option, value))
147 147 config = new_config
148 148
149 149 baseui = make_ui_from_config(config)
150 150 return baseui
151 151
152 152 def _create_repo(self, wire, create):
153 153 baseui = self._create_config(wire["config"])
154 154 return instance(baseui, ascii_bytes(wire["path"]), create)
155 155
156 156 def repo(self, wire, create=False):
157 157 """
158 158 Get a repository instance for the given path.
159 159 """
160 160 return self._create_repo(wire, create)
161 161
162 162
163 163 def patch_ui_message_output(baseui):
164 164 baseui.setconfig(b'ui', b'quiet', b'false')
165 165 output = io.BytesIO()
166 166
167 167 def write(data, **unused_kwargs):
168 168 output.write(data)
169 169
170 170 baseui.status = write
171 171 baseui.write = write
172 172 baseui.warn = write
173 173 baseui.debug = write
174 174
175 175 return baseui, output
176 176
177 177
178 178 class HgRemote(RemoteBase):
179 179
180 180 def __init__(self, factory):
181 181 self._factory = factory
182 182 self._bulk_methods = {
183 183 "affected_files": self.ctx_files,
184 184 "author": self.ctx_user,
185 185 "branch": self.ctx_branch,
186 186 "children": self.ctx_children,
187 187 "date": self.ctx_date,
188 188 "message": self.ctx_description,
189 189 "parents": self.ctx_parents,
190 190 "status": self.ctx_status,
191 191 "obsolete": self.ctx_obsolete,
192 192 "phase": self.ctx_phase,
193 193 "hidden": self.ctx_hidden,
194 194 "_file_paths": self.ctx_list,
195 195 }
196 196
197 197 def _get_ctx(self, repo, ref):
198 198 return get_ctx(repo, ref)
199 199
200 200 @reraise_safe_exceptions
201 201 def discover_hg_version(self):
202 202 from mercurial import util
203 return util.version()
203 return safe_str(util.version())
204 204
205 205 @reraise_safe_exceptions
206 206 def is_empty(self, wire):
207 207 repo = self._factory.repo(wire)
208 208
209 209 try:
210 210 return len(repo) == 0
211 211 except Exception:
212 212 log.exception("failed to read object_store")
213 213 return False
214 214
215 215 @reraise_safe_exceptions
216 216 def bookmarks(self, wire):
217 217 cache_on, context_uid, repo_id = self._cache_on(wire)
218 218 region = self._region(wire)
219
219 220 @region.conditional_cache_on_arguments(condition=cache_on)
220 221 def _bookmarks(_context_uid, _repo_id):
221 222 repo = self._factory.repo(wire)
222 return dict(repo._bookmarks)
223 return {safe_str(name): ascii_str(hex(sha)) for name, sha in repo._bookmarks.items()}
223 224
224 225 return _bookmarks(context_uid, repo_id)
225 226
226 227 @reraise_safe_exceptions
227 228 def branches(self, wire, normal, closed):
228 229 cache_on, context_uid, repo_id = self._cache_on(wire)
229 230 region = self._region(wire)
231
230 232 @region.conditional_cache_on_arguments(condition=cache_on)
231 233 def _branches(_context_uid, _repo_id, _normal, _closed):
232 234 repo = self._factory.repo(wire)
233 235 iter_branches = repo.branchmap().iterbranches()
234 236 bt = {}
235 for branch_name, _heads, tip, is_closed in iter_branches:
237 for branch_name, _heads, tip_node, is_closed in iter_branches:
236 238 if normal and not is_closed:
237 bt[branch_name] = tip
239 bt[safe_str(branch_name)] = ascii_str(hex(tip_node))
238 240 if closed and is_closed:
239 bt[branch_name] = tip
241 bt[safe_str(branch_name)] = ascii_str(hex(tip_node))
240 242
241 243 return bt
242 244
243 245 return _branches(context_uid, repo_id, normal, closed)
244 246
245 247 @reraise_safe_exceptions
246 248 def bulk_request(self, wire, commit_id, pre_load):
247 249 cache_on, context_uid, repo_id = self._cache_on(wire)
248 250 region = self._region(wire)
251
249 252 @region.conditional_cache_on_arguments(condition=cache_on)
250 253 def _bulk_request(_repo_id, _commit_id, _pre_load):
251 254 result = {}
252 255 for attr in pre_load:
253 256 try:
254 257 method = self._bulk_methods[attr]
255 258 result[attr] = method(wire, commit_id)
256 259 except KeyError as e:
257 260 raise exceptions.VcsException(e)(
258 261 'Unknown bulk attribute: "%s"' % attr)
259 262 return result
260 263
261 264 return _bulk_request(repo_id, commit_id, sorted(pre_load))
262 265
263 266 @reraise_safe_exceptions
264 267 def ctx_branch(self, wire, commit_id):
265 268 cache_on, context_uid, repo_id = self._cache_on(wire)
266 269 region = self._region(wire)
270
267 271 @region.conditional_cache_on_arguments(condition=cache_on)
268 272 def _ctx_branch(_repo_id, _commit_id):
269 273 repo = self._factory.repo(wire)
270 274 ctx = self._get_ctx(repo, commit_id)
271 275 return ctx.branch()
272 276 return _ctx_branch(repo_id, commit_id)
273 277
274 278 @reraise_safe_exceptions
275 279 def ctx_date(self, wire, commit_id):
276 280 cache_on, context_uid, repo_id = self._cache_on(wire)
277 281 region = self._region(wire)
282
278 283 @region.conditional_cache_on_arguments(condition=cache_on)
279 284 def _ctx_date(_repo_id, _commit_id):
280 285 repo = self._factory.repo(wire)
281 286 ctx = self._get_ctx(repo, commit_id)
282 287 return ctx.date()
283 288 return _ctx_date(repo_id, commit_id)
284 289
285 290 @reraise_safe_exceptions
286 291 def ctx_description(self, wire, revision):
287 292 repo = self._factory.repo(wire)
288 293 ctx = self._get_ctx(repo, revision)
289 294 return ctx.description()
290 295
291 296 @reraise_safe_exceptions
292 297 def ctx_files(self, wire, commit_id):
293 298 cache_on, context_uid, repo_id = self._cache_on(wire)
294 299 region = self._region(wire)
300
295 301 @region.conditional_cache_on_arguments(condition=cache_on)
296 302 def _ctx_files(_repo_id, _commit_id):
297 303 repo = self._factory.repo(wire)
298 304 ctx = self._get_ctx(repo, commit_id)
299 305 return ctx.files()
300 306
301 307 return _ctx_files(repo_id, commit_id)
302 308
303 309 @reraise_safe_exceptions
304 310 def ctx_list(self, path, revision):
305 311 repo = self._factory.repo(path)
306 312 ctx = self._get_ctx(repo, revision)
307 313 return list(ctx)
308 314
309 315 @reraise_safe_exceptions
310 316 def ctx_parents(self, wire, commit_id):
311 317 cache_on, context_uid, repo_id = self._cache_on(wire)
312 318 region = self._region(wire)
319
313 320 @region.conditional_cache_on_arguments(condition=cache_on)
314 321 def _ctx_parents(_repo_id, _commit_id):
315 322 repo = self._factory.repo(wire)
316 323 ctx = self._get_ctx(repo, commit_id)
317 324 return [parent.hex() for parent in ctx.parents()
318 325 if not (parent.hidden() or parent.obsolete())]
319 326
320 327 return _ctx_parents(repo_id, commit_id)
321 328
322 329 @reraise_safe_exceptions
323 330 def ctx_children(self, wire, commit_id):
324 331 cache_on, context_uid, repo_id = self._cache_on(wire)
325 332 region = self._region(wire)
333
326 334 @region.conditional_cache_on_arguments(condition=cache_on)
327 335 def _ctx_children(_repo_id, _commit_id):
328 336 repo = self._factory.repo(wire)
329 337 ctx = self._get_ctx(repo, commit_id)
330 338 return [child.hex() for child in ctx.children()
331 339 if not (child.hidden() or child.obsolete())]
332 340
333 341 return _ctx_children(repo_id, commit_id)
334 342
335 343 @reraise_safe_exceptions
336 344 def ctx_phase(self, wire, commit_id):
337 345 cache_on, context_uid, repo_id = self._cache_on(wire)
338 346 region = self._region(wire)
347
339 348 @region.conditional_cache_on_arguments(condition=cache_on)
340 349 def _ctx_phase(_context_uid, _repo_id, _commit_id):
341 350 repo = self._factory.repo(wire)
342 351 ctx = self._get_ctx(repo, commit_id)
343 352 # public=0, draft=1, secret=3
344 353 return ctx.phase()
345 354 return _ctx_phase(context_uid, repo_id, commit_id)
346 355
347 356 @reraise_safe_exceptions
348 357 def ctx_obsolete(self, wire, commit_id):
349 358 cache_on, context_uid, repo_id = self._cache_on(wire)
350 359 region = self._region(wire)
360
351 361 @region.conditional_cache_on_arguments(condition=cache_on)
352 362 def _ctx_obsolete(_context_uid, _repo_id, _commit_id):
353 363 repo = self._factory.repo(wire)
354 364 ctx = self._get_ctx(repo, commit_id)
355 365 return ctx.obsolete()
356 366 return _ctx_obsolete(context_uid, repo_id, commit_id)
357 367
358 368 @reraise_safe_exceptions
359 369 def ctx_hidden(self, wire, commit_id):
360 370 cache_on, context_uid, repo_id = self._cache_on(wire)
361 371 region = self._region(wire)
372
362 373 @region.conditional_cache_on_arguments(condition=cache_on)
363 374 def _ctx_hidden(_context_uid, _repo_id, _commit_id):
364 375 repo = self._factory.repo(wire)
365 376 ctx = self._get_ctx(repo, commit_id)
366 377 return ctx.hidden()
367 378 return _ctx_hidden(context_uid, repo_id, commit_id)
368 379
369 380 @reraise_safe_exceptions
370 381 def ctx_substate(self, wire, revision):
371 382 repo = self._factory.repo(wire)
372 383 ctx = self._get_ctx(repo, revision)
373 384 return ctx.substate
374 385
375 386 @reraise_safe_exceptions
376 387 def ctx_status(self, wire, revision):
377 388 repo = self._factory.repo(wire)
378 389 ctx = self._get_ctx(repo, revision)
379 390 status = repo[ctx.p1().node()].status(other=ctx.node())
380 391 # object of status (odd, custom named tuple in mercurial) is not
381 392 # correctly serializable, we make it a list, as the underling
382 393 # API expects this to be a list
383 394 return list(status)
384 395
385 396 @reraise_safe_exceptions
386 397 def ctx_user(self, wire, revision):
387 398 repo = self._factory.repo(wire)
388 399 ctx = self._get_ctx(repo, revision)
389 400 return ctx.user()
390 401
391 402 @reraise_safe_exceptions
392 403 def check_url(self, url, config):
393 404 _proto = None
394 405 if '+' in url[:url.find('://')]:
395 406 _proto = url[0:url.find('+')]
396 407 url = url[url.find('+') + 1:]
397 408 handlers = []
398 409 url_obj = url_parser(url)
399 410 test_uri, authinfo = url_obj.authinfo()
400 411 url_obj.passwd = '*****' if url_obj.passwd else url_obj.passwd
401 412 url_obj.query = obfuscate_qs(url_obj.query)
402 413
403 414 cleaned_uri = str(url_obj)
404 415 log.info("Checking URL for remote cloning/import: %s", cleaned_uri)
405 416
406 417 if authinfo:
407 418 # create a password manager
408 419 passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
409 420 passmgr.add_password(*authinfo)
410 421
411 422 handlers.extend((httpbasicauthhandler(passmgr),
412 423 httpdigestauthhandler(passmgr)))
413 424
414 425 o = urllib.request.build_opener(*handlers)
415 426 o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
416 427 ('Accept', 'application/mercurial-0.1')]
417 428
418 429 q = {"cmd": 'between'}
419 430 q.update({'pairs': "%s-%s" % ('0' * 40, '0' * 40)})
420 431 qs = '?%s' % urllib.parse.urlencode(q)
421 432 cu = "%s%s" % (test_uri, qs)
422 433 req = urllib.request.Request(cu, None, {})
423 434
424 435 try:
425 436 log.debug("Trying to open URL %s", cleaned_uri)
426 437 resp = o.open(req)
427 438 if resp.code != 200:
428 439 raise exceptions.URLError()('Return Code is not 200')
429 440 except Exception as e:
430 441 log.warning("URL cannot be opened: %s", cleaned_uri, exc_info=True)
431 442 # means it cannot be cloned
432 443 raise exceptions.URLError(e)("[%s] org_exc: %s" % (cleaned_uri, e))
433 444
434 445 # now check if it's a proper hg repo, but don't do it for svn
435 446 try:
436 447 if _proto == 'svn':
437 448 pass
438 449 else:
439 450 # check for pure hg repos
440 451 log.debug(
441 452 "Verifying if URL is a Mercurial repository: %s",
442 453 cleaned_uri)
443 454 ui = make_ui_from_config(config)
444 455 peer_checker = makepeer(ui, url)
445 456 peer_checker.lookup('tip')
446 457 except Exception as e:
447 458 log.warning("URL is not a valid Mercurial repository: %s",
448 459 cleaned_uri)
449 460 raise exceptions.URLError(e)(
450 461 "url [%s] does not look like an hg repo org_exc: %s"
451 462 % (cleaned_uri, e))
452 463
453 464 log.info("URL is a valid Mercurial repository: %s", cleaned_uri)
454 465 return True
455 466
456 467 @reraise_safe_exceptions
457 468 def diff(self, wire, commit_id_1, commit_id_2, file_filter, opt_git, opt_ignorews, context):
458 469 repo = self._factory.repo(wire)
459 470
460 471 if file_filter:
461 472 match_filter = match(file_filter[0], '', [file_filter[1]])
462 473 else:
463 474 match_filter = file_filter
464 475 opts = diffopts(git=opt_git, ignorews=opt_ignorews, context=context, showfunc=1)
465 476
466 477 try:
467 return "".join(patch.diff(
468 repo, node1=commit_id_1, node2=commit_id_2, match=match_filter, opts=opts))
478 diff_iter = patch.diff(
479 repo, node1=commit_id_1, node2=commit_id_2, match=match_filter, opts=opts)
480 return b"".join(diff_iter)
469 481 except RepoLookupError as e:
470 482 raise exceptions.LookupException(e)()
471 483
472 484 @reraise_safe_exceptions
473 485 def node_history(self, wire, revision, path, limit):
474 486 cache_on, context_uid, repo_id = self._cache_on(wire)
475 487 region = self._region(wire)
476 488
477 489 @region.conditional_cache_on_arguments(condition=cache_on)
478 490 def _node_history(_context_uid, _repo_id, _revision, _path, _limit):
479 491 repo = self._factory.repo(wire)
480 492
481 493 ctx = self._get_ctx(repo, revision)
482 494 fctx = ctx.filectx(safe_bytes(path))
483 495
484 496 def history_iter():
485 497 limit_rev = fctx.rev()
486 498 for obj in reversed(list(fctx.filelog())):
487 499 obj = fctx.filectx(obj)
488 500 ctx = obj.changectx()
489 501 if ctx.hidden() or ctx.obsolete():
490 502 continue
491 503
492 504 if limit_rev >= obj.rev():
493 505 yield obj
494 506
495 507 history = []
496 508 for cnt, obj in enumerate(history_iter()):
497 509 if limit and cnt >= limit:
498 510 break
499 511 history.append(hex(obj.node()))
500 512
501 513 return [x for x in history]
502 514 return _node_history(context_uid, repo_id, revision, path, limit)
503 515
504 516 @reraise_safe_exceptions
505 517 def node_history_untill(self, wire, revision, path, limit):
506 518 cache_on, context_uid, repo_id = self._cache_on(wire)
507 519 region = self._region(wire)
508 520
509 521 @region.conditional_cache_on_arguments(condition=cache_on)
510 522 def _node_history_until(_context_uid, _repo_id):
511 523 repo = self._factory.repo(wire)
512 524 ctx = self._get_ctx(repo, revision)
513 525 fctx = ctx.filectx(safe_bytes(path))
514 526
515 527 file_log = list(fctx.filelog())
516 528 if limit:
517 529 # Limit to the last n items
518 530 file_log = file_log[-limit:]
519 531
520 532 return [hex(fctx.filectx(cs).node()) for cs in reversed(file_log)]
521 533 return _node_history_until(context_uid, repo_id, revision, path, limit)
522 534
523 535 @reraise_safe_exceptions
524 536 def fctx_annotate(self, wire, revision, path):
525 537 repo = self._factory.repo(wire)
526 538 ctx = self._get_ctx(repo, revision)
527 539 fctx = ctx.filectx(safe_bytes(path))
528 540
529 541 result = []
530 542 for i, annotate_obj in enumerate(fctx.annotate(), 1):
531 543 ln_no = i
532 544 sha = hex(annotate_obj.fctx.node())
533 545 content = annotate_obj.text
534 546 result.append((ln_no, sha, content))
535 547 return result
536 548
537 549 @reraise_safe_exceptions
538 550 def fctx_node_data(self, wire, revision, path):
539 551 repo = self._factory.repo(wire)
540 552 ctx = self._get_ctx(repo, revision)
541 553 fctx = ctx.filectx(safe_bytes(path))
542 554 return fctx.data()
543 555
544 556 @reraise_safe_exceptions
545 557 def fctx_flags(self, wire, commit_id, path):
546 558 cache_on, context_uid, repo_id = self._cache_on(wire)
547 559 region = self._region(wire)
548 560
549 561 @region.conditional_cache_on_arguments(condition=cache_on)
550 562 def _fctx_flags(_repo_id, _commit_id, _path):
551 563 repo = self._factory.repo(wire)
552 564 ctx = self._get_ctx(repo, commit_id)
553 565 fctx = ctx.filectx(safe_bytes(path))
554 566 return fctx.flags()
555 567
556 568 return _fctx_flags(repo_id, commit_id, path)
557 569
558 570 @reraise_safe_exceptions
559 571 def fctx_size(self, wire, commit_id, path):
560 572 cache_on, context_uid, repo_id = self._cache_on(wire)
561 573 region = self._region(wire)
562 574
563 575 @region.conditional_cache_on_arguments(condition=cache_on)
564 576 def _fctx_size(_repo_id, _revision, _path):
565 577 repo = self._factory.repo(wire)
566 578 ctx = self._get_ctx(repo, commit_id)
567 579 fctx = ctx.filectx(safe_bytes(path))
568 580 return fctx.size()
569 581 return _fctx_size(repo_id, commit_id, path)
570 582
571 583 @reraise_safe_exceptions
572 584 def get_all_commit_ids(self, wire, name):
573 585 cache_on, context_uid, repo_id = self._cache_on(wire)
574 586 region = self._region(wire)
575 587
576 588 @region.conditional_cache_on_arguments(condition=cache_on)
577 589 def _get_all_commit_ids(_context_uid, _repo_id, _name):
578 590 repo = self._factory.repo(wire)
579 591 revs = [ascii_str(repo[x].hex()) for x in repo.filtered(b'visible').changelog.revs()]
580 592 return revs
581 593 return _get_all_commit_ids(context_uid, repo_id, name)
582 594
583 595 @reraise_safe_exceptions
584 596 def get_config_value(self, wire, section, name, untrusted=False):
585 597 repo = self._factory.repo(wire)
586 return repo.ui.config(section, name, untrusted=untrusted)
598 return repo.ui.config(ascii_bytes(section), ascii_bytes(name), untrusted=untrusted)
587 599
588 600 @reraise_safe_exceptions
589 601 def is_large_file(self, wire, commit_id, path):
590 602 cache_on, context_uid, repo_id = self._cache_on(wire)
591 603 region = self._region(wire)
592 604
593 605 @region.conditional_cache_on_arguments(condition=cache_on)
594 606 def _is_large_file(_context_uid, _repo_id, _commit_id, _path):
595 607 return largefiles.lfutil.isstandin(safe_bytes(path))
596 608
597 609 return _is_large_file(context_uid, repo_id, commit_id, path)
598 610
599 611 @reraise_safe_exceptions
600 612 def is_binary(self, wire, revision, path):
601 613 cache_on, context_uid, repo_id = self._cache_on(wire)
602 614 region = self._region(wire)
603 615
604 616 @region.conditional_cache_on_arguments(condition=cache_on)
605 617 def _is_binary(_repo_id, _sha, _path):
606 618 repo = self._factory.repo(wire)
607 619 ctx = self._get_ctx(repo, revision)
608 620 fctx = ctx.filectx(safe_bytes(path))
609 621 return fctx.isbinary()
610 622
611 623 return _is_binary(repo_id, revision, path)
612 624
613 625 @reraise_safe_exceptions
614 626 def in_largefiles_store(self, wire, sha):
615 627 repo = self._factory.repo(wire)
616 628 return largefiles.lfutil.instore(repo, sha)
617 629
618 630 @reraise_safe_exceptions
619 631 def in_user_cache(self, wire, sha):
620 632 repo = self._factory.repo(wire)
621 633 return largefiles.lfutil.inusercache(repo.ui, sha)
622 634
623 635 @reraise_safe_exceptions
624 636 def store_path(self, wire, sha):
625 637 repo = self._factory.repo(wire)
626 638 return largefiles.lfutil.storepath(repo, sha)
627 639
628 640 @reraise_safe_exceptions
629 641 def link(self, wire, sha, path):
630 642 repo = self._factory.repo(wire)
631 643 largefiles.lfutil.link(
632 644 largefiles.lfutil.usercachepath(repo.ui, sha), path)
633 645
634 646 @reraise_safe_exceptions
635 647 def localrepository(self, wire, create=False):
636 648 self._factory.repo(wire, create=create)
637 649
638 650 @reraise_safe_exceptions
639 651 def lookup(self, wire, revision, both):
640 652 cache_on, context_uid, repo_id = self._cache_on(wire)
641 653 region = self._region(wire)
642 654
643 655 @region.conditional_cache_on_arguments(condition=cache_on)
644 656 def _lookup(_context_uid, _repo_id, _revision, _both):
645 657
646 658 repo = self._factory.repo(wire)
647 659 rev = _revision
648 660 if isinstance(rev, int):
649 661 # NOTE(marcink):
650 662 # since Mercurial doesn't support negative indexes properly
651 663 # we need to shift accordingly by one to get proper index, e.g
652 664 # repo[-1] => repo[-2]
653 665 # repo[0] => repo[-1]
654 666 if rev <= 0:
655 667 rev = rev + -1
656 668 try:
657 669 ctx = self._get_ctx(repo, rev)
658 670 except (TypeError, RepoLookupError) as e:
659 671 e._org_exc_tb = traceback.format_exc()
660 672 raise exceptions.LookupException(e)(rev)
661 673 except LookupError as e:
662 674 e._org_exc_tb = traceback.format_exc()
663 675 raise exceptions.LookupException(e)(e.name)
664 676
665 677 if not both:
666 678 return ctx.hex()
667 679
668 680 ctx = repo[ctx.hex()]
669 681 return ctx.hex(), ctx.rev()
670 682
671 683 return _lookup(context_uid, repo_id, revision, both)
672 684
673 685 @reraise_safe_exceptions
674 686 def sync_push(self, wire, url):
675 687 if not self.check_url(url, wire['config']):
676 688 return
677 689
678 690 repo = self._factory.repo(wire)
679 691
680 692 # Disable any prompts for this repo
681 693 repo.ui.setconfig(b'ui', b'interactive', b'off', b'-y')
682 694
683 695 bookmarks = list(dict(repo._bookmarks).keys())
684 remote = peer(repo, {}, url)
696 remote = peer(repo, {}, safe_bytes(url))
685 697 # Disable any prompts for this remote
686 698 remote.ui.setconfig(b'ui', b'interactive', b'off', b'-y')
687 699
688 700 return exchange.push(
689 701 repo, remote, newbranch=True, bookmarks=bookmarks).cgresult
690 702
691 703 @reraise_safe_exceptions
692 704 def revision(self, wire, rev):
693 705 repo = self._factory.repo(wire)
694 706 ctx = self._get_ctx(repo, rev)
695 707 return ctx.rev()
696 708
697 709 @reraise_safe_exceptions
698 710 def rev_range(self, wire, commit_filter):
699 711 cache_on, context_uid, repo_id = self._cache_on(wire)
700 712 region = self._region(wire)
701 713
702 714 @region.conditional_cache_on_arguments(condition=cache_on)
703 715 def _rev_range(_context_uid, _repo_id, _filter):
704 716 repo = self._factory.repo(wire)
705 717 revisions = [
706 718 ascii_str(repo[rev].hex())
707 719 for rev in revrange(repo, list(map(ascii_bytes, commit_filter)))
708 720 ]
709 721 return revisions
710 722
711 723 return _rev_range(context_uid, repo_id, sorted(commit_filter))
712 724
713 725 @reraise_safe_exceptions
714 726 def rev_range_hash(self, wire, node):
715 727 repo = self._factory.repo(wire)
716 728
717 729 def get_revs(repo, rev_opt):
718 730 if rev_opt:
719 731 revs = revrange(repo, rev_opt)
720 732 if len(revs) == 0:
721 733 return (nullrev, nullrev)
722 734 return max(revs), min(revs)
723 735 else:
724 736 return len(repo) - 1, 0
725 737
726 738 stop, start = get_revs(repo, [node + ':'])
727 739 revs = [ascii_str(repo[r].hex()) for r in range(start, stop + 1)]
728 740 return revs
729 741
730 742 @reraise_safe_exceptions
731 743 def revs_from_revspec(self, wire, rev_spec, *args, **kwargs):
732 744 other_path = kwargs.pop('other_path', None)
733 745
734 746 # case when we want to compare two independent repositories
735 747 if other_path and other_path != wire["path"]:
736 748 baseui = self._factory._create_config(wire["config"])
737 749 repo = unionrepo.makeunionrepository(baseui, other_path, wire["path"])
738 750 else:
739 751 repo = self._factory.repo(wire)
740 752 return list(repo.revs(rev_spec, *args))
741 753
742 754 @reraise_safe_exceptions
743 755 def verify(self, wire,):
744 756 repo = self._factory.repo(wire)
745 757 baseui = self._factory._create_config(wire['config'])
746 758
747 759 baseui, output = patch_ui_message_output(baseui)
748 760
749 761 repo.ui = baseui
750 762 verify.verify(repo)
751 763 return output.getvalue()
752 764
753 765 @reraise_safe_exceptions
754 766 def hg_update_cache(self, wire,):
755 767 repo = self._factory.repo(wire)
756 768 baseui = self._factory._create_config(wire['config'])
757 769 baseui, output = patch_ui_message_output(baseui)
758 770
759 771 repo.ui = baseui
760 772 with repo.wlock(), repo.lock():
761 773 repo.updatecaches(full=True)
762 774
763 775 return output.getvalue()
764 776
765 777 @reraise_safe_exceptions
766 778 def hg_rebuild_fn_cache(self, wire,):
767 779 repo = self._factory.repo(wire)
768 780 baseui = self._factory._create_config(wire['config'])
769 781 baseui, output = patch_ui_message_output(baseui)
770 782
771 783 repo.ui = baseui
772 784
773 785 repair.rebuildfncache(baseui, repo)
774 786
775 787 return output.getvalue()
776 788
777 789 @reraise_safe_exceptions
778 790 def tags(self, wire):
779 791 cache_on, context_uid, repo_id = self._cache_on(wire)
780 792 region = self._region(wire)
781 793
782 794 @region.conditional_cache_on_arguments(condition=cache_on)
783 795 def _tags(_context_uid, _repo_id):
784 796 repo = self._factory.repo(wire)
785 return repo.tags()
797 return {safe_str(name): ascii_str(hex(sha)) for name, sha in repo.tags().items()}
786 798
787 799 return _tags(context_uid, repo_id)
788 800
789 801 @reraise_safe_exceptions
790 802 def update(self, wire, node=None, clean=False):
791 803 repo = self._factory.repo(wire)
792 804 baseui = self._factory._create_config(wire['config'])
793 805 commands.update(baseui, repo, node=node, clean=clean)
794 806
795 807 @reraise_safe_exceptions
796 808 def identify(self, wire):
797 809 repo = self._factory.repo(wire)
798 810 baseui = self._factory._create_config(wire['config'])
799 811 output = io.BytesIO()
800 812 baseui.write = output.write
801 813 # This is required to get a full node id
802 814 baseui.debugflag = True
803 815 commands.identify(baseui, repo, id=True)
804 816
805 817 return output.getvalue()
806 818
807 819 @reraise_safe_exceptions
808 820 def heads(self, wire, branch=None):
809 821 repo = self._factory.repo(wire)
810 822 baseui = self._factory._create_config(wire['config'])
811 823 output = io.BytesIO()
812 824
813 825 def write(data, **unused_kwargs):
814 826 output.write(data)
815 827
816 828 baseui.write = write
817 829 if branch:
818 args = [branch]
830 args = [safe_bytes(branch)]
819 831 else:
820 832 args = []
821 commands.heads(baseui, repo, template='{node} ', *args)
833 commands.heads(baseui, repo, template=b'{node} ', *args)
822 834
823 835 return output.getvalue()
824 836
825 837 @reraise_safe_exceptions
826 838 def ancestor(self, wire, revision1, revision2):
827 839 repo = self._factory.repo(wire)
828 840 changelog = repo.changelog
829 841 lookup = repo.lookup
830 842 a = changelog.ancestor(lookup(revision1), lookup(revision2))
831 843 return hex(a)
832 844
833 845 @reraise_safe_exceptions
834 846 def clone(self, wire, source, dest, update_after_clone=False, hooks=True):
835 847 baseui = self._factory._create_config(wire["config"], hooks=hooks)
836 clone(baseui, source, dest, noupdate=not update_after_clone)
848 clone(baseui, safe_bytes(source), safe_bytes(dest), noupdate=not update_after_clone)
837 849
838 850 @reraise_safe_exceptions
839 851 def commitctx(self, wire, message, parents, commit_time, commit_timezone, user, files, extra, removed, updated):
840 852
841 853 repo = self._factory.repo(wire)
842 854 baseui = self._factory._create_config(wire['config'])
843 publishing = baseui.configbool('phases', 'publish')
844 if publishing:
845 new_commit = 'public'
846 else:
847 new_commit = 'draft'
855 publishing = baseui.configbool(b'phases', b'publish')
848 856
849 def _filectxfn(_repo, ctx, path):
857 def _filectxfn(_repo, ctx, path: bytes):
850 858 """
851 859 Marks given path as added/changed/removed in a given _repo. This is
852 860 for internal mercurial commit function.
853 861 """
854 862
855 863 # check if this path is removed
856 if path in removed:
864 if safe_str(path) in removed:
857 865 # returning None is a way to mark node for removal
858 866 return None
859 867
860 868 # check if this path is added
861 869 for node in updated:
862 if node['path'] == path:
870 if safe_bytes(node['path']) == path:
863 871 return memfilectx(
864 872 _repo,
865 873 changectx=ctx,
866 path=node['path'],
867 data=node['content'],
874 path=safe_bytes(node['path']),
875 data=safe_bytes(node['content']),
868 876 islink=False,
869 877 isexec=bool(node['mode'] & stat.S_IXUSR),
870 878 copysource=False)
879 abort_exc = exceptions.AbortException()
880 raise abort_exc(f"Given path haven't been marked as added, changed or removed ({path})")
871 881
872 raise exceptions.AbortException()(
873 "Given path haven't been marked as added, "
874 "changed or removed (%s)" % path)
875
876 with repo.ui.configoverride({('phases', 'new-commit'): new_commit}):
877
882 if publishing:
883 new_commit_phase = b'public'
884 else:
885 new_commit_phase = b'draft'
886 with repo.ui.configoverride({(b'phases', b'new-commit'): new_commit_phase}):
887 kwargs = {safe_bytes(k): safe_bytes(v) for k, v in extra.items()}
878 888 commit_ctx = memctx(
879 889 repo=repo,
880 890 parents=parents,
881 text=message,
882 files=files,
891 text=safe_bytes(message),
892 files=[safe_bytes(x) for x in files],
883 893 filectxfn=_filectxfn,
884 user=user,
894 user=safe_bytes(user),
885 895 date=(commit_time, commit_timezone),
886 extra=extra)
896 extra=kwargs)
887 897
888 898 n = repo.commitctx(commit_ctx)
889 899 new_id = hex(n)
890 900
891 901 return new_id
892 902
893 903 @reraise_safe_exceptions
894 904 def pull(self, wire, url, commit_ids=None):
895 905 repo = self._factory.repo(wire)
896 906 # Disable any prompts for this repo
897 907 repo.ui.setconfig(b'ui', b'interactive', b'off', b'-y')
898 908
899 remote = peer(repo, {}, url)
909 remote = peer(repo, {}, safe_bytes(url))
900 910 # Disable any prompts for this remote
901 911 remote.ui.setconfig(b'ui', b'interactive', b'off', b'-y')
902 912
903 913 if commit_ids:
904 914 commit_ids = [bin(commit_id) for commit_id in commit_ids]
905 915
906 916 return exchange.pull(
907 917 repo, remote, heads=commit_ids, force=None).cgresult
908 918
909 919 @reraise_safe_exceptions
910 920 def pull_cmd(self, wire, source, bookmark=None, branch=None, revision=None, hooks=True):
911 921 repo = self._factory.repo(wire)
912 922 baseui = self._factory._create_config(wire['config'], hooks=hooks)
913 923
914 924 # Mercurial internally has a lot of logic that checks ONLY if
915 925 # option is defined, we just pass those if they are defined then
916 926 opts = {}
917 927 if bookmark:
918 928 opts['bookmark'] = bookmark
919 929 if branch:
920 930 opts['branch'] = branch
921 931 if revision:
922 932 opts['rev'] = revision
923 933
924 934 commands.pull(baseui, repo, source, **opts)
925 935
926 936 @reraise_safe_exceptions
927 937 def push(self, wire, revisions, dest_path, hooks=True, push_branches=False):
928 938 repo = self._factory.repo(wire)
929 939 baseui = self._factory._create_config(wire['config'], hooks=hooks)
930 940 commands.push(baseui, repo, dest=dest_path, rev=revisions,
931 941 new_branch=push_branches)
932 942
933 943 @reraise_safe_exceptions
934 944 def strip(self, wire, revision, update, backup):
935 945 repo = self._factory.repo(wire)
936 946 ctx = self._get_ctx(repo, revision)
937 947 hgext_strip(
938 948 repo.baseui, repo, ctx.node(), update=update, backup=backup)
939 949
940 950 @reraise_safe_exceptions
941 951 def get_unresolved_files(self, wire):
942 952 repo = self._factory.repo(wire)
943 953
944 954 log.debug('Calculating unresolved files for repo: %s', repo)
945 955 output = io.BytesIO()
946 956
947 957 def write(data, **unused_kwargs):
948 958 output.write(data)
949 959
950 960 baseui = self._factory._create_config(wire['config'])
951 961 baseui.write = write
952 962
953 963 commands.resolve(baseui, repo, list=True)
954 964 unresolved = output.getvalue().splitlines(0)
955 965 return unresolved
956 966
957 967 @reraise_safe_exceptions
958 968 def merge(self, wire, revision):
959 969 repo = self._factory.repo(wire)
960 970 baseui = self._factory._create_config(wire['config'])
961 971 repo.ui.setconfig(b'ui', b'merge', b'internal:dump')
962 972
963 973 # In case of sub repositories are used mercurial prompts the user in
964 974 # case of merge conflicts or different sub repository sources. By
965 975 # setting the interactive flag to `False` mercurial doesn't prompt the
966 976 # used but instead uses a default value.
967 977 repo.ui.setconfig(b'ui', b'interactive', False)
968 978 commands.merge(baseui, repo, rev=revision)
969 979
970 980 @reraise_safe_exceptions
971 981 def merge_state(self, wire):
972 982 repo = self._factory.repo(wire)
973 983 repo.ui.setconfig(b'ui', b'merge', b'internal:dump')
974 984
975 985 # In case of sub repositories are used mercurial prompts the user in
976 986 # case of merge conflicts or different sub repository sources. By
977 987 # setting the interactive flag to `False` mercurial doesn't prompt the
978 988 # used but instead uses a default value.
979 989 repo.ui.setconfig(b'ui', b'interactive', False)
980 990 ms = hg_merge.mergestate(repo)
981 991 return [x for x in ms.unresolved()]
982 992
983 993 @reraise_safe_exceptions
984 994 def commit(self, wire, message, username, close_branch=False):
985 995 repo = self._factory.repo(wire)
986 996 baseui = self._factory._create_config(wire['config'])
987 997 repo.ui.setconfig(b'ui', b'username', username)
988 998 commands.commit(baseui, repo, message=message, close_branch=close_branch)
989 999
990 1000 @reraise_safe_exceptions
991 1001 def rebase(self, wire, source=None, dest=None, abort=False):
992 1002 repo = self._factory.repo(wire)
993 1003 baseui = self._factory._create_config(wire['config'])
994 1004 repo.ui.setconfig(b'ui', b'merge', b'internal:dump')
995 1005 # In case of sub repositories are used mercurial prompts the user in
996 1006 # case of merge conflicts or different sub repository sources. By
997 1007 # setting the interactive flag to `False` mercurial doesn't prompt the
998 1008 # used but instead uses a default value.
999 1009 repo.ui.setconfig(b'ui', b'interactive', False)
1000 1010 rebase.rebase(baseui, repo, base=source, dest=dest, abort=abort, keep=not abort)
1001 1011
1002 1012 @reraise_safe_exceptions
1003 1013 def tag(self, wire, name, revision, message, local, user, tag_time, tag_timezone):
1004 1014 repo = self._factory.repo(wire)
1005 1015 ctx = self._get_ctx(repo, revision)
1006 1016 node = ctx.node()
1007 1017
1008 1018 date = (tag_time, tag_timezone)
1009 1019 try:
1010 1020 hg_tag.tag(repo, name, node, message, local, user, date)
1011 1021 except Abort as e:
1012 1022 log.exception("Tag operation aborted")
1013 1023 # Exception can contain unicode which we convert
1014 1024 raise exceptions.AbortException(e)(repr(e))
1015 1025
1016 1026 @reraise_safe_exceptions
1017 1027 def bookmark(self, wire, bookmark, revision=None):
1018 1028 repo = self._factory.repo(wire)
1019 1029 baseui = self._factory._create_config(wire['config'])
1020 1030 commands.bookmark(baseui, repo, bookmark, rev=revision, force=True)
1021 1031
1022 1032 @reraise_safe_exceptions
1023 1033 def install_hooks(self, wire, force=False):
1024 1034 # we don't need any special hooks for Mercurial
1025 1035 pass
1026 1036
1027 1037 @reraise_safe_exceptions
1028 1038 def get_hooks_info(self, wire):
1029 1039 return {
1030 1040 'pre_version': vcsserver.__version__,
1031 1041 'post_version': vcsserver.__version__,
1032 1042 }
1033 1043
1034 1044 @reraise_safe_exceptions
1035 1045 def set_head_ref(self, wire, head_name):
1036 1046 pass
1037 1047
1038 1048 @reraise_safe_exceptions
1039 1049 def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path,
1040 1050 archive_dir_name, commit_id):
1041 1051
1042 1052 def file_walker(_commit_id, path):
1043 1053 repo = self._factory.repo(wire)
1044 1054 ctx = repo[_commit_id]
1045 1055 is_root = path in ['', '/']
1046 1056 if is_root:
1047 1057 matcher = alwaysmatcher(badfn=None)
1048 1058 else:
1049 1059 matcher = patternmatcher('', [(b'glob', path+'/**', b'')], badfn=None)
1050 1060 file_iter = ctx.manifest().walk(matcher)
1051 1061
1052 1062 for fn in file_iter:
1053 1063 file_path = fn
1054 1064 flags = ctx.flags(fn)
1055 1065 mode = b'x' in flags and 0o755 or 0o644
1056 1066 is_link = b'l' in flags
1057 1067
1058 1068 yield ArchiveNode(file_path, mode, is_link, ctx[fn].data)
1059 1069
1060 1070 return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path,
1061 1071 archive_dir_name, commit_id)
1062 1072
@@ -1,864 +1,864 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18
19 19 import os
20 20 import subprocess
21 21 from urllib.error import URLError
22 22 import urllib.parse
23 23 import logging
24 24 import posixpath as vcspath
25 25 import io
26 26 import urllib.request
27 27 import urllib.parse
28 28 import urllib.error
29 29 import traceback
30 30
31 31 import svn.client
32 32 import svn.core
33 33 import svn.delta
34 34 import svn.diff
35 35 import svn.fs
36 36 import svn.repos
37 37
38 38 from vcsserver import svn_diff, exceptions, subprocessio, settings
39 39 from vcsserver.base import RepoFactory, raise_from_original, ArchiveNode, archive_repo
40 40 from vcsserver.exceptions import NoContentException
41 41 from vcsserver.str_utils import safe_str
42 42 from vcsserver.vcs_base import RemoteBase
43 43 from vcsserver.lib.svnremoterepo import svnremoterepo
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 svn_compatible_versions_map = {
48 48 'pre-1.4-compatible': '1.3',
49 49 'pre-1.5-compatible': '1.4',
50 50 'pre-1.6-compatible': '1.5',
51 51 'pre-1.8-compatible': '1.7',
52 52 'pre-1.9-compatible': '1.8',
53 53 }
54 54
55 55 current_compatible_version = '1.14'
56 56
57 57
58 58 def reraise_safe_exceptions(func):
59 59 """Decorator for converting svn exceptions to something neutral."""
60 60 def wrapper(*args, **kwargs):
61 61 try:
62 62 return func(*args, **kwargs)
63 63 except Exception as e:
64 64 if not hasattr(e, '_vcs_kind'):
65 65 log.exception("Unhandled exception in svn remote call")
66 66 raise_from_original(exceptions.UnhandledException(e))
67 67 raise
68 68 return wrapper
69 69
70 70
71 71 class SubversionFactory(RepoFactory):
72 72 repo_type = 'svn'
73 73
74 74 def _create_repo(self, wire, create, compatible_version):
75 75 path = svn.core.svn_path_canonicalize(wire['path'])
76 76 if create:
77 77 fs_config = {'compatible-version': current_compatible_version}
78 78 if compatible_version:
79 79
80 80 compatible_version_string = \
81 81 svn_compatible_versions_map.get(compatible_version) \
82 82 or compatible_version
83 83 fs_config['compatible-version'] = compatible_version_string
84 84
85 85 log.debug('Create SVN repo with config "%s"', fs_config)
86 86 repo = svn.repos.create(path, "", "", None, fs_config)
87 87 else:
88 88 repo = svn.repos.open(path)
89 89
90 90 log.debug('Got SVN object: %s', repo)
91 91 return repo
92 92
93 93 def repo(self, wire, create=False, compatible_version=None):
94 94 """
95 95 Get a repository instance for the given path.
96 96 """
97 97 return self._create_repo(wire, create, compatible_version)
98 98
99 99
100 100 NODE_TYPE_MAPPING = {
101 101 svn.core.svn_node_file: 'file',
102 102 svn.core.svn_node_dir: 'dir',
103 103 }
104 104
105 105
106 106 class SvnRemote(RemoteBase):
107 107
108 108 def __init__(self, factory, hg_factory=None):
109 109 self._factory = factory
110 110
111 111 @reraise_safe_exceptions
112 112 def discover_svn_version(self):
113 113 try:
114 114 import svn.core
115 115 svn_ver = svn.core.SVN_VERSION
116 116 except ImportError:
117 117 svn_ver = None
118 return svn_ver
118 return safe_str(svn_ver)
119 119
120 120 @reraise_safe_exceptions
121 121 def is_empty(self, wire):
122 122
123 123 try:
124 124 return self.lookup(wire, -1) == 0
125 125 except Exception:
126 126 log.exception("failed to read object_store")
127 127 return False
128 128
129 129 def check_url(self, url):
130 130
131 131 # uuid function get's only valid UUID from proper repo, else
132 132 # throws exception
133 133 username, password, src_url = self.get_url_and_credentials(url)
134 134 try:
135 135 svnremoterepo(username, password, src_url).svn().uuid
136 136 except Exception:
137 137 tb = traceback.format_exc()
138 138 log.debug("Invalid Subversion url: `%s`, tb: %s", url, tb)
139 139 raise URLError(
140 140 '"%s" is not a valid Subversion source url.' % (url, ))
141 141 return True
142 142
143 143 def is_path_valid_repository(self, wire, path):
144 144
145 145 # NOTE(marcink): short circuit the check for SVN repo
146 146 # the repos.open might be expensive to check, but we have one cheap
147 147 # pre condition that we can use, to check for 'format' file
148 148
149 149 if not os.path.isfile(os.path.join(path, 'format')):
150 150 return False
151 151
152 152 try:
153 153 svn.repos.open(path)
154 154 except svn.core.SubversionException:
155 155 tb = traceback.format_exc()
156 156 log.debug("Invalid Subversion path `%s`, tb: %s", path, tb)
157 157 return False
158 158 return True
159 159
160 160 @reraise_safe_exceptions
161 161 def verify(self, wire,):
162 162 repo_path = wire['path']
163 163 if not self.is_path_valid_repository(wire, repo_path):
164 164 raise Exception(
165 165 "Path %s is not a valid Subversion repository." % repo_path)
166 166
167 167 cmd = ['svnadmin', 'info', repo_path]
168 168 stdout, stderr = subprocessio.run_command(cmd)
169 169 return stdout
170 170
171 171 def lookup(self, wire, revision):
172 172 if revision not in [-1, None, 'HEAD']:
173 173 raise NotImplementedError
174 174 repo = self._factory.repo(wire)
175 175 fs_ptr = svn.repos.fs(repo)
176 176 head = svn.fs.youngest_rev(fs_ptr)
177 177 return head
178 178
179 179 def lookup_interval(self, wire, start_ts, end_ts):
180 180 repo = self._factory.repo(wire)
181 181 fsobj = svn.repos.fs(repo)
182 182 start_rev = None
183 183 end_rev = None
184 184 if start_ts:
185 185 start_ts_svn = apr_time_t(start_ts)
186 186 start_rev = svn.repos.dated_revision(repo, start_ts_svn) + 1
187 187 else:
188 188 start_rev = 1
189 189 if end_ts:
190 190 end_ts_svn = apr_time_t(end_ts)
191 191 end_rev = svn.repos.dated_revision(repo, end_ts_svn)
192 192 else:
193 193 end_rev = svn.fs.youngest_rev(fsobj)
194 194 return start_rev, end_rev
195 195
196 196 def revision_properties(self, wire, revision):
197 197
198 198 cache_on, context_uid, repo_id = self._cache_on(wire)
199 199 region = self._region(wire)
200 200 @region.conditional_cache_on_arguments(condition=cache_on)
201 201 def _revision_properties(_repo_id, _revision):
202 202 repo = self._factory.repo(wire)
203 203 fs_ptr = svn.repos.fs(repo)
204 204 return svn.fs.revision_proplist(fs_ptr, revision)
205 205 return _revision_properties(repo_id, revision)
206 206
207 207 def revision_changes(self, wire, revision):
208 208
209 209 repo = self._factory.repo(wire)
210 210 fsobj = svn.repos.fs(repo)
211 211 rev_root = svn.fs.revision_root(fsobj, revision)
212 212
213 213 editor = svn.repos.ChangeCollector(fsobj, rev_root)
214 214 editor_ptr, editor_baton = svn.delta.make_editor(editor)
215 215 base_dir = ""
216 216 send_deltas = False
217 217 svn.repos.replay2(
218 218 rev_root, base_dir, svn.core.SVN_INVALID_REVNUM, send_deltas,
219 219 editor_ptr, editor_baton, None)
220 220
221 221 added = []
222 222 changed = []
223 223 removed = []
224 224
225 225 # TODO: CHANGE_ACTION_REPLACE: Figure out where it belongs
226 226 for path, change in editor.changes.items():
227 227 # TODO: Decide what to do with directory nodes. Subversion can add
228 228 # empty directories.
229 229
230 230 if change.item_kind == svn.core.svn_node_dir:
231 231 continue
232 232 if change.action in [svn.repos.CHANGE_ACTION_ADD]:
233 233 added.append(path)
234 234 elif change.action in [svn.repos.CHANGE_ACTION_MODIFY,
235 235 svn.repos.CHANGE_ACTION_REPLACE]:
236 236 changed.append(path)
237 237 elif change.action in [svn.repos.CHANGE_ACTION_DELETE]:
238 238 removed.append(path)
239 239 else:
240 240 raise NotImplementedError(
241 241 "Action %s not supported on path %s" % (
242 242 change.action, path))
243 243
244 244 changes = {
245 245 'added': added,
246 246 'changed': changed,
247 247 'removed': removed,
248 248 }
249 249 return changes
250 250
251 251 @reraise_safe_exceptions
252 252 def node_history(self, wire, path, revision, limit):
253 253 cache_on, context_uid, repo_id = self._cache_on(wire)
254 254 region = self._region(wire)
255 255 @region.conditional_cache_on_arguments(condition=cache_on)
256 256 def _assert_correct_path(_context_uid, _repo_id, _path, _revision, _limit):
257 257 cross_copies = False
258 258 repo = self._factory.repo(wire)
259 259 fsobj = svn.repos.fs(repo)
260 260 rev_root = svn.fs.revision_root(fsobj, revision)
261 261
262 262 history_revisions = []
263 263 history = svn.fs.node_history(rev_root, path)
264 264 history = svn.fs.history_prev(history, cross_copies)
265 265 while history:
266 266 __, node_revision = svn.fs.history_location(history)
267 267 history_revisions.append(node_revision)
268 268 if limit and len(history_revisions) >= limit:
269 269 break
270 270 history = svn.fs.history_prev(history, cross_copies)
271 271 return history_revisions
272 272 return _assert_correct_path(context_uid, repo_id, path, revision, limit)
273 273
274 274 def node_properties(self, wire, path, revision):
275 275 cache_on, context_uid, repo_id = self._cache_on(wire)
276 276 region = self._region(wire)
277 277 @region.conditional_cache_on_arguments(condition=cache_on)
278 278 def _node_properties(_repo_id, _path, _revision):
279 279 repo = self._factory.repo(wire)
280 280 fsobj = svn.repos.fs(repo)
281 281 rev_root = svn.fs.revision_root(fsobj, revision)
282 282 return svn.fs.node_proplist(rev_root, path)
283 283 return _node_properties(repo_id, path, revision)
284 284
285 285 def file_annotate(self, wire, path, revision):
286 286 abs_path = 'file://' + urllib.request.pathname2url(
287 287 vcspath.join(wire['path'], path))
288 288 file_uri = svn.core.svn_path_canonicalize(abs_path)
289 289
290 290 start_rev = svn_opt_revision_value_t(0)
291 291 peg_rev = svn_opt_revision_value_t(revision)
292 292 end_rev = peg_rev
293 293
294 294 annotations = []
295 295
296 296 def receiver(line_no, revision, author, date, line, pool):
297 297 annotations.append((line_no, revision, line))
298 298
299 299 # TODO: Cannot use blame5, missing typemap function in the swig code
300 300 try:
301 301 svn.client.blame2(
302 302 file_uri, peg_rev, start_rev, end_rev,
303 303 receiver, svn.client.create_context())
304 304 except svn.core.SubversionException as exc:
305 305 log.exception("Error during blame operation.")
306 306 raise Exception(
307 307 "Blame not supported or file does not exist at path %s. "
308 308 "Error %s." % (path, exc))
309 309
310 310 return annotations
311 311
312 312 def get_node_type(self, wire, path, revision=None):
313 313
314 314 cache_on, context_uid, repo_id = self._cache_on(wire)
315 315 region = self._region(wire)
316 316 @region.conditional_cache_on_arguments(condition=cache_on)
317 317 def _get_node_type(_repo_id, _path, _revision):
318 318 repo = self._factory.repo(wire)
319 319 fs_ptr = svn.repos.fs(repo)
320 320 if _revision is None:
321 321 _revision = svn.fs.youngest_rev(fs_ptr)
322 322 root = svn.fs.revision_root(fs_ptr, _revision)
323 323 node = svn.fs.check_path(root, path)
324 324 return NODE_TYPE_MAPPING.get(node, None)
325 325 return _get_node_type(repo_id, path, revision)
326 326
327 327 def get_nodes(self, wire, path, revision=None):
328 328
329 329 cache_on, context_uid, repo_id = self._cache_on(wire)
330 330 region = self._region(wire)
331 331 @region.conditional_cache_on_arguments(condition=cache_on)
332 332 def _get_nodes(_repo_id, _path, _revision):
333 333 repo = self._factory.repo(wire)
334 334 fsobj = svn.repos.fs(repo)
335 335 if _revision is None:
336 336 _revision = svn.fs.youngest_rev(fsobj)
337 337 root = svn.fs.revision_root(fsobj, _revision)
338 338 entries = svn.fs.dir_entries(root, path)
339 339 result = []
340 340 for entry_path, entry_info in entries.items():
341 341 result.append(
342 342 (entry_path, NODE_TYPE_MAPPING.get(entry_info.kind, None)))
343 343 return result
344 344 return _get_nodes(repo_id, path, revision)
345 345
346 346 def get_file_content(self, wire, path, rev=None):
347 347 repo = self._factory.repo(wire)
348 348 fsobj = svn.repos.fs(repo)
349 349 if rev is None:
350 350 rev = svn.fs.youngest_revision(fsobj)
351 351 root = svn.fs.revision_root(fsobj, rev)
352 352 content = svn.core.Stream(svn.fs.file_contents(root, path))
353 353 return content.read()
354 354
355 355 def get_file_size(self, wire, path, revision=None):
356 356
357 357 cache_on, context_uid, repo_id = self._cache_on(wire)
358 358 region = self._region(wire)
359 359
360 360 @region.conditional_cache_on_arguments(condition=cache_on)
361 361 def _get_file_size(_repo_id, _path, _revision):
362 362 repo = self._factory.repo(wire)
363 363 fsobj = svn.repos.fs(repo)
364 364 if _revision is None:
365 365 _revision = svn.fs.youngest_revision(fsobj)
366 366 root = svn.fs.revision_root(fsobj, _revision)
367 367 size = svn.fs.file_length(root, path)
368 368 return size
369 369 return _get_file_size(repo_id, path, revision)
370 370
371 371 def create_repository(self, wire, compatible_version=None):
372 372 log.info('Creating Subversion repository in path "%s"', wire['path'])
373 373 self._factory.repo(wire, create=True,
374 374 compatible_version=compatible_version)
375 375
376 376 def get_url_and_credentials(self, src_url):
377 377 obj = urllib.parse.urlparse(src_url)
378 378 username = obj.username or None
379 379 password = obj.password or None
380 380 return username, password, src_url
381 381
382 382 def import_remote_repository(self, wire, src_url):
383 383 repo_path = wire['path']
384 384 if not self.is_path_valid_repository(wire, repo_path):
385 385 raise Exception(
386 386 "Path %s is not a valid Subversion repository." % repo_path)
387 387
388 388 username, password, src_url = self.get_url_and_credentials(src_url)
389 389 rdump_cmd = ['svnrdump', 'dump', '--non-interactive',
390 390 '--trust-server-cert-failures=unknown-ca']
391 391 if username and password:
392 392 rdump_cmd += ['--username', username, '--password', password]
393 393 rdump_cmd += [src_url]
394 394
395 395 rdump = subprocess.Popen(
396 396 rdump_cmd,
397 397 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
398 398 load = subprocess.Popen(
399 399 ['svnadmin', 'load', repo_path], stdin=rdump.stdout)
400 400
401 401 # TODO: johbo: This can be a very long operation, might be better
402 402 # to track some kind of status and provide an api to check if the
403 403 # import is done.
404 404 rdump.wait()
405 405 load.wait()
406 406
407 407 log.debug('Return process ended with code: %s', rdump.returncode)
408 408 if rdump.returncode != 0:
409 409 errors = rdump.stderr.read()
410 410 log.error('svnrdump dump failed: statuscode %s: message: %s', rdump.returncode, errors)
411 411
412 412 reason = 'UNKNOWN'
413 413 if b'svnrdump: E230001:' in errors:
414 414 reason = 'INVALID_CERTIFICATE'
415 415
416 416 if reason == 'UNKNOWN':
417 417 reason = 'UNKNOWN:{}'.format(safe_str(errors))
418 418
419 419 raise Exception(
420 420 'Failed to dump the remote repository from %s. Reason:%s' % (
421 421 src_url, reason))
422 422 if load.returncode != 0:
423 423 raise Exception(
424 424 'Failed to load the dump of remote repository from %s.' %
425 425 (src_url, ))
426 426
427 427 def commit(self, wire, message, author, timestamp, updated, removed):
428 428 assert isinstance(message, str)
429 429 assert isinstance(author, str)
430 430
431 431 repo = self._factory.repo(wire)
432 432 fsobj = svn.repos.fs(repo)
433 433
434 434 rev = svn.fs.youngest_rev(fsobj)
435 435 txn = svn.repos.fs_begin_txn_for_commit(repo, rev, author, message)
436 436 txn_root = svn.fs.txn_root(txn)
437 437
438 438 for node in updated:
439 439 TxnNodeProcessor(node, txn_root).update()
440 440 for node in removed:
441 441 TxnNodeProcessor(node, txn_root).remove()
442 442
443 443 commit_id = svn.repos.fs_commit_txn(repo, txn)
444 444
445 445 if timestamp:
446 446 apr_time = apr_time_t(timestamp)
447 447 ts_formatted = svn.core.svn_time_to_cstring(apr_time)
448 448 svn.fs.change_rev_prop(fsobj, commit_id, 'svn:date', ts_formatted)
449 449
450 450 log.debug('Committed revision "%s" to "%s".', commit_id, wire['path'])
451 451 return commit_id
452 452
453 453 def diff(self, wire, rev1, rev2, path1=None, path2=None,
454 454 ignore_whitespace=False, context=3):
455 455
456 456 wire.update(cache=False)
457 457 repo = self._factory.repo(wire)
458 458 diff_creator = SvnDiffer(
459 459 repo, rev1, path1, rev2, path2, ignore_whitespace, context)
460 460 try:
461 461 return diff_creator.generate_diff()
462 462 except svn.core.SubversionException as e:
463 463 log.exception(
464 464 "Error during diff operation operation. "
465 465 "Path might not exist %s, %s" % (path1, path2))
466 466 return ""
467 467
468 468 @reraise_safe_exceptions
469 469 def is_large_file(self, wire, path):
470 470 return False
471 471
472 472 @reraise_safe_exceptions
473 473 def is_binary(self, wire, rev, path):
474 474 cache_on, context_uid, repo_id = self._cache_on(wire)
475 475
476 476 region = self._region(wire)
477 477 @region.conditional_cache_on_arguments(condition=cache_on)
478 478 def _is_binary(_repo_id, _rev, _path):
479 479 raw_bytes = self.get_file_content(wire, path, rev)
480 480 return raw_bytes and '\0' in raw_bytes
481 481
482 482 return _is_binary(repo_id, rev, path)
483 483
484 484 @reraise_safe_exceptions
485 485 def run_svn_command(self, wire, cmd, **opts):
486 486 path = wire.get('path', None)
487 487
488 488 if path and os.path.isdir(path):
489 489 opts['cwd'] = path
490 490
491 491 safe_call = opts.pop('_safe', False)
492 492
493 493 svnenv = os.environ.copy()
494 494 svnenv.update(opts.pop('extra_env', {}))
495 495
496 496 _opts = {'env': svnenv, 'shell': False}
497 497
498 498 try:
499 499 _opts.update(opts)
500 500 proc = subprocessio.SubprocessIOChunker(cmd, **_opts)
501 501
502 502 return b''.join(proc), b''.join(proc.stderr)
503 503 except OSError as err:
504 504 if safe_call:
505 505 return '', safe_str(err).strip()
506 506 else:
507 cmd = ' '.join(cmd) # human friendly CMD
507 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
508 508 tb_err = ("Couldn't run svn command (%s).\n"
509 509 "Original error was:%s\n"
510 510 "Call options:%s\n"
511 511 % (cmd, err, _opts))
512 512 log.exception(tb_err)
513 513 raise exceptions.VcsException()(tb_err)
514 514
515 515 @reraise_safe_exceptions
516 516 def install_hooks(self, wire, force=False):
517 517 from vcsserver.hook_utils import install_svn_hooks
518 518 repo_path = wire['path']
519 519 binary_dir = settings.BINARY_DIR
520 520 executable = None
521 521 if binary_dir:
522 522 executable = os.path.join(binary_dir, 'python')
523 523 return install_svn_hooks(
524 524 repo_path, executable=executable, force_create=force)
525 525
526 526 @reraise_safe_exceptions
527 527 def get_hooks_info(self, wire):
528 528 from vcsserver.hook_utils import (
529 529 get_svn_pre_hook_version, get_svn_post_hook_version)
530 530 repo_path = wire['path']
531 531 return {
532 532 'pre_version': get_svn_pre_hook_version(repo_path),
533 533 'post_version': get_svn_post_hook_version(repo_path),
534 534 }
535 535
536 536 @reraise_safe_exceptions
537 537 def set_head_ref(self, wire, head_name):
538 538 pass
539 539
540 540 @reraise_safe_exceptions
541 541 def archive_repo(self, wire, archive_dest_path, kind, mtime, archive_at_path,
542 542 archive_dir_name, commit_id):
543 543
544 544 def walk_tree(root, root_dir, _commit_id):
545 545 """
546 546 Special recursive svn repo walker
547 547 """
548 548
549 549 filemode_default = 0o100644
550 550 filemode_executable = 0o100755
551 551
552 552 file_iter = svn.fs.dir_entries(root, root_dir)
553 553 for f_name in file_iter:
554 554 f_type = NODE_TYPE_MAPPING.get(file_iter[f_name].kind, None)
555 555
556 556 if f_type == 'dir':
557 557 # return only DIR, and then all entries in that dir
558 558 yield os.path.join(root_dir, f_name), {'mode': filemode_default}, f_type
559 559 new_root = os.path.join(root_dir, f_name)
560 560 for _f_name, _f_data, _f_type in walk_tree(root, new_root, _commit_id):
561 561 yield _f_name, _f_data, _f_type
562 562 else:
563 563 f_path = os.path.join(root_dir, f_name).rstrip('/')
564 564 prop_list = svn.fs.node_proplist(root, f_path)
565 565
566 566 f_mode = filemode_default
567 567 if prop_list.get('svn:executable'):
568 568 f_mode = filemode_executable
569 569
570 570 f_is_link = False
571 571 if prop_list.get('svn:special'):
572 572 f_is_link = True
573 573
574 574 data = {
575 575 'is_link': f_is_link,
576 576 'mode': f_mode,
577 577 'content_stream': svn.core.Stream(svn.fs.file_contents(root, f_path)).read
578 578 }
579 579
580 580 yield f_path, data, f_type
581 581
582 582 def file_walker(_commit_id, path):
583 583 repo = self._factory.repo(wire)
584 584 root = svn.fs.revision_root(svn.repos.fs(repo), int(commit_id))
585 585
586 586 def no_content():
587 587 raise NoContentException()
588 588
589 589 for f_name, f_data, f_type in walk_tree(root, path, _commit_id):
590 590 file_path = f_name
591 591
592 592 if f_type == 'dir':
593 593 mode = f_data['mode']
594 594 yield ArchiveNode(file_path, mode, False, no_content)
595 595 else:
596 596 mode = f_data['mode']
597 597 is_link = f_data['is_link']
598 598 data_stream = f_data['content_stream']
599 599 yield ArchiveNode(file_path, mode, is_link, data_stream)
600 600
601 601 return archive_repo(file_walker, archive_dest_path, kind, mtime, archive_at_path,
602 602 archive_dir_name, commit_id)
603 603
604 604
605 605 class SvnDiffer(object):
606 606 """
607 607 Utility to create diffs based on difflib and the Subversion api
608 608 """
609 609
610 610 binary_content = False
611 611
612 612 def __init__(
613 613 self, repo, src_rev, src_path, tgt_rev, tgt_path,
614 614 ignore_whitespace, context):
615 615 self.repo = repo
616 616 self.ignore_whitespace = ignore_whitespace
617 617 self.context = context
618 618
619 619 fsobj = svn.repos.fs(repo)
620 620
621 621 self.tgt_rev = tgt_rev
622 622 self.tgt_path = tgt_path or ''
623 623 self.tgt_root = svn.fs.revision_root(fsobj, tgt_rev)
624 624 self.tgt_kind = svn.fs.check_path(self.tgt_root, self.tgt_path)
625 625
626 626 self.src_rev = src_rev
627 627 self.src_path = src_path or self.tgt_path
628 628 self.src_root = svn.fs.revision_root(fsobj, src_rev)
629 629 self.src_kind = svn.fs.check_path(self.src_root, self.src_path)
630 630
631 631 self._validate()
632 632
633 633 def _validate(self):
634 634 if (self.tgt_kind != svn.core.svn_node_none and
635 635 self.src_kind != svn.core.svn_node_none and
636 636 self.src_kind != self.tgt_kind):
637 637 # TODO: johbo: proper error handling
638 638 raise Exception(
639 639 "Source and target are not compatible for diff generation. "
640 640 "Source type: %s, target type: %s" %
641 641 (self.src_kind, self.tgt_kind))
642 642
643 643 def generate_diff(self):
644 644 buf = io.StringIO()
645 645 if self.tgt_kind == svn.core.svn_node_dir:
646 646 self._generate_dir_diff(buf)
647 647 else:
648 648 self._generate_file_diff(buf)
649 649 return buf.getvalue()
650 650
651 651 def _generate_dir_diff(self, buf):
652 652 editor = DiffChangeEditor()
653 653 editor_ptr, editor_baton = svn.delta.make_editor(editor)
654 654 svn.repos.dir_delta2(
655 655 self.src_root,
656 656 self.src_path,
657 657 '', # src_entry
658 658 self.tgt_root,
659 659 self.tgt_path,
660 660 editor_ptr, editor_baton,
661 661 authorization_callback_allow_all,
662 662 False, # text_deltas
663 663 svn.core.svn_depth_infinity, # depth
664 664 False, # entry_props
665 665 False, # ignore_ancestry
666 666 )
667 667
668 668 for path, __, change in sorted(editor.changes):
669 669 self._generate_node_diff(
670 670 buf, change, path, self.tgt_path, path, self.src_path)
671 671
672 672 def _generate_file_diff(self, buf):
673 673 change = None
674 674 if self.src_kind == svn.core.svn_node_none:
675 675 change = "add"
676 676 elif self.tgt_kind == svn.core.svn_node_none:
677 677 change = "delete"
678 678 tgt_base, tgt_path = vcspath.split(self.tgt_path)
679 679 src_base, src_path = vcspath.split(self.src_path)
680 680 self._generate_node_diff(
681 681 buf, change, tgt_path, tgt_base, src_path, src_base)
682 682
683 683 def _generate_node_diff(
684 684 self, buf, change, tgt_path, tgt_base, src_path, src_base):
685 685
686 686 if self.src_rev == self.tgt_rev and tgt_base == src_base:
687 687 # makes consistent behaviour with git/hg to return empty diff if
688 688 # we compare same revisions
689 689 return
690 690
691 691 tgt_full_path = vcspath.join(tgt_base, tgt_path)
692 692 src_full_path = vcspath.join(src_base, src_path)
693 693
694 694 self.binary_content = False
695 695 mime_type = self._get_mime_type(tgt_full_path)
696 696
697 697 if mime_type and not mime_type.startswith('text'):
698 698 self.binary_content = True
699 699 buf.write("=" * 67 + '\n')
700 700 buf.write("Cannot display: file marked as a binary type.\n")
701 701 buf.write("svn:mime-type = %s\n" % mime_type)
702 702 buf.write("Index: %s\n" % (tgt_path, ))
703 703 buf.write("=" * 67 + '\n')
704 704 buf.write("diff --git a/%(tgt_path)s b/%(tgt_path)s\n" % {
705 705 'tgt_path': tgt_path})
706 706
707 707 if change == 'add':
708 708 # TODO: johbo: SVN is missing a zero here compared to git
709 709 buf.write("new file mode 10644\n")
710 710
711 711 #TODO(marcink): intro to binary detection of svn patches
712 712 # if self.binary_content:
713 713 # buf.write('GIT binary patch\n')
714 714
715 715 buf.write("--- /dev/null\t(revision 0)\n")
716 716 src_lines = []
717 717 else:
718 718 if change == 'delete':
719 719 buf.write("deleted file mode 10644\n")
720 720
721 721 #TODO(marcink): intro to binary detection of svn patches
722 722 # if self.binary_content:
723 723 # buf.write('GIT binary patch\n')
724 724
725 725 buf.write("--- a/%s\t(revision %s)\n" % (
726 726 src_path, self.src_rev))
727 727 src_lines = self._svn_readlines(self.src_root, src_full_path)
728 728
729 729 if change == 'delete':
730 730 buf.write("+++ /dev/null\t(revision %s)\n" % (self.tgt_rev, ))
731 731 tgt_lines = []
732 732 else:
733 733 buf.write("+++ b/%s\t(revision %s)\n" % (
734 734 tgt_path, self.tgt_rev))
735 735 tgt_lines = self._svn_readlines(self.tgt_root, tgt_full_path)
736 736
737 737 if not self.binary_content:
738 738 udiff = svn_diff.unified_diff(
739 739 src_lines, tgt_lines, context=self.context,
740 740 ignore_blank_lines=self.ignore_whitespace,
741 741 ignore_case=False,
742 742 ignore_space_changes=self.ignore_whitespace)
743 743 buf.writelines(udiff)
744 744
745 745 def _get_mime_type(self, path):
746 746 try:
747 747 mime_type = svn.fs.node_prop(
748 748 self.tgt_root, path, svn.core.SVN_PROP_MIME_TYPE)
749 749 except svn.core.SubversionException:
750 750 mime_type = svn.fs.node_prop(
751 751 self.src_root, path, svn.core.SVN_PROP_MIME_TYPE)
752 752 return mime_type
753 753
754 754 def _svn_readlines(self, fs_root, node_path):
755 755 if self.binary_content:
756 756 return []
757 757 node_kind = svn.fs.check_path(fs_root, node_path)
758 758 if node_kind not in (
759 759 svn.core.svn_node_file, svn.core.svn_node_symlink):
760 760 return []
761 761 content = svn.core.Stream(
762 762 svn.fs.file_contents(fs_root, node_path)).read()
763 763 return content.splitlines(True)
764 764
765 765
766 766 class DiffChangeEditor(svn.delta.Editor):
767 767 """
768 768 Records changes between two given revisions
769 769 """
770 770
771 771 def __init__(self):
772 772 self.changes = []
773 773
774 774 def delete_entry(self, path, revision, parent_baton, pool=None):
775 775 self.changes.append((path, None, 'delete'))
776 776
777 777 def add_file(
778 778 self, path, parent_baton, copyfrom_path, copyfrom_revision,
779 779 file_pool=None):
780 780 self.changes.append((path, 'file', 'add'))
781 781
782 782 def open_file(self, path, parent_baton, base_revision, file_pool=None):
783 783 self.changes.append((path, 'file', 'change'))
784 784
785 785
786 786 def authorization_callback_allow_all(root, path, pool):
787 787 return True
788 788
789 789
790 790 class TxnNodeProcessor(object):
791 791 """
792 792 Utility to process the change of one node within a transaction root.
793 793
794 794 It encapsulates the knowledge of how to add, update or remove
795 795 a node for a given transaction root. The purpose is to support the method
796 796 `SvnRemote.commit`.
797 797 """
798 798
799 799 def __init__(self, node, txn_root):
800 800 assert isinstance(node['path'], str)
801 801
802 802 self.node = node
803 803 self.txn_root = txn_root
804 804
805 805 def update(self):
806 806 self._ensure_parent_dirs()
807 807 self._add_file_if_node_does_not_exist()
808 808 self._update_file_content()
809 809 self._update_file_properties()
810 810
811 811 def remove(self):
812 812 svn.fs.delete(self.txn_root, self.node['path'])
813 813 # TODO: Clean up directory if empty
814 814
815 815 def _ensure_parent_dirs(self):
816 816 curdir = vcspath.dirname(self.node['path'])
817 817 dirs_to_create = []
818 818 while not self._svn_path_exists(curdir):
819 819 dirs_to_create.append(curdir)
820 820 curdir = vcspath.dirname(curdir)
821 821
822 822 for curdir in reversed(dirs_to_create):
823 823 log.debug('Creating missing directory "%s"', curdir)
824 824 svn.fs.make_dir(self.txn_root, curdir)
825 825
826 826 def _svn_path_exists(self, path):
827 827 path_status = svn.fs.check_path(self.txn_root, path)
828 828 return path_status != svn.core.svn_node_none
829 829
830 830 def _add_file_if_node_does_not_exist(self):
831 831 kind = svn.fs.check_path(self.txn_root, self.node['path'])
832 832 if kind == svn.core.svn_node_none:
833 833 svn.fs.make_file(self.txn_root, self.node['path'])
834 834
835 835 def _update_file_content(self):
836 836 assert isinstance(self.node['content'], str)
837 837 handler, baton = svn.fs.apply_textdelta(
838 838 self.txn_root, self.node['path'], None, None)
839 839 svn.delta.svn_txdelta_send_string(self.node['content'], handler, baton)
840 840
841 841 def _update_file_properties(self):
842 842 properties = self.node.get('properties', {})
843 843 for key, value in properties.items():
844 844 svn.fs.change_node_prop(
845 845 self.txn_root, self.node['path'], key, value)
846 846
847 847
848 848 def apr_time_t(timestamp):
849 849 """
850 850 Convert a Python timestamp into APR timestamp type apr_time_t
851 851 """
852 852 return timestamp * 1E6
853 853
854 854
855 855 def svn_opt_revision_value_t(num):
856 856 """
857 857 Put `num` into a `svn_opt_revision_value_t` structure.
858 858 """
859 859 value = svn.core.svn_opt_revision_value_t()
860 860 value.number = num
861 861 revision = svn.core.svn_opt_revision_t()
862 862 revision.kind = svn.core.svn_opt_revision_number
863 863 revision.value = value
864 864 return revision
@@ -1,561 +1,563 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import collections
27 27 import logging
28 28 import subprocess
29 29 import threading
30 30
31 from vcsserver.str_utils import safe_str
32
31 33 log = logging.getLogger(__name__)
32 34
33 35
34 36 class StreamFeeder(threading.Thread):
35 37 """
36 38 Normal writing into pipe-like is blocking once the buffer is filled.
37 39 This thread allows a thread to seep data from a file-like into a pipe
38 40 without blocking the main thread.
39 41 We close inpipe once the end of the source stream is reached.
40 42 """
41 43
42 44 def __init__(self, source):
43 45 super(StreamFeeder, self).__init__()
44 46 self.daemon = True
45 47 filelike = False
46 48 self.bytes = bytes()
47 49 if type(source) in (type(''), bytes, bytearray): # string-like
48 50 self.bytes = bytes(source)
49 51 else: # can be either file pointer or file-like
50 52 if isinstance(source, int): # file pointer it is
51 53 # converting file descriptor (int) stdin into file-like
52 54 source = os.fdopen(source, 'rb', 16384)
53 55 # let's see if source is file-like by now
54 56 filelike = hasattr(source, 'read')
55 57 if not filelike and not self.bytes:
56 58 raise TypeError("StreamFeeder's source object must be a readable "
57 59 "file-like, a file descriptor, or a string-like.")
58 60 self.source = source
59 61 self.readiface, self.writeiface = os.pipe()
60 62
61 63 def run(self):
62 64 writer = self.writeiface
63 65 try:
64 66 if self.bytes:
65 67 os.write(writer, self.bytes)
66 68 else:
67 69 s = self.source
68 70
69 71 while 1:
70 72 _bytes = s.read(4096)
71 73 if not _bytes:
72 74 break
73 75 os.write(writer, _bytes)
74 76
75 77 finally:
76 78 os.close(writer)
77 79
78 80 @property
79 81 def output(self):
80 82 return self.readiface
81 83
82 84
83 85 class InputStreamChunker(threading.Thread):
84 86 def __init__(self, source, target, buffer_size, chunk_size):
85 87
86 88 super(InputStreamChunker, self).__init__()
87 89
88 90 self.daemon = True # die die die.
89 91
90 92 self.source = source
91 93 self.target = target
92 94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
93 95 self.chunk_size = chunk_size
94 96
95 97 self.data_added = threading.Event()
96 98 self.data_added.clear()
97 99
98 100 self.keep_reading = threading.Event()
99 101 self.keep_reading.set()
100 102
101 103 self.EOF = threading.Event()
102 104 self.EOF.clear()
103 105
104 106 self.go = threading.Event()
105 107 self.go.set()
106 108
107 109 def stop(self):
108 110 self.go.clear()
109 111 self.EOF.set()
110 112 try:
111 113 # this is not proper, but is done to force the reader thread let
112 114 # go of the input because, if successful, .close() will send EOF
113 115 # down the pipe.
114 116 self.source.close()
115 117 except:
116 118 pass
117 119
118 120 def run(self):
119 121 s = self.source
120 122 t = self.target
121 123 cs = self.chunk_size
122 124 chunk_count_max = self.chunk_count_max
123 125 keep_reading = self.keep_reading
124 126 da = self.data_added
125 127 go = self.go
126 128
127 129 try:
128 130 b = s.read(cs)
129 131 except ValueError:
130 132 b = ''
131 133
132 134 timeout_input = 20
133 135 while b and go.is_set():
134 136 if len(t) > chunk_count_max:
135 137 keep_reading.clear()
136 138 keep_reading.wait(timeout_input)
137 139 if len(t) > chunk_count_max + timeout_input:
138 140 log.error("Timed out while waiting for input from subprocess.")
139 141 os._exit(-1) # this will cause the worker to recycle itself
140 142
141 143 t.append(b)
142 144 da.set()
143 145
144 146 try:
145 147 b = s.read(cs)
146 148 except ValueError: # probably "I/O operation on closed file"
147 149 b = ''
148 150
149 151 self.EOF.set()
150 152 da.set() # for cases when done but there was no input.
151 153
152 154
153 155 class BufferedGenerator(object):
154 156 """
155 157 Class behaves as a non-blocking, buffered pipe reader.
156 158 Reads chunks of data (through a thread)
157 159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
158 160 Reading is halted in the thread when max chunks is internally buffered.
159 161 The .next() may operate in blocking or non-blocking fashion by yielding
160 162 '' if no data is ready
161 163 to be sent or by not returning until there is some data to send
162 164 When we get EOF from underlying source pipe we raise the marker to raise
163 165 StopIteration after the last chunk of data is yielded.
164 166 """
165 167
166 168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
167 169 starting_values=None, bottomless=False):
168 170 starting_values = starting_values or []
169 171 self.name = name
170 172 self.buffer_size = buffer_size
171 173 self.chunk_size = chunk_size
172 174
173 175 if bottomless:
174 176 maxlen = int(buffer_size / chunk_size)
175 177 else:
176 178 maxlen = None
177 179
178 180 self.data_queue = collections.deque(starting_values, maxlen)
179 181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
180 182 if starting_values:
181 183 self.worker.data_added.set()
182 184 self.worker.start()
183 185
184 186 ####################
185 187 # Generator's methods
186 188 ####################
187 189 def __str__(self):
188 190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
189 191
190 192 def __iter__(self):
191 193 return self
192 194
193 195 def __next__(self):
194 196
195 197 while not self.length and not self.worker.EOF.is_set():
196 198 self.worker.data_added.clear()
197 199 self.worker.data_added.wait(0.2)
198 200
199 201 if self.length:
200 202 self.worker.keep_reading.set()
201 203 return bytes(self.data_queue.popleft())
202 204 elif self.worker.EOF.is_set():
203 205 raise StopIteration
204 206
205 207 def throw(self, exc_type, value=None, traceback=None):
206 208 if not self.worker.EOF.is_set():
207 209 raise exc_type(value)
208 210
209 211 def start(self):
210 212 self.worker.start()
211 213
212 214 def stop(self):
213 215 self.worker.stop()
214 216
215 217 def close(self):
216 218 try:
217 219 self.worker.stop()
218 220 self.throw(GeneratorExit)
219 221 except (GeneratorExit, StopIteration):
220 222 pass
221 223
222 224 ####################
223 225 # Threaded reader's infrastructure.
224 226 ####################
225 227 @property
226 228 def input(self):
227 229 return self.worker.w
228 230
229 231 @property
230 232 def data_added_event(self):
231 233 return self.worker.data_added
232 234
233 235 @property
234 236 def data_added(self):
235 237 return self.worker.data_added.is_set()
236 238
237 239 @property
238 240 def reading_paused(self):
239 241 return not self.worker.keep_reading.is_set()
240 242
241 243 @property
242 244 def done_reading_event(self):
243 245 """
244 246 Done_reding does not mean that the iterator's buffer is empty.
245 247 Iterator might have done reading from underlying source, but the read
246 248 chunks might still be available for serving through .next() method.
247 249
248 250 :returns: An Event class instance.
249 251 """
250 252 return self.worker.EOF
251 253
252 254 @property
253 255 def done_reading(self):
254 256 """
255 257 Done_reading does not mean that the iterator's buffer is empty.
256 258 Iterator might have done reading from underlying source, but the read
257 259 chunks might still be available for serving through .next() method.
258 260
259 261 :returns: An Bool value.
260 262 """
261 263 return self.worker.EOF.is_set()
262 264
263 265 @property
264 266 def length(self):
265 267 """
266 268 returns int.
267 269
268 270 This is the length of the queue of chunks, not the length of
269 271 the combined contents in those chunks.
270 272
271 273 __len__() cannot be meaningfully implemented because this
272 274 reader is just flying through a bottomless pit content and
273 275 can only know the length of what it already saw.
274 276
275 277 If __len__() on WSGI server per PEP 3333 returns a value,
276 278 the response's length will be set to that. In order not to
277 279 confuse WSGI PEP3333 servers, we will not implement __len__
278 280 at all.
279 281 """
280 282 return len(self.data_queue)
281 283
282 284 def prepend(self, x):
283 285 self.data_queue.appendleft(x)
284 286
285 287 def append(self, x):
286 288 self.data_queue.append(x)
287 289
288 290 def extend(self, o):
289 291 self.data_queue.extend(o)
290 292
291 293 def __getitem__(self, i):
292 294 return self.data_queue[i]
293 295
294 296
295 297 class SubprocessIOChunker(object):
296 298 """
297 299 Processor class wrapping handling of subprocess IO.
298 300
299 301 .. important::
300 302
301 303 Watch out for the method `__del__` on this class. If this object
302 304 is deleted, it will kill the subprocess, so avoid to
303 305 return the `output` attribute or usage of it like in the following
304 306 example::
305 307
306 308 # `args` expected to run a program that produces a lot of output
307 309 output = ''.join(SubprocessIOChunker(
308 310 args, shell=False, inputstream=inputstream, env=environ).output)
309 311
310 312 # `output` will not contain all the data, because the __del__ method
311 313 # has already killed the subprocess in this case before all output
312 314 # has been consumed.
313 315
314 316
315 317
316 318 In a way, this is a "communicate()" replacement with a twist.
317 319
318 320 - We are multithreaded. Writing in and reading out, err are all sep threads.
319 321 - We support concurrent (in and out) stream processing.
320 322 - The output is not a stream. It's a queue of read string (bytes, not str)
321 323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
322 324 - We are non-blocking in more respects than communicate()
323 325 (reading from subprocess out pauses when internal buffer is full, but
324 326 does not block the parent calling code. On the flip side, reading from
325 327 slow-yielding subprocess may block the iteration until data shows up. This
326 328 does not block the parallel inpipe reading occurring parallel thread.)
327 329
328 330 The purpose of the object is to allow us to wrap subprocess interactions into
329 331 an iterable that can be passed to a WSGI server as the application's return
330 332 value. Because of stream-processing-ability, WSGI does not have to read ALL
331 333 of the subprocess's output and buffer it, before handing it to WSGI server for
332 334 HTTP response. Instead, the class initializer reads just a bit of the stream
333 335 to figure out if error occurred or likely to occur and if not, just hands the
334 336 further iteration over subprocess output to the server for completion of HTTP
335 337 response.
336 338
337 339 The real or perceived subprocess error is trapped and raised as one of
338 340 OSError family of exceptions
339 341
340 342 Example usage:
341 343 # try:
342 344 # answer = SubprocessIOChunker(
343 345 # cmd,
344 346 # input,
345 347 # buffer_size = 65536,
346 348 # chunk_size = 4096
347 349 # )
348 350 # except (OSError) as e:
349 351 # print str(e)
350 352 # raise e
351 353 #
352 354 # return answer
353 355
354 356
355 357 """
356 358
357 359 # TODO: johbo: This is used to make sure that the open end of the PIPE
358 360 # is closed in the end. It would be way better to wrap this into an
359 361 # object, so that it is closed automatically once it is consumed or
360 362 # something similar.
361 363 _close_input_fd = None
362 364
363 365 _closed = False
364 366 _stdout = None
365 367 _stderr = None
366 368
367 369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
368 370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
369 371 fail_on_return_code=True, **kwargs):
370 372 """
371 373 Initializes SubprocessIOChunker
372 374
373 375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
374 376 :param input_stream: (Default: None) A file-like, string, or file pointer.
375 377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
376 378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
377 379 :param starting_values: (Default: []) An array of strings to put in front of output que.
378 380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
379 381 case something is written to stderr.
380 382 :param fail_on_return_code: (Default: True) Whether to raise an
381 383 exception if the return code is not 0.
382 384 """
383 385
384 386 kwargs['shell'] = kwargs.get('shell', True)
385 387
386 388 starting_values = starting_values or []
387 389 if input_stream:
388 390 input_streamer = StreamFeeder(input_stream)
389 391 input_streamer.start()
390 392 input_stream = input_streamer.output
391 393 self._close_input_fd = input_stream
392 394
393 395 self._fail_on_stderr = fail_on_stderr
394 396 self._fail_on_return_code = fail_on_return_code
395 397 self.cmd = cmd
396 398
397 399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
398 400 **kwargs)
399 401 self.process = _p
400 402
401 403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
402 404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
403 405
404 406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
405 407 # doing this until we reach either end of file, or end of buffer.
406 408 bg_out.data_added_event.wait(0.2)
407 409 bg_out.data_added_event.clear()
408 410
409 411 # at this point it's still ambiguous if we are done reading or just full buffer.
410 412 # Either way, if error (returned by ended process, or implied based on
411 413 # presence of stuff in stderr output) we error out.
412 414 # Else, we are happy.
413 415 return_code = _p.poll()
414 416 ret_code_ok = return_code in [None, 0]
415 417 ret_code_fail = return_code is not None and return_code != 0
416 418 if (
417 419 (ret_code_fail and fail_on_return_code) or
418 420 (ret_code_ok and fail_on_stderr and bg_err.length)
419 421 ):
420 422
421 423 try:
422 424 _p.terminate()
423 425 except Exception:
424 426 pass
425 427
426 428 bg_out.stop()
427 429 out = b''.join(bg_out)
428 430 self._stdout = out
429 431
430 432 bg_err.stop()
431 433 err = b''.join(bg_err)
432 434 self._stderr = err
433 435
434 436 # code from https://github.com/schacon/grack/pull/7
435 437 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
436 438 bg_out = iter([out])
437 439 _p = None
438 440 elif err and fail_on_stderr:
439 441 text_err = err.decode()
440 442 raise OSError(
441 443 "Subprocess exited due to an error:\n{}".format(text_err))
442 444
443 445 if ret_code_fail and fail_on_return_code:
444 446 text_err = err.decode()
445 447 if not err:
446 448 # maybe get empty stderr, try stdout instead
447 449 # in many cases git reports the errors on stdout too
448 450 text_err = out.decode()
449 451 raise OSError(
450 452 "Subprocess exited with non 0 ret code:{}: stderr:{}".format(return_code, text_err))
451 453
452 454 self.stdout = bg_out
453 455 self.stderr = bg_err
454 456 self.inputstream = input_stream
455 457
456 458 def __str__(self):
457 459 proc = getattr(self, 'process', 'NO_PROCESS')
458 460 return f'SubprocessIOChunker: {proc}'
459 461
460 462 def __iter__(self):
461 463 return self
462 464
463 465 def __next__(self):
464 466 # Note: mikhail: We need to be sure that we are checking the return
465 467 # code after the stdout stream is closed. Some processes, e.g. git
466 468 # are doing some magic in between closing stdout and terminating the
467 469 # process and, as a result, we are not getting return code on "slow"
468 470 # systems.
469 471 result = None
470 472 stop_iteration = None
471 473 try:
472 474 result = next(self.stdout)
473 475 except StopIteration as e:
474 476 stop_iteration = e
475 477
476 478 if self.process:
477 479 return_code = self.process.poll()
478 480 ret_code_fail = return_code is not None and return_code != 0
479 481 if ret_code_fail and self._fail_on_return_code:
480 482 self.stop_streams()
481 483 err = self.get_stderr()
482 484 raise OSError(
483 485 "Subprocess exited (exit_code:{}) due to an error during iteration:\n{}".format(return_code, err))
484 486
485 487 if stop_iteration:
486 488 raise stop_iteration
487 489 return result
488 490
489 491 def throw(self, exc_type, value=None, traceback=None):
490 492 if self.stdout.length or not self.stdout.done_reading:
491 493 raise exc_type(value)
492 494
493 495 def close(self):
494 496 if self._closed:
495 497 return
496 498
497 499 try:
498 500 self.process.terminate()
499 501 except Exception:
500 502 pass
501 503 if self._close_input_fd:
502 504 os.close(self._close_input_fd)
503 505 try:
504 506 self.stdout.close()
505 507 except Exception:
506 508 pass
507 509 try:
508 510 self.stderr.close()
509 511 except Exception:
510 512 pass
511 513 try:
512 514 os.close(self.inputstream)
513 515 except Exception:
514 516 pass
515 517
516 518 self._closed = True
517 519
518 520 def stop_streams(self):
519 521 getattr(self.stdout, 'stop', lambda: None)()
520 522 getattr(self.stderr, 'stop', lambda: None)()
521 523
522 524 def get_stdout(self):
523 525 if self._stdout:
524 526 return self._stdout
525 527 else:
526 528 return b''.join(self.stdout)
527 529
528 530 def get_stderr(self):
529 531 if self._stderr:
530 532 return self._stderr
531 533 else:
532 534 return b''.join(self.stderr)
533 535
534 536
535 537 def run_command(arguments, env=None):
536 538 """
537 539 Run the specified command and return the stdout.
538 540
539 541 :param arguments: sequence of program arguments (including the program name)
540 542 :type arguments: list[str]
541 543 """
542 544
543 545 cmd = arguments
544 546 log.debug('Running subprocessio command %s', cmd)
545 547 proc = None
546 548 try:
547 549 _opts = {'shell': False, 'fail_on_stderr': False}
548 550 if env:
549 551 _opts.update({'env': env})
550 552 proc = SubprocessIOChunker(cmd, **_opts)
551 553 return b''.join(proc), b''.join(proc.stderr)
552 554 except OSError as err:
553 cmd = ' '.join(cmd) # human friendly CMD
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
554 556 tb_err = ("Couldn't run subprocessio command (%s).\n"
555 557 "Original error was:%s\n" % (cmd, err))
556 558 log.exception(tb_err)
557 559 raise Exception(tb_err)
558 560 finally:
559 561 if proc:
560 562 proc.close()
561 563
General Comments 0
You need to be logged in to leave comments. Login now