##// END OF EJS Templates
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
dan -
r799:825a2f59 default
parent child Browse files
Show More
@@ -1,1173 +1,1177 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2019 RhodeCode GmbH
2 # Copyright (C) 2014-2019 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import collections
18 import collections
19 import logging
19 import logging
20 import os
20 import os
21 import posixpath as vcspath
21 import posixpath as vcspath
22 import re
22 import re
23 import stat
23 import stat
24 import traceback
24 import traceback
25 import urllib
25 import urllib
26 import urllib2
26 import urllib2
27 from functools import wraps
27 from functools import wraps
28
28
29 import more_itertools
29 import more_itertools
30 import pygit2
30 import pygit2
31 from pygit2 import Repository as LibGit2Repo
31 from pygit2 import Repository as LibGit2Repo
32 from dulwich import index, objects
32 from dulwich import index, objects
33 from dulwich.client import HttpGitClient, LocalGitClient
33 from dulwich.client import HttpGitClient, LocalGitClient
34 from dulwich.errors import (
34 from dulwich.errors import (
35 NotGitRepository, ChecksumMismatch, WrongObjectException,
35 NotGitRepository, ChecksumMismatch, WrongObjectException,
36 MissingCommitError, ObjectMissing, HangupException,
36 MissingCommitError, ObjectMissing, HangupException,
37 UnexpectedCommandError)
37 UnexpectedCommandError)
38 from dulwich.repo import Repo as DulwichRepo
38 from dulwich.repo import Repo as DulwichRepo
39 from dulwich.server import update_server_info
39 from dulwich.server import update_server_info
40
40
41 from vcsserver import exceptions, settings, subprocessio
41 from vcsserver import exceptions, settings, subprocessio
42 from vcsserver.utils import safe_str, safe_int
42 from vcsserver.utils import safe_str, safe_int
43 from vcsserver.base import RepoFactory, obfuscate_qs
43 from vcsserver.base import RepoFactory, obfuscate_qs
44 from vcsserver.hgcompat import (
44 from vcsserver.hgcompat import (
45 hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler)
45 hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler)
46 from vcsserver.git_lfs.lib import LFSOidStore
46 from vcsserver.git_lfs.lib import LFSOidStore
47 from vcsserver.vcs_base import RemoteBase
47 from vcsserver.vcs_base import RemoteBase
48
48
49 DIR_STAT = stat.S_IFDIR
49 DIR_STAT = stat.S_IFDIR
50 FILE_MODE = stat.S_IFMT
50 FILE_MODE = stat.S_IFMT
51 GIT_LINK = objects.S_IFGITLINK
51 GIT_LINK = objects.S_IFGITLINK
52 PEELED_REF_MARKER = '^{}'
52 PEELED_REF_MARKER = '^{}'
53
53
54
54
55 log = logging.getLogger(__name__)
55 log = logging.getLogger(__name__)
56
56
57
57
58 def str_to_dulwich(value):
58 def str_to_dulwich(value):
59 """
59 """
60 Dulwich 0.10.1a requires `unicode` objects to be passed in.
60 Dulwich 0.10.1a requires `unicode` objects to be passed in.
61 """
61 """
62 return value.decode(settings.WIRE_ENCODING)
62 return value.decode(settings.WIRE_ENCODING)
63
63
64
64
65 def reraise_safe_exceptions(func):
65 def reraise_safe_exceptions(func):
66 """Converts Dulwich exceptions to something neutral."""
66 """Converts Dulwich exceptions to something neutral."""
67
67
68 @wraps(func)
68 @wraps(func)
69 def wrapper(*args, **kwargs):
69 def wrapper(*args, **kwargs):
70 try:
70 try:
71 return func(*args, **kwargs)
71 return func(*args, **kwargs)
72 except (ChecksumMismatch, WrongObjectException, MissingCommitError, ObjectMissing,) as e:
72 except (ChecksumMismatch, WrongObjectException, MissingCommitError, ObjectMissing,) as e:
73 exc = exceptions.LookupException(org_exc=e)
73 exc = exceptions.LookupException(org_exc=e)
74 raise exc(safe_str(e))
74 raise exc(safe_str(e))
75 except (HangupException, UnexpectedCommandError) as e:
75 except (HangupException, UnexpectedCommandError) as e:
76 exc = exceptions.VcsException(org_exc=e)
76 exc = exceptions.VcsException(org_exc=e)
77 raise exc(safe_str(e))
77 raise exc(safe_str(e))
78 except Exception as e:
78 except Exception as e:
79 # NOTE(marcink): becuase of how dulwich handles some exceptions
79 # NOTE(marcink): becuase of how dulwich handles some exceptions
80 # (KeyError on empty repos), we cannot track this and catch all
80 # (KeyError on empty repos), we cannot track this and catch all
81 # exceptions, it's an exceptions from other handlers
81 # exceptions, it's an exceptions from other handlers
82 #if not hasattr(e, '_vcs_kind'):
82 #if not hasattr(e, '_vcs_kind'):
83 #log.exception("Unhandled exception in git remote call")
83 #log.exception("Unhandled exception in git remote call")
84 #raise_from_original(exceptions.UnhandledException)
84 #raise_from_original(exceptions.UnhandledException)
85 raise
85 raise
86 return wrapper
86 return wrapper
87
87
88
88
89 class Repo(DulwichRepo):
89 class Repo(DulwichRepo):
90 """
90 """
91 A wrapper for dulwich Repo class.
91 A wrapper for dulwich Repo class.
92
92
93 Since dulwich is sometimes keeping .idx file descriptors open, it leads to
93 Since dulwich is sometimes keeping .idx file descriptors open, it leads to
94 "Too many open files" error. We need to close all opened file descriptors
94 "Too many open files" error. We need to close all opened file descriptors
95 once the repo object is destroyed.
95 once the repo object is destroyed.
96 """
96 """
97 def __del__(self):
97 def __del__(self):
98 if hasattr(self, 'object_store'):
98 if hasattr(self, 'object_store'):
99 self.close()
99 self.close()
100
100
101
101
102 class Repository(LibGit2Repo):
102 class Repository(LibGit2Repo):
103
103
104 def __enter__(self):
104 def __enter__(self):
105 return self
105 return self
106
106
107 def __exit__(self, exc_type, exc_val, exc_tb):
107 def __exit__(self, exc_type, exc_val, exc_tb):
108 self.free()
108 self.free()
109
109
110
110
111 class GitFactory(RepoFactory):
111 class GitFactory(RepoFactory):
112 repo_type = 'git'
112 repo_type = 'git'
113
113
114 def _create_repo(self, wire, create, use_libgit2=False):
114 def _create_repo(self, wire, create, use_libgit2=False):
115 if use_libgit2:
115 if use_libgit2:
116 return Repository(wire['path'])
116 return Repository(wire['path'])
117 else:
117 else:
118 repo_path = str_to_dulwich(wire['path'])
118 repo_path = str_to_dulwich(wire['path'])
119 return Repo(repo_path)
119 return Repo(repo_path)
120
120
121 def repo(self, wire, create=False, use_libgit2=False):
121 def repo(self, wire, create=False, use_libgit2=False):
122 """
122 """
123 Get a repository instance for the given path.
123 Get a repository instance for the given path.
124 """
124 """
125 return self._create_repo(wire, create, use_libgit2)
125 return self._create_repo(wire, create, use_libgit2)
126
126
127 def repo_libgit2(self, wire):
127 def repo_libgit2(self, wire):
128 return self.repo(wire, use_libgit2=True)
128 return self.repo(wire, use_libgit2=True)
129
129
130
130
131 class GitRemote(RemoteBase):
131 class GitRemote(RemoteBase):
132
132
133 def __init__(self, factory):
133 def __init__(self, factory):
134 self._factory = factory
134 self._factory = factory
135 self._bulk_methods = {
135 self._bulk_methods = {
136 "date": self.date,
136 "date": self.date,
137 "author": self.author,
137 "author": self.author,
138 "branch": self.branch,
138 "branch": self.branch,
139 "message": self.message,
139 "message": self.message,
140 "parents": self.parents,
140 "parents": self.parents,
141 "_commit": self.revision,
141 "_commit": self.revision,
142 }
142 }
143
143
144 def _wire_to_config(self, wire):
144 def _wire_to_config(self, wire):
145 if 'config' in wire:
145 if 'config' in wire:
146 return dict([(x[0] + '_' + x[1], x[2]) for x in wire['config']])
146 return dict([(x[0] + '_' + x[1], x[2]) for x in wire['config']])
147 return {}
147 return {}
148
148
149 def _remote_conf(self, config):
149 def _remote_conf(self, config):
150 params = [
150 params = [
151 '-c', 'core.askpass=""',
151 '-c', 'core.askpass=""',
152 ]
152 ]
153 ssl_cert_dir = config.get('vcs_ssl_dir')
153 ssl_cert_dir = config.get('vcs_ssl_dir')
154 if ssl_cert_dir:
154 if ssl_cert_dir:
155 params.extend(['-c', 'http.sslCAinfo={}'.format(ssl_cert_dir)])
155 params.extend(['-c', 'http.sslCAinfo={}'.format(ssl_cert_dir)])
156 return params
156 return params
157
157
158 @reraise_safe_exceptions
158 @reraise_safe_exceptions
159 def discover_git_version(self):
159 def discover_git_version(self):
160 stdout, _ = self.run_git_command(
160 stdout, _ = self.run_git_command(
161 {}, ['--version'], _bare=True, _safe=True)
161 {}, ['--version'], _bare=True, _safe=True)
162 prefix = 'git version'
162 prefix = 'git version'
163 if stdout.startswith(prefix):
163 if stdout.startswith(prefix):
164 stdout = stdout[len(prefix):]
164 stdout = stdout[len(prefix):]
165 return stdout.strip()
165 return stdout.strip()
166
166
167 @reraise_safe_exceptions
167 @reraise_safe_exceptions
168 def is_empty(self, wire):
168 def is_empty(self, wire):
169 repo_init = self._factory.repo_libgit2(wire)
169 repo_init = self._factory.repo_libgit2(wire)
170 with repo_init as repo:
170 with repo_init as repo:
171
171
172 try:
172 try:
173 has_head = repo.head.name
173 has_head = repo.head.name
174 if has_head:
174 if has_head:
175 return False
175 return False
176
176
177 # NOTE(marcink): check again using more expensive method
177 # NOTE(marcink): check again using more expensive method
178 return repo.is_empty
178 return repo.is_empty
179 except Exception:
179 except Exception:
180 pass
180 pass
181
181
182 return True
182 return True
183
183
184 @reraise_safe_exceptions
184 @reraise_safe_exceptions
185 def assert_correct_path(self, wire):
185 def assert_correct_path(self, wire):
186 cache_on, context_uid, repo_id = self._cache_on(wire)
186 cache_on, context_uid, repo_id = self._cache_on(wire)
187 @self.region.conditional_cache_on_arguments(condition=cache_on)
187 @self.region.conditional_cache_on_arguments(condition=cache_on)
188 def _assert_correct_path(_context_uid, _repo_id):
188 def _assert_correct_path(_context_uid, _repo_id):
189 try:
189 try:
190 repo_init = self._factory.repo_libgit2(wire)
190 repo_init = self._factory.repo_libgit2(wire)
191 with repo_init as repo:
191 with repo_init as repo:
192 pass
192 pass
193 except pygit2.GitError:
193 except pygit2.GitError:
194 path = wire.get('path')
194 path = wire.get('path')
195 tb = traceback.format_exc()
195 tb = traceback.format_exc()
196 log.debug("Invalid Git path `%s`, tb: %s", path, tb)
196 log.debug("Invalid Git path `%s`, tb: %s", path, tb)
197 return False
197 return False
198
198
199 return True
199 return True
200 return _assert_correct_path(context_uid, repo_id)
200 return _assert_correct_path(context_uid, repo_id)
201
201
202 @reraise_safe_exceptions
202 @reraise_safe_exceptions
203 def bare(self, wire):
203 def bare(self, wire):
204 repo_init = self._factory.repo_libgit2(wire)
204 repo_init = self._factory.repo_libgit2(wire)
205 with repo_init as repo:
205 with repo_init as repo:
206 return repo.is_bare
206 return repo.is_bare
207
207
208 @reraise_safe_exceptions
208 @reraise_safe_exceptions
209 def blob_as_pretty_string(self, wire, sha):
209 def blob_as_pretty_string(self, wire, sha):
210 repo_init = self._factory.repo_libgit2(wire)
210 repo_init = self._factory.repo_libgit2(wire)
211 with repo_init as repo:
211 with repo_init as repo:
212 blob_obj = repo[sha]
212 blob_obj = repo[sha]
213 blob = blob_obj.data
213 blob = blob_obj.data
214 return blob
214 return blob
215
215
216 @reraise_safe_exceptions
216 @reraise_safe_exceptions
217 def blob_raw_length(self, wire, sha):
217 def blob_raw_length(self, wire, sha):
218 cache_on, context_uid, repo_id = self._cache_on(wire)
218 cache_on, context_uid, repo_id = self._cache_on(wire)
219 @self.region.conditional_cache_on_arguments(condition=cache_on)
219 @self.region.conditional_cache_on_arguments(condition=cache_on)
220 def _blob_raw_length(_repo_id, _sha):
220 def _blob_raw_length(_repo_id, _sha):
221
221
222 repo_init = self._factory.repo_libgit2(wire)
222 repo_init = self._factory.repo_libgit2(wire)
223 with repo_init as repo:
223 with repo_init as repo:
224 blob = repo[sha]
224 blob = repo[sha]
225 return blob.size
225 return blob.size
226
226
227 return _blob_raw_length(repo_id, sha)
227 return _blob_raw_length(repo_id, sha)
228
228
229 def _parse_lfs_pointer(self, raw_content):
229 def _parse_lfs_pointer(self, raw_content):
230
230
231 spec_string = 'version https://git-lfs.github.com/spec'
231 spec_string = 'version https://git-lfs.github.com/spec'
232 if raw_content and raw_content.startswith(spec_string):
232 if raw_content and raw_content.startswith(spec_string):
233 pattern = re.compile(r"""
233 pattern = re.compile(r"""
234 (?:\n)?
234 (?:\n)?
235 ^version[ ]https://git-lfs\.github\.com/spec/(?P<spec_ver>v\d+)\n
235 ^version[ ]https://git-lfs\.github\.com/spec/(?P<spec_ver>v\d+)\n
236 ^oid[ ] sha256:(?P<oid_hash>[0-9a-f]{64})\n
236 ^oid[ ] sha256:(?P<oid_hash>[0-9a-f]{64})\n
237 ^size[ ](?P<oid_size>[0-9]+)\n
237 ^size[ ](?P<oid_size>[0-9]+)\n
238 (?:\n)?
238 (?:\n)?
239 """, re.VERBOSE | re.MULTILINE)
239 """, re.VERBOSE | re.MULTILINE)
240 match = pattern.match(raw_content)
240 match = pattern.match(raw_content)
241 if match:
241 if match:
242 return match.groupdict()
242 return match.groupdict()
243
243
244 return {}
244 return {}
245
245
246 @reraise_safe_exceptions
246 @reraise_safe_exceptions
247 def is_large_file(self, wire, commit_id):
247 def is_large_file(self, wire, commit_id):
248 cache_on, context_uid, repo_id = self._cache_on(wire)
248 cache_on, context_uid, repo_id = self._cache_on(wire)
249
249
250 @self.region.conditional_cache_on_arguments(condition=cache_on)
250 @self.region.conditional_cache_on_arguments(condition=cache_on)
251 def _is_large_file(_repo_id, _sha):
251 def _is_large_file(_repo_id, _sha):
252 repo_init = self._factory.repo_libgit2(wire)
252 repo_init = self._factory.repo_libgit2(wire)
253 with repo_init as repo:
253 with repo_init as repo:
254 blob = repo[commit_id]
254 blob = repo[commit_id]
255 if blob.is_binary:
255 if blob.is_binary:
256 return {}
256 return {}
257
257
258 return self._parse_lfs_pointer(blob.data)
258 return self._parse_lfs_pointer(blob.data)
259
259
260 return _is_large_file(repo_id, commit_id)
260 return _is_large_file(repo_id, commit_id)
261
261
262 @reraise_safe_exceptions
262 @reraise_safe_exceptions
263 def is_binary(self, wire, tree_id):
263 def is_binary(self, wire, tree_id):
264 cache_on, context_uid, repo_id = self._cache_on(wire)
264 cache_on, context_uid, repo_id = self._cache_on(wire)
265
265
266 @self.region.conditional_cache_on_arguments(condition=cache_on)
266 @self.region.conditional_cache_on_arguments(condition=cache_on)
267 def _is_binary(_repo_id, _tree_id):
267 def _is_binary(_repo_id, _tree_id):
268 repo_init = self._factory.repo_libgit2(wire)
268 repo_init = self._factory.repo_libgit2(wire)
269 with repo_init as repo:
269 with repo_init as repo:
270 blob_obj = repo[tree_id]
270 blob_obj = repo[tree_id]
271 return blob_obj.is_binary
271 return blob_obj.is_binary
272
272
273 return _is_binary(repo_id, tree_id)
273 return _is_binary(repo_id, tree_id)
274
274
275 @reraise_safe_exceptions
275 @reraise_safe_exceptions
276 def in_largefiles_store(self, wire, oid):
276 def in_largefiles_store(self, wire, oid):
277 conf = self._wire_to_config(wire)
277 conf = self._wire_to_config(wire)
278 repo_init = self._factory.repo_libgit2(wire)
278 repo_init = self._factory.repo_libgit2(wire)
279 with repo_init as repo:
279 with repo_init as repo:
280 repo_name = repo.path
280 repo_name = repo.path
281
281
282 store_location = conf.get('vcs_git_lfs_store_location')
282 store_location = conf.get('vcs_git_lfs_store_location')
283 if store_location:
283 if store_location:
284
284
285 store = LFSOidStore(
285 store = LFSOidStore(
286 oid=oid, repo=repo_name, store_location=store_location)
286 oid=oid, repo=repo_name, store_location=store_location)
287 return store.has_oid()
287 return store.has_oid()
288
288
289 return False
289 return False
290
290
291 @reraise_safe_exceptions
291 @reraise_safe_exceptions
292 def store_path(self, wire, oid):
292 def store_path(self, wire, oid):
293 conf = self._wire_to_config(wire)
293 conf = self._wire_to_config(wire)
294 repo_init = self._factory.repo_libgit2(wire)
294 repo_init = self._factory.repo_libgit2(wire)
295 with repo_init as repo:
295 with repo_init as repo:
296 repo_name = repo.path
296 repo_name = repo.path
297
297
298 store_location = conf.get('vcs_git_lfs_store_location')
298 store_location = conf.get('vcs_git_lfs_store_location')
299 if store_location:
299 if store_location:
300 store = LFSOidStore(
300 store = LFSOidStore(
301 oid=oid, repo=repo_name, store_location=store_location)
301 oid=oid, repo=repo_name, store_location=store_location)
302 return store.oid_path
302 return store.oid_path
303 raise ValueError('Unable to fetch oid with path {}'.format(oid))
303 raise ValueError('Unable to fetch oid with path {}'.format(oid))
304
304
305 @reraise_safe_exceptions
305 @reraise_safe_exceptions
306 def bulk_request(self, wire, rev, pre_load):
306 def bulk_request(self, wire, rev, pre_load):
307 cache_on, context_uid, repo_id = self._cache_on(wire)
307 cache_on, context_uid, repo_id = self._cache_on(wire)
308 @self.region.conditional_cache_on_arguments(condition=cache_on)
308 @self.region.conditional_cache_on_arguments(condition=cache_on)
309 def _bulk_request(_repo_id, _rev, _pre_load):
309 def _bulk_request(_repo_id, _rev, _pre_load):
310 result = {}
310 result = {}
311 for attr in pre_load:
311 for attr in pre_load:
312 try:
312 try:
313 method = self._bulk_methods[attr]
313 method = self._bulk_methods[attr]
314 args = [wire, rev]
314 args = [wire, rev]
315 result[attr] = method(*args)
315 result[attr] = method(*args)
316 except KeyError as e:
316 except KeyError as e:
317 raise exceptions.VcsException(e)(
317 raise exceptions.VcsException(e)(
318 "Unknown bulk attribute: %s" % attr)
318 "Unknown bulk attribute: %s" % attr)
319 return result
319 return result
320
320
321 return _bulk_request(repo_id, rev, sorted(pre_load))
321 return _bulk_request(repo_id, rev, sorted(pre_load))
322
322
323 def _build_opener(self, url):
323 def _build_opener(self, url):
324 handlers = []
324 handlers = []
325 url_obj = url_parser(url)
325 url_obj = url_parser(url)
326 _, authinfo = url_obj.authinfo()
326 _, authinfo = url_obj.authinfo()
327
327
328 if authinfo:
328 if authinfo:
329 # create a password manager
329 # create a password manager
330 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
330 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
331 passmgr.add_password(*authinfo)
331 passmgr.add_password(*authinfo)
332
332
333 handlers.extend((httpbasicauthhandler(passmgr),
333 handlers.extend((httpbasicauthhandler(passmgr),
334 httpdigestauthhandler(passmgr)))
334 httpdigestauthhandler(passmgr)))
335
335
336 return urllib2.build_opener(*handlers)
336 return urllib2.build_opener(*handlers)
337
337
338 def _type_id_to_name(self, type_id):
338 def _type_id_to_name(self, type_id):
339 return {
339 return {
340 1: b'commit',
340 1: b'commit',
341 2: b'tree',
341 2: b'tree',
342 3: b'blob',
342 3: b'blob',
343 4: b'tag'
343 4: b'tag'
344 }[type_id]
344 }[type_id]
345
345
346 @reraise_safe_exceptions
346 @reraise_safe_exceptions
347 def check_url(self, url, config):
347 def check_url(self, url, config):
348 url_obj = url_parser(url)
348 url_obj = url_parser(url)
349 test_uri, _ = url_obj.authinfo()
349 test_uri, _ = url_obj.authinfo()
350 url_obj.passwd = '*****' if url_obj.passwd else url_obj.passwd
350 url_obj.passwd = '*****' if url_obj.passwd else url_obj.passwd
351 url_obj.query = obfuscate_qs(url_obj.query)
351 url_obj.query = obfuscate_qs(url_obj.query)
352 cleaned_uri = str(url_obj)
352 cleaned_uri = str(url_obj)
353 log.info("Checking URL for remote cloning/import: %s", cleaned_uri)
353 log.info("Checking URL for remote cloning/import: %s", cleaned_uri)
354
354
355 if not test_uri.endswith('info/refs'):
355 if not test_uri.endswith('info/refs'):
356 test_uri = test_uri.rstrip('/') + '/info/refs'
356 test_uri = test_uri.rstrip('/') + '/info/refs'
357
357
358 o = self._build_opener(url)
358 o = self._build_opener(url)
359 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
359 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
360
360
361 q = {"service": 'git-upload-pack'}
361 q = {"service": 'git-upload-pack'}
362 qs = '?%s' % urllib.urlencode(q)
362 qs = '?%s' % urllib.urlencode(q)
363 cu = "%s%s" % (test_uri, qs)
363 cu = "%s%s" % (test_uri, qs)
364 req = urllib2.Request(cu, None, {})
364 req = urllib2.Request(cu, None, {})
365
365
366 try:
366 try:
367 log.debug("Trying to open URL %s", cleaned_uri)
367 log.debug("Trying to open URL %s", cleaned_uri)
368 resp = o.open(req)
368 resp = o.open(req)
369 if resp.code != 200:
369 if resp.code != 200:
370 raise exceptions.URLError()('Return Code is not 200')
370 raise exceptions.URLError()('Return Code is not 200')
371 except Exception as e:
371 except Exception as e:
372 log.warning("URL cannot be opened: %s", cleaned_uri, exc_info=True)
372 log.warning("URL cannot be opened: %s", cleaned_uri, exc_info=True)
373 # means it cannot be cloned
373 # means it cannot be cloned
374 raise exceptions.URLError(e)("[%s] org_exc: %s" % (cleaned_uri, e))
374 raise exceptions.URLError(e)("[%s] org_exc: %s" % (cleaned_uri, e))
375
375
376 # now detect if it's proper git repo
376 # now detect if it's proper git repo
377 gitdata = resp.read()
377 gitdata = resp.read()
378 if 'service=git-upload-pack' in gitdata:
378 if 'service=git-upload-pack' in gitdata:
379 pass
379 pass
380 elif re.findall(r'[0-9a-fA-F]{40}\s+refs', gitdata):
380 elif re.findall(r'[0-9a-fA-F]{40}\s+refs', gitdata):
381 # old style git can return some other format !
381 # old style git can return some other format !
382 pass
382 pass
383 else:
383 else:
384 raise exceptions.URLError()(
384 raise exceptions.URLError()(
385 "url [%s] does not look like an git" % (cleaned_uri,))
385 "url [%s] does not look like an git" % (cleaned_uri,))
386
386
387 return True
387 return True
388
388
389 @reraise_safe_exceptions
389 @reraise_safe_exceptions
390 def clone(self, wire, url, deferred, valid_refs, update_after_clone):
390 def clone(self, wire, url, deferred, valid_refs, update_after_clone):
391 # TODO(marcink): deprecate this method. Last i checked we don't use it anymore
391 # TODO(marcink): deprecate this method. Last i checked we don't use it anymore
392 remote_refs = self.pull(wire, url, apply_refs=False)
392 remote_refs = self.pull(wire, url, apply_refs=False)
393 repo = self._factory.repo(wire)
393 repo = self._factory.repo(wire)
394 if isinstance(valid_refs, list):
394 if isinstance(valid_refs, list):
395 valid_refs = tuple(valid_refs)
395 valid_refs = tuple(valid_refs)
396
396
397 for k in remote_refs:
397 for k in remote_refs:
398 # only parse heads/tags and skip so called deferred tags
398 # only parse heads/tags and skip so called deferred tags
399 if k.startswith(valid_refs) and not k.endswith(deferred):
399 if k.startswith(valid_refs) and not k.endswith(deferred):
400 repo[k] = remote_refs[k]
400 repo[k] = remote_refs[k]
401
401
402 if update_after_clone:
402 if update_after_clone:
403 # we want to checkout HEAD
403 # we want to checkout HEAD
404 repo["HEAD"] = remote_refs["HEAD"]
404 repo["HEAD"] = remote_refs["HEAD"]
405 index.build_index_from_tree(repo.path, repo.index_path(),
405 index.build_index_from_tree(repo.path, repo.index_path(),
406 repo.object_store, repo["HEAD"].tree)
406 repo.object_store, repo["HEAD"].tree)
407
407
408 @reraise_safe_exceptions
408 @reraise_safe_exceptions
409 def branch(self, wire, commit_id):
409 def branch(self, wire, commit_id):
410 cache_on, context_uid, repo_id = self._cache_on(wire)
410 cache_on, context_uid, repo_id = self._cache_on(wire)
411 @self.region.conditional_cache_on_arguments(condition=cache_on)
411 @self.region.conditional_cache_on_arguments(condition=cache_on)
412 def _branch(_context_uid, _repo_id, _commit_id):
412 def _branch(_context_uid, _repo_id, _commit_id):
413 regex = re.compile('^refs/heads')
413 regex = re.compile('^refs/heads')
414
414
415 def filter_with(ref):
415 def filter_with(ref):
416 return regex.match(ref[0]) and ref[1] == _commit_id
416 return regex.match(ref[0]) and ref[1] == _commit_id
417
417
418 branches = filter(filter_with, self.get_refs(wire).items())
418 branches = filter(filter_with, self.get_refs(wire).items())
419 return [x[0].split('refs/heads/')[-1] for x in branches]
419 return [x[0].split('refs/heads/')[-1] for x in branches]
420
420
421 return _branch(context_uid, repo_id, commit_id)
421 return _branch(context_uid, repo_id, commit_id)
422
422
423 @reraise_safe_exceptions
423 @reraise_safe_exceptions
424 def commit_branches(self, wire, commit_id):
424 def commit_branches(self, wire, commit_id):
425 cache_on, context_uid, repo_id = self._cache_on(wire)
425 cache_on, context_uid, repo_id = self._cache_on(wire)
426 @self.region.conditional_cache_on_arguments(condition=cache_on)
426 @self.region.conditional_cache_on_arguments(condition=cache_on)
427 def _commit_branches(_context_uid, _repo_id, _commit_id):
427 def _commit_branches(_context_uid, _repo_id, _commit_id):
428 repo_init = self._factory.repo_libgit2(wire)
428 repo_init = self._factory.repo_libgit2(wire)
429 with repo_init as repo:
429 with repo_init as repo:
430 branches = [x for x in repo.branches.with_commit(_commit_id)]
430 branches = [x for x in repo.branches.with_commit(_commit_id)]
431 return branches
431 return branches
432
432
433 return _commit_branches(context_uid, repo_id, commit_id)
433 return _commit_branches(context_uid, repo_id, commit_id)
434
434
435 @reraise_safe_exceptions
435 @reraise_safe_exceptions
436 def add_object(self, wire, content):
436 def add_object(self, wire, content):
437 repo_init = self._factory.repo_libgit2(wire)
437 repo_init = self._factory.repo_libgit2(wire)
438 with repo_init as repo:
438 with repo_init as repo:
439 blob = objects.Blob()
439 blob = objects.Blob()
440 blob.set_raw_string(content)
440 blob.set_raw_string(content)
441 repo.object_store.add_object(blob)
441 repo.object_store.add_object(blob)
442 return blob.id
442 return blob.id
443
443
444 # TODO: this is quite complex, check if that can be simplified
444 # TODO: this is quite complex, check if that can be simplified
445 @reraise_safe_exceptions
445 @reraise_safe_exceptions
446 def commit(self, wire, commit_data, branch, commit_tree, updated, removed):
446 def commit(self, wire, commit_data, branch, commit_tree, updated, removed):
447 repo = self._factory.repo(wire)
447 repo = self._factory.repo(wire)
448 object_store = repo.object_store
448 object_store = repo.object_store
449
449
450 # Create tree and populates it with blobs
450 # Create tree and populates it with blobs
451 commit_tree = commit_tree and repo[commit_tree] or objects.Tree()
451 commit_tree = commit_tree and repo[commit_tree] or objects.Tree()
452
452
453 for node in updated:
453 for node in updated:
454 # Compute subdirs if needed
454 # Compute subdirs if needed
455 dirpath, nodename = vcspath.split(node['path'])
455 dirpath, nodename = vcspath.split(node['path'])
456 dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
456 dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
457 parent = commit_tree
457 parent = commit_tree
458 ancestors = [('', parent)]
458 ancestors = [('', parent)]
459
459
460 # Tries to dig for the deepest existing tree
460 # Tries to dig for the deepest existing tree
461 while dirnames:
461 while dirnames:
462 curdir = dirnames.pop(0)
462 curdir = dirnames.pop(0)
463 try:
463 try:
464 dir_id = parent[curdir][1]
464 dir_id = parent[curdir][1]
465 except KeyError:
465 except KeyError:
466 # put curdir back into dirnames and stops
466 # put curdir back into dirnames and stops
467 dirnames.insert(0, curdir)
467 dirnames.insert(0, curdir)
468 break
468 break
469 else:
469 else:
470 # If found, updates parent
470 # If found, updates parent
471 parent = repo[dir_id]
471 parent = repo[dir_id]
472 ancestors.append((curdir, parent))
472 ancestors.append((curdir, parent))
473 # Now parent is deepest existing tree and we need to create
473 # Now parent is deepest existing tree and we need to create
474 # subtrees for dirnames (in reverse order)
474 # subtrees for dirnames (in reverse order)
475 # [this only applies for nodes from added]
475 # [this only applies for nodes from added]
476 new_trees = []
476 new_trees = []
477
477
478 blob = objects.Blob.from_string(node['content'])
478 blob = objects.Blob.from_string(node['content'])
479
479
480 if dirnames:
480 if dirnames:
481 # If there are trees which should be created we need to build
481 # If there are trees which should be created we need to build
482 # them now (in reverse order)
482 # them now (in reverse order)
483 reversed_dirnames = list(reversed(dirnames))
483 reversed_dirnames = list(reversed(dirnames))
484 curtree = objects.Tree()
484 curtree = objects.Tree()
485 curtree[node['node_path']] = node['mode'], blob.id
485 curtree[node['node_path']] = node['mode'], blob.id
486 new_trees.append(curtree)
486 new_trees.append(curtree)
487 for dirname in reversed_dirnames[:-1]:
487 for dirname in reversed_dirnames[:-1]:
488 newtree = objects.Tree()
488 newtree = objects.Tree()
489 newtree[dirname] = (DIR_STAT, curtree.id)
489 newtree[dirname] = (DIR_STAT, curtree.id)
490 new_trees.append(newtree)
490 new_trees.append(newtree)
491 curtree = newtree
491 curtree = newtree
492 parent[reversed_dirnames[-1]] = (DIR_STAT, curtree.id)
492 parent[reversed_dirnames[-1]] = (DIR_STAT, curtree.id)
493 else:
493 else:
494 parent.add(name=node['node_path'], mode=node['mode'], hexsha=blob.id)
494 parent.add(name=node['node_path'], mode=node['mode'], hexsha=blob.id)
495
495
496 new_trees.append(parent)
496 new_trees.append(parent)
497 # Update ancestors
497 # Update ancestors
498 reversed_ancestors = reversed(
498 reversed_ancestors = reversed(
499 [(a[1], b[1], b[0]) for a, b in zip(ancestors, ancestors[1:])])
499 [(a[1], b[1], b[0]) for a, b in zip(ancestors, ancestors[1:])])
500 for parent, tree, path in reversed_ancestors:
500 for parent, tree, path in reversed_ancestors:
501 parent[path] = (DIR_STAT, tree.id)
501 parent[path] = (DIR_STAT, tree.id)
502 object_store.add_object(tree)
502 object_store.add_object(tree)
503
503
504 object_store.add_object(blob)
504 object_store.add_object(blob)
505 for tree in new_trees:
505 for tree in new_trees:
506 object_store.add_object(tree)
506 object_store.add_object(tree)
507
507
508 for node_path in removed:
508 for node_path in removed:
509 paths = node_path.split('/')
509 paths = node_path.split('/')
510 tree = commit_tree
510 tree = commit_tree
511 trees = [tree]
511 trees = [tree]
512 # Traverse deep into the forest...
512 # Traverse deep into the forest...
513 for path in paths:
513 for path in paths:
514 try:
514 try:
515 obj = repo[tree[path][1]]
515 obj = repo[tree[path][1]]
516 if isinstance(obj, objects.Tree):
516 if isinstance(obj, objects.Tree):
517 trees.append(obj)
517 trees.append(obj)
518 tree = obj
518 tree = obj
519 except KeyError:
519 except KeyError:
520 break
520 break
521 # Cut down the blob and all rotten trees on the way back...
521 # Cut down the blob and all rotten trees on the way back...
522 for path, tree in reversed(zip(paths, trees)):
522 for path, tree in reversed(zip(paths, trees)):
523 del tree[path]
523 del tree[path]
524 if tree:
524 if tree:
525 # This tree still has elements - don't remove it or any
525 # This tree still has elements - don't remove it or any
526 # of it's parents
526 # of it's parents
527 break
527 break
528
528
529 object_store.add_object(commit_tree)
529 object_store.add_object(commit_tree)
530
530
531 # Create commit
531 # Create commit
532 commit = objects.Commit()
532 commit = objects.Commit()
533 commit.tree = commit_tree.id
533 commit.tree = commit_tree.id
534 for k, v in commit_data.iteritems():
534 for k, v in commit_data.iteritems():
535 setattr(commit, k, v)
535 setattr(commit, k, v)
536 object_store.add_object(commit)
536 object_store.add_object(commit)
537
537
538 self.create_branch(wire, branch, commit.id)
538 self.create_branch(wire, branch, commit.id)
539
539
540 # dulwich set-ref
540 # dulwich set-ref
541 ref = 'refs/heads/%s' % branch
541 ref = 'refs/heads/%s' % branch
542 repo.refs[ref] = commit.id
542 repo.refs[ref] = commit.id
543
543
544 return commit.id
544 return commit.id
545
545
546 @reraise_safe_exceptions
546 @reraise_safe_exceptions
547 def pull(self, wire, url, apply_refs=True, refs=None, update_after=False):
547 def pull(self, wire, url, apply_refs=True, refs=None, update_after=False):
548 if url != 'default' and '://' not in url:
548 if url != 'default' and '://' not in url:
549 client = LocalGitClient(url)
549 client = LocalGitClient(url)
550 else:
550 else:
551 url_obj = url_parser(url)
551 url_obj = url_parser(url)
552 o = self._build_opener(url)
552 o = self._build_opener(url)
553 url, _ = url_obj.authinfo()
553 url, _ = url_obj.authinfo()
554 client = HttpGitClient(base_url=url, opener=o)
554 client = HttpGitClient(base_url=url, opener=o)
555 repo = self._factory.repo(wire)
555 repo = self._factory.repo(wire)
556
556
557 determine_wants = repo.object_store.determine_wants_all
557 determine_wants = repo.object_store.determine_wants_all
558 if refs:
558 if refs:
559 def determine_wants_requested(references):
559 def determine_wants_requested(references):
560 return [references[r] for r in references if r in refs]
560 return [references[r] for r in references if r in refs]
561 determine_wants = determine_wants_requested
561 determine_wants = determine_wants_requested
562
562
563 try:
563 try:
564 remote_refs = client.fetch(
564 remote_refs = client.fetch(
565 path=url, target=repo, determine_wants=determine_wants)
565 path=url, target=repo, determine_wants=determine_wants)
566 except NotGitRepository as e:
566 except NotGitRepository as e:
567 log.warning(
567 log.warning(
568 'Trying to fetch from "%s" failed, not a Git repository.', url)
568 'Trying to fetch from "%s" failed, not a Git repository.', url)
569 # Exception can contain unicode which we convert
569 # Exception can contain unicode which we convert
570 raise exceptions.AbortException(e)(repr(e))
570 raise exceptions.AbortException(e)(repr(e))
571
571
572 # mikhail: client.fetch() returns all the remote refs, but fetches only
572 # mikhail: client.fetch() returns all the remote refs, but fetches only
573 # refs filtered by `determine_wants` function. We need to filter result
573 # refs filtered by `determine_wants` function. We need to filter result
574 # as well
574 # as well
575 if refs:
575 if refs:
576 remote_refs = {k: remote_refs[k] for k in remote_refs if k in refs}
576 remote_refs = {k: remote_refs[k] for k in remote_refs if k in refs}
577
577
578 if apply_refs:
578 if apply_refs:
579 # TODO: johbo: Needs proper test coverage with a git repository
579 # TODO: johbo: Needs proper test coverage with a git repository
580 # that contains a tag object, so that we would end up with
580 # that contains a tag object, so that we would end up with
581 # a peeled ref at this point.
581 # a peeled ref at this point.
582 for k in remote_refs:
582 for k in remote_refs:
583 if k.endswith(PEELED_REF_MARKER):
583 if k.endswith(PEELED_REF_MARKER):
584 log.debug("Skipping peeled reference %s", k)
584 log.debug("Skipping peeled reference %s", k)
585 continue
585 continue
586 repo[k] = remote_refs[k]
586 repo[k] = remote_refs[k]
587
587
588 if refs and not update_after:
588 if refs and not update_after:
589 # mikhail: explicitly set the head to the last ref.
589 # mikhail: explicitly set the head to the last ref.
590 repo['HEAD'] = remote_refs[refs[-1]]
590 repo['HEAD'] = remote_refs[refs[-1]]
591
591
592 if update_after:
592 if update_after:
593 # we want to checkout HEAD
593 # we want to checkout HEAD
594 repo["HEAD"] = remote_refs["HEAD"]
594 repo["HEAD"] = remote_refs["HEAD"]
595 index.build_index_from_tree(repo.path, repo.index_path(),
595 index.build_index_from_tree(repo.path, repo.index_path(),
596 repo.object_store, repo["HEAD"].tree)
596 repo.object_store, repo["HEAD"].tree)
597 return remote_refs
597 return remote_refs
598
598
599 @reraise_safe_exceptions
599 @reraise_safe_exceptions
600 def sync_fetch(self, wire, url, refs=None, all_refs=False):
600 def sync_fetch(self, wire, url, refs=None, all_refs=False):
601 repo = self._factory.repo(wire)
601 repo = self._factory.repo(wire)
602 if refs and not isinstance(refs, (list, tuple)):
602 if refs and not isinstance(refs, (list, tuple)):
603 refs = [refs]
603 refs = [refs]
604
604
605 config = self._wire_to_config(wire)
605 config = self._wire_to_config(wire)
606 # get all remote refs we'll use to fetch later
606 # get all remote refs we'll use to fetch later
607 cmd = ['ls-remote']
607 cmd = ['ls-remote']
608 if not all_refs:
608 if not all_refs:
609 cmd += ['--heads', '--tags']
609 cmd += ['--heads', '--tags']
610 cmd += [url]
610 cmd += [url]
611 output, __ = self.run_git_command(
611 output, __ = self.run_git_command(
612 wire, cmd, fail_on_stderr=False,
612 wire, cmd, fail_on_stderr=False,
613 _copts=self._remote_conf(config),
613 _copts=self._remote_conf(config),
614 extra_env={'GIT_TERMINAL_PROMPT': '0'})
614 extra_env={'GIT_TERMINAL_PROMPT': '0'})
615
615
616 remote_refs = collections.OrderedDict()
616 remote_refs = collections.OrderedDict()
617 fetch_refs = []
617 fetch_refs = []
618
618
619 for ref_line in output.splitlines():
619 for ref_line in output.splitlines():
620 sha, ref = ref_line.split('\t')
620 sha, ref = ref_line.split('\t')
621 sha = sha.strip()
621 sha = sha.strip()
622 if ref in remote_refs:
622 if ref in remote_refs:
623 # duplicate, skip
623 # duplicate, skip
624 continue
624 continue
625 if ref.endswith(PEELED_REF_MARKER):
625 if ref.endswith(PEELED_REF_MARKER):
626 log.debug("Skipping peeled reference %s", ref)
626 log.debug("Skipping peeled reference %s", ref)
627 continue
627 continue
628 # don't sync HEAD
628 # don't sync HEAD
629 if ref in ['HEAD']:
629 if ref in ['HEAD']:
630 continue
630 continue
631
631
632 remote_refs[ref] = sha
632 remote_refs[ref] = sha
633
633
634 if refs and sha in refs:
634 if refs and sha in refs:
635 # we filter fetch using our specified refs
635 # we filter fetch using our specified refs
636 fetch_refs.append('{}:{}'.format(ref, ref))
636 fetch_refs.append('{}:{}'.format(ref, ref))
637 elif not refs:
637 elif not refs:
638 fetch_refs.append('{}:{}'.format(ref, ref))
638 fetch_refs.append('{}:{}'.format(ref, ref))
639 log.debug('Finished obtaining fetch refs, total: %s', len(fetch_refs))
639 log.debug('Finished obtaining fetch refs, total: %s', len(fetch_refs))
640
640
641 if fetch_refs:
641 if fetch_refs:
642 for chunk in more_itertools.chunked(fetch_refs, 1024 * 4):
642 for chunk in more_itertools.chunked(fetch_refs, 1024 * 4):
643 fetch_refs_chunks = list(chunk)
643 fetch_refs_chunks = list(chunk)
644 log.debug('Fetching %s refs from import url', len(fetch_refs_chunks))
644 log.debug('Fetching %s refs from import url', len(fetch_refs_chunks))
645 _out, _err = self.run_git_command(
645 _out, _err = self.run_git_command(
646 wire, ['fetch', url, '--force', '--prune', '--'] + fetch_refs_chunks,
646 wire, ['fetch', url, '--force', '--prune', '--'] + fetch_refs_chunks,
647 fail_on_stderr=False,
647 fail_on_stderr=False,
648 _copts=self._remote_conf(config),
648 _copts=self._remote_conf(config),
649 extra_env={'GIT_TERMINAL_PROMPT': '0'})
649 extra_env={'GIT_TERMINAL_PROMPT': '0'})
650
650
651 return remote_refs
651 return remote_refs
652
652
653 @reraise_safe_exceptions
653 @reraise_safe_exceptions
654 def sync_push(self, wire, url, refs=None):
654 def sync_push(self, wire, url, refs=None):
655 if not self.check_url(url, wire):
655 if not self.check_url(url, wire):
656 return
656 return
657 config = self._wire_to_config(wire)
657 config = self._wire_to_config(wire)
658 self._factory.repo(wire)
658 self._factory.repo(wire)
659 self.run_git_command(
659 self.run_git_command(
660 wire, ['push', url, '--mirror'], fail_on_stderr=False,
660 wire, ['push', url, '--mirror'], fail_on_stderr=False,
661 _copts=self._remote_conf(config),
661 _copts=self._remote_conf(config),
662 extra_env={'GIT_TERMINAL_PROMPT': '0'})
662 extra_env={'GIT_TERMINAL_PROMPT': '0'})
663
663
664 @reraise_safe_exceptions
664 @reraise_safe_exceptions
665 def get_remote_refs(self, wire, url):
665 def get_remote_refs(self, wire, url):
666 repo = Repo(url)
666 repo = Repo(url)
667 return repo.get_refs()
667 return repo.get_refs()
668
668
669 @reraise_safe_exceptions
669 @reraise_safe_exceptions
670 def get_description(self, wire):
670 def get_description(self, wire):
671 repo = self._factory.repo(wire)
671 repo = self._factory.repo(wire)
672 return repo.get_description()
672 return repo.get_description()
673
673
674 @reraise_safe_exceptions
674 @reraise_safe_exceptions
675 def get_missing_revs(self, wire, rev1, rev2, path2):
675 def get_missing_revs(self, wire, rev1, rev2, path2):
676 repo = self._factory.repo(wire)
676 repo = self._factory.repo(wire)
677 LocalGitClient(thin_packs=False).fetch(path2, repo)
677 LocalGitClient(thin_packs=False).fetch(path2, repo)
678
678
679 wire_remote = wire.copy()
679 wire_remote = wire.copy()
680 wire_remote['path'] = path2
680 wire_remote['path'] = path2
681 repo_remote = self._factory.repo(wire_remote)
681 repo_remote = self._factory.repo(wire_remote)
682 LocalGitClient(thin_packs=False).fetch(wire["path"], repo_remote)
682 LocalGitClient(thin_packs=False).fetch(wire["path"], repo_remote)
683
683
684 revs = [
684 revs = [
685 x.commit.id
685 x.commit.id
686 for x in repo_remote.get_walker(include=[rev2], exclude=[rev1])]
686 for x in repo_remote.get_walker(include=[rev2], exclude=[rev1])]
687 return revs
687 return revs
688
688
689 @reraise_safe_exceptions
689 @reraise_safe_exceptions
690 def get_object(self, wire, sha):
690 def get_object(self, wire, sha):
691 cache_on, context_uid, repo_id = self._cache_on(wire)
691 cache_on, context_uid, repo_id = self._cache_on(wire)
692 @self.region.conditional_cache_on_arguments(condition=cache_on)
692 @self.region.conditional_cache_on_arguments(condition=cache_on)
693 def _get_object(_context_uid, _repo_id, _sha):
693 def _get_object(_context_uid, _repo_id, _sha):
694 repo_init = self._factory.repo_libgit2(wire)
694 repo_init = self._factory.repo_libgit2(wire)
695 with repo_init as repo:
695 with repo_init as repo:
696
696
697 missing_commit_err = 'Commit {} does not exist for `{}`'.format(sha, wire['path'])
697 missing_commit_err = 'Commit {} does not exist for `{}`'.format(sha, wire['path'])
698 try:
698 try:
699 commit = repo.revparse_single(sha)
699 commit = repo.revparse_single(sha)
700 except (KeyError, ValueError) as e:
700 except (KeyError, ValueError) as e:
701 raise exceptions.LookupException(e)(missing_commit_err)
701 raise exceptions.LookupException(e)(missing_commit_err)
702
702
703 is_tag = False
703 is_tag = False
704 if isinstance(commit, pygit2.Tag):
704 if isinstance(commit, pygit2.Tag):
705 commit = repo.get(commit.target)
705 commit = repo.get(commit.target)
706 is_tag = True
706 is_tag = True
707
707
708 check_dangling = True
708 check_dangling = True
709 if is_tag:
709 if is_tag:
710 check_dangling = False
710 check_dangling = False
711
711
712 # we used a reference and it parsed means we're not having a dangling commit
712 # we used a reference and it parsed means we're not having a dangling commit
713 if sha != commit.hex:
713 if sha != commit.hex:
714 check_dangling = False
714 check_dangling = False
715
715
716 if check_dangling:
716 if check_dangling:
717 # check for dangling commit
717 # check for dangling commit
718 for branch in repo.branches.with_commit(commit.hex):
718 for branch in repo.branches.with_commit(commit.hex):
719 if branch:
719 if branch:
720 break
720 break
721 else:
721 else:
722 raise exceptions.LookupException(None)(missing_commit_err)
722 raise exceptions.LookupException(None)(missing_commit_err)
723
723
724 commit_id = commit.hex
724 commit_id = commit.hex
725 type_id = commit.type
725 type_id = commit.type
726
726
727 return {
727 return {
728 'id': commit_id,
728 'id': commit_id,
729 'type': self._type_id_to_name(type_id),
729 'type': self._type_id_to_name(type_id),
730 'commit_id': commit_id,
730 'commit_id': commit_id,
731 'idx': 0
731 'idx': 0
732 }
732 }
733
733
734 return _get_object(context_uid, repo_id, sha)
734 return _get_object(context_uid, repo_id, sha)
735
735
736 @reraise_safe_exceptions
736 @reraise_safe_exceptions
737 def get_refs(self, wire):
737 def get_refs(self, wire):
738 cache_on, context_uid, repo_id = self._cache_on(wire)
738 cache_on, context_uid, repo_id = self._cache_on(wire)
739 @self.region.conditional_cache_on_arguments(condition=cache_on)
739 @self.region.conditional_cache_on_arguments(condition=cache_on)
740 def _get_refs(_context_uid, _repo_id):
740 def _get_refs(_context_uid, _repo_id):
741
741
742 repo_init = self._factory.repo_libgit2(wire)
742 repo_init = self._factory.repo_libgit2(wire)
743 with repo_init as repo:
743 with repo_init as repo:
744 regex = re.compile('^refs/(heads|tags)/')
744 regex = re.compile('^refs/(heads|tags)/')
745 return {x.name: x.target.hex for x in
745 return {x.name: x.target.hex for x in
746 filter(lambda ref: regex.match(ref.name) ,repo.listall_reference_objects())}
746 filter(lambda ref: regex.match(ref.name) ,repo.listall_reference_objects())}
747
747
748 return _get_refs(context_uid, repo_id)
748 return _get_refs(context_uid, repo_id)
749
749
750 @reraise_safe_exceptions
750 @reraise_safe_exceptions
751 def get_branch_pointers(self, wire):
751 def get_branch_pointers(self, wire):
752 cache_on, context_uid, repo_id = self._cache_on(wire)
752 cache_on, context_uid, repo_id = self._cache_on(wire)
753 @self.region.conditional_cache_on_arguments(condition=cache_on)
753 @self.region.conditional_cache_on_arguments(condition=cache_on)
754 def _get_branch_pointers(_context_uid, _repo_id):
754 def _get_branch_pointers(_context_uid, _repo_id):
755
755
756 repo_init = self._factory.repo_libgit2(wire)
756 repo_init = self._factory.repo_libgit2(wire)
757 regex = re.compile('^refs/heads')
757 regex = re.compile('^refs/heads')
758 with repo_init as repo:
758 with repo_init as repo:
759 branches = filter(lambda ref: regex.match(ref.name), repo.listall_reference_objects())
759 branches = filter(lambda ref: regex.match(ref.name), repo.listall_reference_objects())
760 return {x.target.hex: x.shorthand for x in branches}
760 return {x.target.hex: x.shorthand for x in branches}
761
761
762 return _get_branch_pointers(context_uid, repo_id)
762 return _get_branch_pointers(context_uid, repo_id)
763
763
764 @reraise_safe_exceptions
764 @reraise_safe_exceptions
765 def head(self, wire, show_exc=True):
765 def head(self, wire, show_exc=True):
766 cache_on, context_uid, repo_id = self._cache_on(wire)
766 cache_on, context_uid, repo_id = self._cache_on(wire)
767 @self.region.conditional_cache_on_arguments(condition=cache_on)
767 @self.region.conditional_cache_on_arguments(condition=cache_on)
768 def _head(_context_uid, _repo_id, _show_exc):
768 def _head(_context_uid, _repo_id, _show_exc):
769 repo_init = self._factory.repo_libgit2(wire)
769 repo_init = self._factory.repo_libgit2(wire)
770 with repo_init as repo:
770 with repo_init as repo:
771 try:
771 try:
772 return repo.head.peel().hex
772 return repo.head.peel().hex
773 except Exception:
773 except Exception:
774 if show_exc:
774 if show_exc:
775 raise
775 raise
776 return _head(context_uid, repo_id, show_exc)
776 return _head(context_uid, repo_id, show_exc)
777
777
778 @reraise_safe_exceptions
778 @reraise_safe_exceptions
779 def init(self, wire):
779 def init(self, wire):
780 repo_path = str_to_dulwich(wire['path'])
780 repo_path = str_to_dulwich(wire['path'])
781 self.repo = Repo.init(repo_path)
781 self.repo = Repo.init(repo_path)
782
782
783 @reraise_safe_exceptions
783 @reraise_safe_exceptions
784 def init_bare(self, wire):
784 def init_bare(self, wire):
785 repo_path = str_to_dulwich(wire['path'])
785 repo_path = str_to_dulwich(wire['path'])
786 self.repo = Repo.init_bare(repo_path)
786 self.repo = Repo.init_bare(repo_path)
787
787
788 @reraise_safe_exceptions
788 @reraise_safe_exceptions
789 def revision(self, wire, rev):
789 def revision(self, wire, rev):
790
790
791 cache_on, context_uid, repo_id = self._cache_on(wire)
791 cache_on, context_uid, repo_id = self._cache_on(wire)
792 @self.region.conditional_cache_on_arguments(condition=cache_on)
792 @self.region.conditional_cache_on_arguments(condition=cache_on)
793 def _revision(_context_uid, _repo_id, _rev):
793 def _revision(_context_uid, _repo_id, _rev):
794 repo_init = self._factory.repo_libgit2(wire)
794 repo_init = self._factory.repo_libgit2(wire)
795 with repo_init as repo:
795 with repo_init as repo:
796 commit = repo[rev]
796 commit = repo[rev]
797 obj_data = {
797 obj_data = {
798 'id': commit.id.hex,
798 'id': commit.id.hex,
799 }
799 }
800 # tree objects itself don't have tree_id attribute
800 # tree objects itself don't have tree_id attribute
801 if hasattr(commit, 'tree_id'):
801 if hasattr(commit, 'tree_id'):
802 obj_data['tree'] = commit.tree_id.hex
802 obj_data['tree'] = commit.tree_id.hex
803
803
804 return obj_data
804 return obj_data
805 return _revision(context_uid, repo_id, rev)
805 return _revision(context_uid, repo_id, rev)
806
806
807 @reraise_safe_exceptions
807 @reraise_safe_exceptions
808 def date(self, wire, commit_id):
808 def date(self, wire, commit_id):
809 cache_on, context_uid, repo_id = self._cache_on(wire)
809 cache_on, context_uid, repo_id = self._cache_on(wire)
810 @self.region.conditional_cache_on_arguments(condition=cache_on)
810 @self.region.conditional_cache_on_arguments(condition=cache_on)
811 def _date(_repo_id, _commit_id):
811 def _date(_repo_id, _commit_id):
812 repo_init = self._factory.repo_libgit2(wire)
812 repo_init = self._factory.repo_libgit2(wire)
813 with repo_init as repo:
813 with repo_init as repo:
814 commit = repo[commit_id]
814 commit = repo[commit_id]
815
815
816 if hasattr(commit, 'commit_time'):
816 if hasattr(commit, 'commit_time'):
817 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
817 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
818 else:
818 else:
819 commit = commit.get_object()
819 commit = commit.get_object()
820 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
820 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
821
821
822 # TODO(marcink): check dulwich difference of offset vs timezone
822 # TODO(marcink): check dulwich difference of offset vs timezone
823 return [commit_time, commit_time_offset]
823 return [commit_time, commit_time_offset]
824 return _date(repo_id, commit_id)
824 return _date(repo_id, commit_id)
825
825
826 @reraise_safe_exceptions
826 @reraise_safe_exceptions
827 def author(self, wire, commit_id):
827 def author(self, wire, commit_id):
828 cache_on, context_uid, repo_id = self._cache_on(wire)
828 cache_on, context_uid, repo_id = self._cache_on(wire)
829 @self.region.conditional_cache_on_arguments(condition=cache_on)
829 @self.region.conditional_cache_on_arguments(condition=cache_on)
830 def _author(_repo_id, _commit_id):
830 def _author(_repo_id, _commit_id):
831 repo_init = self._factory.repo_libgit2(wire)
831 repo_init = self._factory.repo_libgit2(wire)
832 with repo_init as repo:
832 with repo_init as repo:
833 commit = repo[commit_id]
833 commit = repo[commit_id]
834
834
835 if hasattr(commit, 'author'):
835 if hasattr(commit, 'author'):
836 author = commit.author
836 author = commit.author
837 else:
837 else:
838 author = commit.get_object().author
838 author = commit.get_object().author
839
839
840 if author.email:
840 if author.email:
841 return u"{} <{}>".format(author.name, author.email)
841 return u"{} <{}>".format(author.name, author.email)
842
842
843 return u"{}".format(author.raw_name)
843 return u"{}".format(author.raw_name)
844 return _author(repo_id, commit_id)
844 return _author(repo_id, commit_id)
845
845
846 @reraise_safe_exceptions
846 @reraise_safe_exceptions
847 def message(self, wire, commit_id):
847 def message(self, wire, commit_id):
848 cache_on, context_uid, repo_id = self._cache_on(wire)
848 cache_on, context_uid, repo_id = self._cache_on(wire)
849 @self.region.conditional_cache_on_arguments(condition=cache_on)
849 @self.region.conditional_cache_on_arguments(condition=cache_on)
850 def _message(_repo_id, _commit_id):
850 def _message(_repo_id, _commit_id):
851 repo_init = self._factory.repo_libgit2(wire)
851 repo_init = self._factory.repo_libgit2(wire)
852 with repo_init as repo:
852 with repo_init as repo:
853 commit = repo[commit_id]
853 commit = repo[commit_id]
854 return commit.message
854 return commit.message
855 return _message(repo_id, commit_id)
855 return _message(repo_id, commit_id)
856
856
857 @reraise_safe_exceptions
857 @reraise_safe_exceptions
858 def parents(self, wire, commit_id):
858 def parents(self, wire, commit_id):
859 cache_on, context_uid, repo_id = self._cache_on(wire)
859 cache_on, context_uid, repo_id = self._cache_on(wire)
860 @self.region.conditional_cache_on_arguments(condition=cache_on)
860 @self.region.conditional_cache_on_arguments(condition=cache_on)
861 def _parents(_repo_id, _commit_id):
861 def _parents(_repo_id, _commit_id):
862 repo_init = self._factory.repo_libgit2(wire)
862 repo_init = self._factory.repo_libgit2(wire)
863 with repo_init as repo:
863 with repo_init as repo:
864 commit = repo[commit_id]
864 commit = repo[commit_id]
865 if hasattr(commit, 'parent_ids'):
865 if hasattr(commit, 'parent_ids'):
866 parent_ids = commit.parent_ids
866 parent_ids = commit.parent_ids
867 else:
867 else:
868 parent_ids = commit.get_object().parent_ids
868 parent_ids = commit.get_object().parent_ids
869
869
870 return [x.hex for x in parent_ids]
870 return [x.hex for x in parent_ids]
871 return _parents(repo_id, commit_id)
871 return _parents(repo_id, commit_id)
872
872
873 @reraise_safe_exceptions
873 @reraise_safe_exceptions
874 def children(self, wire, commit_id):
874 def children(self, wire, commit_id):
875 cache_on, context_uid, repo_id = self._cache_on(wire)
875 cache_on, context_uid, repo_id = self._cache_on(wire)
876 @self.region.conditional_cache_on_arguments(condition=cache_on)
876 @self.region.conditional_cache_on_arguments(condition=cache_on)
877 def _children(_repo_id, _commit_id):
877 def _children(_repo_id, _commit_id):
878 output, __ = self.run_git_command(
878 output, __ = self.run_git_command(
879 wire, ['rev-list', '--all', '--children'])
879 wire, ['rev-list', '--all', '--children'])
880
880
881 child_ids = []
881 child_ids = []
882 pat = re.compile(r'^%s' % commit_id)
882 pat = re.compile(r'^%s' % commit_id)
883 for l in output.splitlines():
883 for l in output.splitlines():
884 if pat.match(l):
884 if pat.match(l):
885 found_ids = l.split(' ')[1:]
885 found_ids = l.split(' ')[1:]
886 child_ids.extend(found_ids)
886 child_ids.extend(found_ids)
887
887
888 return child_ids
888 return child_ids
889 return _children(repo_id, commit_id)
889 return _children(repo_id, commit_id)
890
890
891 @reraise_safe_exceptions
891 @reraise_safe_exceptions
892 def set_refs(self, wire, key, value):
892 def set_refs(self, wire, key, value):
893 repo_init = self._factory.repo_libgit2(wire)
893 repo_init = self._factory.repo_libgit2(wire)
894 with repo_init as repo:
894 with repo_init as repo:
895 repo.references.create(key, value, force=True)
895 repo.references.create(key, value, force=True)
896
896
897 @reraise_safe_exceptions
897 @reraise_safe_exceptions
898 def create_branch(self, wire, branch_name, commit_id, force=False):
898 def create_branch(self, wire, branch_name, commit_id, force=False):
899 repo_init = self._factory.repo_libgit2(wire)
899 repo_init = self._factory.repo_libgit2(wire)
900 with repo_init as repo:
900 with repo_init as repo:
901 commit = repo[commit_id]
901 commit = repo[commit_id]
902
902
903 if force:
903 if force:
904 repo.branches.local.create(branch_name, commit, force=force)
904 repo.branches.local.create(branch_name, commit, force=force)
905 elif not repo.branches.get(branch_name):
905 elif not repo.branches.get(branch_name):
906 # create only if that branch isn't existing
906 # create only if that branch isn't existing
907 repo.branches.local.create(branch_name, commit, force=force)
907 repo.branches.local.create(branch_name, commit, force=force)
908
908
909 @reraise_safe_exceptions
909 @reraise_safe_exceptions
910 def remove_ref(self, wire, key):
910 def remove_ref(self, wire, key):
911 repo_init = self._factory.repo_libgit2(wire)
911 repo_init = self._factory.repo_libgit2(wire)
912 with repo_init as repo:
912 with repo_init as repo:
913 repo.references.delete(key)
913 repo.references.delete(key)
914
914
915 @reraise_safe_exceptions
915 @reraise_safe_exceptions
916 def tag_remove(self, wire, tag_name):
916 def tag_remove(self, wire, tag_name):
917 repo_init = self._factory.repo_libgit2(wire)
917 repo_init = self._factory.repo_libgit2(wire)
918 with repo_init as repo:
918 with repo_init as repo:
919 key = 'refs/tags/{}'.format(tag_name)
919 key = 'refs/tags/{}'.format(tag_name)
920 repo.references.delete(key)
920 repo.references.delete(key)
921
921
922 @reraise_safe_exceptions
922 @reraise_safe_exceptions
923 def tree_changes(self, wire, source_id, target_id):
923 def tree_changes(self, wire, source_id, target_id):
924 # TODO(marcink): remove this seems it's only used by tests
924 # TODO(marcink): remove this seems it's only used by tests
925 repo = self._factory.repo(wire)
925 repo = self._factory.repo(wire)
926 source = repo[source_id].tree if source_id else None
926 source = repo[source_id].tree if source_id else None
927 target = repo[target_id].tree
927 target = repo[target_id].tree
928 result = repo.object_store.tree_changes(source, target)
928 result = repo.object_store.tree_changes(source, target)
929 return list(result)
929 return list(result)
930
930
931 @reraise_safe_exceptions
931 @reraise_safe_exceptions
932 def tree_and_type_for_path(self, wire, commit_id, path):
932 def tree_and_type_for_path(self, wire, commit_id, path):
933
933
934 cache_on, context_uid, repo_id = self._cache_on(wire)
934 cache_on, context_uid, repo_id = self._cache_on(wire)
935 @self.region.conditional_cache_on_arguments(condition=cache_on)
935 @self.region.conditional_cache_on_arguments(condition=cache_on)
936 def _tree_and_type_for_path(_context_uid, _repo_id, _commit_id, _path):
936 def _tree_and_type_for_path(_context_uid, _repo_id, _commit_id, _path):
937 repo_init = self._factory.repo_libgit2(wire)
937 repo_init = self._factory.repo_libgit2(wire)
938
938
939 with repo_init as repo:
939 with repo_init as repo:
940 commit = repo[commit_id]
940 commit = repo[commit_id]
941 try:
941 try:
942 tree = commit.tree[path]
942 tree = commit.tree[path]
943 except KeyError:
943 except KeyError:
944 return None, None, None
944 return None, None, None
945
945
946 return tree.id.hex, tree.type, tree.filemode
946 return tree.id.hex, tree.type, tree.filemode
947 return _tree_and_type_for_path(context_uid, repo_id, commit_id, path)
947 return _tree_and_type_for_path(context_uid, repo_id, commit_id, path)
948
948
949 @reraise_safe_exceptions
949 @reraise_safe_exceptions
950 def tree_items(self, wire, tree_id):
950 def tree_items(self, wire, tree_id):
951 cache_on, context_uid, repo_id = self._cache_on(wire)
951 cache_on, context_uid, repo_id = self._cache_on(wire)
952 @self.region.conditional_cache_on_arguments(condition=cache_on)
952 @self.region.conditional_cache_on_arguments(condition=cache_on)
953 def _tree_items(_repo_id, _tree_id):
953 def _tree_items(_repo_id, _tree_id):
954
954
955 repo_init = self._factory.repo_libgit2(wire)
955 repo_init = self._factory.repo_libgit2(wire)
956 with repo_init as repo:
956 with repo_init as repo:
957 try:
957 try:
958 tree = repo[tree_id]
958 tree = repo[tree_id]
959 except KeyError:
959 except KeyError:
960 raise ObjectMissing('No tree with id: {}'.format(tree_id))
960 raise ObjectMissing('No tree with id: {}'.format(tree_id))
961
961
962 result = []
962 result = []
963 for item in tree:
963 for item in tree:
964 item_sha = item.hex
964 item_sha = item.hex
965 item_mode = item.filemode
965 item_mode = item.filemode
966 item_type = item.type
966 item_type = item.type
967
967
968 if item_type == 'commit':
968 if item_type == 'commit':
969 # NOTE(marcink): submodules we translate to 'link' for backward compat
969 # NOTE(marcink): submodules we translate to 'link' for backward compat
970 item_type = 'link'
970 item_type = 'link'
971
971
972 result.append((item.name, item_mode, item_sha, item_type))
972 result.append((item.name, item_mode, item_sha, item_type))
973 return result
973 return result
974 return _tree_items(repo_id, tree_id)
974 return _tree_items(repo_id, tree_id)
975
975
976 @reraise_safe_exceptions
976 @reraise_safe_exceptions
977 def diff_2(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
977 def diff_2(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
978 """
978 """
979 Old version that uses subprocess to call diff
979 Old version that uses subprocess to call diff
980 """
980 """
981
981
982 flags = [
982 flags = [
983 '-U%s' % context, '--patch',
983 '-U%s' % context, '--patch',
984 '--binary',
984 '--binary',
985 '--find-renames',
985 '--find-renames',
986 '--no-indent-heuristic',
986 '--no-indent-heuristic',
987 # '--indent-heuristic',
987 # '--indent-heuristic',
988 #'--full-index',
988 #'--full-index',
989 #'--abbrev=40'
989 #'--abbrev=40'
990 ]
990 ]
991
991
992 if opt_ignorews:
992 if opt_ignorews:
993 flags.append('--ignore-all-space')
993 flags.append('--ignore-all-space')
994
994
995 if commit_id_1 == self.EMPTY_COMMIT:
995 if commit_id_1 == self.EMPTY_COMMIT:
996 cmd = ['show'] + flags + [commit_id_2]
996 cmd = ['show'] + flags + [commit_id_2]
997 else:
997 else:
998 cmd = ['diff'] + flags + [commit_id_1, commit_id_2]
998 cmd = ['diff'] + flags + [commit_id_1, commit_id_2]
999
999
1000 if file_filter:
1000 if file_filter:
1001 cmd.extend(['--', file_filter])
1001 cmd.extend(['--', file_filter])
1002
1002
1003 diff, __ = self.run_git_command(wire, cmd)
1003 diff, __ = self.run_git_command(wire, cmd)
1004 # If we used 'show' command, strip first few lines (until actual diff
1004 # If we used 'show' command, strip first few lines (until actual diff
1005 # starts)
1005 # starts)
1006 if commit_id_1 == self.EMPTY_COMMIT:
1006 if commit_id_1 == self.EMPTY_COMMIT:
1007 lines = diff.splitlines()
1007 lines = diff.splitlines()
1008 x = 0
1008 x = 0
1009 for line in lines:
1009 for line in lines:
1010 if line.startswith('diff'):
1010 if line.startswith('diff'):
1011 break
1011 break
1012 x += 1
1012 x += 1
1013 # Append new line just like 'diff' command do
1013 # Append new line just like 'diff' command do
1014 diff = '\n'.join(lines[x:]) + '\n'
1014 diff = '\n'.join(lines[x:]) + '\n'
1015 return diff
1015 return diff
1016
1016
1017 @reraise_safe_exceptions
1017 @reraise_safe_exceptions
1018 def diff(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
1018 def diff(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
1019 repo_init = self._factory.repo_libgit2(wire)
1019 repo_init = self._factory.repo_libgit2(wire)
1020 with repo_init as repo:
1020 with repo_init as repo:
1021 swap = True
1021 swap = True
1022 flags = 0
1022 flags = 0
1023 flags |= pygit2.GIT_DIFF_SHOW_BINARY
1023 flags |= pygit2.GIT_DIFF_SHOW_BINARY
1024
1024
1025 if opt_ignorews:
1025 if opt_ignorews:
1026 flags |= pygit2.GIT_DIFF_IGNORE_WHITESPACE
1026 flags |= pygit2.GIT_DIFF_IGNORE_WHITESPACE
1027
1027
1028 if commit_id_1 == self.EMPTY_COMMIT:
1028 if commit_id_1 == self.EMPTY_COMMIT:
1029 comm1 = repo[commit_id_2]
1029 comm1 = repo[commit_id_2]
1030 diff_obj = comm1.tree.diff_to_tree(
1030 diff_obj = comm1.tree.diff_to_tree(
1031 flags=flags, context_lines=context, swap=swap)
1031 flags=flags, context_lines=context, swap=swap)
1032
1032
1033 else:
1033 else:
1034 comm1 = repo[commit_id_2]
1034 comm1 = repo[commit_id_2]
1035 comm2 = repo[commit_id_1]
1035 comm2 = repo[commit_id_1]
1036 diff_obj = comm1.tree.diff_to_tree(
1036 diff_obj = comm1.tree.diff_to_tree(
1037 comm2.tree, flags=flags, context_lines=context, swap=swap)
1037 comm2.tree, flags=flags, context_lines=context, swap=swap)
1038 similar_flags = 0
1038 similar_flags = 0
1039 similar_flags |= pygit2.GIT_DIFF_FIND_RENAMES
1039 similar_flags |= pygit2.GIT_DIFF_FIND_RENAMES
1040 diff_obj.find_similar(flags=similar_flags)
1040 diff_obj.find_similar(flags=similar_flags)
1041
1041
1042 if file_filter:
1042 if file_filter:
1043 for p in diff_obj:
1043 for p in diff_obj:
1044 if p.delta.old_file.path == file_filter:
1044 if p.delta.old_file.path == file_filter:
1045 return p.patch or ''
1045 return p.patch or ''
1046 # fo matching path == no diff
1046 # fo matching path == no diff
1047 return ''
1047 return ''
1048 return diff_obj.patch or ''
1048 return diff_obj.patch or ''
1049
1049
1050 @reraise_safe_exceptions
1050 @reraise_safe_exceptions
1051 def node_history(self, wire, commit_id, path, limit):
1051 def node_history(self, wire, commit_id, path, limit):
1052 cache_on, context_uid, repo_id = self._cache_on(wire)
1052 cache_on, context_uid, repo_id = self._cache_on(wire)
1053 @self.region.conditional_cache_on_arguments(condition=cache_on)
1053 @self.region.conditional_cache_on_arguments(condition=cache_on)
1054 def _node_history(_context_uid, _repo_id, _commit_id, _path, _limit):
1054 def _node_history(_context_uid, _repo_id, _commit_id, _path, _limit):
1055 # optimize for n==1, rev-list is much faster for that use-case
1055 # optimize for n==1, rev-list is much faster for that use-case
1056 if limit == 1:
1056 if limit == 1:
1057 cmd = ['rev-list', '-1', commit_id, '--', path]
1057 cmd = ['rev-list', '-1', commit_id, '--', path]
1058 else:
1058 else:
1059 cmd = ['log']
1059 cmd = ['log']
1060 if limit:
1060 if limit:
1061 cmd.extend(['-n', str(safe_int(limit, 0))])
1061 cmd.extend(['-n', str(safe_int(limit, 0))])
1062 cmd.extend(['--pretty=format: %H', '-s', commit_id, '--', path])
1062 cmd.extend(['--pretty=format: %H', '-s', commit_id, '--', path])
1063
1063
1064 output, __ = self.run_git_command(wire, cmd)
1064 output, __ = self.run_git_command(wire, cmd)
1065 commit_ids = re.findall(r'[0-9a-fA-F]{40}', output)
1065 commit_ids = re.findall(r'[0-9a-fA-F]{40}', output)
1066
1066
1067 return [x for x in commit_ids]
1067 return [x for x in commit_ids]
1068 return _node_history(context_uid, repo_id, commit_id, path, limit)
1068 return _node_history(context_uid, repo_id, commit_id, path, limit)
1069
1069
1070 @reraise_safe_exceptions
1070 @reraise_safe_exceptions
1071 def node_annotate(self, wire, commit_id, path):
1071 def node_annotate(self, wire, commit_id, path):
1072
1072
1073 cmd = ['blame', '-l', '--root', '-r', commit_id, '--', path]
1073 cmd = ['blame', '-l', '--root', '-r', commit_id, '--', path]
1074 # -l ==> outputs long shas (and we need all 40 characters)
1074 # -l ==> outputs long shas (and we need all 40 characters)
1075 # --root ==> doesn't put '^' character for boundaries
1075 # --root ==> doesn't put '^' character for boundaries
1076 # -r commit_id ==> blames for the given commit
1076 # -r commit_id ==> blames for the given commit
1077 output, __ = self.run_git_command(wire, cmd)
1077 output, __ = self.run_git_command(wire, cmd)
1078
1078
1079 result = []
1079 result = []
1080 for i, blame_line in enumerate(output.split('\n')[:-1]):
1080 for i, blame_line in enumerate(output.split('\n')[:-1]):
1081 line_no = i + 1
1081 line_no = i + 1
1082 commit_id, line = re.split(r' ', blame_line, 1)
1082 commit_id, line = re.split(r' ', blame_line, 1)
1083 result.append((line_no, commit_id, line))
1083 result.append((line_no, commit_id, line))
1084 return result
1084 return result
1085
1085
1086 @reraise_safe_exceptions
1086 @reraise_safe_exceptions
1087 def update_server_info(self, wire):
1087 def update_server_info(self, wire):
1088 repo = self._factory.repo(wire)
1088 repo = self._factory.repo(wire)
1089 update_server_info(repo)
1089 update_server_info(repo)
1090
1090
1091 @reraise_safe_exceptions
1091 @reraise_safe_exceptions
1092 def get_all_commit_ids(self, wire):
1092 def get_all_commit_ids(self, wire):
1093
1093
1094 cache_on, context_uid, repo_id = self._cache_on(wire)
1094 cache_on, context_uid, repo_id = self._cache_on(wire)
1095 @self.region.conditional_cache_on_arguments(condition=cache_on)
1095 @self.region.conditional_cache_on_arguments(condition=cache_on)
1096 def _get_all_commit_ids(_context_uid, _repo_id):
1096 def _get_all_commit_ids(_context_uid, _repo_id):
1097
1097
1098 cmd = ['rev-list', '--reverse', '--date-order', '--branches', '--tags']
1098 cmd = ['rev-list', '--reverse', '--date-order', '--branches', '--tags']
1099 try:
1099 try:
1100 output, __ = self.run_git_command(wire, cmd)
1100 output, __ = self.run_git_command(wire, cmd)
1101 return output.splitlines()
1101 return output.splitlines()
1102 except Exception:
1102 except Exception:
1103 # Can be raised for empty repositories
1103 # Can be raised for empty repositories
1104 return []
1104 return []
1105 return _get_all_commit_ids(context_uid, repo_id)
1105 return _get_all_commit_ids(context_uid, repo_id)
1106
1106
1107 @reraise_safe_exceptions
1107 @reraise_safe_exceptions
1108 def run_git_command(self, wire, cmd, **opts):
1108 def run_git_command(self, wire, cmd, **opts):
1109 path = wire.get('path', None)
1109 path = wire.get('path', None)
1110
1110
1111 if path and os.path.isdir(path):
1111 if path and os.path.isdir(path):
1112 opts['cwd'] = path
1112 opts['cwd'] = path
1113
1113
1114 if '_bare' in opts:
1114 if '_bare' in opts:
1115 _copts = []
1115 _copts = []
1116 del opts['_bare']
1116 del opts['_bare']
1117 else:
1117 else:
1118 _copts = ['-c', 'core.quotepath=false', ]
1118 _copts = ['-c', 'core.quotepath=false', ]
1119 safe_call = False
1119 safe_call = False
1120 if '_safe' in opts:
1120 if '_safe' in opts:
1121 # no exc on failure
1121 # no exc on failure
1122 del opts['_safe']
1122 del opts['_safe']
1123 safe_call = True
1123 safe_call = True
1124
1124
1125 if '_copts' in opts:
1125 if '_copts' in opts:
1126 _copts.extend(opts['_copts'] or [])
1126 _copts.extend(opts['_copts'] or [])
1127 del opts['_copts']
1127 del opts['_copts']
1128
1128
1129 gitenv = os.environ.copy()
1129 gitenv = os.environ.copy()
1130 gitenv.update(opts.pop('extra_env', {}))
1130 gitenv.update(opts.pop('extra_env', {}))
1131 # need to clean fix GIT_DIR !
1131 # need to clean fix GIT_DIR !
1132 if 'GIT_DIR' in gitenv:
1132 if 'GIT_DIR' in gitenv:
1133 del gitenv['GIT_DIR']
1133 del gitenv['GIT_DIR']
1134 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
1134 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
1135 gitenv['GIT_DISCOVERY_ACROSS_FILESYSTEM'] = '1'
1135 gitenv['GIT_DISCOVERY_ACROSS_FILESYSTEM'] = '1'
1136
1136
1137 cmd = [settings.GIT_EXECUTABLE] + _copts + cmd
1137 cmd = [settings.GIT_EXECUTABLE] + _copts + cmd
1138 _opts = {'env': gitenv, 'shell': False}
1138 _opts = {'env': gitenv, 'shell': False}
1139
1139
1140 proc = None
1140 try:
1141 try:
1141 _opts.update(opts)
1142 _opts.update(opts)
1142 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
1143 proc = subprocessio.SubprocessIOChunker(cmd, **_opts)
1143
1144
1144 return ''.join(p), ''.join(p.error)
1145 return ''.join(proc), ''.join(proc.error)
1145 except (EnvironmentError, OSError) as err:
1146 except (EnvironmentError, OSError) as err:
1146 cmd = ' '.join(cmd) # human friendly CMD
1147 cmd = ' '.join(cmd) # human friendly CMD
1147 tb_err = ("Couldn't run git command (%s).\n"
1148 tb_err = ("Couldn't run git command (%s).\n"
1148 "Original error was:%s\n"
1149 "Original error was:%s\n"
1149 "Call options:%s\n"
1150 "Call options:%s\n"
1150 % (cmd, err, _opts))
1151 % (cmd, err, _opts))
1151 log.exception(tb_err)
1152 log.exception(tb_err)
1152 if safe_call:
1153 if safe_call:
1153 return '', err
1154 return '', err
1154 else:
1155 else:
1155 raise exceptions.VcsException()(tb_err)
1156 raise exceptions.VcsException()(tb_err)
1157 finally:
1158 if proc:
1159 proc.close()
1156
1160
1157 @reraise_safe_exceptions
1161 @reraise_safe_exceptions
1158 def install_hooks(self, wire, force=False):
1162 def install_hooks(self, wire, force=False):
1159 from vcsserver.hook_utils import install_git_hooks
1163 from vcsserver.hook_utils import install_git_hooks
1160 bare = self.bare(wire)
1164 bare = self.bare(wire)
1161 path = wire['path']
1165 path = wire['path']
1162 return install_git_hooks(path, bare, force_create=force)
1166 return install_git_hooks(path, bare, force_create=force)
1163
1167
1164 @reraise_safe_exceptions
1168 @reraise_safe_exceptions
1165 def get_hooks_info(self, wire):
1169 def get_hooks_info(self, wire):
1166 from vcsserver.hook_utils import (
1170 from vcsserver.hook_utils import (
1167 get_git_pre_hook_version, get_git_post_hook_version)
1171 get_git_pre_hook_version, get_git_post_hook_version)
1168 bare = self.bare(wire)
1172 bare = self.bare(wire)
1169 path = wire['path']
1173 path = wire['path']
1170 return {
1174 return {
1171 'pre_version': get_git_pre_hook_version(path, bare),
1175 'pre_version': get_git_pre_hook_version(path, bare),
1172 'post_version': get_git_post_hook_version(path, bare),
1176 'post_version': get_git_post_hook_version(path, bare),
1173 }
1177 }
@@ -1,523 +1,519 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import logging
26 import logging
27 import subprocess32 as subprocess
27 import subprocess32 as subprocess
28 from collections import deque
28 from collections import deque
29 from threading import Event, Thread
29 from threading import Event, Thread
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 class StreamFeeder(Thread):
34 class StreamFeeder(Thread):
35 """
35 """
36 Normal writing into pipe-like is blocking once the buffer is filled.
36 Normal writing into pipe-like is blocking once the buffer is filled.
37 This thread allows a thread to seep data from a file-like into a pipe
37 This thread allows a thread to seep data from a file-like into a pipe
38 without blocking the main thread.
38 without blocking the main thread.
39 We close inpipe once the end of the source stream is reached.
39 We close inpipe once the end of the source stream is reached.
40 """
40 """
41
41
42 def __init__(self, source):
42 def __init__(self, source):
43 super(StreamFeeder, self).__init__()
43 super(StreamFeeder, self).__init__()
44 self.daemon = True
44 self.daemon = True
45 filelike = False
45 filelike = False
46 self.bytes = bytes()
46 self.bytes = bytes()
47 if type(source) in (type(''), bytes, bytearray): # string-like
47 if type(source) in (type(''), bytes, bytearray): # string-like
48 self.bytes = bytes(source)
48 self.bytes = bytes(source)
49 else: # can be either file pointer or file-like
49 else: # can be either file pointer or file-like
50 if type(source) in (int, long): # file pointer it is
50 if type(source) in (int, long): # file pointer it is
51 # converting file descriptor (int) stdin into file-like
51 # converting file descriptor (int) stdin into file-like
52 try:
52 try:
53 source = os.fdopen(source, 'rb', 16384)
53 source = os.fdopen(source, 'rb', 16384)
54 except Exception:
54 except Exception:
55 pass
55 pass
56 # let's see if source is file-like by now
56 # let's see if source is file-like by now
57 try:
57 try:
58 filelike = source.read
58 filelike = source.read
59 except Exception:
59 except Exception:
60 pass
60 pass
61 if not filelike and not self.bytes:
61 if not filelike and not self.bytes:
62 raise TypeError("StreamFeeder's source object must be a readable "
62 raise TypeError("StreamFeeder's source object must be a readable "
63 "file-like, a file descriptor, or a string-like.")
63 "file-like, a file descriptor, or a string-like.")
64 self.source = source
64 self.source = source
65 self.readiface, self.writeiface = os.pipe()
65 self.readiface, self.writeiface = os.pipe()
66
66
67 def run(self):
67 def run(self):
68 t = self.writeiface
68 t = self.writeiface
69 try:
69 try:
70 if self.bytes:
70 if self.bytes:
71 os.write(t, self.bytes)
71 os.write(t, self.bytes)
72 else:
72 else:
73 s = self.source
73 s = self.source
74 b = s.read(4096)
74 b = s.read(4096)
75 while b:
75 while b:
76 os.write(t, b)
76 os.write(t, b)
77 b = s.read(4096)
77 b = s.read(4096)
78 finally:
78 finally:
79 os.close(t)
79 os.close(t)
80
80
81 @property
81 @property
82 def output(self):
82 def output(self):
83 return self.readiface
83 return self.readiface
84
84
85
85
86 class InputStreamChunker(Thread):
86 class InputStreamChunker(Thread):
87 def __init__(self, source, target, buffer_size, chunk_size):
87 def __init__(self, source, target, buffer_size, chunk_size):
88
88
89 super(InputStreamChunker, self).__init__()
89 super(InputStreamChunker, self).__init__()
90
90
91 self.daemon = True # die die die.
91 self.daemon = True # die die die.
92
92
93 self.source = source
93 self.source = source
94 self.target = target
94 self.target = target
95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
96 self.chunk_size = chunk_size
96 self.chunk_size = chunk_size
97
97
98 self.data_added = Event()
98 self.data_added = Event()
99 self.data_added.clear()
99 self.data_added.clear()
100
100
101 self.keep_reading = Event()
101 self.keep_reading = Event()
102 self.keep_reading.set()
102 self.keep_reading.set()
103
103
104 self.EOF = Event()
104 self.EOF = Event()
105 self.EOF.clear()
105 self.EOF.clear()
106
106
107 self.go = Event()
107 self.go = Event()
108 self.go.set()
108 self.go.set()
109
109
110 def stop(self):
110 def stop(self):
111 self.go.clear()
111 self.go.clear()
112 self.EOF.set()
112 self.EOF.set()
113 try:
113 try:
114 # this is not proper, but is done to force the reader thread let
114 # this is not proper, but is done to force the reader thread let
115 # go of the input because, if successful, .close() will send EOF
115 # go of the input because, if successful, .close() will send EOF
116 # down the pipe.
116 # down the pipe.
117 self.source.close()
117 self.source.close()
118 except:
118 except:
119 pass
119 pass
120
120
121 def run(self):
121 def run(self):
122 s = self.source
122 s = self.source
123 t = self.target
123 t = self.target
124 cs = self.chunk_size
124 cs = self.chunk_size
125 chunk_count_max = self.chunk_count_max
125 chunk_count_max = self.chunk_count_max
126 keep_reading = self.keep_reading
126 keep_reading = self.keep_reading
127 da = self.data_added
127 da = self.data_added
128 go = self.go
128 go = self.go
129
129
130 try:
130 try:
131 b = s.read(cs)
131 b = s.read(cs)
132 except ValueError:
132 except ValueError:
133 b = ''
133 b = ''
134
134
135 timeout_input = 20
135 timeout_input = 20
136 while b and go.is_set():
136 while b and go.is_set():
137 if len(t) > chunk_count_max:
137 if len(t) > chunk_count_max:
138 keep_reading.clear()
138 keep_reading.clear()
139 keep_reading.wait(timeout_input)
139 keep_reading.wait(timeout_input)
140 if len(t) > chunk_count_max + timeout_input:
140 if len(t) > chunk_count_max + timeout_input:
141 log.error("Timed out while waiting for input from subprocess.")
141 log.error("Timed out while waiting for input from subprocess.")
142 os._exit(-1) # this will cause the worker to recycle itself
142 os._exit(-1) # this will cause the worker to recycle itself
143
143
144 t.append(b)
144 t.append(b)
145 da.set()
145 da.set()
146
146
147 try:
147 try:
148 b = s.read(cs)
148 b = s.read(cs)
149 except ValueError:
149 except ValueError:
150 b = ''
150 b = ''
151
151
152 self.EOF.set()
152 self.EOF.set()
153 da.set() # for cases when done but there was no input.
153 da.set() # for cases when done but there was no input.
154
154
155
155
156 class BufferedGenerator(object):
156 class BufferedGenerator(object):
157 """
157 """
158 Class behaves as a non-blocking, buffered pipe reader.
158 Class behaves as a non-blocking, buffered pipe reader.
159 Reads chunks of data (through a thread)
159 Reads chunks of data (through a thread)
160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
161 Reading is halted in the thread when max chunks is internally buffered.
161 Reading is halted in the thread when max chunks is internally buffered.
162 The .next() may operate in blocking or non-blocking fashion by yielding
162 The .next() may operate in blocking or non-blocking fashion by yielding
163 '' if no data is ready
163 '' if no data is ready
164 to be sent or by not returning until there is some data to send
164 to be sent or by not returning until there is some data to send
165 When we get EOF from underlying source pipe we raise the marker to raise
165 When we get EOF from underlying source pipe we raise the marker to raise
166 StopIteration after the last chunk of data is yielded.
166 StopIteration after the last chunk of data is yielded.
167 """
167 """
168
168
169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
170 starting_values=None, bottomless=False):
170 starting_values=None, bottomless=False):
171 starting_values = starting_values or []
171 starting_values = starting_values or []
172
172
173 if bottomless:
173 if bottomless:
174 maxlen = int(buffer_size / chunk_size)
174 maxlen = int(buffer_size / chunk_size)
175 else:
175 else:
176 maxlen = None
176 maxlen = None
177
177
178 self.data = deque(starting_values, maxlen)
178 self.data = deque(starting_values, maxlen)
179 self.worker = InputStreamChunker(source, self.data, buffer_size,
179 self.worker = InputStreamChunker(source, self.data, buffer_size,
180 chunk_size)
180 chunk_size)
181 if starting_values:
181 if starting_values:
182 self.worker.data_added.set()
182 self.worker.data_added.set()
183 self.worker.start()
183 self.worker.start()
184
184
185 ####################
185 ####################
186 # Generator's methods
186 # Generator's methods
187 ####################
187 ####################
188
188
189 def __iter__(self):
189 def __iter__(self):
190 return self
190 return self
191
191
192 def next(self):
192 def next(self):
193 while not len(self.data) and not self.worker.EOF.is_set():
193 while not len(self.data) and not self.worker.EOF.is_set():
194 self.worker.data_added.clear()
194 self.worker.data_added.clear()
195 self.worker.data_added.wait(0.2)
195 self.worker.data_added.wait(0.2)
196 if len(self.data):
196 if len(self.data):
197 self.worker.keep_reading.set()
197 self.worker.keep_reading.set()
198 return bytes(self.data.popleft())
198 return bytes(self.data.popleft())
199 elif self.worker.EOF.is_set():
199 elif self.worker.EOF.is_set():
200 raise StopIteration
200 raise StopIteration
201
201
202 def throw(self, exc_type, value=None, traceback=None):
202 def throw(self, exc_type, value=None, traceback=None):
203 if not self.worker.EOF.is_set():
203 if not self.worker.EOF.is_set():
204 raise exc_type(value)
204 raise exc_type(value)
205
205
206 def start(self):
206 def start(self):
207 self.worker.start()
207 self.worker.start()
208
208
209 def stop(self):
209 def stop(self):
210 self.worker.stop()
210 self.worker.stop()
211
211
212 def close(self):
212 def close(self):
213 try:
213 try:
214 self.worker.stop()
214 self.worker.stop()
215 self.throw(GeneratorExit)
215 self.throw(GeneratorExit)
216 except (GeneratorExit, StopIteration):
216 except (GeneratorExit, StopIteration):
217 pass
217 pass
218
218
219 def __del__(self):
220 self.close()
221
222 ####################
219 ####################
223 # Threaded reader's infrastructure.
220 # Threaded reader's infrastructure.
224 ####################
221 ####################
225 @property
222 @property
226 def input(self):
223 def input(self):
227 return self.worker.w
224 return self.worker.w
228
225
229 @property
226 @property
230 def data_added_event(self):
227 def data_added_event(self):
231 return self.worker.data_added
228 return self.worker.data_added
232
229
233 @property
230 @property
234 def data_added(self):
231 def data_added(self):
235 return self.worker.data_added.is_set()
232 return self.worker.data_added.is_set()
236
233
237 @property
234 @property
238 def reading_paused(self):
235 def reading_paused(self):
239 return not self.worker.keep_reading.is_set()
236 return not self.worker.keep_reading.is_set()
240
237
241 @property
238 @property
242 def done_reading_event(self):
239 def done_reading_event(self):
243 """
240 """
244 Done_reding does not mean that the iterator's buffer is empty.
241 Done_reding does not mean that the iterator's buffer is empty.
245 Iterator might have done reading from underlying source, but the read
242 Iterator might have done reading from underlying source, but the read
246 chunks might still be available for serving through .next() method.
243 chunks might still be available for serving through .next() method.
247
244
248 :returns: An Event class instance.
245 :returns: An Event class instance.
249 """
246 """
250 return self.worker.EOF
247 return self.worker.EOF
251
248
252 @property
249 @property
253 def done_reading(self):
250 def done_reading(self):
254 """
251 """
255 Done_reding does not mean that the iterator's buffer is empty.
252 Done_reding does not mean that the iterator's buffer is empty.
256 Iterator might have done reading from underlying source, but the read
253 Iterator might have done reading from underlying source, but the read
257 chunks might still be available for serving through .next() method.
254 chunks might still be available for serving through .next() method.
258
255
259 :returns: An Bool value.
256 :returns: An Bool value.
260 """
257 """
261 return self.worker.EOF.is_set()
258 return self.worker.EOF.is_set()
262
259
263 @property
260 @property
264 def length(self):
261 def length(self):
265 """
262 """
266 returns int.
263 returns int.
267
264
268 This is the lenght of the que of chunks, not the length of
265 This is the lenght of the que of chunks, not the length of
269 the combined contents in those chunks.
266 the combined contents in those chunks.
270
267
271 __len__() cannot be meaningfully implemented because this
268 __len__() cannot be meaningfully implemented because this
272 reader is just flying throuh a bottomless pit content and
269 reader is just flying throuh a bottomless pit content and
273 can only know the lenght of what it already saw.
270 can only know the lenght of what it already saw.
274
271
275 If __len__() on WSGI server per PEP 3333 returns a value,
272 If __len__() on WSGI server per PEP 3333 returns a value,
276 the responce's length will be set to that. In order not to
273 the responce's length will be set to that. In order not to
277 confuse WSGI PEP3333 servers, we will not implement __len__
274 confuse WSGI PEP3333 servers, we will not implement __len__
278 at all.
275 at all.
279 """
276 """
280 return len(self.data)
277 return len(self.data)
281
278
282 def prepend(self, x):
279 def prepend(self, x):
283 self.data.appendleft(x)
280 self.data.appendleft(x)
284
281
285 def append(self, x):
282 def append(self, x):
286 self.data.append(x)
283 self.data.append(x)
287
284
288 def extend(self, o):
285 def extend(self, o):
289 self.data.extend(o)
286 self.data.extend(o)
290
287
291 def __getitem__(self, i):
288 def __getitem__(self, i):
292 return self.data[i]
289 return self.data[i]
293
290
294
291
295 class SubprocessIOChunker(object):
292 class SubprocessIOChunker(object):
296 """
293 """
297 Processor class wrapping handling of subprocess IO.
294 Processor class wrapping handling of subprocess IO.
298
295
299 .. important::
296 .. important::
300
297
301 Watch out for the method `__del__` on this class. If this object
298 Watch out for the method `__del__` on this class. If this object
302 is deleted, it will kill the subprocess, so avoid to
299 is deleted, it will kill the subprocess, so avoid to
303 return the `output` attribute or usage of it like in the following
300 return the `output` attribute or usage of it like in the following
304 example::
301 example::
305
302
306 # `args` expected to run a program that produces a lot of output
303 # `args` expected to run a program that produces a lot of output
307 output = ''.join(SubprocessIOChunker(
304 output = ''.join(SubprocessIOChunker(
308 args, shell=False, inputstream=inputstream, env=environ).output)
305 args, shell=False, inputstream=inputstream, env=environ).output)
309
306
310 # `output` will not contain all the data, because the __del__ method
307 # `output` will not contain all the data, because the __del__ method
311 # has already killed the subprocess in this case before all output
308 # has already killed the subprocess in this case before all output
312 # has been consumed.
309 # has been consumed.
313
310
314
311
315
312
316 In a way, this is a "communicate()" replacement with a twist.
313 In a way, this is a "communicate()" replacement with a twist.
317
314
318 - We are multithreaded. Writing in and reading out, err are all sep threads.
315 - We are multithreaded. Writing in and reading out, err are all sep threads.
319 - We support concurrent (in and out) stream processing.
316 - We support concurrent (in and out) stream processing.
320 - The output is not a stream. It's a queue of read string (bytes, not unicode)
317 - The output is not a stream. It's a queue of read string (bytes, not unicode)
321 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
318 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
322 - We are non-blocking in more respects than communicate()
319 - We are non-blocking in more respects than communicate()
323 (reading from subprocess out pauses when internal buffer is full, but
320 (reading from subprocess out pauses when internal buffer is full, but
324 does not block the parent calling code. On the flip side, reading from
321 does not block the parent calling code. On the flip side, reading from
325 slow-yielding subprocess may block the iteration until data shows up. This
322 slow-yielding subprocess may block the iteration until data shows up. This
326 does not block the parallel inpipe reading occurring parallel thread.)
323 does not block the parallel inpipe reading occurring parallel thread.)
327
324
328 The purpose of the object is to allow us to wrap subprocess interactions into
325 The purpose of the object is to allow us to wrap subprocess interactions into
329 and interable that can be passed to a WSGI server as the application's return
326 and interable that can be passed to a WSGI server as the application's return
330 value. Because of stream-processing-ability, WSGI does not have to read ALL
327 value. Because of stream-processing-ability, WSGI does not have to read ALL
331 of the subprocess's output and buffer it, before handing it to WSGI server for
328 of the subprocess's output and buffer it, before handing it to WSGI server for
332 HTTP response. Instead, the class initializer reads just a bit of the stream
329 HTTP response. Instead, the class initializer reads just a bit of the stream
333 to figure out if error ocurred or likely to occur and if not, just hands the
330 to figure out if error ocurred or likely to occur and if not, just hands the
334 further iteration over subprocess output to the server for completion of HTTP
331 further iteration over subprocess output to the server for completion of HTTP
335 response.
332 response.
336
333
337 The real or perceived subprocess error is trapped and raised as one of
334 The real or perceived subprocess error is trapped and raised as one of
338 EnvironmentError family of exceptions
335 EnvironmentError family of exceptions
339
336
340 Example usage:
337 Example usage:
341 # try:
338 # try:
342 # answer = SubprocessIOChunker(
339 # answer = SubprocessIOChunker(
343 # cmd,
340 # cmd,
344 # input,
341 # input,
345 # buffer_size = 65536,
342 # buffer_size = 65536,
346 # chunk_size = 4096
343 # chunk_size = 4096
347 # )
344 # )
348 # except (EnvironmentError) as e:
345 # except (EnvironmentError) as e:
349 # print str(e)
346 # print str(e)
350 # raise e
347 # raise e
351 #
348 #
352 # return answer
349 # return answer
353
350
354
351
355 """
352 """
356
353
357 # TODO: johbo: This is used to make sure that the open end of the PIPE
354 # TODO: johbo: This is used to make sure that the open end of the PIPE
358 # is closed in the end. It would be way better to wrap this into an
355 # is closed in the end. It would be way better to wrap this into an
359 # object, so that it is closed automatically once it is consumed or
356 # object, so that it is closed automatically once it is consumed or
360 # something similar.
357 # something similar.
361 _close_input_fd = None
358 _close_input_fd = None
362
359
363 _closed = False
360 _closed = False
364
361
365 def __init__(self, cmd, inputstream=None, buffer_size=65536,
362 def __init__(self, cmd, inputstream=None, buffer_size=65536,
366 chunk_size=4096, starting_values=None, fail_on_stderr=True,
363 chunk_size=4096, starting_values=None, fail_on_stderr=True,
367 fail_on_return_code=True, **kwargs):
364 fail_on_return_code=True, **kwargs):
368 """
365 """
369 Initializes SubprocessIOChunker
366 Initializes SubprocessIOChunker
370
367
371 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
368 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
372 :param inputstream: (Default: None) A file-like, string, or file pointer.
369 :param inputstream: (Default: None) A file-like, string, or file pointer.
373 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
370 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
374 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
371 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
375 :param starting_values: (Default: []) An array of strings to put in front of output que.
372 :param starting_values: (Default: []) An array of strings to put in front of output que.
376 :param fail_on_stderr: (Default: True) Whether to raise an exception in
373 :param fail_on_stderr: (Default: True) Whether to raise an exception in
377 case something is written to stderr.
374 case something is written to stderr.
378 :param fail_on_return_code: (Default: True) Whether to raise an
375 :param fail_on_return_code: (Default: True) Whether to raise an
379 exception if the return code is not 0.
376 exception if the return code is not 0.
380 """
377 """
381
378
382 starting_values = starting_values or []
379 starting_values = starting_values or []
383 if inputstream:
380 if inputstream:
384 input_streamer = StreamFeeder(inputstream)
381 input_streamer = StreamFeeder(inputstream)
385 input_streamer.start()
382 input_streamer.start()
386 inputstream = input_streamer.output
383 inputstream = input_streamer.output
387 self._close_input_fd = inputstream
384 self._close_input_fd = inputstream
388
385
389 self._fail_on_stderr = fail_on_stderr
386 self._fail_on_stderr = fail_on_stderr
390 self._fail_on_return_code = fail_on_return_code
387 self._fail_on_return_code = fail_on_return_code
391
388
392 _shell = kwargs.get('shell', True)
389 _shell = kwargs.get('shell', True)
393 kwargs['shell'] = _shell
390 kwargs['shell'] = _shell
394
391
395 _p = subprocess.Popen(cmd, bufsize=-1,
392 _p = subprocess.Popen(cmd, bufsize=-1,
396 stdin=inputstream,
393 stdin=inputstream,
397 stdout=subprocess.PIPE,
394 stdout=subprocess.PIPE,
398 stderr=subprocess.PIPE,
395 stderr=subprocess.PIPE,
399 **kwargs)
396 **kwargs)
400
397
401 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
398 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
402 starting_values)
399 starting_values)
403 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
400 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
404
401
405 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
402 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
406 # doing this until we reach either end of file, or end of buffer.
403 # doing this until we reach either end of file, or end of buffer.
407 bg_out.data_added_event.wait(1)
404 bg_out.data_added_event.wait(1)
408 bg_out.data_added_event.clear()
405 bg_out.data_added_event.clear()
409
406
410 # at this point it's still ambiguous if we are done reading or just full buffer.
407 # at this point it's still ambiguous if we are done reading or just full buffer.
411 # Either way, if error (returned by ended process, or implied based on
408 # Either way, if error (returned by ended process, or implied based on
412 # presence of stuff in stderr output) we error out.
409 # presence of stuff in stderr output) we error out.
413 # Else, we are happy.
410 # Else, we are happy.
414 _returncode = _p.poll()
411 _returncode = _p.poll()
415
412
416 if ((_returncode and fail_on_return_code) or
413 if ((_returncode and fail_on_return_code) or
417 (fail_on_stderr and _returncode is None and bg_err.length)):
414 (fail_on_stderr and _returncode is None and bg_err.length)):
418 try:
415 try:
419 _p.terminate()
416 _p.terminate()
420 except Exception:
417 except Exception:
421 pass
418 pass
422 bg_out.stop()
419 bg_out.stop()
423 bg_err.stop()
420 bg_err.stop()
424 if fail_on_stderr:
421 if fail_on_stderr:
425 err = ''.join(bg_err)
422 err = ''.join(bg_err)
426 raise EnvironmentError(
423 raise EnvironmentError(
427 "Subprocess exited due to an error:\n" + err)
424 "Subprocess exited due to an error:\n" + err)
428 if _returncode and fail_on_return_code:
425 if _returncode and fail_on_return_code:
429 err = ''.join(bg_err)
426 err = ''.join(bg_err)
430 if not err:
427 if not err:
431 # maybe get empty stderr, try stdout instead
428 # maybe get empty stderr, try stdout instead
432 # in many cases git reports the errors on stdout too
429 # in many cases git reports the errors on stdout too
433 err = ''.join(bg_out)
430 err = ''.join(bg_out)
434 raise EnvironmentError(
431 raise EnvironmentError(
435 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
432 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
436 _returncode, err))
433 _returncode, err))
437
434
438 self.process = _p
435 self.process = _p
439 self.output = bg_out
436 self.output = bg_out
440 self.error = bg_err
437 self.error = bg_err
441 self.inputstream = inputstream
438 self.inputstream = inputstream
442
439
443 def __iter__(self):
440 def __iter__(self):
444 return self
441 return self
445
442
446 def next(self):
443 def next(self):
447 # Note: mikhail: We need to be sure that we are checking the return
444 # Note: mikhail: We need to be sure that we are checking the return
448 # code after the stdout stream is closed. Some processes, e.g. git
445 # code after the stdout stream is closed. Some processes, e.g. git
449 # are doing some magic in between closing stdout and terminating the
446 # are doing some magic in between closing stdout and terminating the
450 # process and, as a result, we are not getting return code on "slow"
447 # process and, as a result, we are not getting return code on "slow"
451 # systems.
448 # systems.
452 result = None
449 result = None
453 stop_iteration = None
450 stop_iteration = None
454 try:
451 try:
455 result = self.output.next()
452 result = self.output.next()
456 except StopIteration as e:
453 except StopIteration as e:
457 stop_iteration = e
454 stop_iteration = e
458
455
459 if self.process.poll() and self._fail_on_return_code:
456 if self.process.poll() and self._fail_on_return_code:
460 err = '%s' % ''.join(self.error)
457 err = '%s' % ''.join(self.error)
461 raise EnvironmentError(
458 raise EnvironmentError(
462 "Subprocess exited due to an error:\n" + err)
459 "Subprocess exited due to an error:\n" + err)
463
460
464 if stop_iteration:
461 if stop_iteration:
465 raise stop_iteration
462 raise stop_iteration
466 return result
463 return result
467
464
468 def throw(self, type, value=None, traceback=None):
465 def throw(self, type, value=None, traceback=None):
469 if self.output.length or not self.output.done_reading:
466 if self.output.length or not self.output.done_reading:
470 raise type(value)
467 raise type(value)
471
468
472 def close(self):
469 def close(self):
473 if self._closed:
470 if self._closed:
474 return
471 return
475 self._closed = True
472 self._closed = True
476 try:
473 try:
477 self.process.terminate()
474 self.process.terminate()
478 except:
475 except Exception:
479 pass
476 pass
480 if self._close_input_fd:
477 if self._close_input_fd:
481 os.close(self._close_input_fd)
478 os.close(self._close_input_fd)
482 try:
479 try:
483 self.output.close()
480 self.output.close()
484 except:
481 except Exception:
485 pass
482 pass
486 try:
483 try:
487 self.error.close()
484 self.error.close()
488 except:
485 except Exception:
489 pass
486 pass
490 try:
487 try:
491 os.close(self.inputstream)
488 os.close(self.inputstream)
492 except:
489 except Exception:
493 pass
490 pass
494
491
495 def __del__(self):
496 self.close()
497
498
492
499 def run_command(arguments, env=None):
493 def run_command(arguments, env=None):
500 """
494 """
501 Run the specified command and return the stdout.
495 Run the specified command and return the stdout.
502
496
503 :param arguments: sequence of program arguments (including the program name)
497 :param arguments: sequence of program arguments (including the program name)
504 :type arguments: list[str]
498 :type arguments: list[str]
505 """
499 """
506
500
507 cmd = arguments
501 cmd = arguments
508 log.debug('Running subprocessio command %s', cmd)
502 log.debug('Running subprocessio command %s', cmd)
503 proc = None
509 try:
504 try:
510 _opts = {'shell': False, 'fail_on_stderr': False}
505 _opts = {'shell': False, 'fail_on_stderr': False}
511 if env:
506 if env:
512 _opts.update({'env': env})
507 _opts.update({'env': env})
513 p = SubprocessIOChunker(cmd, **_opts)
508 proc = SubprocessIOChunker(cmd, **_opts)
514 stdout = ''.join(p)
509 return ''.join(proc), ''.join(proc.error)
515 stderr = ''.join(''.join(p.error))
516 except (EnvironmentError, OSError) as err:
510 except (EnvironmentError, OSError) as err:
517 cmd = ' '.join(cmd) # human friendly CMD
511 cmd = ' '.join(cmd) # human friendly CMD
518 tb_err = ("Couldn't run subprocessio command (%s).\n"
512 tb_err = ("Couldn't run subprocessio command (%s).\n"
519 "Original error was:%s\n" % (cmd, err))
513 "Original error was:%s\n" % (cmd, err))
520 log.exception(tb_err)
514 log.exception(tb_err)
521 raise Exception(tb_err)
515 raise Exception(tb_err)
516 finally:
517 if proc:
518 proc.close()
522
519
523 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now