##// END OF EJS Templates
fixed handling shell argument in subprocess calls, it always was hardcoded even when passed properly in arguments
marcink -
r3830:08d439bf beta
parent child Browse files
Show More
@@ -1,694 +1,695 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 vcs.backends.git.repository
3 vcs.backends.git.repository
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 Git repository implementation.
6 Git repository implementation.
7
7
8 :created_on: Apr 8, 2010
8 :created_on: Apr 8, 2010
9 :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
9 :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
10 """
10 """
11
11
12 import os
12 import os
13 import re
13 import re
14 import time
14 import time
15 import urllib
15 import urllib
16 import urllib2
16 import urllib2
17 import logging
17 import logging
18 import posixpath
18 import posixpath
19 import string
19 import string
20
20
21 from dulwich.objects import Tag
21 from dulwich.objects import Tag
22 from dulwich.repo import Repo, NotGitRepository
22 from dulwich.repo import Repo, NotGitRepository
23
23
24 from rhodecode.lib.vcs import subprocessio
24 from rhodecode.lib.vcs import subprocessio
25 from rhodecode.lib.vcs.backends.base import BaseRepository, CollectionGenerator
25 from rhodecode.lib.vcs.backends.base import BaseRepository, CollectionGenerator
26 from rhodecode.lib.vcs.conf import settings
26 from rhodecode.lib.vcs.conf import settings
27
27
28 from rhodecode.lib.vcs.exceptions import (
28 from rhodecode.lib.vcs.exceptions import (
29 BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
29 BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
30 RepositoryError, TagAlreadyExistError, TagDoesNotExistError
30 RepositoryError, TagAlreadyExistError, TagDoesNotExistError
31 )
31 )
32 from rhodecode.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
32 from rhodecode.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
33 from rhodecode.lib.vcs.utils.lazy import LazyProperty
33 from rhodecode.lib.vcs.utils.lazy import LazyProperty
34 from rhodecode.lib.vcs.utils.ordered_dict import OrderedDict
34 from rhodecode.lib.vcs.utils.ordered_dict import OrderedDict
35 from rhodecode.lib.vcs.utils.paths import abspath, get_user_home
35 from rhodecode.lib.vcs.utils.paths import abspath, get_user_home
36
36
37 from rhodecode.lib.vcs.utils.hgcompat import (
37 from rhodecode.lib.vcs.utils.hgcompat import (
38 hg_url, httpbasicauthhandler, httpdigestauthhandler
38 hg_url, httpbasicauthhandler, httpdigestauthhandler
39 )
39 )
40
40
41 from .changeset import GitChangeset
41 from .changeset import GitChangeset
42 from .config import ConfigFile
42 from .config import ConfigFile
43 from .inmemory import GitInMemoryChangeset
43 from .inmemory import GitInMemoryChangeset
44 from .workdir import GitWorkdir
44 from .workdir import GitWorkdir
45
45
46 SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
46 SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
47
47
48 log = logging.getLogger(__name__)
48 log = logging.getLogger(__name__)
49
49
50
50
51 class GitRepository(BaseRepository):
51 class GitRepository(BaseRepository):
52 """
52 """
53 Git repository backend.
53 Git repository backend.
54 """
54 """
55 DEFAULT_BRANCH_NAME = 'master'
55 DEFAULT_BRANCH_NAME = 'master'
56 scm = 'git'
56 scm = 'git'
57
57
58 def __init__(self, repo_path, create=False, src_url=None,
58 def __init__(self, repo_path, create=False, src_url=None,
59 update_after_clone=False, bare=False):
59 update_after_clone=False, bare=False):
60
60
61 self.path = abspath(repo_path)
61 self.path = abspath(repo_path)
62 repo = self._get_repo(create, src_url, update_after_clone, bare)
62 repo = self._get_repo(create, src_url, update_after_clone, bare)
63 self.bare = repo.bare
63 self.bare = repo.bare
64
64
65 @property
65 @property
66 def _config_files(self):
66 def _config_files(self):
67 return [
67 return [
68 self.bare and abspath(self.path, 'config')
68 self.bare and abspath(self.path, 'config')
69 or abspath(self.path, '.git', 'config'),
69 or abspath(self.path, '.git', 'config'),
70 abspath(get_user_home(), '.gitconfig'),
70 abspath(get_user_home(), '.gitconfig'),
71 ]
71 ]
72
72
73 @property
73 @property
74 def _repo(self):
74 def _repo(self):
75 return Repo(self.path)
75 return Repo(self.path)
76
76
77 @property
77 @property
78 def head(self):
78 def head(self):
79 try:
79 try:
80 return self._repo.head()
80 return self._repo.head()
81 except KeyError:
81 except KeyError:
82 return None
82 return None
83
83
84 @LazyProperty
84 @LazyProperty
85 def revisions(self):
85 def revisions(self):
86 """
86 """
87 Returns list of revisions' ids, in ascending order. Being lazy
87 Returns list of revisions' ids, in ascending order. Being lazy
88 attribute allows external tools to inject shas from cache.
88 attribute allows external tools to inject shas from cache.
89 """
89 """
90 return self._get_all_revisions()
90 return self._get_all_revisions()
91
91
92 @classmethod
92 @classmethod
93 def _run_git_command(cls, cmd, **opts):
93 def _run_git_command(cls, cmd, **opts):
94 """
94 """
95 Runs given ``cmd`` as git command and returns tuple
95 Runs given ``cmd`` as git command and returns tuple
96 (stdout, stderr).
96 (stdout, stderr).
97
97
98 :param cmd: git command to be executed
98 :param cmd: git command to be executed
99 :param opts: env options to pass into Subprocess command
99 :param opts: env options to pass into Subprocess command
100 """
100 """
101
101
102 if '_bare' in opts:
102 if '_bare' in opts:
103 _copts = []
103 _copts = []
104 del opts['_bare']
104 del opts['_bare']
105 else:
105 else:
106 _copts = ['-c', 'core.quotepath=false', ]
106 _copts = ['-c', 'core.quotepath=false', ]
107 safe_call = False
107 safe_call = False
108 if '_safe' in opts:
108 if '_safe' in opts:
109 #no exc on failure
109 #no exc on failure
110 del opts['_safe']
110 del opts['_safe']
111 safe_call = True
111 safe_call = True
112
112
113 _str_cmd = False
113 _str_cmd = False
114 if isinstance(cmd, basestring):
114 if isinstance(cmd, basestring):
115 cmd = [cmd]
115 cmd = [cmd]
116 _str_cmd = True
116 _str_cmd = True
117
117
118 gitenv = os.environ
118 gitenv = os.environ
119 # need to clean fix GIT_DIR !
119 # need to clean fix GIT_DIR !
120 if 'GIT_DIR' in gitenv:
120 if 'GIT_DIR' in gitenv:
121 del gitenv['GIT_DIR']
121 del gitenv['GIT_DIR']
122 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
122 gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
123
123
124 _git_path = settings.GIT_EXECUTABLE_PATH
124 _git_path = settings.GIT_EXECUTABLE_PATH
125 cmd = [_git_path] + _copts + cmd
125 cmd = [_git_path] + _copts + cmd
126 if _str_cmd:
126 if _str_cmd:
127 cmd = ' '.join(cmd)
127 cmd = ' '.join(cmd)
128
128 try:
129 try:
129 _opts = dict(
130 _opts = dict(
130 env=gitenv,
131 env=gitenv,
131 shell=False,
132 shell=True,
132 )
133 )
133 _opts.update(opts)
134 _opts.update(opts)
134 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
135 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
135 except (EnvironmentError, OSError), err:
136 except (EnvironmentError, OSError), err:
136 tb_err = ("Couldn't run git command (%s).\n"
137 tb_err = ("Couldn't run git command (%s).\n"
137 "Original error was:%s\n" % (cmd, err))
138 "Original error was:%s\n" % (cmd, err))
138 log.error(tb_err)
139 log.error(tb_err)
139 if safe_call:
140 if safe_call:
140 return '', err
141 return '', err
141 else:
142 else:
142 raise RepositoryError(tb_err)
143 raise RepositoryError(tb_err)
143
144
144 return ''.join(p.output), ''.join(p.error)
145 return ''.join(p.output), ''.join(p.error)
145
146
146 def run_git_command(self, cmd):
147 def run_git_command(self, cmd):
147 opts = {}
148 opts = {}
148 if os.path.isdir(self.path):
149 if os.path.isdir(self.path):
149 opts['cwd'] = self.path
150 opts['cwd'] = self.path
150 return self._run_git_command(cmd, **opts)
151 return self._run_git_command(cmd, **opts)
151
152
152 @classmethod
153 @classmethod
153 def _check_url(cls, url):
154 def _check_url(cls, url):
154 """
155 """
155 Functon will check given url and try to verify if it's a valid
156 Functon will check given url and try to verify if it's a valid
156 link. Sometimes it may happened that mercurial will issue basic
157 link. Sometimes it may happened that mercurial will issue basic
157 auth request that can cause whole API to hang when used from python
158 auth request that can cause whole API to hang when used from python
158 or other external calls.
159 or other external calls.
159
160
160 On failures it'll raise urllib2.HTTPError
161 On failures it'll raise urllib2.HTTPError
161 """
162 """
162
163
163 # check first if it's not an local url
164 # check first if it's not an local url
164 if os.path.isdir(url) or url.startswith('file:'):
165 if os.path.isdir(url) or url.startswith('file:'):
165 return True
166 return True
166
167
167 if('+' in url[:url.find('://')]):
168 if('+' in url[:url.find('://')]):
168 url = url[url.find('+') + 1:]
169 url = url[url.find('+') + 1:]
169
170
170 handlers = []
171 handlers = []
171 test_uri, authinfo = hg_url(url).authinfo()
172 test_uri, authinfo = hg_url(url).authinfo()
172 if not test_uri.endswith('info/refs'):
173 if not test_uri.endswith('info/refs'):
173 test_uri = test_uri.rstrip('/') + '/info/refs'
174 test_uri = test_uri.rstrip('/') + '/info/refs'
174 if authinfo:
175 if authinfo:
175 #create a password manager
176 #create a password manager
176 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
177 passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
177 passmgr.add_password(*authinfo)
178 passmgr.add_password(*authinfo)
178
179
179 handlers.extend((httpbasicauthhandler(passmgr),
180 handlers.extend((httpbasicauthhandler(passmgr),
180 httpdigestauthhandler(passmgr)))
181 httpdigestauthhandler(passmgr)))
181
182
182 o = urllib2.build_opener(*handlers)
183 o = urllib2.build_opener(*handlers)
183 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
184 o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
184
185
185 q = {"service": 'git-upload-pack'}
186 q = {"service": 'git-upload-pack'}
186 qs = '?%s' % urllib.urlencode(q)
187 qs = '?%s' % urllib.urlencode(q)
187 cu = "%s%s" % (test_uri, qs)
188 cu = "%s%s" % (test_uri, qs)
188 req = urllib2.Request(cu, None, {})
189 req = urllib2.Request(cu, None, {})
189
190
190 try:
191 try:
191 resp = o.open(req)
192 resp = o.open(req)
192 return resp.code == 200
193 return resp.code == 200
193 except Exception, e:
194 except Exception, e:
194 # means it cannot be cloned
195 # means it cannot be cloned
195 raise urllib2.URLError("[%s] %s" % (url, e))
196 raise urllib2.URLError("[%s] %s" % (url, e))
196
197
197 def _get_repo(self, create, src_url=None, update_after_clone=False,
198 def _get_repo(self, create, src_url=None, update_after_clone=False,
198 bare=False):
199 bare=False):
199 if create and os.path.exists(self.path):
200 if create and os.path.exists(self.path):
200 raise RepositoryError("Location already exist")
201 raise RepositoryError("Location already exist")
201 if src_url and not create:
202 if src_url and not create:
202 raise RepositoryError("Create should be set to True if src_url is "
203 raise RepositoryError("Create should be set to True if src_url is "
203 "given (clone operation creates repository)")
204 "given (clone operation creates repository)")
204 try:
205 try:
205 if create and src_url:
206 if create and src_url:
206 GitRepository._check_url(src_url)
207 GitRepository._check_url(src_url)
207 self.clone(src_url, update_after_clone, bare)
208 self.clone(src_url, update_after_clone, bare)
208 return Repo(self.path)
209 return Repo(self.path)
209 elif create:
210 elif create:
210 os.mkdir(self.path)
211 os.mkdir(self.path)
211 if bare:
212 if bare:
212 return Repo.init_bare(self.path)
213 return Repo.init_bare(self.path)
213 else:
214 else:
214 return Repo.init(self.path)
215 return Repo.init(self.path)
215 else:
216 else:
216 return self._repo
217 return self._repo
217 except (NotGitRepository, OSError), err:
218 except (NotGitRepository, OSError), err:
218 raise RepositoryError(err)
219 raise RepositoryError(err)
219
220
220 def _get_all_revisions(self):
221 def _get_all_revisions(self):
221 # we must check if this repo is not empty, since later command
222 # we must check if this repo is not empty, since later command
222 # fails if it is. And it's cheaper to ask than throw the subprocess
223 # fails if it is. And it's cheaper to ask than throw the subprocess
223 # errors
224 # errors
224 try:
225 try:
225 self._repo.head()
226 self._repo.head()
226 except KeyError:
227 except KeyError:
227 return []
228 return []
228
229
229 rev_filter = _git_path = settings.GIT_REV_FILTER
230 rev_filter = _git_path = settings.GIT_REV_FILTER
230 cmd = 'rev-list %s --reverse --date-order' % (rev_filter)
231 cmd = 'rev-list %s --reverse --date-order' % (rev_filter)
231 try:
232 try:
232 so, se = self.run_git_command(cmd)
233 so, se = self.run_git_command(cmd)
233 except RepositoryError:
234 except RepositoryError:
234 # Can be raised for empty repositories
235 # Can be raised for empty repositories
235 return []
236 return []
236 return so.splitlines()
237 return so.splitlines()
237
238
238 def _get_all_revisions2(self):
239 def _get_all_revisions2(self):
239 #alternate implementation using dulwich
240 #alternate implementation using dulwich
240 includes = [x[1][0] for x in self._parsed_refs.iteritems()
241 includes = [x[1][0] for x in self._parsed_refs.iteritems()
241 if x[1][1] != 'T']
242 if x[1][1] != 'T']
242 return [c.commit.id for c in self._repo.get_walker(include=includes)]
243 return [c.commit.id for c in self._repo.get_walker(include=includes)]
243
244
244 def _get_revision(self, revision):
245 def _get_revision(self, revision):
245 """
246 """
246 For git backend we always return integer here. This way we ensure
247 For git backend we always return integer here. This way we ensure
247 that changset's revision attribute would become integer.
248 that changset's revision attribute would become integer.
248 """
249 """
249
250
250 is_null = lambda o: len(o) == revision.count('0')
251 is_null = lambda o: len(o) == revision.count('0')
251
252
252 try:
253 try:
253 self.revisions[0]
254 self.revisions[0]
254 except (KeyError, IndexError):
255 except (KeyError, IndexError):
255 raise EmptyRepositoryError("There are no changesets yet")
256 raise EmptyRepositoryError("There are no changesets yet")
256
257
257 if revision in (None, '', 'tip', 'HEAD', 'head', -1):
258 if revision in (None, '', 'tip', 'HEAD', 'head', -1):
258 return self.revisions[-1]
259 return self.revisions[-1]
259
260
260 is_bstr = isinstance(revision, (str, unicode))
261 is_bstr = isinstance(revision, (str, unicode))
261 if ((is_bstr and revision.isdigit() and len(revision) < 12)
262 if ((is_bstr and revision.isdigit() and len(revision) < 12)
262 or isinstance(revision, int) or is_null(revision)):
263 or isinstance(revision, int) or is_null(revision)):
263 try:
264 try:
264 revision = self.revisions[int(revision)]
265 revision = self.revisions[int(revision)]
265 except Exception:
266 except Exception:
266 raise ChangesetDoesNotExistError("Revision %s does not exist "
267 raise ChangesetDoesNotExistError("Revision %s does not exist "
267 "for this repository" % (revision))
268 "for this repository" % (revision))
268
269
269 elif is_bstr:
270 elif is_bstr:
270 # get by branch/tag name
271 # get by branch/tag name
271 _ref_revision = self._parsed_refs.get(revision)
272 _ref_revision = self._parsed_refs.get(revision)
272 if _ref_revision: # and _ref_revision[1] in ['H', 'RH', 'T']:
273 if _ref_revision: # and _ref_revision[1] in ['H', 'RH', 'T']:
273 return _ref_revision[0]
274 return _ref_revision[0]
274
275
275 _tags_shas = self.tags.values()
276 _tags_shas = self.tags.values()
276 # maybe it's a tag ? we don't have them in self.revisions
277 # maybe it's a tag ? we don't have them in self.revisions
277 if revision in _tags_shas:
278 if revision in _tags_shas:
278 return _tags_shas[_tags_shas.index(revision)]
279 return _tags_shas[_tags_shas.index(revision)]
279
280
280 elif not SHA_PATTERN.match(revision) or revision not in self.revisions:
281 elif not SHA_PATTERN.match(revision) or revision not in self.revisions:
281 raise ChangesetDoesNotExistError("Revision %s does not exist "
282 raise ChangesetDoesNotExistError("Revision %s does not exist "
282 "for this repository" % (revision))
283 "for this repository" % (revision))
283
284
284 # Ensure we return full id
285 # Ensure we return full id
285 if not SHA_PATTERN.match(str(revision)):
286 if not SHA_PATTERN.match(str(revision)):
286 raise ChangesetDoesNotExistError("Given revision %s not recognized"
287 raise ChangesetDoesNotExistError("Given revision %s not recognized"
287 % revision)
288 % revision)
288 return revision
289 return revision
289
290
290 def _get_archives(self, archive_name='tip'):
291 def _get_archives(self, archive_name='tip'):
291
292
292 for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
293 for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
293 yield {"type": i[0], "extension": i[1], "node": archive_name}
294 yield {"type": i[0], "extension": i[1], "node": archive_name}
294
295
295 def _get_url(self, url):
296 def _get_url(self, url):
296 """
297 """
297 Returns normalized url. If schema is not given, would fall to
298 Returns normalized url. If schema is not given, would fall to
298 filesystem (``file:///``) schema.
299 filesystem (``file:///``) schema.
299 """
300 """
300 url = str(url)
301 url = str(url)
301 if url != 'default' and not '://' in url:
302 if url != 'default' and not '://' in url:
302 url = ':///'.join(('file', url))
303 url = ':///'.join(('file', url))
303 return url
304 return url
304
305
305 def get_hook_location(self):
306 def get_hook_location(self):
306 """
307 """
307 returns absolute path to location where hooks are stored
308 returns absolute path to location where hooks are stored
308 """
309 """
309 loc = os.path.join(self.path, 'hooks')
310 loc = os.path.join(self.path, 'hooks')
310 if not self.bare:
311 if not self.bare:
311 loc = os.path.join(self.path, '.git', 'hooks')
312 loc = os.path.join(self.path, '.git', 'hooks')
312 return loc
313 return loc
313
314
314 @LazyProperty
315 @LazyProperty
315 def name(self):
316 def name(self):
316 return os.path.basename(self.path)
317 return os.path.basename(self.path)
317
318
318 @LazyProperty
319 @LazyProperty
319 def last_change(self):
320 def last_change(self):
320 """
321 """
321 Returns last change made on this repository as datetime object
322 Returns last change made on this repository as datetime object
322 """
323 """
323 return date_fromtimestamp(self._get_mtime(), makedate()[1])
324 return date_fromtimestamp(self._get_mtime(), makedate()[1])
324
325
325 def _get_mtime(self):
326 def _get_mtime(self):
326 try:
327 try:
327 return time.mktime(self.get_changeset().date.timetuple())
328 return time.mktime(self.get_changeset().date.timetuple())
328 except RepositoryError:
329 except RepositoryError:
329 idx_loc = '' if self.bare else '.git'
330 idx_loc = '' if self.bare else '.git'
330 # fallback to filesystem
331 # fallback to filesystem
331 in_path = os.path.join(self.path, idx_loc, "index")
332 in_path = os.path.join(self.path, idx_loc, "index")
332 he_path = os.path.join(self.path, idx_loc, "HEAD")
333 he_path = os.path.join(self.path, idx_loc, "HEAD")
333 if os.path.exists(in_path):
334 if os.path.exists(in_path):
334 return os.stat(in_path).st_mtime
335 return os.stat(in_path).st_mtime
335 else:
336 else:
336 return os.stat(he_path).st_mtime
337 return os.stat(he_path).st_mtime
337
338
338 @LazyProperty
339 @LazyProperty
339 def description(self):
340 def description(self):
340 idx_loc = '' if self.bare else '.git'
341 idx_loc = '' if self.bare else '.git'
341 undefined_description = u'unknown'
342 undefined_description = u'unknown'
342 description_path = os.path.join(self.path, idx_loc, 'description')
343 description_path = os.path.join(self.path, idx_loc, 'description')
343 if os.path.isfile(description_path):
344 if os.path.isfile(description_path):
344 return safe_unicode(open(description_path).read())
345 return safe_unicode(open(description_path).read())
345 else:
346 else:
346 return undefined_description
347 return undefined_description
347
348
348 @LazyProperty
349 @LazyProperty
349 def contact(self):
350 def contact(self):
350 undefined_contact = u'Unknown'
351 undefined_contact = u'Unknown'
351 return undefined_contact
352 return undefined_contact
352
353
353 @property
354 @property
354 def branches(self):
355 def branches(self):
355 if not self.revisions:
356 if not self.revisions:
356 return {}
357 return {}
357 sortkey = lambda ctx: ctx[0]
358 sortkey = lambda ctx: ctx[0]
358 _branches = [(x[0], x[1][0])
359 _branches = [(x[0], x[1][0])
359 for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
360 for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
360 return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
361 return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
361
362
362 @LazyProperty
363 @LazyProperty
363 def tags(self):
364 def tags(self):
364 return self._get_tags()
365 return self._get_tags()
365
366
366 def _get_tags(self):
367 def _get_tags(self):
367 if not self.revisions:
368 if not self.revisions:
368 return {}
369 return {}
369
370
370 sortkey = lambda ctx: ctx[0]
371 sortkey = lambda ctx: ctx[0]
371 _tags = [(x[0], x[1][0])
372 _tags = [(x[0], x[1][0])
372 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
373 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
373 return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
374 return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
374
375
375 def tag(self, name, user, revision=None, message=None, date=None,
376 def tag(self, name, user, revision=None, message=None, date=None,
376 **kwargs):
377 **kwargs):
377 """
378 """
378 Creates and returns a tag for the given ``revision``.
379 Creates and returns a tag for the given ``revision``.
379
380
380 :param name: name for new tag
381 :param name: name for new tag
381 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
382 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
382 :param revision: changeset id for which new tag would be created
383 :param revision: changeset id for which new tag would be created
383 :param message: message of the tag's commit
384 :param message: message of the tag's commit
384 :param date: date of tag's commit
385 :param date: date of tag's commit
385
386
386 :raises TagAlreadyExistError: if tag with same name already exists
387 :raises TagAlreadyExistError: if tag with same name already exists
387 """
388 """
388 if name in self.tags:
389 if name in self.tags:
389 raise TagAlreadyExistError("Tag %s already exists" % name)
390 raise TagAlreadyExistError("Tag %s already exists" % name)
390 changeset = self.get_changeset(revision)
391 changeset = self.get_changeset(revision)
391 message = message or "Added tag %s for commit %s" % (name,
392 message = message or "Added tag %s for commit %s" % (name,
392 changeset.raw_id)
393 changeset.raw_id)
393 self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
394 self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
394
395
395 self._parsed_refs = self._get_parsed_refs()
396 self._parsed_refs = self._get_parsed_refs()
396 self.tags = self._get_tags()
397 self.tags = self._get_tags()
397 return changeset
398 return changeset
398
399
399 def remove_tag(self, name, user, message=None, date=None):
400 def remove_tag(self, name, user, message=None, date=None):
400 """
401 """
401 Removes tag with the given ``name``.
402 Removes tag with the given ``name``.
402
403
403 :param name: name of the tag to be removed
404 :param name: name of the tag to be removed
404 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
405 :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
405 :param message: message of the tag's removal commit
406 :param message: message of the tag's removal commit
406 :param date: date of tag's removal commit
407 :param date: date of tag's removal commit
407
408
408 :raises TagDoesNotExistError: if tag with given name does not exists
409 :raises TagDoesNotExistError: if tag with given name does not exists
409 """
410 """
410 if name not in self.tags:
411 if name not in self.tags:
411 raise TagDoesNotExistError("Tag %s does not exist" % name)
412 raise TagDoesNotExistError("Tag %s does not exist" % name)
412 tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
413 tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
413 try:
414 try:
414 os.remove(tagpath)
415 os.remove(tagpath)
415 self._parsed_refs = self._get_parsed_refs()
416 self._parsed_refs = self._get_parsed_refs()
416 self.tags = self._get_tags()
417 self.tags = self._get_tags()
417 except OSError, e:
418 except OSError, e:
418 raise RepositoryError(e.strerror)
419 raise RepositoryError(e.strerror)
419
420
420 @LazyProperty
421 @LazyProperty
421 def _parsed_refs(self):
422 def _parsed_refs(self):
422 return self._get_parsed_refs()
423 return self._get_parsed_refs()
423
424
424 def _get_parsed_refs(self):
425 def _get_parsed_refs(self):
425 # cache the property
426 # cache the property
426 _repo = self._repo
427 _repo = self._repo
427 refs = _repo.get_refs()
428 refs = _repo.get_refs()
428 keys = [('refs/heads/', 'H'),
429 keys = [('refs/heads/', 'H'),
429 ('refs/remotes/origin/', 'RH'),
430 ('refs/remotes/origin/', 'RH'),
430 ('refs/tags/', 'T')]
431 ('refs/tags/', 'T')]
431 _refs = {}
432 _refs = {}
432 for ref, sha in refs.iteritems():
433 for ref, sha in refs.iteritems():
433 for k, type_ in keys:
434 for k, type_ in keys:
434 if ref.startswith(k):
435 if ref.startswith(k):
435 _key = ref[len(k):]
436 _key = ref[len(k):]
436 if type_ == 'T':
437 if type_ == 'T':
437 obj = _repo.get_object(sha)
438 obj = _repo.get_object(sha)
438 if isinstance(obj, Tag):
439 if isinstance(obj, Tag):
439 sha = _repo.get_object(sha).object[1]
440 sha = _repo.get_object(sha).object[1]
440 _refs[_key] = [sha, type_]
441 _refs[_key] = [sha, type_]
441 break
442 break
442 return _refs
443 return _refs
443
444
444 def _heads(self, reverse=False):
445 def _heads(self, reverse=False):
445 refs = self._repo.get_refs()
446 refs = self._repo.get_refs()
446 heads = {}
447 heads = {}
447
448
448 for key, val in refs.items():
449 for key, val in refs.items():
449 for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
450 for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
450 if key.startswith(ref_key):
451 if key.startswith(ref_key):
451 n = key[len(ref_key):]
452 n = key[len(ref_key):]
452 if n not in ['HEAD']:
453 if n not in ['HEAD']:
453 heads[n] = val
454 heads[n] = val
454
455
455 return heads if reverse else dict((y, x) for x, y in heads.iteritems())
456 return heads if reverse else dict((y, x) for x, y in heads.iteritems())
456
457
457 def get_changeset(self, revision=None):
458 def get_changeset(self, revision=None):
458 """
459 """
459 Returns ``GitChangeset`` object representing commit from git repository
460 Returns ``GitChangeset`` object representing commit from git repository
460 at the given revision or head (most recent commit) if None given.
461 at the given revision or head (most recent commit) if None given.
461 """
462 """
462 if isinstance(revision, GitChangeset):
463 if isinstance(revision, GitChangeset):
463 return revision
464 return revision
464 revision = self._get_revision(revision)
465 revision = self._get_revision(revision)
465 changeset = GitChangeset(repository=self, revision=revision)
466 changeset = GitChangeset(repository=self, revision=revision)
466 return changeset
467 return changeset
467
468
468 def get_changesets(self, start=None, end=None, start_date=None,
469 def get_changesets(self, start=None, end=None, start_date=None,
469 end_date=None, branch_name=None, reverse=False):
470 end_date=None, branch_name=None, reverse=False):
470 """
471 """
471 Returns iterator of ``GitChangeset`` objects from start to end (both
472 Returns iterator of ``GitChangeset`` objects from start to end (both
472 are inclusive), in ascending date order (unless ``reverse`` is set).
473 are inclusive), in ascending date order (unless ``reverse`` is set).
473
474
474 :param start: changeset ID, as str; first returned changeset
475 :param start: changeset ID, as str; first returned changeset
475 :param end: changeset ID, as str; last returned changeset
476 :param end: changeset ID, as str; last returned changeset
476 :param start_date: if specified, changesets with commit date less than
477 :param start_date: if specified, changesets with commit date less than
477 ``start_date`` would be filtered out from returned set
478 ``start_date`` would be filtered out from returned set
478 :param end_date: if specified, changesets with commit date greater than
479 :param end_date: if specified, changesets with commit date greater than
479 ``end_date`` would be filtered out from returned set
480 ``end_date`` would be filtered out from returned set
480 :param branch_name: if specified, changesets not reachable from given
481 :param branch_name: if specified, changesets not reachable from given
481 branch would be filtered out from returned set
482 branch would be filtered out from returned set
482 :param reverse: if ``True``, returned generator would be reversed
483 :param reverse: if ``True``, returned generator would be reversed
483 (meaning that returned changesets would have descending date order)
484 (meaning that returned changesets would have descending date order)
484
485
485 :raise BranchDoesNotExistError: If given ``branch_name`` does not
486 :raise BranchDoesNotExistError: If given ``branch_name`` does not
486 exist.
487 exist.
487 :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
488 :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
488 ``end`` could not be found.
489 ``end`` could not be found.
489
490
490 """
491 """
491 if branch_name and branch_name not in self.branches:
492 if branch_name and branch_name not in self.branches:
492 raise BranchDoesNotExistError("Branch '%s' not found" \
493 raise BranchDoesNotExistError("Branch '%s' not found" \
493 % branch_name)
494 % branch_name)
494 # %H at format means (full) commit hash, initial hashes are retrieved
495 # %H at format means (full) commit hash, initial hashes are retrieved
495 # in ascending date order
496 # in ascending date order
496 cmd_template = 'log --date-order --reverse --pretty=format:"%H"'
497 cmd_template = 'log --date-order --reverse --pretty=format:"%H"'
497 cmd_params = {}
498 cmd_params = {}
498 if start_date:
499 if start_date:
499 cmd_template += ' --since "$since"'
500 cmd_template += ' --since "$since"'
500 cmd_params['since'] = start_date.strftime('%m/%d/%y %H:%M:%S')
501 cmd_params['since'] = start_date.strftime('%m/%d/%y %H:%M:%S')
501 if end_date:
502 if end_date:
502 cmd_template += ' --until "$until"'
503 cmd_template += ' --until "$until"'
503 cmd_params['until'] = end_date.strftime('%m/%d/%y %H:%M:%S')
504 cmd_params['until'] = end_date.strftime('%m/%d/%y %H:%M:%S')
504 if branch_name:
505 if branch_name:
505 cmd_template += ' $branch_name'
506 cmd_template += ' $branch_name'
506 cmd_params['branch_name'] = branch_name
507 cmd_params['branch_name'] = branch_name
507 else:
508 else:
508 rev_filter = _git_path = settings.GIT_REV_FILTER
509 rev_filter = _git_path = settings.GIT_REV_FILTER
509 cmd_template += ' %s' % (rev_filter)
510 cmd_template += ' %s' % (rev_filter)
510
511
511 cmd = string.Template(cmd_template).safe_substitute(**cmd_params)
512 cmd = string.Template(cmd_template).safe_substitute(**cmd_params)
512 revs = self.run_git_command(cmd)[0].splitlines()
513 revs = self.run_git_command(cmd)[0].splitlines()
513 start_pos = 0
514 start_pos = 0
514 end_pos = len(revs)
515 end_pos = len(revs)
515 if start:
516 if start:
516 _start = self._get_revision(start)
517 _start = self._get_revision(start)
517 try:
518 try:
518 start_pos = revs.index(_start)
519 start_pos = revs.index(_start)
519 except ValueError:
520 except ValueError:
520 pass
521 pass
521
522
522 if end is not None:
523 if end is not None:
523 _end = self._get_revision(end)
524 _end = self._get_revision(end)
524 try:
525 try:
525 end_pos = revs.index(_end)
526 end_pos = revs.index(_end)
526 except ValueError:
527 except ValueError:
527 pass
528 pass
528
529
529 if None not in [start, end] and start_pos > end_pos:
530 if None not in [start, end] and start_pos > end_pos:
530 raise RepositoryError('start cannot be after end')
531 raise RepositoryError('start cannot be after end')
531
532
532 if end_pos is not None:
533 if end_pos is not None:
533 end_pos += 1
534 end_pos += 1
534
535
535 revs = revs[start_pos:end_pos]
536 revs = revs[start_pos:end_pos]
536 if reverse:
537 if reverse:
537 revs = reversed(revs)
538 revs = reversed(revs)
538 return CollectionGenerator(self, revs)
539 return CollectionGenerator(self, revs)
539
540
540 def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
541 def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
541 context=3):
542 context=3):
542 """
543 """
543 Returns (git like) *diff*, as plain text. Shows changes introduced by
544 Returns (git like) *diff*, as plain text. Shows changes introduced by
544 ``rev2`` since ``rev1``.
545 ``rev2`` since ``rev1``.
545
546
546 :param rev1: Entry point from which diff is shown. Can be
547 :param rev1: Entry point from which diff is shown. Can be
547 ``self.EMPTY_CHANGESET`` - in this case, patch showing all
548 ``self.EMPTY_CHANGESET`` - in this case, patch showing all
548 the changes since empty state of the repository until ``rev2``
549 the changes since empty state of the repository until ``rev2``
549 :param rev2: Until which revision changes should be shown.
550 :param rev2: Until which revision changes should be shown.
550 :param ignore_whitespace: If set to ``True``, would not show whitespace
551 :param ignore_whitespace: If set to ``True``, would not show whitespace
551 changes. Defaults to ``False``.
552 changes. Defaults to ``False``.
552 :param context: How many lines before/after changed lines should be
553 :param context: How many lines before/after changed lines should be
553 shown. Defaults to ``3``.
554 shown. Defaults to ``3``.
554 """
555 """
555 flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
556 flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
556 if ignore_whitespace:
557 if ignore_whitespace:
557 flags.append('-w')
558 flags.append('-w')
558
559
559 if hasattr(rev1, 'raw_id'):
560 if hasattr(rev1, 'raw_id'):
560 rev1 = getattr(rev1, 'raw_id')
561 rev1 = getattr(rev1, 'raw_id')
561
562
562 if hasattr(rev2, 'raw_id'):
563 if hasattr(rev2, 'raw_id'):
563 rev2 = getattr(rev2, 'raw_id')
564 rev2 = getattr(rev2, 'raw_id')
564
565
565 if rev1 == self.EMPTY_CHANGESET:
566 if rev1 == self.EMPTY_CHANGESET:
566 rev2 = self.get_changeset(rev2).raw_id
567 rev2 = self.get_changeset(rev2).raw_id
567 cmd = ' '.join(['show'] + flags + [rev2])
568 cmd = ' '.join(['show'] + flags + [rev2])
568 else:
569 else:
569 rev1 = self.get_changeset(rev1).raw_id
570 rev1 = self.get_changeset(rev1).raw_id
570 rev2 = self.get_changeset(rev2).raw_id
571 rev2 = self.get_changeset(rev2).raw_id
571 cmd = ' '.join(['diff'] + flags + [rev1, rev2])
572 cmd = ' '.join(['diff'] + flags + [rev1, rev2])
572
573
573 if path:
574 if path:
574 cmd += ' -- "%s"' % path
575 cmd += ' -- "%s"' % path
575
576
576 stdout, stderr = self.run_git_command(cmd)
577 stdout, stderr = self.run_git_command(cmd)
577 # If we used 'show' command, strip first few lines (until actual diff
578 # If we used 'show' command, strip first few lines (until actual diff
578 # starts)
579 # starts)
579 if rev1 == self.EMPTY_CHANGESET:
580 if rev1 == self.EMPTY_CHANGESET:
580 lines = stdout.splitlines()
581 lines = stdout.splitlines()
581 x = 0
582 x = 0
582 for line in lines:
583 for line in lines:
583 if line.startswith('diff'):
584 if line.startswith('diff'):
584 break
585 break
585 x += 1
586 x += 1
586 # Append new line just like 'diff' command do
587 # Append new line just like 'diff' command do
587 stdout = '\n'.join(lines[x:]) + '\n'
588 stdout = '\n'.join(lines[x:]) + '\n'
588 return stdout
589 return stdout
589
590
590 @LazyProperty
591 @LazyProperty
591 def in_memory_changeset(self):
592 def in_memory_changeset(self):
592 """
593 """
593 Returns ``GitInMemoryChangeset`` object for this repository.
594 Returns ``GitInMemoryChangeset`` object for this repository.
594 """
595 """
595 return GitInMemoryChangeset(self)
596 return GitInMemoryChangeset(self)
596
597
597 def clone(self, url, update_after_clone=True, bare=False):
598 def clone(self, url, update_after_clone=True, bare=False):
598 """
599 """
599 Tries to clone changes from external location.
600 Tries to clone changes from external location.
600
601
601 :param update_after_clone: If set to ``False``, git won't checkout
602 :param update_after_clone: If set to ``False``, git won't checkout
602 working directory
603 working directory
603 :param bare: If set to ``True``, repository would be cloned into
604 :param bare: If set to ``True``, repository would be cloned into
604 *bare* git repository (no working directory at all).
605 *bare* git repository (no working directory at all).
605 """
606 """
606 url = self._get_url(url)
607 url = self._get_url(url)
607 cmd = ['clone']
608 cmd = ['clone']
608 if bare:
609 if bare:
609 cmd.append('--bare')
610 cmd.append('--bare')
610 elif not update_after_clone:
611 elif not update_after_clone:
611 cmd.append('--no-checkout')
612 cmd.append('--no-checkout')
612 cmd += ['--', '"%s"' % url, '"%s"' % self.path]
613 cmd += ['--', '"%s"' % url, '"%s"' % self.path]
613 cmd = ' '.join(cmd)
614 cmd = ' '.join(cmd)
614 # If error occurs run_git_command raises RepositoryError already
615 # If error occurs run_git_command raises RepositoryError already
615 self.run_git_command(cmd)
616 self.run_git_command(cmd)
616
617
617 def pull(self, url):
618 def pull(self, url):
618 """
619 """
619 Tries to pull changes from external location.
620 Tries to pull changes from external location.
620 """
621 """
621 url = self._get_url(url)
622 url = self._get_url(url)
622 cmd = ['pull']
623 cmd = ['pull']
623 cmd.append("--ff-only")
624 cmd.append("--ff-only")
624 cmd.append(url)
625 cmd.append(url)
625 cmd = ' '.join(cmd)
626 cmd = ' '.join(cmd)
626 # If error occurs run_git_command raises RepositoryError already
627 # If error occurs run_git_command raises RepositoryError already
627 self.run_git_command(cmd)
628 self.run_git_command(cmd)
628
629
629 def fetch(self, url):
630 def fetch(self, url):
630 """
631 """
631 Tries to pull changes from external location.
632 Tries to pull changes from external location.
632 """
633 """
633 url = self._get_url(url)
634 url = self._get_url(url)
634 so, se = self.run_git_command('ls-remote -h %s' % url)
635 so, se = self.run_git_command('ls-remote -h %s' % url)
635 refs = []
636 refs = []
636 for line in (x for x in so.splitlines()):
637 for line in (x for x in so.splitlines()):
637 sha, ref = line.split('\t')
638 sha, ref = line.split('\t')
638 refs.append(ref)
639 refs.append(ref)
639 refs = ' '.join(('+%s:%s' % (r, r) for r in refs))
640 refs = ' '.join(('+%s:%s' % (r, r) for r in refs))
640 cmd = '''fetch %s -- %s''' % (url, refs)
641 cmd = '''fetch %s -- %s''' % (url, refs)
641 self.run_git_command(cmd)
642 self.run_git_command(cmd)
642
643
643 @LazyProperty
644 @LazyProperty
644 def workdir(self):
645 def workdir(self):
645 """
646 """
646 Returns ``Workdir`` instance for this repository.
647 Returns ``Workdir`` instance for this repository.
647 """
648 """
648 return GitWorkdir(self)
649 return GitWorkdir(self)
649
650
650 def get_config_value(self, section, name, config_file=None):
651 def get_config_value(self, section, name, config_file=None):
651 """
652 """
652 Returns configuration value for a given [``section``] and ``name``.
653 Returns configuration value for a given [``section``] and ``name``.
653
654
654 :param section: Section we want to retrieve value from
655 :param section: Section we want to retrieve value from
655 :param name: Name of configuration we want to retrieve
656 :param name: Name of configuration we want to retrieve
656 :param config_file: A path to file which should be used to retrieve
657 :param config_file: A path to file which should be used to retrieve
657 configuration from (might also be a list of file paths)
658 configuration from (might also be a list of file paths)
658 """
659 """
659 if config_file is None:
660 if config_file is None:
660 config_file = []
661 config_file = []
661 elif isinstance(config_file, basestring):
662 elif isinstance(config_file, basestring):
662 config_file = [config_file]
663 config_file = [config_file]
663
664
664 def gen_configs():
665 def gen_configs():
665 for path in config_file + self._config_files:
666 for path in config_file + self._config_files:
666 try:
667 try:
667 yield ConfigFile.from_path(path)
668 yield ConfigFile.from_path(path)
668 except (IOError, OSError, ValueError):
669 except (IOError, OSError, ValueError):
669 continue
670 continue
670
671
671 for config in gen_configs():
672 for config in gen_configs():
672 try:
673 try:
673 return config.get(section, name)
674 return config.get(section, name)
674 except KeyError:
675 except KeyError:
675 continue
676 continue
676 return None
677 return None
677
678
678 def get_user_name(self, config_file=None):
679 def get_user_name(self, config_file=None):
679 """
680 """
680 Returns user's name from global configuration file.
681 Returns user's name from global configuration file.
681
682
682 :param config_file: A path to file which should be used to retrieve
683 :param config_file: A path to file which should be used to retrieve
683 configuration from (might also be a list of file paths)
684 configuration from (might also be a list of file paths)
684 """
685 """
685 return self.get_config_value('user', 'name', config_file)
686 return self.get_config_value('user', 'name', config_file)
686
687
687 def get_user_email(self, config_file=None):
688 def get_user_email(self, config_file=None):
688 """
689 """
689 Returns user's email from global configuration file.
690 Returns user's email from global configuration file.
690
691
691 :param config_file: A path to file which should be used to retrieve
692 :param config_file: A path to file which should be used to retrieve
692 configuration from (might also be a list of file paths)
693 configuration from (might also be a list of file paths)
693 """
694 """
694 return self.get_config_value('user', 'email', config_file)
695 return self.get_config_value('user', 'email', config_file)
@@ -1,415 +1,415 b''
1 '''
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
24 '''
25 import os
25 import os
26 import subprocess
26 import subprocess
27 from rhodecode.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
27 from rhodecode.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
28
28
29
29
30 class StreamFeeder(Thread):
30 class StreamFeeder(Thread):
31 """
31 """
32 Normal writing into pipe-like is blocking once the buffer is filled.
32 Normal writing into pipe-like is blocking once the buffer is filled.
33 This thread allows a thread to seep data from a file-like into a pipe
33 This thread allows a thread to seep data from a file-like into a pipe
34 without blocking the main thread.
34 without blocking the main thread.
35 We close inpipe once the end of the source stream is reached.
35 We close inpipe once the end of the source stream is reached.
36 """
36 """
37 def __init__(self, source):
37 def __init__(self, source):
38 super(StreamFeeder, self).__init__()
38 super(StreamFeeder, self).__init__()
39 self.daemon = True
39 self.daemon = True
40 filelike = False
40 filelike = False
41 self.bytes = _bytes()
41 self.bytes = _bytes()
42 if type(source) in (type(''), _bytes, _bytearray): # string-like
42 if type(source) in (type(''), _bytes, _bytearray): # string-like
43 self.bytes = _bytes(source)
43 self.bytes = _bytes(source)
44 else: # can be either file pointer or file-like
44 else: # can be either file pointer or file-like
45 if type(source) in (int, long): # file pointer it is
45 if type(source) in (int, long): # file pointer it is
46 ## converting file descriptor (int) stdin into file-like
46 ## converting file descriptor (int) stdin into file-like
47 try:
47 try:
48 source = os.fdopen(source, 'rb', 16384)
48 source = os.fdopen(source, 'rb', 16384)
49 except Exception:
49 except Exception:
50 pass
50 pass
51 # let's see if source is file-like by now
51 # let's see if source is file-like by now
52 try:
52 try:
53 filelike = source.read
53 filelike = source.read
54 except Exception:
54 except Exception:
55 pass
55 pass
56 if not filelike and not self.bytes:
56 if not filelike and not self.bytes:
57 raise TypeError("StreamFeeder's source object must be a readable "
57 raise TypeError("StreamFeeder's source object must be a readable "
58 "file-like, a file descriptor, or a string-like.")
58 "file-like, a file descriptor, or a string-like.")
59 self.source = source
59 self.source = source
60 self.readiface, self.writeiface = os.pipe()
60 self.readiface, self.writeiface = os.pipe()
61
61
62 def run(self):
62 def run(self):
63 t = self.writeiface
63 t = self.writeiface
64 if self.bytes:
64 if self.bytes:
65 os.write(t, self.bytes)
65 os.write(t, self.bytes)
66 else:
66 else:
67 s = self.source
67 s = self.source
68 b = s.read(4096)
68 b = s.read(4096)
69 while b:
69 while b:
70 os.write(t, b)
70 os.write(t, b)
71 b = s.read(4096)
71 b = s.read(4096)
72 os.close(t)
72 os.close(t)
73
73
74 @property
74 @property
75 def output(self):
75 def output(self):
76 return self.readiface
76 return self.readiface
77
77
78
78
79 class InputStreamChunker(Thread):
79 class InputStreamChunker(Thread):
80 def __init__(self, source, target, buffer_size, chunk_size):
80 def __init__(self, source, target, buffer_size, chunk_size):
81
81
82 super(InputStreamChunker, self).__init__()
82 super(InputStreamChunker, self).__init__()
83
83
84 self.daemon = True # die die die.
84 self.daemon = True # die die die.
85
85
86 self.source = source
86 self.source = source
87 self.target = target
87 self.target = target
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_size = chunk_size
89 self.chunk_size = chunk_size
90
90
91 self.data_added = Event()
91 self.data_added = Event()
92 self.data_added.clear()
92 self.data_added.clear()
93
93
94 self.keep_reading = Event()
94 self.keep_reading = Event()
95 self.keep_reading.set()
95 self.keep_reading.set()
96
96
97 self.EOF = Event()
97 self.EOF = Event()
98 self.EOF.clear()
98 self.EOF.clear()
99
99
100 self.go = Event()
100 self.go = Event()
101 self.go.set()
101 self.go.set()
102
102
103 def stop(self):
103 def stop(self):
104 self.go.clear()
104 self.go.clear()
105 self.EOF.set()
105 self.EOF.set()
106 try:
106 try:
107 # this is not proper, but is done to force the reader thread let
107 # this is not proper, but is done to force the reader thread let
108 # go of the input because, if successful, .close() will send EOF
108 # go of the input because, if successful, .close() will send EOF
109 # down the pipe.
109 # down the pipe.
110 self.source.close()
110 self.source.close()
111 except:
111 except:
112 pass
112 pass
113
113
114 def run(self):
114 def run(self):
115 s = self.source
115 s = self.source
116 t = self.target
116 t = self.target
117 cs = self.chunk_size
117 cs = self.chunk_size
118 ccm = self.chunk_count_max
118 ccm = self.chunk_count_max
119 kr = self.keep_reading
119 kr = self.keep_reading
120 da = self.data_added
120 da = self.data_added
121 go = self.go
121 go = self.go
122
122
123 try:
123 try:
124 b = s.read(cs)
124 b = s.read(cs)
125 except ValueError:
125 except ValueError:
126 b = ''
126 b = ''
127
127
128 while b and go.is_set():
128 while b and go.is_set():
129 if len(t) > ccm:
129 if len(t) > ccm:
130 kr.clear()
130 kr.clear()
131 kr.wait(2)
131 kr.wait(2)
132 # # this only works on 2.7.x and up
132 # # this only works on 2.7.x and up
133 # if not kr.wait(10):
133 # if not kr.wait(10):
134 # raise Exception("Timed out while waiting for input to be read.")
134 # raise Exception("Timed out while waiting for input to be read.")
135 # instead we'll use this
135 # instead we'll use this
136 if len(t) > ccm + 3:
136 if len(t) > ccm + 3:
137 raise IOError("Timed out while waiting for input from subprocess.")
137 raise IOError("Timed out while waiting for input from subprocess.")
138 t.append(b)
138 t.append(b)
139 da.set()
139 da.set()
140 b = s.read(cs)
140 b = s.read(cs)
141 self.EOF.set()
141 self.EOF.set()
142 da.set() # for cases when done but there was no input.
142 da.set() # for cases when done but there was no input.
143
143
144
144
145 class BufferedGenerator():
145 class BufferedGenerator():
146 '''
146 '''
147 Class behaves as a non-blocking, buffered pipe reader.
147 Class behaves as a non-blocking, buffered pipe reader.
148 Reads chunks of data (through a thread)
148 Reads chunks of data (through a thread)
149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 Reading is halted in the thread when max chunks is internally buffered.
150 Reading is halted in the thread when max chunks is internally buffered.
151 The .next() may operate in blocking or non-blocking fashion by yielding
151 The .next() may operate in blocking or non-blocking fashion by yielding
152 '' if no data is ready
152 '' if no data is ready
153 to be sent or by not returning until there is some data to send
153 to be sent or by not returning until there is some data to send
154 When we get EOF from underlying source pipe we raise the marker to raise
154 When we get EOF from underlying source pipe we raise the marker to raise
155 StopIteration after the last chunk of data is yielded.
155 StopIteration after the last chunk of data is yielded.
156 '''
156 '''
157
157
158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 starting_values=[], bottomless=False):
159 starting_values=[], bottomless=False):
160
160
161 if bottomless:
161 if bottomless:
162 maxlen = int(buffer_size / chunk_size)
162 maxlen = int(buffer_size / chunk_size)
163 else:
163 else:
164 maxlen = None
164 maxlen = None
165
165
166 self.data = deque(starting_values, maxlen)
166 self.data = deque(starting_values, maxlen)
167
167
168 self.worker = InputStreamChunker(source, self.data, buffer_size,
168 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 chunk_size)
169 chunk_size)
170 if starting_values:
170 if starting_values:
171 self.worker.data_added.set()
171 self.worker.data_added.set()
172 self.worker.start()
172 self.worker.start()
173
173
174 ####################
174 ####################
175 # Generator's methods
175 # Generator's methods
176 ####################
176 ####################
177
177
178 def __iter__(self):
178 def __iter__(self):
179 return self
179 return self
180
180
181 def next(self):
181 def next(self):
182 while not len(self.data) and not self.worker.EOF.is_set():
182 while not len(self.data) and not self.worker.EOF.is_set():
183 self.worker.data_added.clear()
183 self.worker.data_added.clear()
184 self.worker.data_added.wait(0.2)
184 self.worker.data_added.wait(0.2)
185 if len(self.data):
185 if len(self.data):
186 self.worker.keep_reading.set()
186 self.worker.keep_reading.set()
187 return _bytes(self.data.popleft())
187 return _bytes(self.data.popleft())
188 elif self.worker.EOF.is_set():
188 elif self.worker.EOF.is_set():
189 raise StopIteration
189 raise StopIteration
190
190
191 def throw(self, type, value=None, traceback=None):
191 def throw(self, type, value=None, traceback=None):
192 if not self.worker.EOF.is_set():
192 if not self.worker.EOF.is_set():
193 raise type(value)
193 raise type(value)
194
194
195 def start(self):
195 def start(self):
196 self.worker.start()
196 self.worker.start()
197
197
198 def stop(self):
198 def stop(self):
199 self.worker.stop()
199 self.worker.stop()
200
200
201 def close(self):
201 def close(self):
202 try:
202 try:
203 self.worker.stop()
203 self.worker.stop()
204 self.throw(GeneratorExit)
204 self.throw(GeneratorExit)
205 except (GeneratorExit, StopIteration):
205 except (GeneratorExit, StopIteration):
206 pass
206 pass
207
207
208 def __del__(self):
208 def __del__(self):
209 self.close()
209 self.close()
210
210
211 ####################
211 ####################
212 # Threaded reader's infrastructure.
212 # Threaded reader's infrastructure.
213 ####################
213 ####################
214 @property
214 @property
215 def input(self):
215 def input(self):
216 return self.worker.w
216 return self.worker.w
217
217
218 @property
218 @property
219 def data_added_event(self):
219 def data_added_event(self):
220 return self.worker.data_added
220 return self.worker.data_added
221
221
222 @property
222 @property
223 def data_added(self):
223 def data_added(self):
224 return self.worker.data_added.is_set()
224 return self.worker.data_added.is_set()
225
225
226 @property
226 @property
227 def reading_paused(self):
227 def reading_paused(self):
228 return not self.worker.keep_reading.is_set()
228 return not self.worker.keep_reading.is_set()
229
229
230 @property
230 @property
231 def done_reading_event(self):
231 def done_reading_event(self):
232 '''
232 '''
233 Done_reding does not mean that the iterator's buffer is empty.
233 Done_reding does not mean that the iterator's buffer is empty.
234 Iterator might have done reading from underlying source, but the read
234 Iterator might have done reading from underlying source, but the read
235 chunks might still be available for serving through .next() method.
235 chunks might still be available for serving through .next() method.
236
236
237 @return An Event class instance.
237 @return An Event class instance.
238 '''
238 '''
239 return self.worker.EOF
239 return self.worker.EOF
240
240
241 @property
241 @property
242 def done_reading(self):
242 def done_reading(self):
243 '''
243 '''
244 Done_reding does not mean that the iterator's buffer is empty.
244 Done_reding does not mean that the iterator's buffer is empty.
245 Iterator might have done reading from underlying source, but the read
245 Iterator might have done reading from underlying source, but the read
246 chunks might still be available for serving through .next() method.
246 chunks might still be available for serving through .next() method.
247
247
248 @return An Bool value.
248 @return An Bool value.
249 '''
249 '''
250 return self.worker.EOF.is_set()
250 return self.worker.EOF.is_set()
251
251
252 @property
252 @property
253 def length(self):
253 def length(self):
254 '''
254 '''
255 returns int.
255 returns int.
256
256
257 This is the lenght of the que of chunks, not the length of
257 This is the lenght of the que of chunks, not the length of
258 the combined contents in those chunks.
258 the combined contents in those chunks.
259
259
260 __len__() cannot be meaningfully implemented because this
260 __len__() cannot be meaningfully implemented because this
261 reader is just flying throuh a bottomless pit content and
261 reader is just flying throuh a bottomless pit content and
262 can only know the lenght of what it already saw.
262 can only know the lenght of what it already saw.
263
263
264 If __len__() on WSGI server per PEP 3333 returns a value,
264 If __len__() on WSGI server per PEP 3333 returns a value,
265 the responce's length will be set to that. In order not to
265 the responce's length will be set to that. In order not to
266 confuse WSGI PEP3333 servers, we will not implement __len__
266 confuse WSGI PEP3333 servers, we will not implement __len__
267 at all.
267 at all.
268 '''
268 '''
269 return len(self.data)
269 return len(self.data)
270
270
271 def prepend(self, x):
271 def prepend(self, x):
272 self.data.appendleft(x)
272 self.data.appendleft(x)
273
273
274 def append(self, x):
274 def append(self, x):
275 self.data.append(x)
275 self.data.append(x)
276
276
277 def extend(self, o):
277 def extend(self, o):
278 self.data.extend(o)
278 self.data.extend(o)
279
279
280 def __getitem__(self, i):
280 def __getitem__(self, i):
281 return self.data[i]
281 return self.data[i]
282
282
283
283
284 class SubprocessIOChunker(object):
284 class SubprocessIOChunker(object):
285 '''
285 '''
286 Processor class wrapping handling of subprocess IO.
286 Processor class wrapping handling of subprocess IO.
287
287
288 In a way, this is a "communicate()" replacement with a twist.
288 In a way, this is a "communicate()" replacement with a twist.
289
289
290 - We are multithreaded. Writing in and reading out, err are all sep threads.
290 - We are multithreaded. Writing in and reading out, err are all sep threads.
291 - We support concurrent (in and out) stream processing.
291 - We support concurrent (in and out) stream processing.
292 - The output is not a stream. It's a queue of read string (bytes, not unicode)
292 - The output is not a stream. It's a queue of read string (bytes, not unicode)
293 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
293 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
294 - We are non-blocking in more respects than communicate()
294 - We are non-blocking in more respects than communicate()
295 (reading from subprocess out pauses when internal buffer is full, but
295 (reading from subprocess out pauses when internal buffer is full, but
296 does not block the parent calling code. On the flip side, reading from
296 does not block the parent calling code. On the flip side, reading from
297 slow-yielding subprocess may block the iteration until data shows up. This
297 slow-yielding subprocess may block the iteration until data shows up. This
298 does not block the parallel inpipe reading occurring parallel thread.)
298 does not block the parallel inpipe reading occurring parallel thread.)
299
299
300 The purpose of the object is to allow us to wrap subprocess interactions into
300 The purpose of the object is to allow us to wrap subprocess interactions into
301 and interable that can be passed to a WSGI server as the application's return
301 and interable that can be passed to a WSGI server as the application's return
302 value. Because of stream-processing-ability, WSGI does not have to read ALL
302 value. Because of stream-processing-ability, WSGI does not have to read ALL
303 of the subprocess's output and buffer it, before handing it to WSGI server for
303 of the subprocess's output and buffer it, before handing it to WSGI server for
304 HTTP response. Instead, the class initializer reads just a bit of the stream
304 HTTP response. Instead, the class initializer reads just a bit of the stream
305 to figure out if error ocurred or likely to occur and if not, just hands the
305 to figure out if error ocurred or likely to occur and if not, just hands the
306 further iteration over subprocess output to the server for completion of HTTP
306 further iteration over subprocess output to the server for completion of HTTP
307 response.
307 response.
308
308
309 The real or perceived subprocess error is trapped and raised as one of
309 The real or perceived subprocess error is trapped and raised as one of
310 EnvironmentError family of exceptions
310 EnvironmentError family of exceptions
311
311
312 Example usage:
312 Example usage:
313 # try:
313 # try:
314 # answer = SubprocessIOChunker(
314 # answer = SubprocessIOChunker(
315 # cmd,
315 # cmd,
316 # input,
316 # input,
317 # buffer_size = 65536,
317 # buffer_size = 65536,
318 # chunk_size = 4096
318 # chunk_size = 4096
319 # )
319 # )
320 # except (EnvironmentError) as e:
320 # except (EnvironmentError) as e:
321 # print str(e)
321 # print str(e)
322 # raise e
322 # raise e
323 #
323 #
324 # return answer
324 # return answer
325
325
326
326
327 '''
327 '''
328 def __init__(self, cmd, inputstream=None, buffer_size=65536,
328 def __init__(self, cmd, inputstream=None, buffer_size=65536,
329 chunk_size=4096, starting_values=[], **kwargs):
329 chunk_size=4096, starting_values=[], **kwargs):
330 '''
330 '''
331 Initializes SubprocessIOChunker
331 Initializes SubprocessIOChunker
332
332
333 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
333 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
334 :param inputstream: (Default: None) A file-like, string, or file pointer.
334 :param inputstream: (Default: None) A file-like, string, or file pointer.
335 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
335 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
336 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
336 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
337 :param starting_values: (Default: []) An array of strings to put in front of output que.
337 :param starting_values: (Default: []) An array of strings to put in front of output que.
338 '''
338 '''
339
339
340 if inputstream:
340 if inputstream:
341 input_streamer = StreamFeeder(inputstream)
341 input_streamer = StreamFeeder(inputstream)
342 input_streamer.start()
342 input_streamer.start()
343 inputstream = input_streamer.output
343 inputstream = input_streamer.output
344
344
345 _shell = kwargs.get('shell', True)
345 if isinstance(cmd, (list, tuple)):
346 if isinstance(cmd, (list, tuple)):
346 cmd = ' '.join(cmd)
347 cmd = ' '.join(cmd)
347
348
348 _shell = kwargs.get('shell') or True
349 kwargs['shell'] = _shell
349 kwargs['shell'] = _shell
350 _p = subprocess.Popen(cmd,
350 _p = subprocess.Popen(cmd,
351 bufsize=-1,
351 bufsize=-1,
352 stdin=inputstream,
352 stdin=inputstream,
353 stdout=subprocess.PIPE,
353 stdout=subprocess.PIPE,
354 stderr=subprocess.PIPE,
354 stderr=subprocess.PIPE,
355 **kwargs
355 **kwargs
356 )
356 )
357
357
358 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
358 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
359 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
359 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
360
360
361 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
361 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
362 # doing this until we reach either end of file, or end of buffer.
362 # doing this until we reach either end of file, or end of buffer.
363 bg_out.data_added_event.wait(1)
363 bg_out.data_added_event.wait(1)
364 bg_out.data_added_event.clear()
364 bg_out.data_added_event.clear()
365
365
366 # at this point it's still ambiguous if we are done reading or just full buffer.
366 # at this point it's still ambiguous if we are done reading or just full buffer.
367 # Either way, if error (returned by ended process, or implied based on
367 # Either way, if error (returned by ended process, or implied based on
368 # presence of stuff in stderr output) we error out.
368 # presence of stuff in stderr output) we error out.
369 # Else, we are happy.
369 # Else, we are happy.
370 _returncode = _p.poll()
370 _returncode = _p.poll()
371 if _returncode or (_returncode == None and bg_err.length):
371 if _returncode or (_returncode == None and bg_err.length):
372 try:
372 try:
373 _p.terminate()
373 _p.terminate()
374 except:
374 except:
375 pass
375 pass
376 bg_out.stop()
376 bg_out.stop()
377 bg_err.stop()
377 bg_err.stop()
378 err = '%s' % ''.join(bg_err)
378 err = '%s' % ''.join(bg_err)
379 if err:
379 if err:
380 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
380 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
381 raise EnvironmentError("Subprocess exited with non 0 ret code:%s" % _returncode)
381 raise EnvironmentError("Subprocess exited with non 0 ret code:%s" % _returncode)
382
382
383 self.process = _p
383 self.process = _p
384 self.output = bg_out
384 self.output = bg_out
385 self.error = bg_err
385 self.error = bg_err
386
386
387 def __iter__(self):
387 def __iter__(self):
388 return self
388 return self
389
389
390 def next(self):
390 def next(self):
391 if self.process.poll():
391 if self.process.poll():
392 err = '%s' % ''.join(self.error)
392 err = '%s' % ''.join(self.error)
393 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
393 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
394 return self.output.next()
394 return self.output.next()
395
395
396 def throw(self, type, value=None, traceback=None):
396 def throw(self, type, value=None, traceback=None):
397 if self.output.length or not self.output.done_reading:
397 if self.output.length or not self.output.done_reading:
398 raise type(value)
398 raise type(value)
399
399
400 def close(self):
400 def close(self):
401 try:
401 try:
402 self.process.terminate()
402 self.process.terminate()
403 except:
403 except:
404 pass
404 pass
405 try:
405 try:
406 self.output.close()
406 self.output.close()
407 except:
407 except:
408 pass
408 pass
409 try:
409 try:
410 self.error.close()
410 self.error.close()
411 except:
411 except:
412 pass
412 pass
413
413
414 def __del__(self):
414 def __del__(self):
415 self.close()
415 self.close()
General Comments 0
You need to be logged in to leave comments. Login now