##// END OF EJS Templates
fixed handling shell argument in subprocess calls, it always was hardcoded even when passed properly in arguments
marcink -
r3830:08d439bf beta
parent child Browse files
Show More
@@ -1,694 +1,695 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 vcs.backends.git.repository
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Git repository implementation.
7 7
8 8 :created_on: Apr 8, 2010
9 9 :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
10 10 """
11 11
12 12 import os
13 13 import re
14 14 import time
15 15 import urllib
16 16 import urllib2
17 17 import logging
18 18 import posixpath
19 19 import string
20 20
21 21 from dulwich.objects import Tag
22 22 from dulwich.repo import Repo, NotGitRepository
23 23
24 24 from rhodecode.lib.vcs import subprocessio
25 25 from rhodecode.lib.vcs.backends.base import BaseRepository, CollectionGenerator
26 26 from rhodecode.lib.vcs.conf import settings
27 27
28 28 from rhodecode.lib.vcs.exceptions import (
29 29 BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
30 30 RepositoryError, TagAlreadyExistError, TagDoesNotExistError
31 31 )
32 32 from rhodecode.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
33 33 from rhodecode.lib.vcs.utils.lazy import LazyProperty
34 34 from rhodecode.lib.vcs.utils.ordered_dict import OrderedDict
35 35 from rhodecode.lib.vcs.utils.paths import abspath, get_user_home
36 36
37 37 from rhodecode.lib.vcs.utils.hgcompat import (
38 38 hg_url, httpbasicauthhandler, httpdigestauthhandler
39 39 )
40 40
41 41 from .changeset import GitChangeset
42 42 from .config import ConfigFile
43 43 from .inmemory import GitInMemoryChangeset
44 44 from .workdir import GitWorkdir
45 45
46 46 SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
47 47
48 48 log = logging.getLogger(__name__)
49 49
50 50
51 51 class GitRepository(BaseRepository):
52 52 """
53 53 Git repository backend.
54 54 """
55 55 DEFAULT_BRANCH_NAME = 'master'
56 56 scm = 'git'
57 57
58 58 def __init__(self, repo_path, create=False, src_url=None,
59 59 update_after_clone=False, bare=False):
60 60
61 61 self.path = abspath(repo_path)
62 62 repo = self._get_repo(create, src_url, update_after_clone, bare)
63 63 self.bare = repo.bare
64 64
65 65 @property
66 66 def _config_files(self):
67 67 return [
68 68 self.bare and abspath(self.path, 'config')
69 69 or abspath(self.path, '.git', 'config'),
70 70 abspath(get_user_home(), '.gitconfig'),
71 71 ]
72 72
73 73 @property
74 74 def _repo(self):
75 75 return Repo(self.path)
76 76
77 77 @property
78 78 def head(self):
79 79 try:
80 80 return self._repo.head()
81 81 except KeyError:
82 82 return None
83 83
84 84 @LazyProperty
85 85 def revisions(self):
86 86 """
87 87 Returns list of revisions' ids, in ascending order. Being lazy
88 88 attribute allows external tools to inject shas from cache.
89 89 """
90 90 return self._get_all_revisions()
91 91
92 92 @classmethod
93 93 def _run_git_command(cls, cmd, **opts):
94 94 """
95 95 Runs given ``cmd`` as git command and returns tuple
96 96 (stdout, stderr).
97 97
98 98 :param cmd: git command to be executed
99 99 :param opts: env options to pass into Subprocess command
100 100 """
101 101
102 102 if '_bare' in opts:
103 103 _copts = []
104 104 del opts['_bare']
105 105 else:
106 106 _copts = ['-c', 'core.quotepath=false', ]
107 107 safe_call = False
108 108 if '_safe' in opts:
109 109 #no exc on failure
110 110 del opts['_safe']
111 111 safe_call = True
112 112
113 113 _str_cmd = False
114 114 if isinstance(cmd, basestring):
115 115 cmd = [cmd]
116 116 _str_cmd = True
117 117
118 118 gitenv = os.environ
119 119 # need to clean fix GIT_DIR !
120 120 if 'GIT_DIR' in gitenv:
121 121 del gitenv['GIT_DIR']
122 122 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
123 123
124 124 _git_path = settings.GIT_EXECUTABLE_PATH
125 125 cmd = [_git_path] + _copts + cmd
126 126 if _str_cmd:
127 127 cmd = ' '.join(cmd)
128
128 129 try:
129 130 _opts = dict(
130 131 env=gitenv,
131 shell=False,
132 shell=True,
132 133 )
133 134 _opts.update(opts)
134 135 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
135 136 except (EnvironmentError, OSError), err:
136 137 tb_err = ("Couldn't run git command (%s).\n"
137 138 "Original error was:%s\n" % (cmd, err))
138 139 log.error(tb_err)
139 140 if safe_call:
140 141 return '', err
141 142 else:
142 143 raise RepositoryError(tb_err)
143 144
144 145 return ''.join(p.output), ''.join(p.error)
145 146
146 147 def run_git_command(self, cmd):
147 148 opts = {}
148 149 if os.path.isdir(self.path):
149 150 opts['cwd'] = self.path
150 151 return self._run_git_command(cmd, **opts)
151 152
152 153 @classmethod
153 154 def _check_url(cls, url):
154 155 """
155 156 Functon will check given url and try to verify if it's a valid
156 157 link. Sometimes it may happened that mercurial will issue basic
157 158 auth request that can cause whole API to hang when used from python
158 159 or other external calls.
159 160
160 161 On failures it'll raise urllib2.HTTPError
161 162 """
162 163
163 164 # check first if it's not an local url
164 165 if os.path.isdir(url) or url.startswith('file:'):
165 166 return True
166 167
167 168 if('+' in url[:url.find('://')]):
168 169 url = url[url.find('+') + 1:]
169 170
170 171 handlers = []
171 172 test_uri, authinfo = hg_url(url).authinfo()
172 173 if not test_uri.endswith('info/refs'):
173 174 test_uri = test_uri.rstrip('/') + '/info/refs'
174 175 if authinfo:
175 176 #create a password manager
176 177 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
177 178 passmgr.add_password(*authinfo)
178 179
179 180 handlers.extend((httpbasicauthhandler(passmgr),
180 181 httpdigestauthhandler(passmgr)))
181 182
182 183 o = urllib2.build_opener(*handlers)
183 184 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
184 185
185 186 q = {"service": 'git-upload-pack'}
186 187 qs = '?%s' % urllib.urlencode(q)
187 188 cu = "%s%s" % (test_uri, qs)
188 189 req = urllib2.Request(cu, None, {})
189 190
190 191 try:
191 192 resp = o.open(req)
192 193 return resp.code == 200
193 194 except Exception, e:
194 195 # means it cannot be cloned
195 196 raise urllib2.URLError("[%s] %s" % (url, e))
196 197
197 198 def _get_repo(self, create, src_url=None, update_after_clone=False,
198 199 bare=False):
199 200 if create and os.path.exists(self.path):
200 201 raise RepositoryError("Location already exist")
201 202 if src_url and not create:
202 203 raise RepositoryError("Create should be set to True if src_url is "
203 204 "given (clone operation creates repository)")
204 205 try:
205 206 if create and src_url:
206 207 GitRepository._check_url(src_url)
207 208 self.clone(src_url, update_after_clone, bare)
208 209 return Repo(self.path)
209 210 elif create:
210 211 os.mkdir(self.path)
211 212 if bare:
212 213 return Repo.init_bare(self.path)
213 214 else:
214 215 return Repo.init(self.path)
215 216 else:
216 217 return self._repo
217 218 except (NotGitRepository, OSError), err:
218 219 raise RepositoryError(err)
219 220
220 221 def _get_all_revisions(self):
221 222 # we must check if this repo is not empty, since later command
222 223 # fails if it is. And it's cheaper to ask than throw the subprocess
223 224 # errors
224 225 try:
225 226 self._repo.head()
226 227 except KeyError:
227 228 return []
228 229
229 230 rev_filter = _git_path = settings.GIT_REV_FILTER
230 231 cmd = 'rev-list %s --reverse --date-order' % (rev_filter)
231 232 try:
232 233 so, se = self.run_git_command(cmd)
233 234 except RepositoryError:
234 235 # Can be raised for empty repositories
235 236 return []
236 237 return so.splitlines()
237 238
238 239 def _get_all_revisions2(self):
239 240 #alternate implementation using dulwich
240 241 includes = [x[1][0] for x in self._parsed_refs.iteritems()
241 242 if x[1][1] != 'T']
242 243 return [c.commit.id for c in self._repo.get_walker(include=includes)]
243 244
244 245 def _get_revision(self, revision):
245 246 """
246 247 For git backend we always return integer here. This way we ensure
247 248 that changset's revision attribute would become integer.
248 249 """
249 250
250 251 is_null = lambda o: len(o) == revision.count('0')
251 252
252 253 try:
253 254 self.revisions[0]
254 255 except (KeyError, IndexError):
255 256 raise EmptyRepositoryError("There are no changesets yet")
256 257
257 258 if revision in (None, '', 'tip', 'HEAD', 'head', -1):
258 259 return self.revisions[-1]
259 260
260 261 is_bstr = isinstance(revision, (str, unicode))
261 262 if ((is_bstr and revision.isdigit() and len(revision) < 12)
262 263 or isinstance(revision, int) or is_null(revision)):
263 264 try:
264 265 revision = self.revisions[int(revision)]
265 266 except Exception:
266 267 raise ChangesetDoesNotExistError("Revision %s does not exist "
267 268 "for this repository" % (revision))
268 269
269 270 elif is_bstr:
270 271 # get by branch/tag name
271 272 _ref_revision = self._parsed_refs.get(revision)
272 273 if _ref_revision: # and _ref_revision[1] in ['H', 'RH', 'T']:
273 274 return _ref_revision[0]
274 275
275 276 _tags_shas = self.tags.values()
276 277 # maybe it's a tag ? we don't have them in self.revisions
277 278 if revision in _tags_shas:
278 279 return _tags_shas[_tags_shas.index(revision)]
279 280
280 281 elif not SHA_PATTERN.match(revision) or revision not in self.revisions:
281 282 raise ChangesetDoesNotExistError("Revision %s does not exist "
282 283 "for this repository" % (revision))
283 284
284 285 # Ensure we return full id
285 286 if not SHA_PATTERN.match(str(revision)):
286 287 raise ChangesetDoesNotExistError("Given revision %s not recognized"
287 288 % revision)
288 289 return revision
289 290
290 291 def _get_archives(self, archive_name='tip'):
291 292
292 293 for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
293 294 yield {"type": i[0], "extension": i[1], "node": archive_name}
294 295
295 296 def _get_url(self, url):
296 297 """
297 298 Returns normalized url. If schema is not given, would fall to
298 299 filesystem (``file:///``) schema.
299 300 """
300 301 url = str(url)
301 302 if url != 'default' and not '://' in url:
302 303 url = ':///'.join(('file', url))
303 304 return url
304 305
305 306 def get_hook_location(self):
306 307 """
307 308 returns absolute path to location where hooks are stored
308 309 """
309 310 loc = os.path.join(self.path, 'hooks')
310 311 if not self.bare:
311 312 loc = os.path.join(self.path, '.git', 'hooks')
312 313 return loc
313 314
314 315 @LazyProperty
315 316 def name(self):
316 317 return os.path.basename(self.path)
317 318
318 319 @LazyProperty
319 320 def last_change(self):
320 321 """
321 322 Returns last change made on this repository as datetime object
322 323 """
323 324 return date_fromtimestamp(self._get_mtime(), makedate()[1])
324 325
325 326 def _get_mtime(self):
326 327 try:
327 328 return time.mktime(self.get_changeset().date.timetuple())
328 329 except RepositoryError:
329 330 idx_loc = '' if self.bare else '.git'
330 331 # fallback to filesystem
331 332 in_path = os.path.join(self.path, idx_loc, "index")
332 333 he_path = os.path.join(self.path, idx_loc, "HEAD")
333 334 if os.path.exists(in_path):
334 335 return os.stat(in_path).st_mtime
335 336 else:
336 337 return os.stat(he_path).st_mtime
337 338
338 339 @LazyProperty
339 340 def description(self):
340 341 idx_loc = '' if self.bare else '.git'
341 342 undefined_description = u'unknown'
342 343 description_path = os.path.join(self.path, idx_loc, 'description')
343 344 if os.path.isfile(description_path):
344 345 return safe_unicode(open(description_path).read())
345 346 else:
346 347 return undefined_description
347 348
348 349 @LazyProperty
349 350 def contact(self):
350 351 undefined_contact = u'Unknown'
351 352 return undefined_contact
352 353
353 354 @property
354 355 def branches(self):
355 356 if not self.revisions:
356 357 return {}
357 358 sortkey = lambda ctx: ctx[0]
358 359 _branches = [(x[0], x[1][0])
359 360 for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
360 361 return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
361 362
362 363 @LazyProperty
363 364 def tags(self):
364 365 return self._get_tags()
365 366
366 367 def _get_tags(self):
367 368 if not self.revisions:
368 369 return {}
369 370
370 371 sortkey = lambda ctx: ctx[0]
371 372 _tags = [(x[0], x[1][0])
372 373 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
373 374 return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
374 375
375 376 def tag(self, name, user, revision=None, message=None, date=None,
376 377 **kwargs):
377 378 """
378 379 Creates and returns a tag for the given ``revision``.
379 380
380 381 :param name: name for new tag
381 382 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
382 383 :param revision: changeset id for which new tag would be created
383 384 :param message: message of the tag's commit
384 385 :param date: date of tag's commit
385 386
386 387 :raises TagAlreadyExistError: if tag with same name already exists
387 388 """
388 389 if name in self.tags:
389 390 raise TagAlreadyExistError("Tag %s already exists" % name)
390 391 changeset = self.get_changeset(revision)
391 392 message = message or "Added tag %s for commit %s" % (name,
392 393 changeset.raw_id)
393 394 self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
394 395
395 396 self._parsed_refs = self._get_parsed_refs()
396 397 self.tags = self._get_tags()
397 398 return changeset
398 399
399 400 def remove_tag(self, name, user, message=None, date=None):
400 401 """
401 402 Removes tag with the given ``name``.
402 403
403 404 :param name: name of the tag to be removed
404 405 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
405 406 :param message: message of the tag's removal commit
406 407 :param date: date of tag's removal commit
407 408
408 409 :raises TagDoesNotExistError: if tag with given name does not exists
409 410 """
410 411 if name not in self.tags:
411 412 raise TagDoesNotExistError("Tag %s does not exist" % name)
412 413 tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
413 414 try:
414 415 os.remove(tagpath)
415 416 self._parsed_refs = self._get_parsed_refs()
416 417 self.tags = self._get_tags()
417 418 except OSError, e:
418 419 raise RepositoryError(e.strerror)
419 420
420 421 @LazyProperty
421 422 def _parsed_refs(self):
422 423 return self._get_parsed_refs()
423 424
424 425 def _get_parsed_refs(self):
425 426 # cache the property
426 427 _repo = self._repo
427 428 refs = _repo.get_refs()
428 429 keys = [('refs/heads/', 'H'),
429 430 ('refs/remotes/origin/', 'RH'),
430 431 ('refs/tags/', 'T')]
431 432 _refs = {}
432 433 for ref, sha in refs.iteritems():
433 434 for k, type_ in keys:
434 435 if ref.startswith(k):
435 436 _key = ref[len(k):]
436 437 if type_ == 'T':
437 438 obj = _repo.get_object(sha)
438 439 if isinstance(obj, Tag):
439 440 sha = _repo.get_object(sha).object[1]
440 441 _refs[_key] = [sha, type_]
441 442 break
442 443 return _refs
443 444
444 445 def _heads(self, reverse=False):
445 446 refs = self._repo.get_refs()
446 447 heads = {}
447 448
448 449 for key, val in refs.items():
449 450 for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
450 451 if key.startswith(ref_key):
451 452 n = key[len(ref_key):]
452 453 if n not in ['HEAD']:
453 454 heads[n] = val
454 455
455 456 return heads if reverse else dict((y, x) for x, y in heads.iteritems())
456 457
457 458 def get_changeset(self, revision=None):
458 459 """
459 460 Returns ``GitChangeset`` object representing commit from git repository
460 461 at the given revision or head (most recent commit) if None given.
461 462 """
462 463 if isinstance(revision, GitChangeset):
463 464 return revision
464 465 revision = self._get_revision(revision)
465 466 changeset = GitChangeset(repository=self, revision=revision)
466 467 return changeset
467 468
468 469 def get_changesets(self, start=None, end=None, start_date=None,
469 470 end_date=None, branch_name=None, reverse=False):
470 471 """
471 472 Returns iterator of ``GitChangeset`` objects from start to end (both
472 473 are inclusive), in ascending date order (unless ``reverse`` is set).
473 474
474 475 :param start: changeset ID, as str; first returned changeset
475 476 :param end: changeset ID, as str; last returned changeset
476 477 :param start_date: if specified, changesets with commit date less than
477 478 ``start_date`` would be filtered out from returned set
478 479 :param end_date: if specified, changesets with commit date greater than
479 480 ``end_date`` would be filtered out from returned set
480 481 :param branch_name: if specified, changesets not reachable from given
481 482 branch would be filtered out from returned set
482 483 :param reverse: if ``True``, returned generator would be reversed
483 484 (meaning that returned changesets would have descending date order)
484 485
485 486 :raise BranchDoesNotExistError: If given ``branch_name`` does not
486 487 exist.
487 488 :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
488 489 ``end`` could not be found.
489 490
490 491 """
491 492 if branch_name and branch_name not in self.branches:
492 493 raise BranchDoesNotExistError("Branch '%s' not found" \
493 494 % branch_name)
494 495 # %H at format means (full) commit hash, initial hashes are retrieved
495 496 # in ascending date order
496 497 cmd_template = 'log --date-order --reverse --pretty=format:"%H"'
497 498 cmd_params = {}
498 499 if start_date:
499 500 cmd_template += ' --since "$since"'
500 501 cmd_params['since'] = start_date.strftime('%m/%d/%y %H:%M:%S')
501 502 if end_date:
502 503 cmd_template += ' --until "$until"'
503 504 cmd_params['until'] = end_date.strftime('%m/%d/%y %H:%M:%S')
504 505 if branch_name:
505 506 cmd_template += ' $branch_name'
506 507 cmd_params['branch_name'] = branch_name
507 508 else:
508 509 rev_filter = _git_path = settings.GIT_REV_FILTER
509 510 cmd_template += ' %s' % (rev_filter)
510 511
511 512 cmd = string.Template(cmd_template).safe_substitute(**cmd_params)
512 513 revs = self.run_git_command(cmd)[0].splitlines()
513 514 start_pos = 0
514 515 end_pos = len(revs)
515 516 if start:
516 517 _start = self._get_revision(start)
517 518 try:
518 519 start_pos = revs.index(_start)
519 520 except ValueError:
520 521 pass
521 522
522 523 if end is not None:
523 524 _end = self._get_revision(end)
524 525 try:
525 526 end_pos = revs.index(_end)
526 527 except ValueError:
527 528 pass
528 529
529 530 if None not in [start, end] and start_pos > end_pos:
530 531 raise RepositoryError('start cannot be after end')
531 532
532 533 if end_pos is not None:
533 534 end_pos += 1
534 535
535 536 revs = revs[start_pos:end_pos]
536 537 if reverse:
537 538 revs = reversed(revs)
538 539 return CollectionGenerator(self, revs)
539 540
540 541 def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
541 542 context=3):
542 543 """
543 544 Returns (git like) *diff*, as plain text. Shows changes introduced by
544 545 ``rev2`` since ``rev1``.
545 546
546 547 :param rev1: Entry point from which diff is shown. Can be
547 548 ``self.EMPTY_CHANGESET`` - in this case, patch showing all
548 549 the changes since empty state of the repository until ``rev2``
549 550 :param rev2: Until which revision changes should be shown.
550 551 :param ignore_whitespace: If set to ``True``, would not show whitespace
551 552 changes. Defaults to ``False``.
552 553 :param context: How many lines before/after changed lines should be
553 554 shown. Defaults to ``3``.
554 555 """
555 556 flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
556 557 if ignore_whitespace:
557 558 flags.append('-w')
558 559
559 560 if hasattr(rev1, 'raw_id'):
560 561 rev1 = getattr(rev1, 'raw_id')
561 562
562 563 if hasattr(rev2, 'raw_id'):
563 564 rev2 = getattr(rev2, 'raw_id')
564 565
565 566 if rev1 == self.EMPTY_CHANGESET:
566 567 rev2 = self.get_changeset(rev2).raw_id
567 568 cmd = ' '.join(['show'] + flags + [rev2])
568 569 else:
569 570 rev1 = self.get_changeset(rev1).raw_id
570 571 rev2 = self.get_changeset(rev2).raw_id
571 572 cmd = ' '.join(['diff'] + flags + [rev1, rev2])
572 573
573 574 if path:
574 575 cmd += ' -- "%s"' % path
575 576
576 577 stdout, stderr = self.run_git_command(cmd)
577 578 # If we used 'show' command, strip first few lines (until actual diff
578 579 # starts)
579 580 if rev1 == self.EMPTY_CHANGESET:
580 581 lines = stdout.splitlines()
581 582 x = 0
582 583 for line in lines:
583 584 if line.startswith('diff'):
584 585 break
585 586 x += 1
586 587 # Append new line just like 'diff' command do
587 588 stdout = '\n'.join(lines[x:]) + '\n'
588 589 return stdout
589 590
590 591 @LazyProperty
591 592 def in_memory_changeset(self):
592 593 """
593 594 Returns ``GitInMemoryChangeset`` object for this repository.
594 595 """
595 596 return GitInMemoryChangeset(self)
596 597
597 598 def clone(self, url, update_after_clone=True, bare=False):
598 599 """
599 600 Tries to clone changes from external location.
600 601
601 602 :param update_after_clone: If set to ``False``, git won't checkout
602 603 working directory
603 604 :param bare: If set to ``True``, repository would be cloned into
604 605 *bare* git repository (no working directory at all).
605 606 """
606 607 url = self._get_url(url)
607 608 cmd = ['clone']
608 609 if bare:
609 610 cmd.append('--bare')
610 611 elif not update_after_clone:
611 612 cmd.append('--no-checkout')
612 613 cmd += ['--', '"%s"' % url, '"%s"' % self.path]
613 614 cmd = ' '.join(cmd)
614 615 # If error occurs run_git_command raises RepositoryError already
615 616 self.run_git_command(cmd)
616 617
617 618 def pull(self, url):
618 619 """
619 620 Tries to pull changes from external location.
620 621 """
621 622 url = self._get_url(url)
622 623 cmd = ['pull']
623 624 cmd.append("--ff-only")
624 625 cmd.append(url)
625 626 cmd = ' '.join(cmd)
626 627 # If error occurs run_git_command raises RepositoryError already
627 628 self.run_git_command(cmd)
628 629
629 630 def fetch(self, url):
630 631 """
631 632 Tries to pull changes from external location.
632 633 """
633 634 url = self._get_url(url)
634 635 so, se = self.run_git_command('ls-remote -h %s' % url)
635 636 refs = []
636 637 for line in (x for x in so.splitlines()):
637 638 sha, ref = line.split('\t')
638 639 refs.append(ref)
639 640 refs = ' '.join(('+%s:%s' % (r, r) for r in refs))
640 641 cmd = '''fetch %s -- %s''' % (url, refs)
641 642 self.run_git_command(cmd)
642 643
643 644 @LazyProperty
644 645 def workdir(self):
645 646 """
646 647 Returns ``Workdir`` instance for this repository.
647 648 """
648 649 return GitWorkdir(self)
649 650
650 651 def get_config_value(self, section, name, config_file=None):
651 652 """
652 653 Returns configuration value for a given [``section``] and ``name``.
653 654
654 655 :param section: Section we want to retrieve value from
655 656 :param name: Name of configuration we want to retrieve
656 657 :param config_file: A path to file which should be used to retrieve
657 658 configuration from (might also be a list of file paths)
658 659 """
659 660 if config_file is None:
660 661 config_file = []
661 662 elif isinstance(config_file, basestring):
662 663 config_file = [config_file]
663 664
664 665 def gen_configs():
665 666 for path in config_file + self._config_files:
666 667 try:
667 668 yield ConfigFile.from_path(path)
668 669 except (IOError, OSError, ValueError):
669 670 continue
670 671
671 672 for config in gen_configs():
672 673 try:
673 674 return config.get(section, name)
674 675 except KeyError:
675 676 continue
676 677 return None
677 678
678 679 def get_user_name(self, config_file=None):
679 680 """
680 681 Returns user's name from global configuration file.
681 682
682 683 :param config_file: A path to file which should be used to retrieve
683 684 configuration from (might also be a list of file paths)
684 685 """
685 686 return self.get_config_value('user', 'name', config_file)
686 687
687 688 def get_user_email(self, config_file=None):
688 689 """
689 690 Returns user's email from global configuration file.
690 691
691 692 :param config_file: A path to file which should be used to retrieve
692 693 configuration from (might also be a list of file paths)
693 694 """
694 695 return self.get_config_value('user', 'email', config_file)
@@ -1,415 +1,415 b''
1 1 '''
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 '''
25 25 import os
26 26 import subprocess
27 27 from rhodecode.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
28 28
29 29
30 30 class StreamFeeder(Thread):
31 31 """
32 32 Normal writing into pipe-like is blocking once the buffer is filled.
33 33 This thread allows a thread to seep data from a file-like into a pipe
34 34 without blocking the main thread.
35 35 We close inpipe once the end of the source stream is reached.
36 36 """
37 37 def __init__(self, source):
38 38 super(StreamFeeder, self).__init__()
39 39 self.daemon = True
40 40 filelike = False
41 41 self.bytes = _bytes()
42 42 if type(source) in (type(''), _bytes, _bytearray): # string-like
43 43 self.bytes = _bytes(source)
44 44 else: # can be either file pointer or file-like
45 45 if type(source) in (int, long): # file pointer it is
46 46 ## converting file descriptor (int) stdin into file-like
47 47 try:
48 48 source = os.fdopen(source, 'rb', 16384)
49 49 except Exception:
50 50 pass
51 51 # let's see if source is file-like by now
52 52 try:
53 53 filelike = source.read
54 54 except Exception:
55 55 pass
56 56 if not filelike and not self.bytes:
57 57 raise TypeError("StreamFeeder's source object must be a readable "
58 58 "file-like, a file descriptor, or a string-like.")
59 59 self.source = source
60 60 self.readiface, self.writeiface = os.pipe()
61 61
62 62 def run(self):
63 63 t = self.writeiface
64 64 if self.bytes:
65 65 os.write(t, self.bytes)
66 66 else:
67 67 s = self.source
68 68 b = s.read(4096)
69 69 while b:
70 70 os.write(t, b)
71 71 b = s.read(4096)
72 72 os.close(t)
73 73
74 74 @property
75 75 def output(self):
76 76 return self.readiface
77 77
78 78
79 79 class InputStreamChunker(Thread):
80 80 def __init__(self, source, target, buffer_size, chunk_size):
81 81
82 82 super(InputStreamChunker, self).__init__()
83 83
84 84 self.daemon = True # die die die.
85 85
86 86 self.source = source
87 87 self.target = target
88 88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 89 self.chunk_size = chunk_size
90 90
91 91 self.data_added = Event()
92 92 self.data_added.clear()
93 93
94 94 self.keep_reading = Event()
95 95 self.keep_reading.set()
96 96
97 97 self.EOF = Event()
98 98 self.EOF.clear()
99 99
100 100 self.go = Event()
101 101 self.go.set()
102 102
103 103 def stop(self):
104 104 self.go.clear()
105 105 self.EOF.set()
106 106 try:
107 107 # this is not proper, but is done to force the reader thread let
108 108 # go of the input because, if successful, .close() will send EOF
109 109 # down the pipe.
110 110 self.source.close()
111 111 except:
112 112 pass
113 113
114 114 def run(self):
115 115 s = self.source
116 116 t = self.target
117 117 cs = self.chunk_size
118 118 ccm = self.chunk_count_max
119 119 kr = self.keep_reading
120 120 da = self.data_added
121 121 go = self.go
122 122
123 123 try:
124 124 b = s.read(cs)
125 125 except ValueError:
126 126 b = ''
127 127
128 128 while b and go.is_set():
129 129 if len(t) > ccm:
130 130 kr.clear()
131 131 kr.wait(2)
132 132 # # this only works on 2.7.x and up
133 133 # if not kr.wait(10):
134 134 # raise Exception("Timed out while waiting for input to be read.")
135 135 # instead we'll use this
136 136 if len(t) > ccm + 3:
137 137 raise IOError("Timed out while waiting for input from subprocess.")
138 138 t.append(b)
139 139 da.set()
140 140 b = s.read(cs)
141 141 self.EOF.set()
142 142 da.set() # for cases when done but there was no input.
143 143
144 144
145 145 class BufferedGenerator():
146 146 '''
147 147 Class behaves as a non-blocking, buffered pipe reader.
148 148 Reads chunks of data (through a thread)
149 149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 150 Reading is halted in the thread when max chunks is internally buffered.
151 151 The .next() may operate in blocking or non-blocking fashion by yielding
152 152 '' if no data is ready
153 153 to be sent or by not returning until there is some data to send
154 154 When we get EOF from underlying source pipe we raise the marker to raise
155 155 StopIteration after the last chunk of data is yielded.
156 156 '''
157 157
158 158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 159 starting_values=[], bottomless=False):
160 160
161 161 if bottomless:
162 162 maxlen = int(buffer_size / chunk_size)
163 163 else:
164 164 maxlen = None
165 165
166 166 self.data = deque(starting_values, maxlen)
167 167
168 168 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 169 chunk_size)
170 170 if starting_values:
171 171 self.worker.data_added.set()
172 172 self.worker.start()
173 173
174 174 ####################
175 175 # Generator's methods
176 176 ####################
177 177
178 178 def __iter__(self):
179 179 return self
180 180
181 181 def next(self):
182 182 while not len(self.data) and not self.worker.EOF.is_set():
183 183 self.worker.data_added.clear()
184 184 self.worker.data_added.wait(0.2)
185 185 if len(self.data):
186 186 self.worker.keep_reading.set()
187 187 return _bytes(self.data.popleft())
188 188 elif self.worker.EOF.is_set():
189 189 raise StopIteration
190 190
191 191 def throw(self, type, value=None, traceback=None):
192 192 if not self.worker.EOF.is_set():
193 193 raise type(value)
194 194
195 195 def start(self):
196 196 self.worker.start()
197 197
198 198 def stop(self):
199 199 self.worker.stop()
200 200
201 201 def close(self):
202 202 try:
203 203 self.worker.stop()
204 204 self.throw(GeneratorExit)
205 205 except (GeneratorExit, StopIteration):
206 206 pass
207 207
208 208 def __del__(self):
209 209 self.close()
210 210
211 211 ####################
212 212 # Threaded reader's infrastructure.
213 213 ####################
214 214 @property
215 215 def input(self):
216 216 return self.worker.w
217 217
218 218 @property
219 219 def data_added_event(self):
220 220 return self.worker.data_added
221 221
222 222 @property
223 223 def data_added(self):
224 224 return self.worker.data_added.is_set()
225 225
226 226 @property
227 227 def reading_paused(self):
228 228 return not self.worker.keep_reading.is_set()
229 229
230 230 @property
231 231 def done_reading_event(self):
232 232 '''
233 233 Done_reding does not mean that the iterator's buffer is empty.
234 234 Iterator might have done reading from underlying source, but the read
235 235 chunks might still be available for serving through .next() method.
236 236
237 237 @return An Event class instance.
238 238 '''
239 239 return self.worker.EOF
240 240
241 241 @property
242 242 def done_reading(self):
243 243 '''
244 244 Done_reding does not mean that the iterator's buffer is empty.
245 245 Iterator might have done reading from underlying source, but the read
246 246 chunks might still be available for serving through .next() method.
247 247
248 248 @return An Bool value.
249 249 '''
250 250 return self.worker.EOF.is_set()
251 251
252 252 @property
253 253 def length(self):
254 254 '''
255 255 returns int.
256 256
257 257 This is the lenght of the que of chunks, not the length of
258 258 the combined contents in those chunks.
259 259
260 260 __len__() cannot be meaningfully implemented because this
261 261 reader is just flying throuh a bottomless pit content and
262 262 can only know the lenght of what it already saw.
263 263
264 264 If __len__() on WSGI server per PEP 3333 returns a value,
265 265 the responce's length will be set to that. In order not to
266 266 confuse WSGI PEP3333 servers, we will not implement __len__
267 267 at all.
268 268 '''
269 269 return len(self.data)
270 270
271 271 def prepend(self, x):
272 272 self.data.appendleft(x)
273 273
274 274 def append(self, x):
275 275 self.data.append(x)
276 276
277 277 def extend(self, o):
278 278 self.data.extend(o)
279 279
280 280 def __getitem__(self, i):
281 281 return self.data[i]
282 282
283 283
284 284 class SubprocessIOChunker(object):
285 285 '''
286 286 Processor class wrapping handling of subprocess IO.
287 287
288 288 In a way, this is a "communicate()" replacement with a twist.
289 289
290 290 - We are multithreaded. Writing in and reading out, err are all sep threads.
291 291 - We support concurrent (in and out) stream processing.
292 292 - The output is not a stream. It's a queue of read string (bytes, not unicode)
293 293 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
294 294 - We are non-blocking in more respects than communicate()
295 295 (reading from subprocess out pauses when internal buffer is full, but
296 296 does not block the parent calling code. On the flip side, reading from
297 297 slow-yielding subprocess may block the iteration until data shows up. This
298 298 does not block the parallel inpipe reading occurring parallel thread.)
299 299
300 300 The purpose of the object is to allow us to wrap subprocess interactions into
301 301 and interable that can be passed to a WSGI server as the application's return
302 302 value. Because of stream-processing-ability, WSGI does not have to read ALL
303 303 of the subprocess's output and buffer it, before handing it to WSGI server for
304 304 HTTP response. Instead, the class initializer reads just a bit of the stream
305 305 to figure out if error ocurred or likely to occur and if not, just hands the
306 306 further iteration over subprocess output to the server for completion of HTTP
307 307 response.
308 308
309 309 The real or perceived subprocess error is trapped and raised as one of
310 310 EnvironmentError family of exceptions
311 311
312 312 Example usage:
313 313 # try:
314 314 # answer = SubprocessIOChunker(
315 315 # cmd,
316 316 # input,
317 317 # buffer_size = 65536,
318 318 # chunk_size = 4096
319 319 # )
320 320 # except (EnvironmentError) as e:
321 321 # print str(e)
322 322 # raise e
323 323 #
324 324 # return answer
325 325
326 326
327 327 '''
328 328 def __init__(self, cmd, inputstream=None, buffer_size=65536,
329 329 chunk_size=4096, starting_values=[], **kwargs):
330 330 '''
331 331 Initializes SubprocessIOChunker
332 332
333 333 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
334 334 :param inputstream: (Default: None) A file-like, string, or file pointer.
335 335 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
336 336 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
337 337 :param starting_values: (Default: []) An array of strings to put in front of output que.
338 338 '''
339 339
340 340 if inputstream:
341 341 input_streamer = StreamFeeder(inputstream)
342 342 input_streamer.start()
343 343 inputstream = input_streamer.output
344 344
345 _shell = kwargs.get('shell', True)
345 346 if isinstance(cmd, (list, tuple)):
346 347 cmd = ' '.join(cmd)
347 348
348 _shell = kwargs.get('shell') or True
349 349 kwargs['shell'] = _shell
350 350 _p = subprocess.Popen(cmd,
351 351 bufsize=-1,
352 352 stdin=inputstream,
353 353 stdout=subprocess.PIPE,
354 354 stderr=subprocess.PIPE,
355 355 **kwargs
356 356 )
357 357
358 358 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
359 359 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
360 360
361 361 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
362 362 # doing this until we reach either end of file, or end of buffer.
363 363 bg_out.data_added_event.wait(1)
364 364 bg_out.data_added_event.clear()
365 365
366 366 # at this point it's still ambiguous if we are done reading or just full buffer.
367 367 # Either way, if error (returned by ended process, or implied based on
368 368 # presence of stuff in stderr output) we error out.
369 369 # Else, we are happy.
370 370 _returncode = _p.poll()
371 371 if _returncode or (_returncode == None and bg_err.length):
372 372 try:
373 373 _p.terminate()
374 374 except:
375 375 pass
376 376 bg_out.stop()
377 377 bg_err.stop()
378 378 err = '%s' % ''.join(bg_err)
379 379 if err:
380 380 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
381 381 raise EnvironmentError("Subprocess exited with non 0 ret code:%s" % _returncode)
382 382
383 383 self.process = _p
384 384 self.output = bg_out
385 385 self.error = bg_err
386 386
387 387 def __iter__(self):
388 388 return self
389 389
390 390 def next(self):
391 391 if self.process.poll():
392 392 err = '%s' % ''.join(self.error)
393 393 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
394 394 return self.output.next()
395 395
396 396 def throw(self, type, value=None, traceback=None):
397 397 if self.output.length or not self.output.done_reading:
398 398 raise type(value)
399 399
400 400 def close(self):
401 401 try:
402 402 self.process.terminate()
403 403 except:
404 404 pass
405 405 try:
406 406 self.output.close()
407 407 except:
408 408 pass
409 409 try:
410 410 self.error.close()
411 411 except:
412 412 pass
413 413
414 414 def __del__(self):
415 415 self.close()
General Comments 0
You need to be logged in to leave comments. Login now