##// END OF EJS Templates
linting
super-admin -
r1095:268c6aa4 python3
parent child Browse files
Show More
@@ -1,738 +1,738 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # RhodeCode VCSServer provides access to different vcs backends via network.
3 # RhodeCode VCSServer provides access to different vcs backends via network.
4 # Copyright (C) 2014-2020 RhodeCode GmbH
4 # Copyright (C) 2014-2020 RhodeCode GmbH
5 #
5 #
6 # This program is free software; you can redistribute it and/or modify
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
9 # (at your option) any later version.
10 #
10 #
11 # This program is distributed in the hope that it will be useful,
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
14 # GNU General Public License for more details.
15 #
15 #
16 # You should have received a copy of the GNU General Public License
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software Foundation,
17 # along with this program; if not, write to the Free Software Foundation,
18 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
19
20 import io
20 import io
21 import os
21 import os
22 import sys
22 import sys
23 import logging
23 import logging
24 import collections
24 import collections
25 import importlib
25 import importlib
26 import base64
26 import base64
27 import msgpack
27 import msgpack
28
28
29 from http.client import HTTPConnection
29 from http.client import HTTPConnection
30
30
31
31
32 import mercurial.scmutil
32 import mercurial.scmutil
33 import mercurial.node
33 import mercurial.node
34
34
35 from vcsserver.lib.rc_json import json
35 from vcsserver.lib.rc_json import json
36 from vcsserver import exceptions, subprocessio, settings
36 from vcsserver import exceptions, subprocessio, settings
37 from vcsserver.str_utils import safe_bytes
37 from vcsserver.str_utils import safe_bytes
38
38
39 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
40
40
41
41
42 class HooksHttpClient(object):
42 class HooksHttpClient(object):
43 proto = 'msgpack.v1'
43 proto = 'msgpack.v1'
44 connection = None
44 connection = None
45
45
46 def __init__(self, hooks_uri):
46 def __init__(self, hooks_uri):
47 self.hooks_uri = hooks_uri
47 self.hooks_uri = hooks_uri
48
48
49 def __call__(self, method, extras):
49 def __call__(self, method, extras):
50 connection = HTTPConnection(self.hooks_uri)
50 connection = HTTPConnection(self.hooks_uri)
51 # binary msgpack body
51 # binary msgpack body
52 headers, body = self._serialize(method, extras)
52 headers, body = self._serialize(method, extras)
53 try:
53 try:
54 connection.request('POST', '/', body, headers)
54 connection.request('POST', '/', body, headers)
55 except Exception as error:
55 except Exception as error:
56 log.error('Hooks calling Connection failed on %s, org error: %s', connection.__dict__, error)
56 log.error('Hooks calling Connection failed on %s, org error: %s', connection.__dict__, error)
57 raise
57 raise
58 response = connection.getresponse()
58 response = connection.getresponse()
59 try:
59 try:
60 return msgpack.load(response)
60 return msgpack.load(response)
61 except Exception:
61 except Exception:
62 response_data = response.read()
62 response_data = response.read()
63 log.exception('Failed to decode hook response json data. '
63 log.exception('Failed to decode hook response json data. '
64 'response_code:%s, raw_data:%s',
64 'response_code:%s, raw_data:%s',
65 response.status, response_data)
65 response.status, response_data)
66 raise
66 raise
67
67
68 @classmethod
68 @classmethod
69 def _serialize(cls, hook_name, extras):
69 def _serialize(cls, hook_name, extras):
70 data = {
70 data = {
71 'method': hook_name,
71 'method': hook_name,
72 'extras': extras
72 'extras': extras
73 }
73 }
74 headers = {
74 headers = {
75 'rc-hooks-protocol': cls.proto
75 'rc-hooks-protocol': cls.proto
76 }
76 }
77 return headers, msgpack.packb(data)
77 return headers, msgpack.packb(data)
78
78
79
79
80 class HooksDummyClient(object):
80 class HooksDummyClient(object):
81 def __init__(self, hooks_module):
81 def __init__(self, hooks_module):
82 self._hooks_module = importlib.import_module(hooks_module)
82 self._hooks_module = importlib.import_module(hooks_module)
83
83
84 def __call__(self, hook_name, extras):
84 def __call__(self, hook_name, extras):
85 with self._hooks_module.Hooks() as hooks:
85 with self._hooks_module.Hooks() as hooks:
86 return getattr(hooks, hook_name)(extras)
86 return getattr(hooks, hook_name)(extras)
87
87
88
88
89 class HooksShadowRepoClient(object):
89 class HooksShadowRepoClient(object):
90
90
91 def __call__(self, hook_name, extras):
91 def __call__(self, hook_name, extras):
92 return {'output': '', 'status': 0}
92 return {'output': '', 'status': 0}
93
93
94
94
95 class RemoteMessageWriter(object):
95 class RemoteMessageWriter(object):
96 """Writer base class."""
96 """Writer base class."""
97 def write(self, message):
97 def write(self, message):
98 raise NotImplementedError()
98 raise NotImplementedError()
99
99
100
100
101 class HgMessageWriter(RemoteMessageWriter):
101 class HgMessageWriter(RemoteMessageWriter):
102 """Writer that knows how to send messages to mercurial clients."""
102 """Writer that knows how to send messages to mercurial clients."""
103
103
104 def __init__(self, ui):
104 def __init__(self, ui):
105 self.ui = ui
105 self.ui = ui
106
106
107 def write(self, message):
107 def write(self, message):
108 # TODO: Check why the quiet flag is set by default.
108 # TODO: Check why the quiet flag is set by default.
109 old = self.ui.quiet
109 old = self.ui.quiet
110 self.ui.quiet = False
110 self.ui.quiet = False
111 self.ui.status(message.encode('utf-8'))
111 self.ui.status(message.encode('utf-8'))
112 self.ui.quiet = old
112 self.ui.quiet = old
113
113
114
114
115 class GitMessageWriter(RemoteMessageWriter):
115 class GitMessageWriter(RemoteMessageWriter):
116 """Writer that knows how to send messages to git clients."""
116 """Writer that knows how to send messages to git clients."""
117
117
118 def __init__(self, stdout=None):
118 def __init__(self, stdout=None):
119 self.stdout = stdout or sys.stdout
119 self.stdout = stdout or sys.stdout
120
120
121 def write(self, message):
121 def write(self, message):
122 self.stdout.write(safe_bytes(message))
122 self.stdout.write(safe_bytes(message))
123
123
124
124
125 class SvnMessageWriter(RemoteMessageWriter):
125 class SvnMessageWriter(RemoteMessageWriter):
126 """Writer that knows how to send messages to svn clients."""
126 """Writer that knows how to send messages to svn clients."""
127
127
128 def __init__(self, stderr=None):
128 def __init__(self, stderr=None):
129 # SVN needs data sent to stderr for back-to-client messaging
129 # SVN needs data sent to stderr for back-to-client messaging
130 self.stderr = stderr or sys.stderr
130 self.stderr = stderr or sys.stderr
131
131
132 def write(self, message):
132 def write(self, message):
133 self.stderr.write(message.encode('utf-8'))
133 self.stderr.write(message.encode('utf-8'))
134
134
135
135
136 def _handle_exception(result):
136 def _handle_exception(result):
137 exception_class = result.get('exception')
137 exception_class = result.get('exception')
138 exception_traceback = result.get('exception_traceback')
138 exception_traceback = result.get('exception_traceback')
139
139
140 if exception_traceback:
140 if exception_traceback:
141 log.error('Got traceback from remote call:%s', exception_traceback)
141 log.error('Got traceback from remote call:%s', exception_traceback)
142
142
143 if exception_class == 'HTTPLockedRC':
143 if exception_class == 'HTTPLockedRC':
144 raise exceptions.RepositoryLockedException()(*result['exception_args'])
144 raise exceptions.RepositoryLockedException()(*result['exception_args'])
145 elif exception_class == 'HTTPBranchProtected':
145 elif exception_class == 'HTTPBranchProtected':
146 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
146 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
147 elif exception_class == 'RepositoryError':
147 elif exception_class == 'RepositoryError':
148 raise exceptions.VcsException()(*result['exception_args'])
148 raise exceptions.VcsException()(*result['exception_args'])
149 elif exception_class:
149 elif exception_class:
150 raise Exception('Got remote exception "%s" with args "%s"' %
150 raise Exception('Got remote exception "%s" with args "%s"' %
151 (exception_class, result['exception_args']))
151 (exception_class, result['exception_args']))
152
152
153
153
154 def _get_hooks_client(extras):
154 def _get_hooks_client(extras):
155 hooks_uri = extras.get('hooks_uri')
155 hooks_uri = extras.get('hooks_uri')
156 is_shadow_repo = extras.get('is_shadow_repo')
156 is_shadow_repo = extras.get('is_shadow_repo')
157 if hooks_uri:
157 if hooks_uri:
158 return HooksHttpClient(extras['hooks_uri'])
158 return HooksHttpClient(extras['hooks_uri'])
159 elif is_shadow_repo:
159 elif is_shadow_repo:
160 return HooksShadowRepoClient()
160 return HooksShadowRepoClient()
161 else:
161 else:
162 return HooksDummyClient(extras['hooks_module'])
162 return HooksDummyClient(extras['hooks_module'])
163
163
164
164
165 def _call_hook(hook_name, extras, writer):
165 def _call_hook(hook_name, extras, writer):
166 hooks_client = _get_hooks_client(extras)
166 hooks_client = _get_hooks_client(extras)
167 log.debug('Hooks, using client:%s', hooks_client)
167 log.debug('Hooks, using client:%s', hooks_client)
168 result = hooks_client(hook_name, extras)
168 result = hooks_client(hook_name, extras)
169 log.debug('Hooks got result: %s', result)
169 log.debug('Hooks got result: %s', result)
170
170
171 _handle_exception(result)
171 _handle_exception(result)
172 writer.write(result['output'])
172 writer.write(result['output'])
173
173
174 return result['status']
174 return result['status']
175
175
176
176
177 def _extras_from_ui(ui):
177 def _extras_from_ui(ui):
178 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
178 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
179 if not hook_data:
179 if not hook_data:
180 # maybe it's inside environ ?
180 # maybe it's inside environ ?
181 env_hook_data = os.environ.get('RC_SCM_DATA')
181 env_hook_data = os.environ.get('RC_SCM_DATA')
182 if env_hook_data:
182 if env_hook_data:
183 hook_data = env_hook_data
183 hook_data = env_hook_data
184
184
185 extras = {}
185 extras = {}
186 if hook_data:
186 if hook_data:
187 extras = json.loads(hook_data)
187 extras = json.loads(hook_data)
188 return extras
188 return extras
189
189
190
190
191 def _rev_range_hash(repo, node, check_heads=False):
191 def _rev_range_hash(repo, node, check_heads=False):
192 from vcsserver.hgcompat import get_ctx
192 from vcsserver.hgcompat import get_ctx
193
193
194 commits = []
194 commits = []
195 revs = []
195 revs = []
196 start = get_ctx(repo, node).rev()
196 start = get_ctx(repo, node).rev()
197 end = len(repo)
197 end = len(repo)
198 for rev in range(start, end):
198 for rev in range(start, end):
199 revs.append(rev)
199 revs.append(rev)
200 ctx = get_ctx(repo, rev)
200 ctx = get_ctx(repo, rev)
201 commit_id = mercurial.node.hex(ctx.node())
201 commit_id = mercurial.node.hex(ctx.node())
202 branch = ctx.branch()
202 branch = ctx.branch()
203 commits.append((commit_id, branch))
203 commits.append((commit_id, branch))
204
204
205 parent_heads = []
205 parent_heads = []
206 if check_heads:
206 if check_heads:
207 parent_heads = _check_heads(repo, start, end, revs)
207 parent_heads = _check_heads(repo, start, end, revs)
208 return commits, parent_heads
208 return commits, parent_heads
209
209
210
210
211 def _check_heads(repo, start, end, commits):
211 def _check_heads(repo, start, end, commits):
212 from vcsserver.hgcompat import get_ctx
212 from vcsserver.hgcompat import get_ctx
213 changelog = repo.changelog
213 changelog = repo.changelog
214 parents = set()
214 parents = set()
215
215
216 for new_rev in commits:
216 for new_rev in commits:
217 for p in changelog.parentrevs(new_rev):
217 for p in changelog.parentrevs(new_rev):
218 if p == mercurial.node.nullrev:
218 if p == mercurial.node.nullrev:
219 continue
219 continue
220 if p < start:
220 if p < start:
221 parents.add(p)
221 parents.add(p)
222
222
223 for p in parents:
223 for p in parents:
224 branch = get_ctx(repo, p).branch()
224 branch = get_ctx(repo, p).branch()
225 # The heads descending from that parent, on the same branch
225 # The heads descending from that parent, on the same branch
226 parent_heads = set([p])
226 parent_heads = set([p])
227 reachable = set([p])
227 reachable = set([p])
228 for x in range(p + 1, end):
228 for x in range(p + 1, end):
229 if get_ctx(repo, x).branch() != branch:
229 if get_ctx(repo, x).branch() != branch:
230 continue
230 continue
231 for pp in changelog.parentrevs(x):
231 for pp in changelog.parentrevs(x):
232 if pp in reachable:
232 if pp in reachable:
233 reachable.add(x)
233 reachable.add(x)
234 parent_heads.discard(pp)
234 parent_heads.discard(pp)
235 parent_heads.add(x)
235 parent_heads.add(x)
236 # More than one head? Suggest merging
236 # More than one head? Suggest merging
237 if len(parent_heads) > 1:
237 if len(parent_heads) > 1:
238 return list(parent_heads)
238 return list(parent_heads)
239
239
240 return []
240 return []
241
241
242
242
243 def _get_git_env():
243 def _get_git_env():
244 env = {}
244 env = {}
245 for k, v in os.environ.items():
245 for k, v in os.environ.items():
246 if k.startswith('GIT'):
246 if k.startswith('GIT'):
247 env[k] = v
247 env[k] = v
248
248
249 # serialized version
249 # serialized version
250 return [(k, v) for k, v in env.items()]
250 return [(k, v) for k, v in env.items()]
251
251
252
252
253 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
253 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
254 env = {}
254 env = {}
255 for k, v in os.environ.items():
255 for k, v in os.environ.items():
256 if k.startswith('HG'):
256 if k.startswith('HG'):
257 env[k] = v
257 env[k] = v
258
258
259 env['HG_NODE'] = old_rev
259 env['HG_NODE'] = old_rev
260 env['HG_NODE_LAST'] = new_rev
260 env['HG_NODE_LAST'] = new_rev
261 env['HG_TXNID'] = txnid
261 env['HG_TXNID'] = txnid
262 env['HG_PENDING'] = repo_path
262 env['HG_PENDING'] = repo_path
263
263
264 return [(k, v) for k, v in env.items()]
264 return [(k, v) for k, v in env.items()]
265
265
266
266
267 def repo_size(ui, repo, **kwargs):
267 def repo_size(ui, repo, **kwargs):
268 extras = _extras_from_ui(ui)
268 extras = _extras_from_ui(ui)
269 return _call_hook('repo_size', extras, HgMessageWriter(ui))
269 return _call_hook('repo_size', extras, HgMessageWriter(ui))
270
270
271
271
272 def pre_pull(ui, repo, **kwargs):
272 def pre_pull(ui, repo, **kwargs):
273 extras = _extras_from_ui(ui)
273 extras = _extras_from_ui(ui)
274 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
274 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
275
275
276
276
277 def pre_pull_ssh(ui, repo, **kwargs):
277 def pre_pull_ssh(ui, repo, **kwargs):
278 extras = _extras_from_ui(ui)
278 extras = _extras_from_ui(ui)
279 if extras and extras.get('SSH'):
279 if extras and extras.get('SSH'):
280 return pre_pull(ui, repo, **kwargs)
280 return pre_pull(ui, repo, **kwargs)
281 return 0
281 return 0
282
282
283
283
284 def post_pull(ui, repo, **kwargs):
284 def post_pull(ui, repo, **kwargs):
285 extras = _extras_from_ui(ui)
285 extras = _extras_from_ui(ui)
286 return _call_hook('post_pull', extras, HgMessageWriter(ui))
286 return _call_hook('post_pull', extras, HgMessageWriter(ui))
287
287
288
288
289 def post_pull_ssh(ui, repo, **kwargs):
289 def post_pull_ssh(ui, repo, **kwargs):
290 extras = _extras_from_ui(ui)
290 extras = _extras_from_ui(ui)
291 if extras and extras.get('SSH'):
291 if extras and extras.get('SSH'):
292 return post_pull(ui, repo, **kwargs)
292 return post_pull(ui, repo, **kwargs)
293 return 0
293 return 0
294
294
295
295
296 def pre_push(ui, repo, node=None, **kwargs):
296 def pre_push(ui, repo, node=None, **kwargs):
297 """
297 """
298 Mercurial pre_push hook
298 Mercurial pre_push hook
299 """
299 """
300 extras = _extras_from_ui(ui)
300 extras = _extras_from_ui(ui)
301 detect_force_push = extras.get('detect_force_push')
301 detect_force_push = extras.get('detect_force_push')
302
302
303 rev_data = []
303 rev_data = []
304 if node and kwargs.get('hooktype') == 'pretxnchangegroup':
304 if node and kwargs.get('hooktype') == 'pretxnchangegroup':
305 branches = collections.defaultdict(list)
305 branches = collections.defaultdict(list)
306 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
306 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
307 for commit_id, branch in commits:
307 for commit_id, branch in commits:
308 branches[branch].append(commit_id)
308 branches[branch].append(commit_id)
309
309
310 for branch, commits in branches.items():
310 for branch, commits in branches.items():
311 old_rev = kwargs.get('node_last') or commits[0]
311 old_rev = kwargs.get('node_last') or commits[0]
312 rev_data.append({
312 rev_data.append({
313 'total_commits': len(commits),
313 'total_commits': len(commits),
314 'old_rev': old_rev,
314 'old_rev': old_rev,
315 'new_rev': commits[-1],
315 'new_rev': commits[-1],
316 'ref': '',
316 'ref': '',
317 'type': 'branch',
317 'type': 'branch',
318 'name': branch,
318 'name': branch,
319 })
319 })
320
320
321 for push_ref in rev_data:
321 for push_ref in rev_data:
322 push_ref['multiple_heads'] = _heads
322 push_ref['multiple_heads'] = _heads
323
323
324 repo_path = os.path.join(
324 repo_path = os.path.join(
325 extras.get('repo_store', ''), extras.get('repository', ''))
325 extras.get('repo_store', ''), extras.get('repository', ''))
326 push_ref['hg_env'] = _get_hg_env(
326 push_ref['hg_env'] = _get_hg_env(
327 old_rev=push_ref['old_rev'],
327 old_rev=push_ref['old_rev'],
328 new_rev=push_ref['new_rev'], txnid=kwargs.get('txnid'),
328 new_rev=push_ref['new_rev'], txnid=kwargs.get('txnid'),
329 repo_path=repo_path)
329 repo_path=repo_path)
330
330
331 extras['hook_type'] = kwargs.get('hooktype', 'pre_push')
331 extras['hook_type'] = kwargs.get('hooktype', 'pre_push')
332 extras['commit_ids'] = rev_data
332 extras['commit_ids'] = rev_data
333
333
334 return _call_hook('pre_push', extras, HgMessageWriter(ui))
334 return _call_hook('pre_push', extras, HgMessageWriter(ui))
335
335
336
336
337 def pre_push_ssh(ui, repo, node=None, **kwargs):
337 def pre_push_ssh(ui, repo, node=None, **kwargs):
338 extras = _extras_from_ui(ui)
338 extras = _extras_from_ui(ui)
339 if extras.get('SSH'):
339 if extras.get('SSH'):
340 return pre_push(ui, repo, node, **kwargs)
340 return pre_push(ui, repo, node, **kwargs)
341
341
342 return 0
342 return 0
343
343
344
344
345 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
345 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
346 """
346 """
347 Mercurial pre_push hook for SSH
347 Mercurial pre_push hook for SSH
348 """
348 """
349 extras = _extras_from_ui(ui)
349 extras = _extras_from_ui(ui)
350 if extras.get('SSH'):
350 if extras.get('SSH'):
351 permission = extras['SSH_PERMISSIONS']
351 permission = extras['SSH_PERMISSIONS']
352
352
353 if 'repository.write' == permission or 'repository.admin' == permission:
353 if 'repository.write' == permission or 'repository.admin' == permission:
354 return 0
354 return 0
355
355
356 # non-zero ret code
356 # non-zero ret code
357 return 1
357 return 1
358
358
359 return 0
359 return 0
360
360
361
361
362 def post_push(ui, repo, node, **kwargs):
362 def post_push(ui, repo, node, **kwargs):
363 """
363 """
364 Mercurial post_push hook
364 Mercurial post_push hook
365 """
365 """
366 extras = _extras_from_ui(ui)
366 extras = _extras_from_ui(ui)
367
367
368 commit_ids = []
368 commit_ids = []
369 branches = []
369 branches = []
370 bookmarks = []
370 bookmarks = []
371 tags = []
371 tags = []
372
372
373 commits, _heads = _rev_range_hash(repo, node)
373 commits, _heads = _rev_range_hash(repo, node)
374 for commit_id, branch in commits:
374 for commit_id, branch in commits:
375 commit_ids.append(commit_id)
375 commit_ids.append(commit_id)
376 if branch not in branches:
376 if branch not in branches:
377 branches.append(branch)
377 branches.append(branch)
378
378
379 if hasattr(ui, '_rc_pushkey_branches'):
379 if hasattr(ui, '_rc_pushkey_branches'):
380 bookmarks = ui._rc_pushkey_branches
380 bookmarks = ui._rc_pushkey_branches
381
381
382 extras['hook_type'] = kwargs.get('hooktype', 'post_push')
382 extras['hook_type'] = kwargs.get('hooktype', 'post_push')
383 extras['commit_ids'] = commit_ids
383 extras['commit_ids'] = commit_ids
384 extras['new_refs'] = {
384 extras['new_refs'] = {
385 'branches': branches,
385 'branches': branches,
386 'bookmarks': bookmarks,
386 'bookmarks': bookmarks,
387 'tags': tags
387 'tags': tags
388 }
388 }
389
389
390 return _call_hook('post_push', extras, HgMessageWriter(ui))
390 return _call_hook('post_push', extras, HgMessageWriter(ui))
391
391
392
392
393 def post_push_ssh(ui, repo, node, **kwargs):
393 def post_push_ssh(ui, repo, node, **kwargs):
394 """
394 """
395 Mercurial post_push hook for SSH
395 Mercurial post_push hook for SSH
396 """
396 """
397 if _extras_from_ui(ui).get('SSH'):
397 if _extras_from_ui(ui).get('SSH'):
398 return post_push(ui, repo, node, **kwargs)
398 return post_push(ui, repo, node, **kwargs)
399 return 0
399 return 0
400
400
401
401
402 def key_push(ui, repo, **kwargs):
402 def key_push(ui, repo, **kwargs):
403 from vcsserver.hgcompat import get_ctx
403 from vcsserver.hgcompat import get_ctx
404 if kwargs['new'] != '0' and kwargs['namespace'] == 'bookmarks':
404 if kwargs['new'] != '0' and kwargs['namespace'] == 'bookmarks':
405 # store new bookmarks in our UI object propagated later to post_push
405 # store new bookmarks in our UI object propagated later to post_push
406 ui._rc_pushkey_branches = get_ctx(repo, kwargs['key']).bookmarks()
406 ui._rc_pushkey_branches = get_ctx(repo, kwargs['key']).bookmarks()
407 return
407 return
408
408
409
409
410 # backward compat
410 # backward compat
411 log_pull_action = post_pull
411 log_pull_action = post_pull
412
412
413 # backward compat
413 # backward compat
414 log_push_action = post_push
414 log_push_action = post_push
415
415
416
416
417 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
417 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
418 """
418 """
419 Old hook name: keep here for backward compatibility.
419 Old hook name: keep here for backward compatibility.
420
420
421 This is only required when the installed git hooks are not upgraded.
421 This is only required when the installed git hooks are not upgraded.
422 """
422 """
423 pass
423 pass
424
424
425
425
426 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
426 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
427 """
427 """
428 Old hook name: keep here for backward compatibility.
428 Old hook name: keep here for backward compatibility.
429
429
430 This is only required when the installed git hooks are not upgraded.
430 This is only required when the installed git hooks are not upgraded.
431 """
431 """
432 pass
432 pass
433
433
434
434
435 HookResponse = collections.namedtuple('HookResponse', ('status', 'output'))
435 HookResponse = collections.namedtuple('HookResponse', ('status', 'output'))
436
436
437
437
438 def git_pre_pull(extras):
438 def git_pre_pull(extras):
439 """
439 """
440 Pre pull hook.
440 Pre pull hook.
441
441
442 :param extras: dictionary containing the keys defined in simplevcs
442 :param extras: dictionary containing the keys defined in simplevcs
443 :type extras: dict
443 :type extras: dict
444
444
445 :return: status code of the hook. 0 for success.
445 :return: status code of the hook. 0 for success.
446 :rtype: int
446 :rtype: int
447 """
447 """
448
448
449 if 'pull' not in extras['hooks']:
449 if 'pull' not in extras['hooks']:
450 return HookResponse(0, '')
450 return HookResponse(0, '')
451
451
452 stdout = io.BytesIO()
452 stdout = io.BytesIO()
453 try:
453 try:
454 status = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
454 status = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
455
455
456 except Exception as error:
456 except Exception as error:
457 log.exception('Failed to call pre_pull hook')
457 log.exception('Failed to call pre_pull hook')
458 status = 128
458 status = 128
459 stdout.write(safe_bytes(f'ERROR: {error}\n'))
459 stdout.write(safe_bytes(f'ERROR: {error}\n'))
460
460
461 return HookResponse(status, stdout.getvalue())
461 return HookResponse(status, stdout.getvalue())
462
462
463
463
464 def git_post_pull(extras):
464 def git_post_pull(extras):
465 """
465 """
466 Post pull hook.
466 Post pull hook.
467
467
468 :param extras: dictionary containing the keys defined in simplevcs
468 :param extras: dictionary containing the keys defined in simplevcs
469 :type extras: dict
469 :type extras: dict
470
470
471 :return: status code of the hook. 0 for success.
471 :return: status code of the hook. 0 for success.
472 :rtype: int
472 :rtype: int
473 """
473 """
474 if 'pull' not in extras['hooks']:
474 if 'pull' not in extras['hooks']:
475 return HookResponse(0, '')
475 return HookResponse(0, '')
476
476
477 stdout = io.BytesIO()
477 stdout = io.BytesIO()
478 try:
478 try:
479 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
479 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
480 except Exception as error:
480 except Exception as error:
481 status = 128
481 status = 128
482 stdout.write(safe_bytes(f'ERROR: {error}\n'))
482 stdout.write(safe_bytes(f'ERROR: {error}\n'))
483
483
484 return HookResponse(status, stdout.getvalue())
484 return HookResponse(status, stdout.getvalue())
485
485
486
486
487 def _parse_git_ref_lines(revision_lines):
487 def _parse_git_ref_lines(revision_lines):
488 rev_data = []
488 rev_data = []
489 for revision_line in revision_lines or []:
489 for revision_line in revision_lines or []:
490 old_rev, new_rev, ref = revision_line.strip().split(' ')
490 old_rev, new_rev, ref = revision_line.strip().split(' ')
491 ref_data = ref.split('/', 2)
491 ref_data = ref.split('/', 2)
492 if ref_data[1] in ('tags', 'heads'):
492 if ref_data[1] in ('tags', 'heads'):
493 rev_data.append({
493 rev_data.append({
494 # NOTE(marcink):
494 # NOTE(marcink):
495 # we're unable to tell total_commits for git at this point
495 # we're unable to tell total_commits for git at this point
496 # but we set the variable for consistency with GIT
496 # but we set the variable for consistency with GIT
497 'total_commits': -1,
497 'total_commits': -1,
498 'old_rev': old_rev,
498 'old_rev': old_rev,
499 'new_rev': new_rev,
499 'new_rev': new_rev,
500 'ref': ref,
500 'ref': ref,
501 'type': ref_data[1],
501 'type': ref_data[1],
502 'name': ref_data[2],
502 'name': ref_data[2],
503 })
503 })
504 return rev_data
504 return rev_data
505
505
506
506
507 def git_pre_receive(unused_repo_path, revision_lines, env):
507 def git_pre_receive(unused_repo_path, revision_lines, env):
508 """
508 """
509 Pre push hook.
509 Pre push hook.
510
510
511 :param extras: dictionary containing the keys defined in simplevcs
511 :param extras: dictionary containing the keys defined in simplevcs
512 :type extras: dict
512 :type extras: dict
513
513
514 :return: status code of the hook. 0 for success.
514 :return: status code of the hook. 0 for success.
515 :rtype: int
515 :rtype: int
516 """
516 """
517 extras = json.loads(env['RC_SCM_DATA'])
517 extras = json.loads(env['RC_SCM_DATA'])
518 rev_data = _parse_git_ref_lines(revision_lines)
518 rev_data = _parse_git_ref_lines(revision_lines)
519 if 'push' not in extras['hooks']:
519 if 'push' not in extras['hooks']:
520 return 0
520 return 0
521 empty_commit_id = '0' * 40
521 empty_commit_id = '0' * 40
522
522
523 detect_force_push = extras.get('detect_force_push')
523 detect_force_push = extras.get('detect_force_push')
524
524
525 for push_ref in rev_data:
525 for push_ref in rev_data:
526 # store our git-env which holds the temp store
526 # store our git-env which holds the temp store
527 push_ref['git_env'] = _get_git_env()
527 push_ref['git_env'] = _get_git_env()
528 push_ref['pruned_sha'] = ''
528 push_ref['pruned_sha'] = ''
529 if not detect_force_push:
529 if not detect_force_push:
530 # don't check for forced-push when we don't need to
530 # don't check for forced-push when we don't need to
531 continue
531 continue
532
532
533 type_ = push_ref['type']
533 type_ = push_ref['type']
534 new_branch = push_ref['old_rev'] == empty_commit_id
534 new_branch = push_ref['old_rev'] == empty_commit_id
535 delete_branch = push_ref['new_rev'] == empty_commit_id
535 delete_branch = push_ref['new_rev'] == empty_commit_id
536 if type_ == 'heads' and not (new_branch or delete_branch):
536 if type_ == 'heads' and not (new_branch or delete_branch):
537 old_rev = push_ref['old_rev']
537 old_rev = push_ref['old_rev']
538 new_rev = push_ref['new_rev']
538 new_rev = push_ref['new_rev']
539 cmd = [settings.GIT_EXECUTABLE, 'rev-list', old_rev, '^{}'.format(new_rev)]
539 cmd = [settings.GIT_EXECUTABLE, 'rev-list', old_rev, '^{}'.format(new_rev)]
540 stdout, stderr = subprocessio.run_command(
540 stdout, stderr = subprocessio.run_command(
541 cmd, env=os.environ.copy())
541 cmd, env=os.environ.copy())
542 # means we're having some non-reachable objects, this forced push was used
542 # means we're having some non-reachable objects, this forced push was used
543 if stdout:
543 if stdout:
544 push_ref['pruned_sha'] = stdout.splitlines()
544 push_ref['pruned_sha'] = stdout.splitlines()
545
545
546 extras['hook_type'] = 'pre_receive'
546 extras['hook_type'] = 'pre_receive'
547 extras['commit_ids'] = rev_data
547 extras['commit_ids'] = rev_data
548 return _call_hook('pre_push', extras, GitMessageWriter())
548 return _call_hook('pre_push', extras, GitMessageWriter())
549
549
550
550
551 def git_post_receive(unused_repo_path, revision_lines, env):
551 def git_post_receive(unused_repo_path, revision_lines, env):
552 """
552 """
553 Post push hook.
553 Post push hook.
554
554
555 :param extras: dictionary containing the keys defined in simplevcs
555 :param extras: dictionary containing the keys defined in simplevcs
556 :type extras: dict
556 :type extras: dict
557
557
558 :return: status code of the hook. 0 for success.
558 :return: status code of the hook. 0 for success.
559 :rtype: int
559 :rtype: int
560 """
560 """
561 extras = json.loads(env['RC_SCM_DATA'])
561 extras = json.loads(env['RC_SCM_DATA'])
562 if 'push' not in extras['hooks']:
562 if 'push' not in extras['hooks']:
563 return 0
563 return 0
564
564
565 rev_data = _parse_git_ref_lines(revision_lines)
565 rev_data = _parse_git_ref_lines(revision_lines)
566
566
567 git_revs = []
567 git_revs = []
568
568
569 # N.B.(skreft): it is ok to just call git, as git before calling a
569 # N.B.(skreft): it is ok to just call git, as git before calling a
570 # subcommand sets the PATH environment variable so that it point to the
570 # subcommand sets the PATH environment variable so that it point to the
571 # correct version of the git executable.
571 # correct version of the git executable.
572 empty_commit_id = '0' * 40
572 empty_commit_id = '0' * 40
573 branches = []
573 branches = []
574 tags = []
574 tags = []
575 for push_ref in rev_data:
575 for push_ref in rev_data:
576 type_ = push_ref['type']
576 type_ = push_ref['type']
577
577
578 if type_ == 'heads':
578 if type_ == 'heads':
579 if push_ref['old_rev'] == empty_commit_id:
579 if push_ref['old_rev'] == empty_commit_id:
580 # starting new branch case
580 # starting new branch case
581 if push_ref['name'] not in branches:
581 if push_ref['name'] not in branches:
582 branches.append(push_ref['name'])
582 branches.append(push_ref['name'])
583
583
584 # Fix up head revision if needed
584 # Fix up head revision if needed
585 cmd = [settings.GIT_EXECUTABLE, 'show', 'HEAD']
585 cmd = [settings.GIT_EXECUTABLE, 'show', 'HEAD']
586 try:
586 try:
587 subprocessio.run_command(cmd, env=os.environ.copy())
587 subprocessio.run_command(cmd, env=os.environ.copy())
588 except Exception:
588 except Exception:
589 push_ref_name = push_ref['name']
589 push_ref_name = push_ref['name']
590 cmd = [settings.GIT_EXECUTABLE, 'symbolic-ref', '"HEAD"', f'"refs/heads/{push_ref_name}"']
590 cmd = [settings.GIT_EXECUTABLE, 'symbolic-ref', '"HEAD"', f'"refs/heads/{push_ref_name}"']
591 print(f"Setting default branch to {push_ref_name}")
591 print(f"Setting default branch to {push_ref_name}")
592 subprocessio.run_command(cmd, env=os.environ.copy())
592 subprocessio.run_command(cmd, env=os.environ.copy())
593
593
594 cmd = [settings.GIT_EXECUTABLE, 'for-each-ref',
594 cmd = [settings.GIT_EXECUTABLE, 'for-each-ref',
595 '--format=%(refname)', 'refs/heads/*']
595 '--format=%(refname)', 'refs/heads/*']
596 stdout, stderr = subprocessio.run_command(
596 stdout, stderr = subprocessio.run_command(
597 cmd, env=os.environ.copy())
597 cmd, env=os.environ.copy())
598 heads = stdout
598 heads = stdout
599 heads = heads.replace(push_ref['ref'], '')
599 heads = heads.replace(push_ref['ref'], '')
600 heads = ' '.join(head for head
600 heads = ' '.join(head for head
601 in heads.splitlines() if head) or '.'
601 in heads.splitlines() if head) or '.'
602 cmd = [settings.GIT_EXECUTABLE, 'log', '--reverse',
602 cmd = [settings.GIT_EXECUTABLE, 'log', '--reverse',
603 '--pretty=format:%H', '--', push_ref['new_rev'],
603 '--pretty=format:%H', '--', push_ref['new_rev'],
604 '--not', heads]
604 '--not', heads]
605 stdout, stderr = subprocessio.run_command(
605 stdout, stderr = subprocessio.run_command(
606 cmd, env=os.environ.copy())
606 cmd, env=os.environ.copy())
607 git_revs.extend(stdout.splitlines())
607 git_revs.extend(stdout.splitlines())
608 elif push_ref['new_rev'] == empty_commit_id:
608 elif push_ref['new_rev'] == empty_commit_id:
609 # delete branch case
609 # delete branch case
610 git_revs.append('delete_branch=>%s' % push_ref['name'])
610 git_revs.append('delete_branch=>%s' % push_ref['name'])
611 else:
611 else:
612 if push_ref['name'] not in branches:
612 if push_ref['name'] not in branches:
613 branches.append(push_ref['name'])
613 branches.append(push_ref['name'])
614
614
615 cmd = [settings.GIT_EXECUTABLE, 'log',
615 cmd = [settings.GIT_EXECUTABLE, 'log',
616 '{old_rev}..{new_rev}'.format(**push_ref),
616 '{old_rev}..{new_rev}'.format(**push_ref),
617 '--reverse', '--pretty=format:%H']
617 '--reverse', '--pretty=format:%H']
618 stdout, stderr = subprocessio.run_command(
618 stdout, stderr = subprocessio.run_command(
619 cmd, env=os.environ.copy())
619 cmd, env=os.environ.copy())
620 git_revs.extend(stdout.splitlines())
620 git_revs.extend(stdout.splitlines())
621 elif type_ == 'tags':
621 elif type_ == 'tags':
622 if push_ref['name'] not in tags:
622 if push_ref['name'] not in tags:
623 tags.append(push_ref['name'])
623 tags.append(push_ref['name'])
624 git_revs.append('tag=>%s' % push_ref['name'])
624 git_revs.append('tag=>%s' % push_ref['name'])
625
625
626 extras['hook_type'] = 'post_receive'
626 extras['hook_type'] = 'post_receive'
627 extras['commit_ids'] = git_revs
627 extras['commit_ids'] = git_revs
628 extras['new_refs'] = {
628 extras['new_refs'] = {
629 'branches': branches,
629 'branches': branches,
630 'bookmarks': [],
630 'bookmarks': [],
631 'tags': tags,
631 'tags': tags,
632 }
632 }
633
633
634 if 'repo_size' in extras['hooks']:
634 if 'repo_size' in extras['hooks']:
635 try:
635 try:
636 _call_hook('repo_size', extras, GitMessageWriter())
636 _call_hook('repo_size', extras, GitMessageWriter())
637 except:
637 except Exception:
638 pass
638 pass
639
639
640 return _call_hook('post_push', extras, GitMessageWriter())
640 return _call_hook('post_push', extras, GitMessageWriter())
641
641
642
642
643 def _get_extras_from_txn_id(path, txn_id):
643 def _get_extras_from_txn_id(path, txn_id):
644 extras = {}
644 extras = {}
645 try:
645 try:
646 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
646 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
647 '-t', txn_id,
647 '-t', txn_id,
648 '--revprop', path, 'rc-scm-extras']
648 '--revprop', path, 'rc-scm-extras']
649 stdout, stderr = subprocessio.run_command(
649 stdout, stderr = subprocessio.run_command(
650 cmd, env=os.environ.copy())
650 cmd, env=os.environ.copy())
651 extras = json.loads(base64.urlsafe_b64decode(stdout))
651 extras = json.loads(base64.urlsafe_b64decode(stdout))
652 except Exception:
652 except Exception:
653 log.exception('Failed to extract extras info from txn_id')
653 log.exception('Failed to extract extras info from txn_id')
654
654
655 return extras
655 return extras
656
656
657
657
658 def _get_extras_from_commit_id(commit_id, path):
658 def _get_extras_from_commit_id(commit_id, path):
659 extras = {}
659 extras = {}
660 try:
660 try:
661 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
661 cmd = [settings.SVNLOOK_EXECUTABLE, 'pget',
662 '-r', commit_id,
662 '-r', commit_id,
663 '--revprop', path, 'rc-scm-extras']
663 '--revprop', path, 'rc-scm-extras']
664 stdout, stderr = subprocessio.run_command(
664 stdout, stderr = subprocessio.run_command(
665 cmd, env=os.environ.copy())
665 cmd, env=os.environ.copy())
666 extras = json.loads(base64.urlsafe_b64decode(stdout))
666 extras = json.loads(base64.urlsafe_b64decode(stdout))
667 except Exception:
667 except Exception:
668 log.exception('Failed to extract extras info from commit_id')
668 log.exception('Failed to extract extras info from commit_id')
669
669
670 return extras
670 return extras
671
671
672
672
673 def svn_pre_commit(repo_path, commit_data, env):
673 def svn_pre_commit(repo_path, commit_data, env):
674 path, txn_id = commit_data
674 path, txn_id = commit_data
675 branches = []
675 branches = []
676 tags = []
676 tags = []
677
677
678 if env.get('RC_SCM_DATA'):
678 if env.get('RC_SCM_DATA'):
679 extras = json.loads(env['RC_SCM_DATA'])
679 extras = json.loads(env['RC_SCM_DATA'])
680 else:
680 else:
681 # fallback method to read from TXN-ID stored data
681 # fallback method to read from TXN-ID stored data
682 extras = _get_extras_from_txn_id(path, txn_id)
682 extras = _get_extras_from_txn_id(path, txn_id)
683 if not extras:
683 if not extras:
684 return 0
684 return 0
685
685
686 extras['hook_type'] = 'pre_commit'
686 extras['hook_type'] = 'pre_commit'
687 extras['commit_ids'] = [txn_id]
687 extras['commit_ids'] = [txn_id]
688 extras['txn_id'] = txn_id
688 extras['txn_id'] = txn_id
689 extras['new_refs'] = {
689 extras['new_refs'] = {
690 'total_commits': 1,
690 'total_commits': 1,
691 'branches': branches,
691 'branches': branches,
692 'bookmarks': [],
692 'bookmarks': [],
693 'tags': tags,
693 'tags': tags,
694 }
694 }
695
695
696 return _call_hook('pre_push', extras, SvnMessageWriter())
696 return _call_hook('pre_push', extras, SvnMessageWriter())
697
697
698
698
699 def svn_post_commit(repo_path, commit_data, env):
699 def svn_post_commit(repo_path, commit_data, env):
700 """
700 """
701 commit_data is path, rev, txn_id
701 commit_data is path, rev, txn_id
702 """
702 """
703 if len(commit_data) == 3:
703 if len(commit_data) == 3:
704 path, commit_id, txn_id = commit_data
704 path, commit_id, txn_id = commit_data
705 elif len(commit_data) == 2:
705 elif len(commit_data) == 2:
706 log.error('Failed to extract txn_id from commit_data using legacy method. '
706 log.error('Failed to extract txn_id from commit_data using legacy method. '
707 'Some functionality might be limited')
707 'Some functionality might be limited')
708 path, commit_id = commit_data
708 path, commit_id = commit_data
709 txn_id = None
709 txn_id = None
710
710
711 branches = []
711 branches = []
712 tags = []
712 tags = []
713
713
714 if env.get('RC_SCM_DATA'):
714 if env.get('RC_SCM_DATA'):
715 extras = json.loads(env['RC_SCM_DATA'])
715 extras = json.loads(env['RC_SCM_DATA'])
716 else:
716 else:
717 # fallback method to read from TXN-ID stored data
717 # fallback method to read from TXN-ID stored data
718 extras = _get_extras_from_commit_id(commit_id, path)
718 extras = _get_extras_from_commit_id(commit_id, path)
719 if not extras:
719 if not extras:
720 return 0
720 return 0
721
721
722 extras['hook_type'] = 'post_commit'
722 extras['hook_type'] = 'post_commit'
723 extras['commit_ids'] = [commit_id]
723 extras['commit_ids'] = [commit_id]
724 extras['txn_id'] = txn_id
724 extras['txn_id'] = txn_id
725 extras['new_refs'] = {
725 extras['new_refs'] = {
726 'branches': branches,
726 'branches': branches,
727 'bookmarks': [],
727 'bookmarks': [],
728 'tags': tags,
728 'tags': tags,
729 'total_commits': 1,
729 'total_commits': 1,
730 }
730 }
731
731
732 if 'repo_size' in extras['hooks']:
732 if 'repo_size' in extras['hooks']:
733 try:
733 try:
734 _call_hook('repo_size', extras, SvnMessageWriter())
734 _call_hook('repo_size', extras, SvnMessageWriter())
735 except Exception:
735 except Exception:
736 pass
736 pass
737
737
738 return _call_hook('post_push', extras, SvnMessageWriter())
738 return _call_hook('post_push', extras, SvnMessageWriter())
@@ -1,563 +1,563 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import collections
26 import collections
27 import logging
27 import logging
28 import subprocess
28 import subprocess
29 import threading
29 import threading
30
30
31 from vcsserver.str_utils import safe_str
31 from vcsserver.str_utils import safe_str
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class StreamFeeder(threading.Thread):
36 class StreamFeeder(threading.Thread):
37 """
37 """
38 Normal writing into pipe-like is blocking once the buffer is filled.
38 Normal writing into pipe-like is blocking once the buffer is filled.
39 This thread allows a thread to seep data from a file-like into a pipe
39 This thread allows a thread to seep data from a file-like into a pipe
40 without blocking the main thread.
40 without blocking the main thread.
41 We close inpipe once the end of the source stream is reached.
41 We close inpipe once the end of the source stream is reached.
42 """
42 """
43
43
44 def __init__(self, source):
44 def __init__(self, source):
45 super(StreamFeeder, self).__init__()
45 super(StreamFeeder, self).__init__()
46 self.daemon = True
46 self.daemon = True
47 filelike = False
47 filelike = False
48 self.bytes = bytes()
48 self.bytes = bytes()
49 if type(source) in (type(''), bytes, bytearray): # string-like
49 if type(source) in (type(''), bytes, bytearray): # string-like
50 self.bytes = bytes(source)
50 self.bytes = bytes(source)
51 else: # can be either file pointer or file-like
51 else: # can be either file pointer or file-like
52 if isinstance(source, int): # file pointer it is
52 if isinstance(source, int): # file pointer it is
53 # converting file descriptor (int) stdin into file-like
53 # converting file descriptor (int) stdin into file-like
54 source = os.fdopen(source, 'rb', 16384)
54 source = os.fdopen(source, 'rb', 16384)
55 # let's see if source is file-like by now
55 # let's see if source is file-like by now
56 filelike = hasattr(source, 'read')
56 filelike = hasattr(source, 'read')
57 if not filelike and not self.bytes:
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable "
58 raise TypeError("StreamFeeder's source object must be a readable "
59 "file-like, a file descriptor, or a string-like.")
59 "file-like, a file descriptor, or a string-like.")
60 self.source = source
60 self.source = source
61 self.readiface, self.writeiface = os.pipe()
61 self.readiface, self.writeiface = os.pipe()
62
62
63 def run(self):
63 def run(self):
64 writer = self.writeiface
64 writer = self.writeiface
65 try:
65 try:
66 if self.bytes:
66 if self.bytes:
67 os.write(writer, self.bytes)
67 os.write(writer, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70
70
71 while 1:
71 while 1:
72 _bytes = s.read(4096)
72 _bytes = s.read(4096)
73 if not _bytes:
73 if not _bytes:
74 break
74 break
75 os.write(writer, _bytes)
75 os.write(writer, _bytes)
76
76
77 finally:
77 finally:
78 os.close(writer)
78 os.close(writer)
79
79
80 @property
80 @property
81 def output(self):
81 def output(self):
82 return self.readiface
82 return self.readiface
83
83
84
84
85 class InputStreamChunker(threading.Thread):
85 class InputStreamChunker(threading.Thread):
86 def __init__(self, source, target, buffer_size, chunk_size):
86 def __init__(self, source, target, buffer_size, chunk_size):
87
87
88 super(InputStreamChunker, self).__init__()
88 super(InputStreamChunker, self).__init__()
89
89
90 self.daemon = True # die die die.
90 self.daemon = True # die die die.
91
91
92 self.source = source
92 self.source = source
93 self.target = target
93 self.target = target
94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 self.chunk_size = chunk_size
95 self.chunk_size = chunk_size
96
96
97 self.data_added = threading.Event()
97 self.data_added = threading.Event()
98 self.data_added.clear()
98 self.data_added.clear()
99
99
100 self.keep_reading = threading.Event()
100 self.keep_reading = threading.Event()
101 self.keep_reading.set()
101 self.keep_reading.set()
102
102
103 self.EOF = threading.Event()
103 self.EOF = threading.Event()
104 self.EOF.clear()
104 self.EOF.clear()
105
105
106 self.go = threading.Event()
106 self.go = threading.Event()
107 self.go.set()
107 self.go.set()
108
108
109 def stop(self):
109 def stop(self):
110 self.go.clear()
110 self.go.clear()
111 self.EOF.set()
111 self.EOF.set()
112 try:
112 try:
113 # this is not proper, but is done to force the reader thread let
113 # this is not proper, but is done to force the reader thread let
114 # go of the input because, if successful, .close() will send EOF
114 # go of the input because, if successful, .close() will send EOF
115 # down the pipe.
115 # down the pipe.
116 self.source.close()
116 self.source.close()
117 except:
117 except Exception:
118 pass
118 pass
119
119
120 def run(self):
120 def run(self):
121 s = self.source
121 s = self.source
122 t = self.target
122 t = self.target
123 cs = self.chunk_size
123 cs = self.chunk_size
124 chunk_count_max = self.chunk_count_max
124 chunk_count_max = self.chunk_count_max
125 keep_reading = self.keep_reading
125 keep_reading = self.keep_reading
126 da = self.data_added
126 da = self.data_added
127 go = self.go
127 go = self.go
128
128
129 try:
129 try:
130 b = s.read(cs)
130 b = s.read(cs)
131 except ValueError:
131 except ValueError:
132 b = ''
132 b = ''
133
133
134 timeout_input = 20
134 timeout_input = 20
135 while b and go.is_set():
135 while b and go.is_set():
136 if len(t) > chunk_count_max:
136 if len(t) > chunk_count_max:
137 keep_reading.clear()
137 keep_reading.clear()
138 keep_reading.wait(timeout_input)
138 keep_reading.wait(timeout_input)
139 if len(t) > chunk_count_max + timeout_input:
139 if len(t) > chunk_count_max + timeout_input:
140 log.error("Timed out while waiting for input from subprocess.")
140 log.error("Timed out while waiting for input from subprocess.")
141 os._exit(-1) # this will cause the worker to recycle itself
141 os._exit(-1) # this will cause the worker to recycle itself
142
142
143 t.append(b)
143 t.append(b)
144 da.set()
144 da.set()
145
145
146 try:
146 try:
147 b = s.read(cs)
147 b = s.read(cs)
148 except ValueError: # probably "I/O operation on closed file"
148 except ValueError: # probably "I/O operation on closed file"
149 b = ''
149 b = ''
150
150
151 self.EOF.set()
151 self.EOF.set()
152 da.set() # for cases when done but there was no input.
152 da.set() # for cases when done but there was no input.
153
153
154
154
155 class BufferedGenerator(object):
155 class BufferedGenerator(object):
156 """
156 """
157 Class behaves as a non-blocking, buffered pipe reader.
157 Class behaves as a non-blocking, buffered pipe reader.
158 Reads chunks of data (through a thread)
158 Reads chunks of data (through a thread)
159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
159 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 Reading is halted in the thread when max chunks is internally buffered.
160 Reading is halted in the thread when max chunks is internally buffered.
161 The .next() may operate in blocking or non-blocking fashion by yielding
161 The .next() may operate in blocking or non-blocking fashion by yielding
162 '' if no data is ready
162 '' if no data is ready
163 to be sent or by not returning until there is some data to send
163 to be sent or by not returning until there is some data to send
164 When we get EOF from underlying source pipe we raise the marker to raise
164 When we get EOF from underlying source pipe we raise the marker to raise
165 StopIteration after the last chunk of data is yielded.
165 StopIteration after the last chunk of data is yielded.
166 """
166 """
167
167
168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
168 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
169 starting_values=None, bottomless=False):
169 starting_values=None, bottomless=False):
170 starting_values = starting_values or []
170 starting_values = starting_values or []
171 self.name = name
171 self.name = name
172 self.buffer_size = buffer_size
172 self.buffer_size = buffer_size
173 self.chunk_size = chunk_size
173 self.chunk_size = chunk_size
174
174
175 if bottomless:
175 if bottomless:
176 maxlen = int(buffer_size / chunk_size)
176 maxlen = int(buffer_size / chunk_size)
177 else:
177 else:
178 maxlen = None
178 maxlen = None
179
179
180 self.data_queue = collections.deque(starting_values, maxlen)
180 self.data_queue = collections.deque(starting_values, maxlen)
181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
181 self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
182 if starting_values:
182 if starting_values:
183 self.worker.data_added.set()
183 self.worker.data_added.set()
184 self.worker.start()
184 self.worker.start()
185
185
186 ####################
186 ####################
187 # Generator's methods
187 # Generator's methods
188 ####################
188 ####################
189 def __str__(self):
189 def __str__(self):
190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
190 return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
191
191
192 def __iter__(self):
192 def __iter__(self):
193 return self
193 return self
194
194
195 def __next__(self):
195 def __next__(self):
196
196
197 while not self.length and not self.worker.EOF.is_set():
197 while not self.length and not self.worker.EOF.is_set():
198 self.worker.data_added.clear()
198 self.worker.data_added.clear()
199 self.worker.data_added.wait(0.2)
199 self.worker.data_added.wait(0.2)
200
200
201 if self.length:
201 if self.length:
202 self.worker.keep_reading.set()
202 self.worker.keep_reading.set()
203 return bytes(self.data_queue.popleft())
203 return bytes(self.data_queue.popleft())
204 elif self.worker.EOF.is_set():
204 elif self.worker.EOF.is_set():
205 raise StopIteration
205 raise StopIteration
206
206
207 def throw(self, exc_type, value=None, traceback=None):
207 def throw(self, exc_type, value=None, traceback=None):
208 if not self.worker.EOF.is_set():
208 if not self.worker.EOF.is_set():
209 raise exc_type(value)
209 raise exc_type(value)
210
210
211 def start(self):
211 def start(self):
212 self.worker.start()
212 self.worker.start()
213
213
214 def stop(self):
214 def stop(self):
215 self.worker.stop()
215 self.worker.stop()
216
216
217 def close(self):
217 def close(self):
218 try:
218 try:
219 self.worker.stop()
219 self.worker.stop()
220 self.throw(GeneratorExit)
220 self.throw(GeneratorExit)
221 except (GeneratorExit, StopIteration):
221 except (GeneratorExit, StopIteration):
222 pass
222 pass
223
223
224 ####################
224 ####################
225 # Threaded reader's infrastructure.
225 # Threaded reader's infrastructure.
226 ####################
226 ####################
227 @property
227 @property
228 def input(self):
228 def input(self):
229 return self.worker.w
229 return self.worker.w
230
230
231 @property
231 @property
232 def data_added_event(self):
232 def data_added_event(self):
233 return self.worker.data_added
233 return self.worker.data_added
234
234
235 @property
235 @property
236 def data_added(self):
236 def data_added(self):
237 return self.worker.data_added.is_set()
237 return self.worker.data_added.is_set()
238
238
239 @property
239 @property
240 def reading_paused(self):
240 def reading_paused(self):
241 return not self.worker.keep_reading.is_set()
241 return not self.worker.keep_reading.is_set()
242
242
243 @property
243 @property
244 def done_reading_event(self):
244 def done_reading_event(self):
245 """
245 """
246 Done_reding does not mean that the iterator's buffer is empty.
246 Done_reding does not mean that the iterator's buffer is empty.
247 Iterator might have done reading from underlying source, but the read
247 Iterator might have done reading from underlying source, but the read
248 chunks might still be available for serving through .next() method.
248 chunks might still be available for serving through .next() method.
249
249
250 :returns: An Event class instance.
250 :returns: An Event class instance.
251 """
251 """
252 return self.worker.EOF
252 return self.worker.EOF
253
253
254 @property
254 @property
255 def done_reading(self):
255 def done_reading(self):
256 """
256 """
257 Done_reading does not mean that the iterator's buffer is empty.
257 Done_reading does not mean that the iterator's buffer is empty.
258 Iterator might have done reading from underlying source, but the read
258 Iterator might have done reading from underlying source, but the read
259 chunks might still be available for serving through .next() method.
259 chunks might still be available for serving through .next() method.
260
260
261 :returns: An Bool value.
261 :returns: An Bool value.
262 """
262 """
263 return self.worker.EOF.is_set()
263 return self.worker.EOF.is_set()
264
264
265 @property
265 @property
266 def length(self):
266 def length(self):
267 """
267 """
268 returns int.
268 returns int.
269
269
270 This is the length of the queue of chunks, not the length of
270 This is the length of the queue of chunks, not the length of
271 the combined contents in those chunks.
271 the combined contents in those chunks.
272
272
273 __len__() cannot be meaningfully implemented because this
273 __len__() cannot be meaningfully implemented because this
274 reader is just flying through a bottomless pit content and
274 reader is just flying through a bottomless pit content and
275 can only know the length of what it already saw.
275 can only know the length of what it already saw.
276
276
277 If __len__() on WSGI server per PEP 3333 returns a value,
277 If __len__() on WSGI server per PEP 3333 returns a value,
278 the response's length will be set to that. In order not to
278 the response's length will be set to that. In order not to
279 confuse WSGI PEP3333 servers, we will not implement __len__
279 confuse WSGI PEP3333 servers, we will not implement __len__
280 at all.
280 at all.
281 """
281 """
282 return len(self.data_queue)
282 return len(self.data_queue)
283
283
284 def prepend(self, x):
284 def prepend(self, x):
285 self.data_queue.appendleft(x)
285 self.data_queue.appendleft(x)
286
286
287 def append(self, x):
287 def append(self, x):
288 self.data_queue.append(x)
288 self.data_queue.append(x)
289
289
290 def extend(self, o):
290 def extend(self, o):
291 self.data_queue.extend(o)
291 self.data_queue.extend(o)
292
292
293 def __getitem__(self, i):
293 def __getitem__(self, i):
294 return self.data_queue[i]
294 return self.data_queue[i]
295
295
296
296
297 class SubprocessIOChunker(object):
297 class SubprocessIOChunker(object):
298 """
298 """
299 Processor class wrapping handling of subprocess IO.
299 Processor class wrapping handling of subprocess IO.
300
300
301 .. important::
301 .. important::
302
302
303 Watch out for the method `__del__` on this class. If this object
303 Watch out for the method `__del__` on this class. If this object
304 is deleted, it will kill the subprocess, so avoid to
304 is deleted, it will kill the subprocess, so avoid to
305 return the `output` attribute or usage of it like in the following
305 return the `output` attribute or usage of it like in the following
306 example::
306 example::
307
307
308 # `args` expected to run a program that produces a lot of output
308 # `args` expected to run a program that produces a lot of output
309 output = ''.join(SubprocessIOChunker(
309 output = ''.join(SubprocessIOChunker(
310 args, shell=False, inputstream=inputstream, env=environ).output)
310 args, shell=False, inputstream=inputstream, env=environ).output)
311
311
312 # `output` will not contain all the data, because the __del__ method
312 # `output` will not contain all the data, because the __del__ method
313 # has already killed the subprocess in this case before all output
313 # has already killed the subprocess in this case before all output
314 # has been consumed.
314 # has been consumed.
315
315
316
316
317
317
318 In a way, this is a "communicate()" replacement with a twist.
318 In a way, this is a "communicate()" replacement with a twist.
319
319
320 - We are multithreaded. Writing in and reading out, err are all sep threads.
320 - We are multithreaded. Writing in and reading out, err are all sep threads.
321 - We support concurrent (in and out) stream processing.
321 - We support concurrent (in and out) stream processing.
322 - The output is not a stream. It's a queue of read string (bytes, not str)
322 - The output is not a stream. It's a queue of read string (bytes, not str)
323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
323 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
324 - We are non-blocking in more respects than communicate()
324 - We are non-blocking in more respects than communicate()
325 (reading from subprocess out pauses when internal buffer is full, but
325 (reading from subprocess out pauses when internal buffer is full, but
326 does not block the parent calling code. On the flip side, reading from
326 does not block the parent calling code. On the flip side, reading from
327 slow-yielding subprocess may block the iteration until data shows up. This
327 slow-yielding subprocess may block the iteration until data shows up. This
328 does not block the parallel inpipe reading occurring parallel thread.)
328 does not block the parallel inpipe reading occurring parallel thread.)
329
329
330 The purpose of the object is to allow us to wrap subprocess interactions into
330 The purpose of the object is to allow us to wrap subprocess interactions into
331 an iterable that can be passed to a WSGI server as the application's return
331 an iterable that can be passed to a WSGI server as the application's return
332 value. Because of stream-processing-ability, WSGI does not have to read ALL
332 value. Because of stream-processing-ability, WSGI does not have to read ALL
333 of the subprocess's output and buffer it, before handing it to WSGI server for
333 of the subprocess's output and buffer it, before handing it to WSGI server for
334 HTTP response. Instead, the class initializer reads just a bit of the stream
334 HTTP response. Instead, the class initializer reads just a bit of the stream
335 to figure out if error occurred or likely to occur and if not, just hands the
335 to figure out if error occurred or likely to occur and if not, just hands the
336 further iteration over subprocess output to the server for completion of HTTP
336 further iteration over subprocess output to the server for completion of HTTP
337 response.
337 response.
338
338
339 The real or perceived subprocess error is trapped and raised as one of
339 The real or perceived subprocess error is trapped and raised as one of
340 OSError family of exceptions
340 OSError family of exceptions
341
341
342 Example usage:
342 Example usage:
343 # try:
343 # try:
344 # answer = SubprocessIOChunker(
344 # answer = SubprocessIOChunker(
345 # cmd,
345 # cmd,
346 # input,
346 # input,
347 # buffer_size = 65536,
347 # buffer_size = 65536,
348 # chunk_size = 4096
348 # chunk_size = 4096
349 # )
349 # )
350 # except (OSError) as e:
350 # except (OSError) as e:
351 # print str(e)
351 # print str(e)
352 # raise e
352 # raise e
353 #
353 #
354 # return answer
354 # return answer
355
355
356
356
357 """
357 """
358
358
359 # TODO: johbo: This is used to make sure that the open end of the PIPE
359 # TODO: johbo: This is used to make sure that the open end of the PIPE
360 # is closed in the end. It would be way better to wrap this into an
360 # is closed in the end. It would be way better to wrap this into an
361 # object, so that it is closed automatically once it is consumed or
361 # object, so that it is closed automatically once it is consumed or
362 # something similar.
362 # something similar.
363 _close_input_fd = None
363 _close_input_fd = None
364
364
365 _closed = False
365 _closed = False
366 _stdout = None
366 _stdout = None
367 _stderr = None
367 _stderr = None
368
368
369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
369 def __init__(self, cmd, input_stream=None, buffer_size=65536,
370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
370 chunk_size=4096, starting_values=None, fail_on_stderr=True,
371 fail_on_return_code=True, **kwargs):
371 fail_on_return_code=True, **kwargs):
372 """
372 """
373 Initializes SubprocessIOChunker
373 Initializes SubprocessIOChunker
374
374
375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
375 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
376 :param input_stream: (Default: None) A file-like, string, or file pointer.
376 :param input_stream: (Default: None) A file-like, string, or file pointer.
377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
377 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
378 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
379 :param starting_values: (Default: []) An array of strings to put in front of output que.
379 :param starting_values: (Default: []) An array of strings to put in front of output que.
380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
380 :param fail_on_stderr: (Default: True) Whether to raise an exception in
381 case something is written to stderr.
381 case something is written to stderr.
382 :param fail_on_return_code: (Default: True) Whether to raise an
382 :param fail_on_return_code: (Default: True) Whether to raise an
383 exception if the return code is not 0.
383 exception if the return code is not 0.
384 """
384 """
385
385
386 kwargs['shell'] = kwargs.get('shell', True)
386 kwargs['shell'] = kwargs.get('shell', True)
387
387
388 starting_values = starting_values or []
388 starting_values = starting_values or []
389 if input_stream:
389 if input_stream:
390 input_streamer = StreamFeeder(input_stream)
390 input_streamer = StreamFeeder(input_stream)
391 input_streamer.start()
391 input_streamer.start()
392 input_stream = input_streamer.output
392 input_stream = input_streamer.output
393 self._close_input_fd = input_stream
393 self._close_input_fd = input_stream
394
394
395 self._fail_on_stderr = fail_on_stderr
395 self._fail_on_stderr = fail_on_stderr
396 self._fail_on_return_code = fail_on_return_code
396 self._fail_on_return_code = fail_on_return_code
397 self.cmd = cmd
397 self.cmd = cmd
398
398
399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
399 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
400 **kwargs)
400 **kwargs)
401 self.process = _p
401 self.process = _p
402
402
403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
403 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
404 bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
405
405
406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
406 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
407 # doing this until we reach either end of file, or end of buffer.
407 # doing this until we reach either end of file, or end of buffer.
408 bg_out.data_added_event.wait(0.2)
408 bg_out.data_added_event.wait(0.2)
409 bg_out.data_added_event.clear()
409 bg_out.data_added_event.clear()
410
410
411 # at this point it's still ambiguous if we are done reading or just full buffer.
411 # at this point it's still ambiguous if we are done reading or just full buffer.
412 # Either way, if error (returned by ended process, or implied based on
412 # Either way, if error (returned by ended process, or implied based on
413 # presence of stuff in stderr output) we error out.
413 # presence of stuff in stderr output) we error out.
414 # Else, we are happy.
414 # Else, we are happy.
415 return_code = _p.poll()
415 return_code = _p.poll()
416 ret_code_ok = return_code in [None, 0]
416 ret_code_ok = return_code in [None, 0]
417 ret_code_fail = return_code is not None and return_code != 0
417 ret_code_fail = return_code is not None and return_code != 0
418 if (
418 if (
419 (ret_code_fail and fail_on_return_code) or
419 (ret_code_fail and fail_on_return_code) or
420 (ret_code_ok and fail_on_stderr and bg_err.length)
420 (ret_code_ok and fail_on_stderr and bg_err.length)
421 ):
421 ):
422
422
423 try:
423 try:
424 _p.terminate()
424 _p.terminate()
425 except Exception:
425 except Exception:
426 pass
426 pass
427
427
428 bg_out.stop()
428 bg_out.stop()
429 out = b''.join(bg_out)
429 out = b''.join(bg_out)
430 self._stdout = out
430 self._stdout = out
431
431
432 bg_err.stop()
432 bg_err.stop()
433 err = b''.join(bg_err)
433 err = b''.join(bg_err)
434 self._stderr = err
434 self._stderr = err
435
435
436 # code from https://github.com/schacon/grack/pull/7
436 # code from https://github.com/schacon/grack/pull/7
437 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
437 if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
438 bg_out = iter([out])
438 bg_out = iter([out])
439 _p = None
439 _p = None
440 elif err and fail_on_stderr:
440 elif err and fail_on_stderr:
441 text_err = err.decode()
441 text_err = err.decode()
442 raise OSError(
442 raise OSError(
443 "Subprocess exited due to an error:\n{}".format(text_err))
443 "Subprocess exited due to an error:\n{}".format(text_err))
444
444
445 if ret_code_fail and fail_on_return_code:
445 if ret_code_fail and fail_on_return_code:
446 text_err = err.decode()
446 text_err = err.decode()
447 if not err:
447 if not err:
448 # maybe get empty stderr, try stdout instead
448 # maybe get empty stderr, try stdout instead
449 # in many cases git reports the errors on stdout too
449 # in many cases git reports the errors on stdout too
450 text_err = out.decode()
450 text_err = out.decode()
451 raise OSError(
451 raise OSError(
452 "Subprocess exited with non 0 ret code:{}: stderr:{}".format(return_code, text_err))
452 "Subprocess exited with non 0 ret code:{}: stderr:{}".format(return_code, text_err))
453
453
454 self.stdout = bg_out
454 self.stdout = bg_out
455 self.stderr = bg_err
455 self.stderr = bg_err
456 self.inputstream = input_stream
456 self.inputstream = input_stream
457
457
458 def __str__(self):
458 def __str__(self):
459 proc = getattr(self, 'process', 'NO_PROCESS')
459 proc = getattr(self, 'process', 'NO_PROCESS')
460 return f'SubprocessIOChunker: {proc}'
460 return f'SubprocessIOChunker: {proc}'
461
461
462 def __iter__(self):
462 def __iter__(self):
463 return self
463 return self
464
464
465 def __next__(self):
465 def __next__(self):
466 # Note: mikhail: We need to be sure that we are checking the return
466 # Note: mikhail: We need to be sure that we are checking the return
467 # code after the stdout stream is closed. Some processes, e.g. git
467 # code after the stdout stream is closed. Some processes, e.g. git
468 # are doing some magic in between closing stdout and terminating the
468 # are doing some magic in between closing stdout and terminating the
469 # process and, as a result, we are not getting return code on "slow"
469 # process and, as a result, we are not getting return code on "slow"
470 # systems.
470 # systems.
471 result = None
471 result = None
472 stop_iteration = None
472 stop_iteration = None
473 try:
473 try:
474 result = next(self.stdout)
474 result = next(self.stdout)
475 except StopIteration as e:
475 except StopIteration as e:
476 stop_iteration = e
476 stop_iteration = e
477
477
478 if self.process:
478 if self.process:
479 return_code = self.process.poll()
479 return_code = self.process.poll()
480 ret_code_fail = return_code is not None and return_code != 0
480 ret_code_fail = return_code is not None and return_code != 0
481 if ret_code_fail and self._fail_on_return_code:
481 if ret_code_fail and self._fail_on_return_code:
482 self.stop_streams()
482 self.stop_streams()
483 err = self.get_stderr()
483 err = self.get_stderr()
484 raise OSError(
484 raise OSError(
485 "Subprocess exited (exit_code:{}) due to an error during iteration:\n{}".format(return_code, err))
485 "Subprocess exited (exit_code:{}) due to an error during iteration:\n{}".format(return_code, err))
486
486
487 if stop_iteration:
487 if stop_iteration:
488 raise stop_iteration
488 raise stop_iteration
489 return result
489 return result
490
490
491 def throw(self, exc_type, value=None, traceback=None):
491 def throw(self, exc_type, value=None, traceback=None):
492 if self.stdout.length or not self.stdout.done_reading:
492 if self.stdout.length or not self.stdout.done_reading:
493 raise exc_type(value)
493 raise exc_type(value)
494
494
495 def close(self):
495 def close(self):
496 if self._closed:
496 if self._closed:
497 return
497 return
498
498
499 try:
499 try:
500 self.process.terminate()
500 self.process.terminate()
501 except Exception:
501 except Exception:
502 pass
502 pass
503 if self._close_input_fd:
503 if self._close_input_fd:
504 os.close(self._close_input_fd)
504 os.close(self._close_input_fd)
505 try:
505 try:
506 self.stdout.close()
506 self.stdout.close()
507 except Exception:
507 except Exception:
508 pass
508 pass
509 try:
509 try:
510 self.stderr.close()
510 self.stderr.close()
511 except Exception:
511 except Exception:
512 pass
512 pass
513 try:
513 try:
514 os.close(self.inputstream)
514 os.close(self.inputstream)
515 except Exception:
515 except Exception:
516 pass
516 pass
517
517
518 self._closed = True
518 self._closed = True
519
519
520 def stop_streams(self):
520 def stop_streams(self):
521 getattr(self.stdout, 'stop', lambda: None)()
521 getattr(self.stdout, 'stop', lambda: None)()
522 getattr(self.stderr, 'stop', lambda: None)()
522 getattr(self.stderr, 'stop', lambda: None)()
523
523
524 def get_stdout(self):
524 def get_stdout(self):
525 if self._stdout:
525 if self._stdout:
526 return self._stdout
526 return self._stdout
527 else:
527 else:
528 return b''.join(self.stdout)
528 return b''.join(self.stdout)
529
529
530 def get_stderr(self):
530 def get_stderr(self):
531 if self._stderr:
531 if self._stderr:
532 return self._stderr
532 return self._stderr
533 else:
533 else:
534 return b''.join(self.stderr)
534 return b''.join(self.stderr)
535
535
536
536
537 def run_command(arguments, env=None):
537 def run_command(arguments, env=None):
538 """
538 """
539 Run the specified command and return the stdout.
539 Run the specified command and return the stdout.
540
540
541 :param arguments: sequence of program arguments (including the program name)
541 :param arguments: sequence of program arguments (including the program name)
542 :type arguments: list[str]
542 :type arguments: list[str]
543 """
543 """
544
544
545 cmd = arguments
545 cmd = arguments
546 log.debug('Running subprocessio command %s', cmd)
546 log.debug('Running subprocessio command %s', cmd)
547 proc = None
547 proc = None
548 try:
548 try:
549 _opts = {'shell': False, 'fail_on_stderr': False}
549 _opts = {'shell': False, 'fail_on_stderr': False}
550 if env:
550 if env:
551 _opts.update({'env': env})
551 _opts.update({'env': env})
552 proc = SubprocessIOChunker(cmd, **_opts)
552 proc = SubprocessIOChunker(cmd, **_opts)
553 return b''.join(proc), b''.join(proc.stderr)
553 return b''.join(proc), b''.join(proc.stderr)
554 except OSError as err:
554 except OSError as err:
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
555 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
556 tb_err = ("Couldn't run subprocessio command (%s).\n"
556 tb_err = ("Couldn't run subprocessio command (%s).\n"
557 "Original error was:%s\n" % (cmd, err))
557 "Original error was:%s\n" % (cmd, err))
558 log.exception(tb_err)
558 log.exception(tb_err)
559 raise Exception(tb_err)
559 raise Exception(tb_err)
560 finally:
560 finally:
561 if proc:
561 if proc:
562 proc.close()
562 proc.close()
563
563
@@ -1,114 +1,112 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import logging
19 import logging
20
20
21 import msgpack
22
23 import vcsserver
21 import vcsserver
24 from vcsserver.str_utils import safe_str, ascii_str
22 from vcsserver.str_utils import safe_str, ascii_str
25
23
26 log = logging.getLogger(__name__)
24 log = logging.getLogger(__name__)
27
25
28
26
29 def get_access_path(environ):
27 def get_access_path(environ):
30 path = environ.get('PATH_INFO')
28 path = environ.get('PATH_INFO')
31 return path
29 return path
32
30
33
31
34 def get_user_agent(environ):
32 def get_user_agent(environ):
35 return environ.get('HTTP_USER_AGENT')
33 return environ.get('HTTP_USER_AGENT')
36
34
37
35
38 def get_call_context(registry) -> dict:
36 def get_call_context(registry) -> dict:
39 cc = {}
37 cc = {}
40 if hasattr(registry, 'vcs_call_context'):
38 if hasattr(registry, 'vcs_call_context'):
41 cc.update({
39 cc.update({
42 'X-RC-Method': registry.vcs_call_context.get('method'),
40 'X-RC-Method': registry.vcs_call_context.get('method'),
43 'X-RC-Repo-Name': registry.vcs_call_context.get('repo_name')
41 'X-RC-Repo-Name': registry.vcs_call_context.get('repo_name')
44 })
42 })
45
43
46 return cc
44 return cc
47
45
48
46
49 class RequestWrapperTween(object):
47 class RequestWrapperTween(object):
50 def __init__(self, handler, registry):
48 def __init__(self, handler, registry):
51 self.handler = handler
49 self.handler = handler
52 self.registry = registry
50 self.registry = registry
53
51
54 # one-time configuration code goes here
52 # one-time configuration code goes here
55
53
56 def __call__(self, request):
54 def __call__(self, request):
57 start = time.time()
55 start = time.time()
58 log.debug('Starting request time measurement')
56 log.debug('Starting request time measurement')
59 response = None
57 response = None
60
58
61 try:
59 try:
62 response = self.handler(request)
60 response = self.handler(request)
63 finally:
61 finally:
64 ua = get_user_agent(request.environ)
62 ua = get_user_agent(request.environ)
65 call_context = get_call_context(request.registry)
63 call_context = get_call_context(request.registry)
66 vcs_method = call_context.get('X-RC-Method', '_NO_VCS_METHOD')
64 vcs_method = call_context.get('X-RC-Method', '_NO_VCS_METHOD')
67 repo_name = call_context.get('X-RC-Repo-Name', '')
65 repo_name = call_context.get('X-RC-Repo-Name', '')
68
66
69 count = request.request_count()
67 count = request.request_count()
70 _ver_ = ascii_str(vcsserver.__version__)
68 _ver_ = ascii_str(vcsserver.__version__)
71 _path = safe_str(get_access_path(request.environ))
69 _path = safe_str(get_access_path(request.environ))
72
70
73 ip = '127.0.0.1'
71 ip = '127.0.0.1'
74 match_route = request.matched_route.name if request.matched_route else "NOT_FOUND"
72 match_route = request.matched_route.name if request.matched_route else "NOT_FOUND"
75 resp_code = getattr(response, 'status_code', 'UNDEFINED')
73 resp_code = getattr(response, 'status_code', 'UNDEFINED')
76
74
77 _view_path = f"{repo_name}@{_path}/{vcs_method}"
75 _view_path = f"{repo_name}@{_path}/{vcs_method}"
78
76
79 total = time.time() - start
77 total = time.time() - start
80
78
81 log.info(
79 log.info(
82 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
80 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
83 count, ip, request.environ.get('REQUEST_METHOD'),
81 count, ip, request.environ.get('REQUEST_METHOD'),
84 _view_path, total, ua, _ver_,
82 _view_path, total, ua, _ver_,
85 extra={"time": total, "ver": _ver_, "code": resp_code,
83 extra={"time": total, "ver": _ver_, "code": resp_code,
86 "path": _path, "view_name": match_route, "user_agent": ua,
84 "path": _path, "view_name": match_route, "user_agent": ua,
87 "vcs_method": vcs_method, "repo_name": repo_name}
85 "vcs_method": vcs_method, "repo_name": repo_name}
88 )
86 )
89
87
90 statsd = request.registry.statsd
88 statsd = request.registry.statsd
91 if statsd:
89 if statsd:
92 match_route = request.matched_route.name if request.matched_route else _path
90 match_route = request.matched_route.name if request.matched_route else _path
93 elapsed_time_ms = round(1000.0 * total) # use ms only
91 elapsed_time_ms = round(1000.0 * total) # use ms only
94 statsd.timing(
92 statsd.timing(
95 "vcsserver_req_timing.histogram", elapsed_time_ms,
93 "vcsserver_req_timing.histogram", elapsed_time_ms,
96 tags=[
94 tags=[
97 "view_name:{}".format(match_route),
95 "view_name:{}".format(match_route),
98 "code:{}".format(resp_code)
96 "code:{}".format(resp_code)
99 ],
97 ],
100 use_decimals=False
98 use_decimals=False
101 )
99 )
102 statsd.incr(
100 statsd.incr(
103 "vcsserver_req_total", tags=[
101 "vcsserver_req_total", tags=[
104 "view_name:{}".format(match_route),
102 "view_name:{}".format(match_route),
105 "code:{}".format(resp_code)
103 "code:{}".format(resp_code)
106 ])
104 ])
107
105
108 return response
106 return response
109
107
110
108
111 def includeme(config):
109 def includeme(config):
112 config.add_tween(
110 config.add_tween(
113 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
111 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
114 )
112 )
General Comments 0
You need to be logged in to leave comments. Login now