##// END OF EJS Templates
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
dan -
r799:825a2f59 default
parent child Browse files
Show More
@@ -1,1173 +1,1177 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2019 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import collections
19 19 import logging
20 20 import os
21 21 import posixpath as vcspath
22 22 import re
23 23 import stat
24 24 import traceback
25 25 import urllib
26 26 import urllib2
27 27 from functools import wraps
28 28
29 29 import more_itertools
30 30 import pygit2
31 31 from pygit2 import Repository as LibGit2Repo
32 32 from dulwich import index, objects
33 33 from dulwich.client import HttpGitClient, LocalGitClient
34 34 from dulwich.errors import (
35 35 NotGitRepository, ChecksumMismatch, WrongObjectException,
36 36 MissingCommitError, ObjectMissing, HangupException,
37 37 UnexpectedCommandError)
38 38 from dulwich.repo import Repo as DulwichRepo
39 39 from dulwich.server import update_server_info
40 40
41 41 from vcsserver import exceptions, settings, subprocessio
42 42 from vcsserver.utils import safe_str, safe_int
43 43 from vcsserver.base import RepoFactory, obfuscate_qs
44 44 from vcsserver.hgcompat import (
45 45 hg_url as url_parser, httpbasicauthhandler, httpdigestauthhandler)
46 46 from vcsserver.git_lfs.lib import LFSOidStore
47 47 from vcsserver.vcs_base import RemoteBase
48 48
49 49 DIR_STAT = stat.S_IFDIR
50 50 FILE_MODE = stat.S_IFMT
51 51 GIT_LINK = objects.S_IFGITLINK
52 52 PEELED_REF_MARKER = '^{}'
53 53
54 54
55 55 log = logging.getLogger(__name__)
56 56
57 57
58 58 def str_to_dulwich(value):
59 59 """
60 60 Dulwich 0.10.1a requires `unicode` objects to be passed in.
61 61 """
62 62 return value.decode(settings.WIRE_ENCODING)
63 63
64 64
65 65 def reraise_safe_exceptions(func):
66 66 """Converts Dulwich exceptions to something neutral."""
67 67
68 68 @wraps(func)
69 69 def wrapper(*args, **kwargs):
70 70 try:
71 71 return func(*args, **kwargs)
72 72 except (ChecksumMismatch, WrongObjectException, MissingCommitError, ObjectMissing,) as e:
73 73 exc = exceptions.LookupException(org_exc=e)
74 74 raise exc(safe_str(e))
75 75 except (HangupException, UnexpectedCommandError) as e:
76 76 exc = exceptions.VcsException(org_exc=e)
77 77 raise exc(safe_str(e))
78 78 except Exception as e:
79 79 # NOTE(marcink): becuase of how dulwich handles some exceptions
80 80 # (KeyError on empty repos), we cannot track this and catch all
81 81 # exceptions, it's an exceptions from other handlers
82 82 #if not hasattr(e, '_vcs_kind'):
83 83 #log.exception("Unhandled exception in git remote call")
84 84 #raise_from_original(exceptions.UnhandledException)
85 85 raise
86 86 return wrapper
87 87
88 88
89 89 class Repo(DulwichRepo):
90 90 """
91 91 A wrapper for dulwich Repo class.
92 92
93 93 Since dulwich is sometimes keeping .idx file descriptors open, it leads to
94 94 "Too many open files" error. We need to close all opened file descriptors
95 95 once the repo object is destroyed.
96 96 """
97 97 def __del__(self):
98 98 if hasattr(self, 'object_store'):
99 99 self.close()
100 100
101 101
102 102 class Repository(LibGit2Repo):
103 103
104 104 def __enter__(self):
105 105 return self
106 106
107 107 def __exit__(self, exc_type, exc_val, exc_tb):
108 108 self.free()
109 109
110 110
111 111 class GitFactory(RepoFactory):
112 112 repo_type = 'git'
113 113
114 114 def _create_repo(self, wire, create, use_libgit2=False):
115 115 if use_libgit2:
116 116 return Repository(wire['path'])
117 117 else:
118 118 repo_path = str_to_dulwich(wire['path'])
119 119 return Repo(repo_path)
120 120
121 121 def repo(self, wire, create=False, use_libgit2=False):
122 122 """
123 123 Get a repository instance for the given path.
124 124 """
125 125 return self._create_repo(wire, create, use_libgit2)
126 126
127 127 def repo_libgit2(self, wire):
128 128 return self.repo(wire, use_libgit2=True)
129 129
130 130
131 131 class GitRemote(RemoteBase):
132 132
133 133 def __init__(self, factory):
134 134 self._factory = factory
135 135 self._bulk_methods = {
136 136 "date": self.date,
137 137 "author": self.author,
138 138 "branch": self.branch,
139 139 "message": self.message,
140 140 "parents": self.parents,
141 141 "_commit": self.revision,
142 142 }
143 143
144 144 def _wire_to_config(self, wire):
145 145 if 'config' in wire:
146 146 return dict([(x[0] + '_' + x[1], x[2]) for x in wire['config']])
147 147 return {}
148 148
149 149 def _remote_conf(self, config):
150 150 params = [
151 151 '-c', 'core.askpass=""',
152 152 ]
153 153 ssl_cert_dir = config.get('vcs_ssl_dir')
154 154 if ssl_cert_dir:
155 155 params.extend(['-c', 'http.sslCAinfo={}'.format(ssl_cert_dir)])
156 156 return params
157 157
158 158 @reraise_safe_exceptions
159 159 def discover_git_version(self):
160 160 stdout, _ = self.run_git_command(
161 161 {}, ['--version'], _bare=True, _safe=True)
162 162 prefix = 'git version'
163 163 if stdout.startswith(prefix):
164 164 stdout = stdout[len(prefix):]
165 165 return stdout.strip()
166 166
167 167 @reraise_safe_exceptions
168 168 def is_empty(self, wire):
169 169 repo_init = self._factory.repo_libgit2(wire)
170 170 with repo_init as repo:
171 171
172 172 try:
173 173 has_head = repo.head.name
174 174 if has_head:
175 175 return False
176 176
177 177 # NOTE(marcink): check again using more expensive method
178 178 return repo.is_empty
179 179 except Exception:
180 180 pass
181 181
182 182 return True
183 183
184 184 @reraise_safe_exceptions
185 185 def assert_correct_path(self, wire):
186 186 cache_on, context_uid, repo_id = self._cache_on(wire)
187 187 @self.region.conditional_cache_on_arguments(condition=cache_on)
188 188 def _assert_correct_path(_context_uid, _repo_id):
189 189 try:
190 190 repo_init = self._factory.repo_libgit2(wire)
191 191 with repo_init as repo:
192 192 pass
193 193 except pygit2.GitError:
194 194 path = wire.get('path')
195 195 tb = traceback.format_exc()
196 196 log.debug("Invalid Git path `%s`, tb: %s", path, tb)
197 197 return False
198 198
199 199 return True
200 200 return _assert_correct_path(context_uid, repo_id)
201 201
202 202 @reraise_safe_exceptions
203 203 def bare(self, wire):
204 204 repo_init = self._factory.repo_libgit2(wire)
205 205 with repo_init as repo:
206 206 return repo.is_bare
207 207
208 208 @reraise_safe_exceptions
209 209 def blob_as_pretty_string(self, wire, sha):
210 210 repo_init = self._factory.repo_libgit2(wire)
211 211 with repo_init as repo:
212 212 blob_obj = repo[sha]
213 213 blob = blob_obj.data
214 214 return blob
215 215
216 216 @reraise_safe_exceptions
217 217 def blob_raw_length(self, wire, sha):
218 218 cache_on, context_uid, repo_id = self._cache_on(wire)
219 219 @self.region.conditional_cache_on_arguments(condition=cache_on)
220 220 def _blob_raw_length(_repo_id, _sha):
221 221
222 222 repo_init = self._factory.repo_libgit2(wire)
223 223 with repo_init as repo:
224 224 blob = repo[sha]
225 225 return blob.size
226 226
227 227 return _blob_raw_length(repo_id, sha)
228 228
229 229 def _parse_lfs_pointer(self, raw_content):
230 230
231 231 spec_string = 'version https://git-lfs.github.com/spec'
232 232 if raw_content and raw_content.startswith(spec_string):
233 233 pattern = re.compile(r"""
234 234 (?:\n)?
235 235 ^version[ ]https://git-lfs\.github\.com/spec/(?P<spec_ver>v\d+)\n
236 236 ^oid[ ] sha256:(?P<oid_hash>[0-9a-f]{64})\n
237 237 ^size[ ](?P<oid_size>[0-9]+)\n
238 238 (?:\n)?
239 239 """, re.VERBOSE | re.MULTILINE)
240 240 match = pattern.match(raw_content)
241 241 if match:
242 242 return match.groupdict()
243 243
244 244 return {}
245 245
246 246 @reraise_safe_exceptions
247 247 def is_large_file(self, wire, commit_id):
248 248 cache_on, context_uid, repo_id = self._cache_on(wire)
249 249
250 250 @self.region.conditional_cache_on_arguments(condition=cache_on)
251 251 def _is_large_file(_repo_id, _sha):
252 252 repo_init = self._factory.repo_libgit2(wire)
253 253 with repo_init as repo:
254 254 blob = repo[commit_id]
255 255 if blob.is_binary:
256 256 return {}
257 257
258 258 return self._parse_lfs_pointer(blob.data)
259 259
260 260 return _is_large_file(repo_id, commit_id)
261 261
262 262 @reraise_safe_exceptions
263 263 def is_binary(self, wire, tree_id):
264 264 cache_on, context_uid, repo_id = self._cache_on(wire)
265 265
266 266 @self.region.conditional_cache_on_arguments(condition=cache_on)
267 267 def _is_binary(_repo_id, _tree_id):
268 268 repo_init = self._factory.repo_libgit2(wire)
269 269 with repo_init as repo:
270 270 blob_obj = repo[tree_id]
271 271 return blob_obj.is_binary
272 272
273 273 return _is_binary(repo_id, tree_id)
274 274
275 275 @reraise_safe_exceptions
276 276 def in_largefiles_store(self, wire, oid):
277 277 conf = self._wire_to_config(wire)
278 278 repo_init = self._factory.repo_libgit2(wire)
279 279 with repo_init as repo:
280 280 repo_name = repo.path
281 281
282 282 store_location = conf.get('vcs_git_lfs_store_location')
283 283 if store_location:
284 284
285 285 store = LFSOidStore(
286 286 oid=oid, repo=repo_name, store_location=store_location)
287 287 return store.has_oid()
288 288
289 289 return False
290 290
291 291 @reraise_safe_exceptions
292 292 def store_path(self, wire, oid):
293 293 conf = self._wire_to_config(wire)
294 294 repo_init = self._factory.repo_libgit2(wire)
295 295 with repo_init as repo:
296 296 repo_name = repo.path
297 297
298 298 store_location = conf.get('vcs_git_lfs_store_location')
299 299 if store_location:
300 300 store = LFSOidStore(
301 301 oid=oid, repo=repo_name, store_location=store_location)
302 302 return store.oid_path
303 303 raise ValueError('Unable to fetch oid with path {}'.format(oid))
304 304
305 305 @reraise_safe_exceptions
306 306 def bulk_request(self, wire, rev, pre_load):
307 307 cache_on, context_uid, repo_id = self._cache_on(wire)
308 308 @self.region.conditional_cache_on_arguments(condition=cache_on)
309 309 def _bulk_request(_repo_id, _rev, _pre_load):
310 310 result = {}
311 311 for attr in pre_load:
312 312 try:
313 313 method = self._bulk_methods[attr]
314 314 args = [wire, rev]
315 315 result[attr] = method(*args)
316 316 except KeyError as e:
317 317 raise exceptions.VcsException(e)(
318 318 "Unknown bulk attribute: %s" % attr)
319 319 return result
320 320
321 321 return _bulk_request(repo_id, rev, sorted(pre_load))
322 322
323 323 def _build_opener(self, url):
324 324 handlers = []
325 325 url_obj = url_parser(url)
326 326 _, authinfo = url_obj.authinfo()
327 327
328 328 if authinfo:
329 329 # create a password manager
330 330 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
331 331 passmgr.add_password(*authinfo)
332 332
333 333 handlers.extend((httpbasicauthhandler(passmgr),
334 334 httpdigestauthhandler(passmgr)))
335 335
336 336 return urllib2.build_opener(*handlers)
337 337
338 338 def _type_id_to_name(self, type_id):
339 339 return {
340 340 1: b'commit',
341 341 2: b'tree',
342 342 3: b'blob',
343 343 4: b'tag'
344 344 }[type_id]
345 345
346 346 @reraise_safe_exceptions
347 347 def check_url(self, url, config):
348 348 url_obj = url_parser(url)
349 349 test_uri, _ = url_obj.authinfo()
350 350 url_obj.passwd = '*****' if url_obj.passwd else url_obj.passwd
351 351 url_obj.query = obfuscate_qs(url_obj.query)
352 352 cleaned_uri = str(url_obj)
353 353 log.info("Checking URL for remote cloning/import: %s", cleaned_uri)
354 354
355 355 if not test_uri.endswith('info/refs'):
356 356 test_uri = test_uri.rstrip('/') + '/info/refs'
357 357
358 358 o = self._build_opener(url)
359 359 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
360 360
361 361 q = {"service": 'git-upload-pack'}
362 362 qs = '?%s' % urllib.urlencode(q)
363 363 cu = "%s%s" % (test_uri, qs)
364 364 req = urllib2.Request(cu, None, {})
365 365
366 366 try:
367 367 log.debug("Trying to open URL %s", cleaned_uri)
368 368 resp = o.open(req)
369 369 if resp.code != 200:
370 370 raise exceptions.URLError()('Return Code is not 200')
371 371 except Exception as e:
372 372 log.warning("URL cannot be opened: %s", cleaned_uri, exc_info=True)
373 373 # means it cannot be cloned
374 374 raise exceptions.URLError(e)("[%s] org_exc: %s" % (cleaned_uri, e))
375 375
376 376 # now detect if it's proper git repo
377 377 gitdata = resp.read()
378 378 if 'service=git-upload-pack' in gitdata:
379 379 pass
380 380 elif re.findall(r'[0-9a-fA-F]{40}\s+refs', gitdata):
381 381 # old style git can return some other format !
382 382 pass
383 383 else:
384 384 raise exceptions.URLError()(
385 385 "url [%s] does not look like an git" % (cleaned_uri,))
386 386
387 387 return True
388 388
389 389 @reraise_safe_exceptions
390 390 def clone(self, wire, url, deferred, valid_refs, update_after_clone):
391 391 # TODO(marcink): deprecate this method. Last i checked we don't use it anymore
392 392 remote_refs = self.pull(wire, url, apply_refs=False)
393 393 repo = self._factory.repo(wire)
394 394 if isinstance(valid_refs, list):
395 395 valid_refs = tuple(valid_refs)
396 396
397 397 for k in remote_refs:
398 398 # only parse heads/tags and skip so called deferred tags
399 399 if k.startswith(valid_refs) and not k.endswith(deferred):
400 400 repo[k] = remote_refs[k]
401 401
402 402 if update_after_clone:
403 403 # we want to checkout HEAD
404 404 repo["HEAD"] = remote_refs["HEAD"]
405 405 index.build_index_from_tree(repo.path, repo.index_path(),
406 406 repo.object_store, repo["HEAD"].tree)
407 407
408 408 @reraise_safe_exceptions
409 409 def branch(self, wire, commit_id):
410 410 cache_on, context_uid, repo_id = self._cache_on(wire)
411 411 @self.region.conditional_cache_on_arguments(condition=cache_on)
412 412 def _branch(_context_uid, _repo_id, _commit_id):
413 413 regex = re.compile('^refs/heads')
414 414
415 415 def filter_with(ref):
416 416 return regex.match(ref[0]) and ref[1] == _commit_id
417 417
418 418 branches = filter(filter_with, self.get_refs(wire).items())
419 419 return [x[0].split('refs/heads/')[-1] for x in branches]
420 420
421 421 return _branch(context_uid, repo_id, commit_id)
422 422
423 423 @reraise_safe_exceptions
424 424 def commit_branches(self, wire, commit_id):
425 425 cache_on, context_uid, repo_id = self._cache_on(wire)
426 426 @self.region.conditional_cache_on_arguments(condition=cache_on)
427 427 def _commit_branches(_context_uid, _repo_id, _commit_id):
428 428 repo_init = self._factory.repo_libgit2(wire)
429 429 with repo_init as repo:
430 430 branches = [x for x in repo.branches.with_commit(_commit_id)]
431 431 return branches
432 432
433 433 return _commit_branches(context_uid, repo_id, commit_id)
434 434
435 435 @reraise_safe_exceptions
436 436 def add_object(self, wire, content):
437 437 repo_init = self._factory.repo_libgit2(wire)
438 438 with repo_init as repo:
439 439 blob = objects.Blob()
440 440 blob.set_raw_string(content)
441 441 repo.object_store.add_object(blob)
442 442 return blob.id
443 443
444 444 # TODO: this is quite complex, check if that can be simplified
445 445 @reraise_safe_exceptions
446 446 def commit(self, wire, commit_data, branch, commit_tree, updated, removed):
447 447 repo = self._factory.repo(wire)
448 448 object_store = repo.object_store
449 449
450 450 # Create tree and populates it with blobs
451 451 commit_tree = commit_tree and repo[commit_tree] or objects.Tree()
452 452
453 453 for node in updated:
454 454 # Compute subdirs if needed
455 455 dirpath, nodename = vcspath.split(node['path'])
456 456 dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
457 457 parent = commit_tree
458 458 ancestors = [('', parent)]
459 459
460 460 # Tries to dig for the deepest existing tree
461 461 while dirnames:
462 462 curdir = dirnames.pop(0)
463 463 try:
464 464 dir_id = parent[curdir][1]
465 465 except KeyError:
466 466 # put curdir back into dirnames and stops
467 467 dirnames.insert(0, curdir)
468 468 break
469 469 else:
470 470 # If found, updates parent
471 471 parent = repo[dir_id]
472 472 ancestors.append((curdir, parent))
473 473 # Now parent is deepest existing tree and we need to create
474 474 # subtrees for dirnames (in reverse order)
475 475 # [this only applies for nodes from added]
476 476 new_trees = []
477 477
478 478 blob = objects.Blob.from_string(node['content'])
479 479
480 480 if dirnames:
481 481 # If there are trees which should be created we need to build
482 482 # them now (in reverse order)
483 483 reversed_dirnames = list(reversed(dirnames))
484 484 curtree = objects.Tree()
485 485 curtree[node['node_path']] = node['mode'], blob.id
486 486 new_trees.append(curtree)
487 487 for dirname in reversed_dirnames[:-1]:
488 488 newtree = objects.Tree()
489 489 newtree[dirname] = (DIR_STAT, curtree.id)
490 490 new_trees.append(newtree)
491 491 curtree = newtree
492 492 parent[reversed_dirnames[-1]] = (DIR_STAT, curtree.id)
493 493 else:
494 494 parent.add(name=node['node_path'], mode=node['mode'], hexsha=blob.id)
495 495
496 496 new_trees.append(parent)
497 497 # Update ancestors
498 498 reversed_ancestors = reversed(
499 499 [(a[1], b[1], b[0]) for a, b in zip(ancestors, ancestors[1:])])
500 500 for parent, tree, path in reversed_ancestors:
501 501 parent[path] = (DIR_STAT, tree.id)
502 502 object_store.add_object(tree)
503 503
504 504 object_store.add_object(blob)
505 505 for tree in new_trees:
506 506 object_store.add_object(tree)
507 507
508 508 for node_path in removed:
509 509 paths = node_path.split('/')
510 510 tree = commit_tree
511 511 trees = [tree]
512 512 # Traverse deep into the forest...
513 513 for path in paths:
514 514 try:
515 515 obj = repo[tree[path][1]]
516 516 if isinstance(obj, objects.Tree):
517 517 trees.append(obj)
518 518 tree = obj
519 519 except KeyError:
520 520 break
521 521 # Cut down the blob and all rotten trees on the way back...
522 522 for path, tree in reversed(zip(paths, trees)):
523 523 del tree[path]
524 524 if tree:
525 525 # This tree still has elements - don't remove it or any
526 526 # of it's parents
527 527 break
528 528
529 529 object_store.add_object(commit_tree)
530 530
531 531 # Create commit
532 532 commit = objects.Commit()
533 533 commit.tree = commit_tree.id
534 534 for k, v in commit_data.iteritems():
535 535 setattr(commit, k, v)
536 536 object_store.add_object(commit)
537 537
538 538 self.create_branch(wire, branch, commit.id)
539 539
540 540 # dulwich set-ref
541 541 ref = 'refs/heads/%s' % branch
542 542 repo.refs[ref] = commit.id
543 543
544 544 return commit.id
545 545
546 546 @reraise_safe_exceptions
547 547 def pull(self, wire, url, apply_refs=True, refs=None, update_after=False):
548 548 if url != 'default' and '://' not in url:
549 549 client = LocalGitClient(url)
550 550 else:
551 551 url_obj = url_parser(url)
552 552 o = self._build_opener(url)
553 553 url, _ = url_obj.authinfo()
554 554 client = HttpGitClient(base_url=url, opener=o)
555 555 repo = self._factory.repo(wire)
556 556
557 557 determine_wants = repo.object_store.determine_wants_all
558 558 if refs:
559 559 def determine_wants_requested(references):
560 560 return [references[r] for r in references if r in refs]
561 561 determine_wants = determine_wants_requested
562 562
563 563 try:
564 564 remote_refs = client.fetch(
565 565 path=url, target=repo, determine_wants=determine_wants)
566 566 except NotGitRepository as e:
567 567 log.warning(
568 568 'Trying to fetch from "%s" failed, not a Git repository.', url)
569 569 # Exception can contain unicode which we convert
570 570 raise exceptions.AbortException(e)(repr(e))
571 571
572 572 # mikhail: client.fetch() returns all the remote refs, but fetches only
573 573 # refs filtered by `determine_wants` function. We need to filter result
574 574 # as well
575 575 if refs:
576 576 remote_refs = {k: remote_refs[k] for k in remote_refs if k in refs}
577 577
578 578 if apply_refs:
579 579 # TODO: johbo: Needs proper test coverage with a git repository
580 580 # that contains a tag object, so that we would end up with
581 581 # a peeled ref at this point.
582 582 for k in remote_refs:
583 583 if k.endswith(PEELED_REF_MARKER):
584 584 log.debug("Skipping peeled reference %s", k)
585 585 continue
586 586 repo[k] = remote_refs[k]
587 587
588 588 if refs and not update_after:
589 589 # mikhail: explicitly set the head to the last ref.
590 590 repo['HEAD'] = remote_refs[refs[-1]]
591 591
592 592 if update_after:
593 593 # we want to checkout HEAD
594 594 repo["HEAD"] = remote_refs["HEAD"]
595 595 index.build_index_from_tree(repo.path, repo.index_path(),
596 596 repo.object_store, repo["HEAD"].tree)
597 597 return remote_refs
598 598
599 599 @reraise_safe_exceptions
600 600 def sync_fetch(self, wire, url, refs=None, all_refs=False):
601 601 repo = self._factory.repo(wire)
602 602 if refs and not isinstance(refs, (list, tuple)):
603 603 refs = [refs]
604 604
605 605 config = self._wire_to_config(wire)
606 606 # get all remote refs we'll use to fetch later
607 607 cmd = ['ls-remote']
608 608 if not all_refs:
609 609 cmd += ['--heads', '--tags']
610 610 cmd += [url]
611 611 output, __ = self.run_git_command(
612 612 wire, cmd, fail_on_stderr=False,
613 613 _copts=self._remote_conf(config),
614 614 extra_env={'GIT_TERMINAL_PROMPT': '0'})
615 615
616 616 remote_refs = collections.OrderedDict()
617 617 fetch_refs = []
618 618
619 619 for ref_line in output.splitlines():
620 620 sha, ref = ref_line.split('\t')
621 621 sha = sha.strip()
622 622 if ref in remote_refs:
623 623 # duplicate, skip
624 624 continue
625 625 if ref.endswith(PEELED_REF_MARKER):
626 626 log.debug("Skipping peeled reference %s", ref)
627 627 continue
628 628 # don't sync HEAD
629 629 if ref in ['HEAD']:
630 630 continue
631 631
632 632 remote_refs[ref] = sha
633 633
634 634 if refs and sha in refs:
635 635 # we filter fetch using our specified refs
636 636 fetch_refs.append('{}:{}'.format(ref, ref))
637 637 elif not refs:
638 638 fetch_refs.append('{}:{}'.format(ref, ref))
639 639 log.debug('Finished obtaining fetch refs, total: %s', len(fetch_refs))
640 640
641 641 if fetch_refs:
642 642 for chunk in more_itertools.chunked(fetch_refs, 1024 * 4):
643 643 fetch_refs_chunks = list(chunk)
644 644 log.debug('Fetching %s refs from import url', len(fetch_refs_chunks))
645 645 _out, _err = self.run_git_command(
646 646 wire, ['fetch', url, '--force', '--prune', '--'] + fetch_refs_chunks,
647 647 fail_on_stderr=False,
648 648 _copts=self._remote_conf(config),
649 649 extra_env={'GIT_TERMINAL_PROMPT': '0'})
650 650
651 651 return remote_refs
652 652
653 653 @reraise_safe_exceptions
654 654 def sync_push(self, wire, url, refs=None):
655 655 if not self.check_url(url, wire):
656 656 return
657 657 config = self._wire_to_config(wire)
658 658 self._factory.repo(wire)
659 659 self.run_git_command(
660 660 wire, ['push', url, '--mirror'], fail_on_stderr=False,
661 661 _copts=self._remote_conf(config),
662 662 extra_env={'GIT_TERMINAL_PROMPT': '0'})
663 663
664 664 @reraise_safe_exceptions
665 665 def get_remote_refs(self, wire, url):
666 666 repo = Repo(url)
667 667 return repo.get_refs()
668 668
669 669 @reraise_safe_exceptions
670 670 def get_description(self, wire):
671 671 repo = self._factory.repo(wire)
672 672 return repo.get_description()
673 673
674 674 @reraise_safe_exceptions
675 675 def get_missing_revs(self, wire, rev1, rev2, path2):
676 676 repo = self._factory.repo(wire)
677 677 LocalGitClient(thin_packs=False).fetch(path2, repo)
678 678
679 679 wire_remote = wire.copy()
680 680 wire_remote['path'] = path2
681 681 repo_remote = self._factory.repo(wire_remote)
682 682 LocalGitClient(thin_packs=False).fetch(wire["path"], repo_remote)
683 683
684 684 revs = [
685 685 x.commit.id
686 686 for x in repo_remote.get_walker(include=[rev2], exclude=[rev1])]
687 687 return revs
688 688
689 689 @reraise_safe_exceptions
690 690 def get_object(self, wire, sha):
691 691 cache_on, context_uid, repo_id = self._cache_on(wire)
692 692 @self.region.conditional_cache_on_arguments(condition=cache_on)
693 693 def _get_object(_context_uid, _repo_id, _sha):
694 694 repo_init = self._factory.repo_libgit2(wire)
695 695 with repo_init as repo:
696 696
697 697 missing_commit_err = 'Commit {} does not exist for `{}`'.format(sha, wire['path'])
698 698 try:
699 699 commit = repo.revparse_single(sha)
700 700 except (KeyError, ValueError) as e:
701 701 raise exceptions.LookupException(e)(missing_commit_err)
702 702
703 703 is_tag = False
704 704 if isinstance(commit, pygit2.Tag):
705 705 commit = repo.get(commit.target)
706 706 is_tag = True
707 707
708 708 check_dangling = True
709 709 if is_tag:
710 710 check_dangling = False
711 711
712 712 # we used a reference and it parsed means we're not having a dangling commit
713 713 if sha != commit.hex:
714 714 check_dangling = False
715 715
716 716 if check_dangling:
717 717 # check for dangling commit
718 718 for branch in repo.branches.with_commit(commit.hex):
719 719 if branch:
720 720 break
721 721 else:
722 722 raise exceptions.LookupException(None)(missing_commit_err)
723 723
724 724 commit_id = commit.hex
725 725 type_id = commit.type
726 726
727 727 return {
728 728 'id': commit_id,
729 729 'type': self._type_id_to_name(type_id),
730 730 'commit_id': commit_id,
731 731 'idx': 0
732 732 }
733 733
734 734 return _get_object(context_uid, repo_id, sha)
735 735
736 736 @reraise_safe_exceptions
737 737 def get_refs(self, wire):
738 738 cache_on, context_uid, repo_id = self._cache_on(wire)
739 739 @self.region.conditional_cache_on_arguments(condition=cache_on)
740 740 def _get_refs(_context_uid, _repo_id):
741 741
742 742 repo_init = self._factory.repo_libgit2(wire)
743 743 with repo_init as repo:
744 744 regex = re.compile('^refs/(heads|tags)/')
745 745 return {x.name: x.target.hex for x in
746 746 filter(lambda ref: regex.match(ref.name) ,repo.listall_reference_objects())}
747 747
748 748 return _get_refs(context_uid, repo_id)
749 749
750 750 @reraise_safe_exceptions
751 751 def get_branch_pointers(self, wire):
752 752 cache_on, context_uid, repo_id = self._cache_on(wire)
753 753 @self.region.conditional_cache_on_arguments(condition=cache_on)
754 754 def _get_branch_pointers(_context_uid, _repo_id):
755 755
756 756 repo_init = self._factory.repo_libgit2(wire)
757 757 regex = re.compile('^refs/heads')
758 758 with repo_init as repo:
759 759 branches = filter(lambda ref: regex.match(ref.name), repo.listall_reference_objects())
760 760 return {x.target.hex: x.shorthand for x in branches}
761 761
762 762 return _get_branch_pointers(context_uid, repo_id)
763 763
764 764 @reraise_safe_exceptions
765 765 def head(self, wire, show_exc=True):
766 766 cache_on, context_uid, repo_id = self._cache_on(wire)
767 767 @self.region.conditional_cache_on_arguments(condition=cache_on)
768 768 def _head(_context_uid, _repo_id, _show_exc):
769 769 repo_init = self._factory.repo_libgit2(wire)
770 770 with repo_init as repo:
771 771 try:
772 772 return repo.head.peel().hex
773 773 except Exception:
774 774 if show_exc:
775 775 raise
776 776 return _head(context_uid, repo_id, show_exc)
777 777
778 778 @reraise_safe_exceptions
779 779 def init(self, wire):
780 780 repo_path = str_to_dulwich(wire['path'])
781 781 self.repo = Repo.init(repo_path)
782 782
783 783 @reraise_safe_exceptions
784 784 def init_bare(self, wire):
785 785 repo_path = str_to_dulwich(wire['path'])
786 786 self.repo = Repo.init_bare(repo_path)
787 787
788 788 @reraise_safe_exceptions
789 789 def revision(self, wire, rev):
790 790
791 791 cache_on, context_uid, repo_id = self._cache_on(wire)
792 792 @self.region.conditional_cache_on_arguments(condition=cache_on)
793 793 def _revision(_context_uid, _repo_id, _rev):
794 794 repo_init = self._factory.repo_libgit2(wire)
795 795 with repo_init as repo:
796 796 commit = repo[rev]
797 797 obj_data = {
798 798 'id': commit.id.hex,
799 799 }
800 800 # tree objects itself don't have tree_id attribute
801 801 if hasattr(commit, 'tree_id'):
802 802 obj_data['tree'] = commit.tree_id.hex
803 803
804 804 return obj_data
805 805 return _revision(context_uid, repo_id, rev)
806 806
807 807 @reraise_safe_exceptions
808 808 def date(self, wire, commit_id):
809 809 cache_on, context_uid, repo_id = self._cache_on(wire)
810 810 @self.region.conditional_cache_on_arguments(condition=cache_on)
811 811 def _date(_repo_id, _commit_id):
812 812 repo_init = self._factory.repo_libgit2(wire)
813 813 with repo_init as repo:
814 814 commit = repo[commit_id]
815 815
816 816 if hasattr(commit, 'commit_time'):
817 817 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
818 818 else:
819 819 commit = commit.get_object()
820 820 commit_time, commit_time_offset = commit.commit_time, commit.commit_time_offset
821 821
822 822 # TODO(marcink): check dulwich difference of offset vs timezone
823 823 return [commit_time, commit_time_offset]
824 824 return _date(repo_id, commit_id)
825 825
826 826 @reraise_safe_exceptions
827 827 def author(self, wire, commit_id):
828 828 cache_on, context_uid, repo_id = self._cache_on(wire)
829 829 @self.region.conditional_cache_on_arguments(condition=cache_on)
830 830 def _author(_repo_id, _commit_id):
831 831 repo_init = self._factory.repo_libgit2(wire)
832 832 with repo_init as repo:
833 833 commit = repo[commit_id]
834 834
835 835 if hasattr(commit, 'author'):
836 836 author = commit.author
837 837 else:
838 838 author = commit.get_object().author
839 839
840 840 if author.email:
841 841 return u"{} <{}>".format(author.name, author.email)
842 842
843 843 return u"{}".format(author.raw_name)
844 844 return _author(repo_id, commit_id)
845 845
846 846 @reraise_safe_exceptions
847 847 def message(self, wire, commit_id):
848 848 cache_on, context_uid, repo_id = self._cache_on(wire)
849 849 @self.region.conditional_cache_on_arguments(condition=cache_on)
850 850 def _message(_repo_id, _commit_id):
851 851 repo_init = self._factory.repo_libgit2(wire)
852 852 with repo_init as repo:
853 853 commit = repo[commit_id]
854 854 return commit.message
855 855 return _message(repo_id, commit_id)
856 856
857 857 @reraise_safe_exceptions
858 858 def parents(self, wire, commit_id):
859 859 cache_on, context_uid, repo_id = self._cache_on(wire)
860 860 @self.region.conditional_cache_on_arguments(condition=cache_on)
861 861 def _parents(_repo_id, _commit_id):
862 862 repo_init = self._factory.repo_libgit2(wire)
863 863 with repo_init as repo:
864 864 commit = repo[commit_id]
865 865 if hasattr(commit, 'parent_ids'):
866 866 parent_ids = commit.parent_ids
867 867 else:
868 868 parent_ids = commit.get_object().parent_ids
869 869
870 870 return [x.hex for x in parent_ids]
871 871 return _parents(repo_id, commit_id)
872 872
873 873 @reraise_safe_exceptions
874 874 def children(self, wire, commit_id):
875 875 cache_on, context_uid, repo_id = self._cache_on(wire)
876 876 @self.region.conditional_cache_on_arguments(condition=cache_on)
877 877 def _children(_repo_id, _commit_id):
878 878 output, __ = self.run_git_command(
879 879 wire, ['rev-list', '--all', '--children'])
880 880
881 881 child_ids = []
882 882 pat = re.compile(r'^%s' % commit_id)
883 883 for l in output.splitlines():
884 884 if pat.match(l):
885 885 found_ids = l.split(' ')[1:]
886 886 child_ids.extend(found_ids)
887 887
888 888 return child_ids
889 889 return _children(repo_id, commit_id)
890 890
891 891 @reraise_safe_exceptions
892 892 def set_refs(self, wire, key, value):
893 893 repo_init = self._factory.repo_libgit2(wire)
894 894 with repo_init as repo:
895 895 repo.references.create(key, value, force=True)
896 896
897 897 @reraise_safe_exceptions
898 898 def create_branch(self, wire, branch_name, commit_id, force=False):
899 899 repo_init = self._factory.repo_libgit2(wire)
900 900 with repo_init as repo:
901 901 commit = repo[commit_id]
902 902
903 903 if force:
904 904 repo.branches.local.create(branch_name, commit, force=force)
905 905 elif not repo.branches.get(branch_name):
906 906 # create only if that branch isn't existing
907 907 repo.branches.local.create(branch_name, commit, force=force)
908 908
909 909 @reraise_safe_exceptions
910 910 def remove_ref(self, wire, key):
911 911 repo_init = self._factory.repo_libgit2(wire)
912 912 with repo_init as repo:
913 913 repo.references.delete(key)
914 914
915 915 @reraise_safe_exceptions
916 916 def tag_remove(self, wire, tag_name):
917 917 repo_init = self._factory.repo_libgit2(wire)
918 918 with repo_init as repo:
919 919 key = 'refs/tags/{}'.format(tag_name)
920 920 repo.references.delete(key)
921 921
922 922 @reraise_safe_exceptions
923 923 def tree_changes(self, wire, source_id, target_id):
924 924 # TODO(marcink): remove this seems it's only used by tests
925 925 repo = self._factory.repo(wire)
926 926 source = repo[source_id].tree if source_id else None
927 927 target = repo[target_id].tree
928 928 result = repo.object_store.tree_changes(source, target)
929 929 return list(result)
930 930
931 931 @reraise_safe_exceptions
932 932 def tree_and_type_for_path(self, wire, commit_id, path):
933 933
934 934 cache_on, context_uid, repo_id = self._cache_on(wire)
935 935 @self.region.conditional_cache_on_arguments(condition=cache_on)
936 936 def _tree_and_type_for_path(_context_uid, _repo_id, _commit_id, _path):
937 937 repo_init = self._factory.repo_libgit2(wire)
938 938
939 939 with repo_init as repo:
940 940 commit = repo[commit_id]
941 941 try:
942 942 tree = commit.tree[path]
943 943 except KeyError:
944 944 return None, None, None
945 945
946 946 return tree.id.hex, tree.type, tree.filemode
947 947 return _tree_and_type_for_path(context_uid, repo_id, commit_id, path)
948 948
949 949 @reraise_safe_exceptions
950 950 def tree_items(self, wire, tree_id):
951 951 cache_on, context_uid, repo_id = self._cache_on(wire)
952 952 @self.region.conditional_cache_on_arguments(condition=cache_on)
953 953 def _tree_items(_repo_id, _tree_id):
954 954
955 955 repo_init = self._factory.repo_libgit2(wire)
956 956 with repo_init as repo:
957 957 try:
958 958 tree = repo[tree_id]
959 959 except KeyError:
960 960 raise ObjectMissing('No tree with id: {}'.format(tree_id))
961 961
962 962 result = []
963 963 for item in tree:
964 964 item_sha = item.hex
965 965 item_mode = item.filemode
966 966 item_type = item.type
967 967
968 968 if item_type == 'commit':
969 969 # NOTE(marcink): submodules we translate to 'link' for backward compat
970 970 item_type = 'link'
971 971
972 972 result.append((item.name, item_mode, item_sha, item_type))
973 973 return result
974 974 return _tree_items(repo_id, tree_id)
975 975
976 976 @reraise_safe_exceptions
977 977 def diff_2(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
978 978 """
979 979 Old version that uses subprocess to call diff
980 980 """
981 981
982 982 flags = [
983 983 '-U%s' % context, '--patch',
984 984 '--binary',
985 985 '--find-renames',
986 986 '--no-indent-heuristic',
987 987 # '--indent-heuristic',
988 988 #'--full-index',
989 989 #'--abbrev=40'
990 990 ]
991 991
992 992 if opt_ignorews:
993 993 flags.append('--ignore-all-space')
994 994
995 995 if commit_id_1 == self.EMPTY_COMMIT:
996 996 cmd = ['show'] + flags + [commit_id_2]
997 997 else:
998 998 cmd = ['diff'] + flags + [commit_id_1, commit_id_2]
999 999
1000 1000 if file_filter:
1001 1001 cmd.extend(['--', file_filter])
1002 1002
1003 1003 diff, __ = self.run_git_command(wire, cmd)
1004 1004 # If we used 'show' command, strip first few lines (until actual diff
1005 1005 # starts)
1006 1006 if commit_id_1 == self.EMPTY_COMMIT:
1007 1007 lines = diff.splitlines()
1008 1008 x = 0
1009 1009 for line in lines:
1010 1010 if line.startswith('diff'):
1011 1011 break
1012 1012 x += 1
1013 1013 # Append new line just like 'diff' command do
1014 1014 diff = '\n'.join(lines[x:]) + '\n'
1015 1015 return diff
1016 1016
1017 1017 @reraise_safe_exceptions
1018 1018 def diff(self, wire, commit_id_1, commit_id_2, file_filter, opt_ignorews, context):
1019 1019 repo_init = self._factory.repo_libgit2(wire)
1020 1020 with repo_init as repo:
1021 1021 swap = True
1022 1022 flags = 0
1023 1023 flags |= pygit2.GIT_DIFF_SHOW_BINARY
1024 1024
1025 1025 if opt_ignorews:
1026 1026 flags |= pygit2.GIT_DIFF_IGNORE_WHITESPACE
1027 1027
1028 1028 if commit_id_1 == self.EMPTY_COMMIT:
1029 1029 comm1 = repo[commit_id_2]
1030 1030 diff_obj = comm1.tree.diff_to_tree(
1031 1031 flags=flags, context_lines=context, swap=swap)
1032 1032
1033 1033 else:
1034 1034 comm1 = repo[commit_id_2]
1035 1035 comm2 = repo[commit_id_1]
1036 1036 diff_obj = comm1.tree.diff_to_tree(
1037 1037 comm2.tree, flags=flags, context_lines=context, swap=swap)
1038 1038 similar_flags = 0
1039 1039 similar_flags |= pygit2.GIT_DIFF_FIND_RENAMES
1040 1040 diff_obj.find_similar(flags=similar_flags)
1041 1041
1042 1042 if file_filter:
1043 1043 for p in diff_obj:
1044 1044 if p.delta.old_file.path == file_filter:
1045 1045 return p.patch or ''
1046 1046 # fo matching path == no diff
1047 1047 return ''
1048 1048 return diff_obj.patch or ''
1049 1049
1050 1050 @reraise_safe_exceptions
1051 1051 def node_history(self, wire, commit_id, path, limit):
1052 1052 cache_on, context_uid, repo_id = self._cache_on(wire)
1053 1053 @self.region.conditional_cache_on_arguments(condition=cache_on)
1054 1054 def _node_history(_context_uid, _repo_id, _commit_id, _path, _limit):
1055 1055 # optimize for n==1, rev-list is much faster for that use-case
1056 1056 if limit == 1:
1057 1057 cmd = ['rev-list', '-1', commit_id, '--', path]
1058 1058 else:
1059 1059 cmd = ['log']
1060 1060 if limit:
1061 1061 cmd.extend(['-n', str(safe_int(limit, 0))])
1062 1062 cmd.extend(['--pretty=format: %H', '-s', commit_id, '--', path])
1063 1063
1064 1064 output, __ = self.run_git_command(wire, cmd)
1065 1065 commit_ids = re.findall(r'[0-9a-fA-F]{40}', output)
1066 1066
1067 1067 return [x for x in commit_ids]
1068 1068 return _node_history(context_uid, repo_id, commit_id, path, limit)
1069 1069
1070 1070 @reraise_safe_exceptions
1071 1071 def node_annotate(self, wire, commit_id, path):
1072 1072
1073 1073 cmd = ['blame', '-l', '--root', '-r', commit_id, '--', path]
1074 1074 # -l ==> outputs long shas (and we need all 40 characters)
1075 1075 # --root ==> doesn't put '^' character for boundaries
1076 1076 # -r commit_id ==> blames for the given commit
1077 1077 output, __ = self.run_git_command(wire, cmd)
1078 1078
1079 1079 result = []
1080 1080 for i, blame_line in enumerate(output.split('\n')[:-1]):
1081 1081 line_no = i + 1
1082 1082 commit_id, line = re.split(r' ', blame_line, 1)
1083 1083 result.append((line_no, commit_id, line))
1084 1084 return result
1085 1085
1086 1086 @reraise_safe_exceptions
1087 1087 def update_server_info(self, wire):
1088 1088 repo = self._factory.repo(wire)
1089 1089 update_server_info(repo)
1090 1090
1091 1091 @reraise_safe_exceptions
1092 1092 def get_all_commit_ids(self, wire):
1093 1093
1094 1094 cache_on, context_uid, repo_id = self._cache_on(wire)
1095 1095 @self.region.conditional_cache_on_arguments(condition=cache_on)
1096 1096 def _get_all_commit_ids(_context_uid, _repo_id):
1097 1097
1098 1098 cmd = ['rev-list', '--reverse', '--date-order', '--branches', '--tags']
1099 1099 try:
1100 1100 output, __ = self.run_git_command(wire, cmd)
1101 1101 return output.splitlines()
1102 1102 except Exception:
1103 1103 # Can be raised for empty repositories
1104 1104 return []
1105 1105 return _get_all_commit_ids(context_uid, repo_id)
1106 1106
1107 1107 @reraise_safe_exceptions
1108 1108 def run_git_command(self, wire, cmd, **opts):
1109 1109 path = wire.get('path', None)
1110 1110
1111 1111 if path and os.path.isdir(path):
1112 1112 opts['cwd'] = path
1113 1113
1114 1114 if '_bare' in opts:
1115 1115 _copts = []
1116 1116 del opts['_bare']
1117 1117 else:
1118 1118 _copts = ['-c', 'core.quotepath=false', ]
1119 1119 safe_call = False
1120 1120 if '_safe' in opts:
1121 1121 # no exc on failure
1122 1122 del opts['_safe']
1123 1123 safe_call = True
1124 1124
1125 1125 if '_copts' in opts:
1126 1126 _copts.extend(opts['_copts'] or [])
1127 1127 del opts['_copts']
1128 1128
1129 1129 gitenv = os.environ.copy()
1130 1130 gitenv.update(opts.pop('extra_env', {}))
1131 1131 # need to clean fix GIT_DIR !
1132 1132 if 'GIT_DIR' in gitenv:
1133 1133 del gitenv['GIT_DIR']
1134 1134 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
1135 1135 gitenv['GIT_DISCOVERY_ACROSS_FILESYSTEM'] = '1'
1136 1136
1137 1137 cmd = [settings.GIT_EXECUTABLE] + _copts + cmd
1138 1138 _opts = {'env': gitenv, 'shell': False}
1139 1139
1140 proc = None
1140 1141 try:
1141 1142 _opts.update(opts)
1142 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
1143 proc = subprocessio.SubprocessIOChunker(cmd, **_opts)
1143 1144
1144 return ''.join(p), ''.join(p.error)
1145 return ''.join(proc), ''.join(proc.error)
1145 1146 except (EnvironmentError, OSError) as err:
1146 1147 cmd = ' '.join(cmd) # human friendly CMD
1147 1148 tb_err = ("Couldn't run git command (%s).\n"
1148 1149 "Original error was:%s\n"
1149 1150 "Call options:%s\n"
1150 1151 % (cmd, err, _opts))
1151 1152 log.exception(tb_err)
1152 1153 if safe_call:
1153 1154 return '', err
1154 1155 else:
1155 1156 raise exceptions.VcsException()(tb_err)
1157 finally:
1158 if proc:
1159 proc.close()
1156 1160
1157 1161 @reraise_safe_exceptions
1158 1162 def install_hooks(self, wire, force=False):
1159 1163 from vcsserver.hook_utils import install_git_hooks
1160 1164 bare = self.bare(wire)
1161 1165 path = wire['path']
1162 1166 return install_git_hooks(path, bare, force_create=force)
1163 1167
1164 1168 @reraise_safe_exceptions
1165 1169 def get_hooks_info(self, wire):
1166 1170 from vcsserver.hook_utils import (
1167 1171 get_git_pre_hook_version, get_git_post_hook_version)
1168 1172 bare = self.bare(wire)
1169 1173 path = wire['path']
1170 1174 return {
1171 1175 'pre_version': get_git_pre_hook_version(path, bare),
1172 1176 'post_version': get_git_post_hook_version(path, bare),
1173 1177 }
@@ -1,523 +1,519 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import logging
27 27 import subprocess32 as subprocess
28 28 from collections import deque
29 29 from threading import Event, Thread
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class StreamFeeder(Thread):
35 35 """
36 36 Normal writing into pipe-like is blocking once the buffer is filled.
37 37 This thread allows a thread to seep data from a file-like into a pipe
38 38 without blocking the main thread.
39 39 We close inpipe once the end of the source stream is reached.
40 40 """
41 41
42 42 def __init__(self, source):
43 43 super(StreamFeeder, self).__init__()
44 44 self.daemon = True
45 45 filelike = False
46 46 self.bytes = bytes()
47 47 if type(source) in (type(''), bytes, bytearray): # string-like
48 48 self.bytes = bytes(source)
49 49 else: # can be either file pointer or file-like
50 50 if type(source) in (int, long): # file pointer it is
51 51 # converting file descriptor (int) stdin into file-like
52 52 try:
53 53 source = os.fdopen(source, 'rb', 16384)
54 54 except Exception:
55 55 pass
56 56 # let's see if source is file-like by now
57 57 try:
58 58 filelike = source.read
59 59 except Exception:
60 60 pass
61 61 if not filelike and not self.bytes:
62 62 raise TypeError("StreamFeeder's source object must be a readable "
63 63 "file-like, a file descriptor, or a string-like.")
64 64 self.source = source
65 65 self.readiface, self.writeiface = os.pipe()
66 66
67 67 def run(self):
68 68 t = self.writeiface
69 69 try:
70 70 if self.bytes:
71 71 os.write(t, self.bytes)
72 72 else:
73 73 s = self.source
74 74 b = s.read(4096)
75 75 while b:
76 76 os.write(t, b)
77 77 b = s.read(4096)
78 78 finally:
79 79 os.close(t)
80 80
81 81 @property
82 82 def output(self):
83 83 return self.readiface
84 84
85 85
86 86 class InputStreamChunker(Thread):
87 87 def __init__(self, source, target, buffer_size, chunk_size):
88 88
89 89 super(InputStreamChunker, self).__init__()
90 90
91 91 self.daemon = True # die die die.
92 92
93 93 self.source = source
94 94 self.target = target
95 95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
96 96 self.chunk_size = chunk_size
97 97
98 98 self.data_added = Event()
99 99 self.data_added.clear()
100 100
101 101 self.keep_reading = Event()
102 102 self.keep_reading.set()
103 103
104 104 self.EOF = Event()
105 105 self.EOF.clear()
106 106
107 107 self.go = Event()
108 108 self.go.set()
109 109
110 110 def stop(self):
111 111 self.go.clear()
112 112 self.EOF.set()
113 113 try:
114 114 # this is not proper, but is done to force the reader thread let
115 115 # go of the input because, if successful, .close() will send EOF
116 116 # down the pipe.
117 117 self.source.close()
118 118 except:
119 119 pass
120 120
121 121 def run(self):
122 122 s = self.source
123 123 t = self.target
124 124 cs = self.chunk_size
125 125 chunk_count_max = self.chunk_count_max
126 126 keep_reading = self.keep_reading
127 127 da = self.data_added
128 128 go = self.go
129 129
130 130 try:
131 131 b = s.read(cs)
132 132 except ValueError:
133 133 b = ''
134 134
135 135 timeout_input = 20
136 136 while b and go.is_set():
137 137 if len(t) > chunk_count_max:
138 138 keep_reading.clear()
139 139 keep_reading.wait(timeout_input)
140 140 if len(t) > chunk_count_max + timeout_input:
141 141 log.error("Timed out while waiting for input from subprocess.")
142 142 os._exit(-1) # this will cause the worker to recycle itself
143 143
144 144 t.append(b)
145 145 da.set()
146 146
147 147 try:
148 148 b = s.read(cs)
149 149 except ValueError:
150 150 b = ''
151 151
152 152 self.EOF.set()
153 153 da.set() # for cases when done but there was no input.
154 154
155 155
156 156 class BufferedGenerator(object):
157 157 """
158 158 Class behaves as a non-blocking, buffered pipe reader.
159 159 Reads chunks of data (through a thread)
160 160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
161 161 Reading is halted in the thread when max chunks is internally buffered.
162 162 The .next() may operate in blocking or non-blocking fashion by yielding
163 163 '' if no data is ready
164 164 to be sent or by not returning until there is some data to send
165 165 When we get EOF from underlying source pipe we raise the marker to raise
166 166 StopIteration after the last chunk of data is yielded.
167 167 """
168 168
169 169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
170 170 starting_values=None, bottomless=False):
171 171 starting_values = starting_values or []
172 172
173 173 if bottomless:
174 174 maxlen = int(buffer_size / chunk_size)
175 175 else:
176 176 maxlen = None
177 177
178 178 self.data = deque(starting_values, maxlen)
179 179 self.worker = InputStreamChunker(source, self.data, buffer_size,
180 180 chunk_size)
181 181 if starting_values:
182 182 self.worker.data_added.set()
183 183 self.worker.start()
184 184
185 185 ####################
186 186 # Generator's methods
187 187 ####################
188 188
189 189 def __iter__(self):
190 190 return self
191 191
192 192 def next(self):
193 193 while not len(self.data) and not self.worker.EOF.is_set():
194 194 self.worker.data_added.clear()
195 195 self.worker.data_added.wait(0.2)
196 196 if len(self.data):
197 197 self.worker.keep_reading.set()
198 198 return bytes(self.data.popleft())
199 199 elif self.worker.EOF.is_set():
200 200 raise StopIteration
201 201
202 202 def throw(self, exc_type, value=None, traceback=None):
203 203 if not self.worker.EOF.is_set():
204 204 raise exc_type(value)
205 205
206 206 def start(self):
207 207 self.worker.start()
208 208
209 209 def stop(self):
210 210 self.worker.stop()
211 211
212 212 def close(self):
213 213 try:
214 214 self.worker.stop()
215 215 self.throw(GeneratorExit)
216 216 except (GeneratorExit, StopIteration):
217 217 pass
218 218
219 def __del__(self):
220 self.close()
221
222 219 ####################
223 220 # Threaded reader's infrastructure.
224 221 ####################
225 222 @property
226 223 def input(self):
227 224 return self.worker.w
228 225
229 226 @property
230 227 def data_added_event(self):
231 228 return self.worker.data_added
232 229
233 230 @property
234 231 def data_added(self):
235 232 return self.worker.data_added.is_set()
236 233
237 234 @property
238 235 def reading_paused(self):
239 236 return not self.worker.keep_reading.is_set()
240 237
241 238 @property
242 239 def done_reading_event(self):
243 240 """
244 241 Done_reding does not mean that the iterator's buffer is empty.
245 242 Iterator might have done reading from underlying source, but the read
246 243 chunks might still be available for serving through .next() method.
247 244
248 245 :returns: An Event class instance.
249 246 """
250 247 return self.worker.EOF
251 248
252 249 @property
253 250 def done_reading(self):
254 251 """
255 252 Done_reding does not mean that the iterator's buffer is empty.
256 253 Iterator might have done reading from underlying source, but the read
257 254 chunks might still be available for serving through .next() method.
258 255
259 256 :returns: An Bool value.
260 257 """
261 258 return self.worker.EOF.is_set()
262 259
263 260 @property
264 261 def length(self):
265 262 """
266 263 returns int.
267 264
268 265 This is the lenght of the que of chunks, not the length of
269 266 the combined contents in those chunks.
270 267
271 268 __len__() cannot be meaningfully implemented because this
272 269 reader is just flying throuh a bottomless pit content and
273 270 can only know the lenght of what it already saw.
274 271
275 272 If __len__() on WSGI server per PEP 3333 returns a value,
276 273 the responce's length will be set to that. In order not to
277 274 confuse WSGI PEP3333 servers, we will not implement __len__
278 275 at all.
279 276 """
280 277 return len(self.data)
281 278
282 279 def prepend(self, x):
283 280 self.data.appendleft(x)
284 281
285 282 def append(self, x):
286 283 self.data.append(x)
287 284
288 285 def extend(self, o):
289 286 self.data.extend(o)
290 287
291 288 def __getitem__(self, i):
292 289 return self.data[i]
293 290
294 291
295 292 class SubprocessIOChunker(object):
296 293 """
297 294 Processor class wrapping handling of subprocess IO.
298 295
299 296 .. important::
300 297
301 298 Watch out for the method `__del__` on this class. If this object
302 299 is deleted, it will kill the subprocess, so avoid to
303 300 return the `output` attribute or usage of it like in the following
304 301 example::
305 302
306 303 # `args` expected to run a program that produces a lot of output
307 304 output = ''.join(SubprocessIOChunker(
308 305 args, shell=False, inputstream=inputstream, env=environ).output)
309 306
310 307 # `output` will not contain all the data, because the __del__ method
311 308 # has already killed the subprocess in this case before all output
312 309 # has been consumed.
313 310
314 311
315 312
316 313 In a way, this is a "communicate()" replacement with a twist.
317 314
318 315 - We are multithreaded. Writing in and reading out, err are all sep threads.
319 316 - We support concurrent (in and out) stream processing.
320 317 - The output is not a stream. It's a queue of read string (bytes, not unicode)
321 318 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
322 319 - We are non-blocking in more respects than communicate()
323 320 (reading from subprocess out pauses when internal buffer is full, but
324 321 does not block the parent calling code. On the flip side, reading from
325 322 slow-yielding subprocess may block the iteration until data shows up. This
326 323 does not block the parallel inpipe reading occurring parallel thread.)
327 324
328 325 The purpose of the object is to allow us to wrap subprocess interactions into
329 326 and interable that can be passed to a WSGI server as the application's return
330 327 value. Because of stream-processing-ability, WSGI does not have to read ALL
331 328 of the subprocess's output and buffer it, before handing it to WSGI server for
332 329 HTTP response. Instead, the class initializer reads just a bit of the stream
333 330 to figure out if error ocurred or likely to occur and if not, just hands the
334 331 further iteration over subprocess output to the server for completion of HTTP
335 332 response.
336 333
337 334 The real or perceived subprocess error is trapped and raised as one of
338 335 EnvironmentError family of exceptions
339 336
340 337 Example usage:
341 338 # try:
342 339 # answer = SubprocessIOChunker(
343 340 # cmd,
344 341 # input,
345 342 # buffer_size = 65536,
346 343 # chunk_size = 4096
347 344 # )
348 345 # except (EnvironmentError) as e:
349 346 # print str(e)
350 347 # raise e
351 348 #
352 349 # return answer
353 350
354 351
355 352 """
356 353
357 354 # TODO: johbo: This is used to make sure that the open end of the PIPE
358 355 # is closed in the end. It would be way better to wrap this into an
359 356 # object, so that it is closed automatically once it is consumed or
360 357 # something similar.
361 358 _close_input_fd = None
362 359
363 360 _closed = False
364 361
365 362 def __init__(self, cmd, inputstream=None, buffer_size=65536,
366 363 chunk_size=4096, starting_values=None, fail_on_stderr=True,
367 364 fail_on_return_code=True, **kwargs):
368 365 """
369 366 Initializes SubprocessIOChunker
370 367
371 368 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
372 369 :param inputstream: (Default: None) A file-like, string, or file pointer.
373 370 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
374 371 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
375 372 :param starting_values: (Default: []) An array of strings to put in front of output que.
376 373 :param fail_on_stderr: (Default: True) Whether to raise an exception in
377 374 case something is written to stderr.
378 375 :param fail_on_return_code: (Default: True) Whether to raise an
379 376 exception if the return code is not 0.
380 377 """
381 378
382 379 starting_values = starting_values or []
383 380 if inputstream:
384 381 input_streamer = StreamFeeder(inputstream)
385 382 input_streamer.start()
386 383 inputstream = input_streamer.output
387 384 self._close_input_fd = inputstream
388 385
389 386 self._fail_on_stderr = fail_on_stderr
390 387 self._fail_on_return_code = fail_on_return_code
391 388
392 389 _shell = kwargs.get('shell', True)
393 390 kwargs['shell'] = _shell
394 391
395 392 _p = subprocess.Popen(cmd, bufsize=-1,
396 393 stdin=inputstream,
397 394 stdout=subprocess.PIPE,
398 395 stderr=subprocess.PIPE,
399 396 **kwargs)
400 397
401 398 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
402 399 starting_values)
403 400 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
404 401
405 402 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
406 403 # doing this until we reach either end of file, or end of buffer.
407 404 bg_out.data_added_event.wait(1)
408 405 bg_out.data_added_event.clear()
409 406
410 407 # at this point it's still ambiguous if we are done reading or just full buffer.
411 408 # Either way, if error (returned by ended process, or implied based on
412 409 # presence of stuff in stderr output) we error out.
413 410 # Else, we are happy.
414 411 _returncode = _p.poll()
415 412
416 413 if ((_returncode and fail_on_return_code) or
417 414 (fail_on_stderr and _returncode is None and bg_err.length)):
418 415 try:
419 416 _p.terminate()
420 417 except Exception:
421 418 pass
422 419 bg_out.stop()
423 420 bg_err.stop()
424 421 if fail_on_stderr:
425 422 err = ''.join(bg_err)
426 423 raise EnvironmentError(
427 424 "Subprocess exited due to an error:\n" + err)
428 425 if _returncode and fail_on_return_code:
429 426 err = ''.join(bg_err)
430 427 if not err:
431 428 # maybe get empty stderr, try stdout instead
432 429 # in many cases git reports the errors on stdout too
433 430 err = ''.join(bg_out)
434 431 raise EnvironmentError(
435 432 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
436 433 _returncode, err))
437 434
438 435 self.process = _p
439 436 self.output = bg_out
440 437 self.error = bg_err
441 438 self.inputstream = inputstream
442 439
443 440 def __iter__(self):
444 441 return self
445 442
446 443 def next(self):
447 444 # Note: mikhail: We need to be sure that we are checking the return
448 445 # code after the stdout stream is closed. Some processes, e.g. git
449 446 # are doing some magic in between closing stdout and terminating the
450 447 # process and, as a result, we are not getting return code on "slow"
451 448 # systems.
452 449 result = None
453 450 stop_iteration = None
454 451 try:
455 452 result = self.output.next()
456 453 except StopIteration as e:
457 454 stop_iteration = e
458 455
459 456 if self.process.poll() and self._fail_on_return_code:
460 457 err = '%s' % ''.join(self.error)
461 458 raise EnvironmentError(
462 459 "Subprocess exited due to an error:\n" + err)
463 460
464 461 if stop_iteration:
465 462 raise stop_iteration
466 463 return result
467 464
468 465 def throw(self, type, value=None, traceback=None):
469 466 if self.output.length or not self.output.done_reading:
470 467 raise type(value)
471 468
472 469 def close(self):
473 470 if self._closed:
474 471 return
475 472 self._closed = True
476 473 try:
477 474 self.process.terminate()
478 except:
475 except Exception:
479 476 pass
480 477 if self._close_input_fd:
481 478 os.close(self._close_input_fd)
482 479 try:
483 480 self.output.close()
484 except:
481 except Exception:
485 482 pass
486 483 try:
487 484 self.error.close()
488 except:
485 except Exception:
489 486 pass
490 487 try:
491 488 os.close(self.inputstream)
492 except:
489 except Exception:
493 490 pass
494 491
495 def __del__(self):
496 self.close()
497
498 492
499 493 def run_command(arguments, env=None):
500 494 """
501 495 Run the specified command and return the stdout.
502 496
503 497 :param arguments: sequence of program arguments (including the program name)
504 498 :type arguments: list[str]
505 499 """
506 500
507 501 cmd = arguments
508 502 log.debug('Running subprocessio command %s', cmd)
503 proc = None
509 504 try:
510 505 _opts = {'shell': False, 'fail_on_stderr': False}
511 506 if env:
512 507 _opts.update({'env': env})
513 p = SubprocessIOChunker(cmd, **_opts)
514 stdout = ''.join(p)
515 stderr = ''.join(''.join(p.error))
508 proc = SubprocessIOChunker(cmd, **_opts)
509 return ''.join(proc), ''.join(proc.error)
516 510 except (EnvironmentError, OSError) as err:
517 511 cmd = ' '.join(cmd) # human friendly CMD
518 512 tb_err = ("Couldn't run subprocessio command (%s).\n"
519 513 "Original error was:%s\n" % (cmd, err))
520 514 log.exception(tb_err)
521 515 raise Exception(tb_err)
516 finally:
517 if proc:
518 proc.close()
522 519
523 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now