##// END OF EJS Templates
subprocess: use subprocessio helper to run various subprocess commands.
marcink -
r374:58336b17 stable
parent child Browse files
Show More
@@ -1,484 +1,467 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # RhodeCode VCSServer provides access to different vcs backends via network.
4 4 # Copyright (C) 2014-2018 RhodeCode GmbH
5 5 #
6 6 # This program is free software; you can redistribute it and/or modify
7 7 # it under the terms of the GNU General Public License as published by
8 8 # the Free Software Foundation; either version 3 of the License, or
9 9 # (at your option) any later version.
10 10 #
11 11 # This program is distributed in the hope that it will be useful,
12 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 14 # GNU General Public License for more details.
15 15 #
16 16 # You should have received a copy of the GNU General Public License
17 17 # along with this program; if not, write to the Free Software Foundation,
18 18 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 19
20 20 import io
21 21 import os
22 22 import sys
23 23 import json
24 24 import logging
25 25 import collections
26 26 import importlib
27 27
28 28 from httplib import HTTPConnection
29 29
30 30
31 31 import mercurial.scmutil
32 32 import mercurial.node
33 33 import simplejson as json
34 34
35 35 from vcsserver import exceptions, subprocessio, settings
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 class HooksHttpClient(object):
41 41 connection = None
42 42
43 43 def __init__(self, hooks_uri):
44 44 self.hooks_uri = hooks_uri
45 45
46 46 def __call__(self, method, extras):
47 47 connection = HTTPConnection(self.hooks_uri)
48 48 body = self._serialize(method, extras)
49 49 connection.request('POST', '/', body)
50 50 response = connection.getresponse()
51 51 return json.loads(response.read())
52 52
53 53 def _serialize(self, hook_name, extras):
54 54 data = {
55 55 'method': hook_name,
56 56 'extras': extras
57 57 }
58 58 return json.dumps(data)
59 59
60 60
61 61 class HooksDummyClient(object):
62 62 def __init__(self, hooks_module):
63 63 self._hooks_module = importlib.import_module(hooks_module)
64 64
65 65 def __call__(self, hook_name, extras):
66 66 with self._hooks_module.Hooks() as hooks:
67 67 return getattr(hooks, hook_name)(extras)
68 68
69 69
70 70 class RemoteMessageWriter(object):
71 71 """Writer base class."""
72 72 def write(self, message):
73 73 raise NotImplementedError()
74 74
75 75
76 76 class HgMessageWriter(RemoteMessageWriter):
77 77 """Writer that knows how to send messages to mercurial clients."""
78 78
79 79 def __init__(self, ui):
80 80 self.ui = ui
81 81
82 82 def write(self, message):
83 83 # TODO: Check why the quiet flag is set by default.
84 84 old = self.ui.quiet
85 85 self.ui.quiet = False
86 86 self.ui.status(message.encode('utf-8'))
87 87 self.ui.quiet = old
88 88
89 89
90 90 class GitMessageWriter(RemoteMessageWriter):
91 91 """Writer that knows how to send messages to git clients."""
92 92
93 93 def __init__(self, stdout=None):
94 94 self.stdout = stdout or sys.stdout
95 95
96 96 def write(self, message):
97 97 self.stdout.write(message.encode('utf-8'))
98 98
99 99
100 100 def _handle_exception(result):
101 101 exception_class = result.get('exception')
102 102 exception_traceback = result.get('exception_traceback')
103 103
104 104 if exception_traceback:
105 105 log.error('Got traceback from remote call:%s', exception_traceback)
106 106
107 107 if exception_class == 'HTTPLockedRC':
108 108 raise exceptions.RepositoryLockedException(*result['exception_args'])
109 109 elif exception_class == 'RepositoryError':
110 110 raise exceptions.VcsException(*result['exception_args'])
111 111 elif exception_class:
112 112 raise Exception('Got remote exception "%s" with args "%s"' %
113 113 (exception_class, result['exception_args']))
114 114
115 115
116 116 def _get_hooks_client(extras):
117 117 if 'hooks_uri' in extras:
118 118 protocol = extras.get('hooks_protocol')
119 119 return HooksHttpClient(extras['hooks_uri'])
120 120 else:
121 121 return HooksDummyClient(extras['hooks_module'])
122 122
123 123
124 124 def _call_hook(hook_name, extras, writer):
125 125 hooks = _get_hooks_client(extras)
126 126 result = hooks(hook_name, extras)
127 127 log.debug('Hooks got result: %s', result)
128 128 writer.write(result['output'])
129 129 _handle_exception(result)
130 130
131 131 return result['status']
132 132
133 133
134 134 def _extras_from_ui(ui):
135 135 hook_data = ui.config('rhodecode', 'RC_SCM_DATA')
136 136 if not hook_data:
137 137 # maybe it's inside environ ?
138 138 env_hook_data = os.environ.get('RC_SCM_DATA')
139 139 if env_hook_data:
140 140 hook_data = env_hook_data
141 141
142 142 extras = {}
143 143 if hook_data:
144 144 extras = json.loads(hook_data)
145 145 return extras
146 146
147 147
148 148 def _rev_range_hash(repo, node):
149 149
150 150 commits = []
151 151 for rev in xrange(repo[node], len(repo)):
152 152 ctx = repo[rev]
153 153 commit_id = mercurial.node.hex(ctx.node())
154 154 branch = ctx.branch()
155 155 commits.append((commit_id, branch))
156 156
157 157 return commits
158 158
159 159
160 160 def repo_size(ui, repo, **kwargs):
161 161 extras = _extras_from_ui(ui)
162 162 return _call_hook('repo_size', extras, HgMessageWriter(ui))
163 163
164 164
165 165 def pre_pull(ui, repo, **kwargs):
166 166 extras = _extras_from_ui(ui)
167 167 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
168 168
169 169
170 170 def pre_pull_ssh(ui, repo, **kwargs):
171 171 extras = _extras_from_ui(ui)
172 172 if extras and extras.get('SSH'):
173 173 return pre_pull(ui, repo, **kwargs)
174 174 return 0
175 175
176 176
177 177 def post_pull(ui, repo, **kwargs):
178 178 extras = _extras_from_ui(ui)
179 179 return _call_hook('post_pull', extras, HgMessageWriter(ui))
180 180
181 181
182 182 def post_pull_ssh(ui, repo, **kwargs):
183 183 extras = _extras_from_ui(ui)
184 184 if extras and extras.get('SSH'):
185 185 return post_pull(ui, repo, **kwargs)
186 186 return 0
187 187
188 188
189 189 def pre_push(ui, repo, node=None, **kwargs):
190 190 extras = _extras_from_ui(ui)
191 191
192 192 rev_data = []
193 193 if node and kwargs.get('hooktype') == 'pretxnchangegroup':
194 194 branches = collections.defaultdict(list)
195 195 for commit_id, branch in _rev_range_hash(repo, node):
196 196 branches[branch].append(commit_id)
197 197
198 198 for branch, commits in branches.iteritems():
199 199 old_rev = kwargs.get('node_last') or commits[0]
200 200 rev_data.append({
201 201 'old_rev': old_rev,
202 202 'new_rev': commits[-1],
203 203 'ref': '',
204 204 'type': 'branch',
205 205 'name': branch,
206 206 })
207 207
208 208 extras['commit_ids'] = rev_data
209 209 return _call_hook('pre_push', extras, HgMessageWriter(ui))
210 210
211 211
212 212 def pre_push_ssh(ui, repo, node=None, **kwargs):
213 213 if _extras_from_ui(ui).get('SSH'):
214 214 return pre_push(ui, repo, node, **kwargs)
215 215
216 216 return 0
217 217
218 218
219 219 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
220 220 extras = _extras_from_ui(ui)
221 221 if extras.get('SSH'):
222 222 permission = extras['SSH_PERMISSIONS']
223 223
224 224 if 'repository.write' == permission or 'repository.admin' == permission:
225 225 return 0
226 226
227 227 # non-zero ret code
228 228 return 1
229 229
230 230 return 0
231 231
232 232
233 233 def post_push(ui, repo, node, **kwargs):
234 234 extras = _extras_from_ui(ui)
235 235
236 236 commit_ids = []
237 237 branches = []
238 238 bookmarks = []
239 239 tags = []
240 240
241 241 for commit_id, branch in _rev_range_hash(repo, node):
242 242 commit_ids.append(commit_id)
243 243 if branch not in branches:
244 244 branches.append(branch)
245 245
246 246 if hasattr(ui, '_rc_pushkey_branches'):
247 247 bookmarks = ui._rc_pushkey_branches
248 248
249 249 extras['commit_ids'] = commit_ids
250 250 extras['new_refs'] = {
251 251 'branches': branches,
252 252 'bookmarks': bookmarks,
253 253 'tags': tags
254 254 }
255 255
256 256 return _call_hook('post_push', extras, HgMessageWriter(ui))
257 257
258 258
259 259 def post_push_ssh(ui, repo, node, **kwargs):
260 260 if _extras_from_ui(ui).get('SSH'):
261 261 return post_push(ui, repo, node, **kwargs)
262 262 return 0
263 263
264 264
265 265 def key_push(ui, repo, **kwargs):
266 266 if kwargs['new'] != '0' and kwargs['namespace'] == 'bookmarks':
267 267 # store new bookmarks in our UI object propagated later to post_push
268 268 ui._rc_pushkey_branches = repo[kwargs['key']].bookmarks()
269 269 return
270 270
271 271
272 272 # backward compat
273 273 log_pull_action = post_pull
274 274
275 275 # backward compat
276 276 log_push_action = post_push
277 277
278 278
279 279 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
280 280 """
281 281 Old hook name: keep here for backward compatibility.
282 282
283 283 This is only required when the installed git hooks are not upgraded.
284 284 """
285 285 pass
286 286
287 287
288 288 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
289 289 """
290 290 Old hook name: keep here for backward compatibility.
291 291
292 292 This is only required when the installed git hooks are not upgraded.
293 293 """
294 294 pass
295 295
296 296
297 297 HookResponse = collections.namedtuple('HookResponse', ('status', 'output'))
298 298
299 299
300 300 def git_pre_pull(extras):
301 301 """
302 302 Pre pull hook.
303 303
304 304 :param extras: dictionary containing the keys defined in simplevcs
305 305 :type extras: dict
306 306
307 307 :return: status code of the hook. 0 for success.
308 308 :rtype: int
309 309 """
310 310 if 'pull' not in extras['hooks']:
311 311 return HookResponse(0, '')
312 312
313 313 stdout = io.BytesIO()
314 314 try:
315 315 status = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
316 316 except Exception as error:
317 317 status = 128
318 318 stdout.write('ERROR: %s\n' % str(error))
319 319
320 320 return HookResponse(status, stdout.getvalue())
321 321
322 322
323 323 def git_post_pull(extras):
324 324 """
325 325 Post pull hook.
326 326
327 327 :param extras: dictionary containing the keys defined in simplevcs
328 328 :type extras: dict
329 329
330 330 :return: status code of the hook. 0 for success.
331 331 :rtype: int
332 332 """
333 333 if 'pull' not in extras['hooks']:
334 334 return HookResponse(0, '')
335 335
336 336 stdout = io.BytesIO()
337 337 try:
338 338 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
339 339 except Exception as error:
340 340 status = 128
341 341 stdout.write('ERROR: %s\n' % error)
342 342
343 343 return HookResponse(status, stdout.getvalue())
344 344
345 345
346 346 def _parse_git_ref_lines(revision_lines):
347 347 rev_data = []
348 348 for revision_line in revision_lines or []:
349 349 old_rev, new_rev, ref = revision_line.strip().split(' ')
350 350 ref_data = ref.split('/', 2)
351 351 if ref_data[1] in ('tags', 'heads'):
352 352 rev_data.append({
353 353 'old_rev': old_rev,
354 354 'new_rev': new_rev,
355 355 'ref': ref,
356 356 'type': ref_data[1],
357 357 'name': ref_data[2],
358 358 })
359 359 return rev_data
360 360
361 361
362 362 def git_pre_receive(unused_repo_path, revision_lines, env):
363 363 """
364 364 Pre push hook.
365 365
366 366 :param extras: dictionary containing the keys defined in simplevcs
367 367 :type extras: dict
368 368
369 369 :return: status code of the hook. 0 for success.
370 370 :rtype: int
371 371 """
372 372 extras = json.loads(env['RC_SCM_DATA'])
373 373 rev_data = _parse_git_ref_lines(revision_lines)
374 374 if 'push' not in extras['hooks']:
375 375 return 0
376 376 extras['commit_ids'] = rev_data
377 377 return _call_hook('pre_push', extras, GitMessageWriter())
378 378
379 379
380 def _run_command(arguments):
381 """
382 Run the specified command and return the stdout.
383
384 :param arguments: sequence of program arguments (including the program name)
385 :type arguments: list[str]
386 """
387
388 cmd = arguments
389 try:
390 gitenv = os.environ.copy()
391 _opts = {'env': gitenv, 'shell': False, 'fail_on_stderr': False}
392 p = subprocessio.SubprocessIOChunker(cmd, **_opts)
393 stdout = ''.join(p)
394 except (EnvironmentError, OSError) as err:
395 cmd = ' '.join(cmd) # human friendly CMD
396 tb_err = ("Couldn't run git command (%s).\n"
397 "Original error was:%s\n" % (cmd, err))
398 log.exception(tb_err)
399 raise Exception(tb_err)
400
401 return stdout
402
403
404 380 def git_post_receive(unused_repo_path, revision_lines, env):
405 381 """
406 382 Post push hook.
407 383
408 384 :param extras: dictionary containing the keys defined in simplevcs
409 385 :type extras: dict
410 386
411 387 :return: status code of the hook. 0 for success.
412 388 :rtype: int
413 389 """
414 390 extras = json.loads(env['RC_SCM_DATA'])
415 391 if 'push' not in extras['hooks']:
416 392 return 0
417 393
418 394 rev_data = _parse_git_ref_lines(revision_lines)
419 395
420 396 git_revs = []
421 397
422 398 # N.B.(skreft): it is ok to just call git, as git before calling a
423 399 # subcommand sets the PATH environment variable so that it point to the
424 400 # correct version of the git executable.
425 401 empty_commit_id = '0' * 40
426 402 branches = []
427 403 tags = []
428 404 for push_ref in rev_data:
429 405 type_ = push_ref['type']
430 406
431 407 if type_ == 'heads':
432 408 if push_ref['old_rev'] == empty_commit_id:
433 409 # starting new branch case
434 410 if push_ref['name'] not in branches:
435 411 branches.append(push_ref['name'])
436 412
437 413 # Fix up head revision if needed
438 414 cmd = [settings.GIT_EXECUTABLE, 'show', 'HEAD']
439 415 try:
440 _run_command(cmd)
416 subprocessio.run_command(cmd, env=os.environ.copy())
441 417 except Exception:
442 418 cmd = [settings.GIT_EXECUTABLE, 'symbolic-ref', 'HEAD',
443 419 'refs/heads/%s' % push_ref['name']]
444 420 print("Setting default branch to %s" % push_ref['name'])
445 _run_command(cmd)
421 subprocessio.run_command(cmd, env=os.environ.copy())
446 422
447 cmd = [settings.GIT_EXECUTABLE, 'for-each-ref', '--format=%(refname)',
448 'refs/heads/*']
449 heads = _run_command(cmd)
423 cmd = [settings.GIT_EXECUTABLE, 'for-each-ref',
424 '--format=%(refname)', 'refs/heads/*']
425 stdout, stderr = subprocessio.run_command(
426 cmd, env=os.environ.copy())
427 heads = stdout
450 428 heads = heads.replace(push_ref['ref'], '')
451 429 heads = ' '.join(head for head in heads.splitlines() if head)
452 cmd = [settings.GIT_EXECUTABLE, 'log', '--reverse', '--pretty=format:%H',
453 '--', push_ref['new_rev'], '--not', heads]
454 git_revs.extend(_run_command(cmd).splitlines())
430 cmd = [settings.GIT_EXECUTABLE, 'log', '--reverse',
431 '--pretty=format:%H', '--', push_ref['new_rev'],
432 '--not', heads]
433 stdout, stderr = subprocessio.run_command(
434 cmd, env=os.environ.copy())
435 git_revs.extend(stdout.splitlines())
455 436 elif push_ref['new_rev'] == empty_commit_id:
456 437 # delete branch case
457 438 git_revs.append('delete_branch=>%s' % push_ref['name'])
458 439 else:
459 440 if push_ref['name'] not in branches:
460 441 branches.append(push_ref['name'])
461 442
462 443 cmd = [settings.GIT_EXECUTABLE, 'log',
463 444 '{old_rev}..{new_rev}'.format(**push_ref),
464 445 '--reverse', '--pretty=format:%H']
465 git_revs.extend(_run_command(cmd).splitlines())
446 stdout, stderr = subprocessio.run_command(
447 cmd, env=os.environ.copy())
448 git_revs.extend(stdout.splitlines())
466 449 elif type_ == 'tags':
467 450 if push_ref['name'] not in tags:
468 451 tags.append(push_ref['name'])
469 452 git_revs.append('tag=>%s' % push_ref['name'])
470 453
471 454 extras['commit_ids'] = git_revs
472 455 extras['new_refs'] = {
473 456 'branches': branches,
474 457 'bookmarks': [],
475 458 'tags': tags,
476 459 }
477 460
478 461 if 'repo_size' in extras['hooks']:
479 462 try:
480 463 _call_hook('repo_size', extras, GitMessageWriter())
481 464 except:
482 465 pass
483 466
484 467 return _call_hook('post_push', extras, GitMessageWriter())
@@ -1,484 +1,511 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import logging
27 27 import subprocess32 as subprocess
28 28 from collections import deque
29 29 from threading import Event, Thread
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class StreamFeeder(Thread):
35 35 """
36 36 Normal writing into pipe-like is blocking once the buffer is filled.
37 37 This thread allows a thread to seep data from a file-like into a pipe
38 38 without blocking the main thread.
39 39 We close inpipe once the end of the source stream is reached.
40 40 """
41 41
42 42 def __init__(self, source):
43 43 super(StreamFeeder, self).__init__()
44 44 self.daemon = True
45 45 filelike = False
46 46 self.bytes = bytes()
47 47 if type(source) in (type(''), bytes, bytearray): # string-like
48 48 self.bytes = bytes(source)
49 49 else: # can be either file pointer or file-like
50 50 if type(source) in (int, long): # file pointer it is
51 51 # converting file descriptor (int) stdin into file-like
52 52 try:
53 53 source = os.fdopen(source, 'rb', 16384)
54 54 except Exception:
55 55 pass
56 56 # let's see if source is file-like by now
57 57 try:
58 58 filelike = source.read
59 59 except Exception:
60 60 pass
61 61 if not filelike and not self.bytes:
62 62 raise TypeError("StreamFeeder's source object must be a readable "
63 63 "file-like, a file descriptor, or a string-like.")
64 64 self.source = source
65 65 self.readiface, self.writeiface = os.pipe()
66 66
67 67 def run(self):
68 68 t = self.writeiface
69 69 if self.bytes:
70 70 os.write(t, self.bytes)
71 71 else:
72 72 s = self.source
73 73 b = s.read(4096)
74 74 while b:
75 75 os.write(t, b)
76 76 b = s.read(4096)
77 77 os.close(t)
78 78
79 79 @property
80 80 def output(self):
81 81 return self.readiface
82 82
83 83
84 84 class InputStreamChunker(Thread):
85 85 def __init__(self, source, target, buffer_size, chunk_size):
86 86
87 87 super(InputStreamChunker, self).__init__()
88 88
89 89 self.daemon = True # die die die.
90 90
91 91 self.source = source
92 92 self.target = target
93 93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 94 self.chunk_size = chunk_size
95 95
96 96 self.data_added = Event()
97 97 self.data_added.clear()
98 98
99 99 self.keep_reading = Event()
100 100 self.keep_reading.set()
101 101
102 102 self.EOF = Event()
103 103 self.EOF.clear()
104 104
105 105 self.go = Event()
106 106 self.go.set()
107 107
108 108 def stop(self):
109 109 self.go.clear()
110 110 self.EOF.set()
111 111 try:
112 112 # this is not proper, but is done to force the reader thread let
113 113 # go of the input because, if successful, .close() will send EOF
114 114 # down the pipe.
115 115 self.source.close()
116 116 except:
117 117 pass
118 118
119 119 def run(self):
120 120 s = self.source
121 121 t = self.target
122 122 cs = self.chunk_size
123 123 chunk_count_max = self.chunk_count_max
124 124 keep_reading = self.keep_reading
125 125 da = self.data_added
126 126 go = self.go
127 127
128 128 try:
129 129 b = s.read(cs)
130 130 except ValueError:
131 131 b = ''
132 132
133 133 timeout_input = 20
134 134 while b and go.is_set():
135 135 if len(t) > chunk_count_max:
136 136 keep_reading.clear()
137 137 keep_reading.wait(timeout_input)
138 138 if len(t) > chunk_count_max + timeout_input:
139 139 log.error("Timed out while waiting for input from subprocess.")
140 140 os._exit(-1) # this will cause the worker to recycle itself
141 141
142 142 t.append(b)
143 143 da.set()
144 144 b = s.read(cs)
145 145 self.EOF.set()
146 146 da.set() # for cases when done but there was no input.
147 147
148 148
149 149 class BufferedGenerator(object):
150 150 """
151 151 Class behaves as a non-blocking, buffered pipe reader.
152 152 Reads chunks of data (through a thread)
153 153 from a blocking pipe, and attaches these to an array (Deque) of chunks.
154 154 Reading is halted in the thread when max chunks is internally buffered.
155 155 The .next() may operate in blocking or non-blocking fashion by yielding
156 156 '' if no data is ready
157 157 to be sent or by not returning until there is some data to send
158 158 When we get EOF from underlying source pipe we raise the marker to raise
159 159 StopIteration after the last chunk of data is yielded.
160 160 """
161 161
162 162 def __init__(self, source, buffer_size=65536, chunk_size=4096,
163 163 starting_values=None, bottomless=False):
164 164 starting_values = starting_values or []
165 165
166 166 if bottomless:
167 167 maxlen = int(buffer_size / chunk_size)
168 168 else:
169 169 maxlen = None
170 170
171 171 self.data = deque(starting_values, maxlen)
172 172 self.worker = InputStreamChunker(source, self.data, buffer_size,
173 173 chunk_size)
174 174 if starting_values:
175 175 self.worker.data_added.set()
176 176 self.worker.start()
177 177
178 178 ####################
179 179 # Generator's methods
180 180 ####################
181 181
182 182 def __iter__(self):
183 183 return self
184 184
185 185 def next(self):
186 186 while not len(self.data) and not self.worker.EOF.is_set():
187 187 self.worker.data_added.clear()
188 188 self.worker.data_added.wait(0.2)
189 189 if len(self.data):
190 190 self.worker.keep_reading.set()
191 191 return bytes(self.data.popleft())
192 192 elif self.worker.EOF.is_set():
193 193 raise StopIteration
194 194
195 195 def throw(self, exc_type, value=None, traceback=None):
196 196 if not self.worker.EOF.is_set():
197 197 raise exc_type(value)
198 198
199 199 def start(self):
200 200 self.worker.start()
201 201
202 202 def stop(self):
203 203 self.worker.stop()
204 204
205 205 def close(self):
206 206 try:
207 207 self.worker.stop()
208 208 self.throw(GeneratorExit)
209 209 except (GeneratorExit, StopIteration):
210 210 pass
211 211
212 212 def __del__(self):
213 213 self.close()
214 214
215 215 ####################
216 216 # Threaded reader's infrastructure.
217 217 ####################
218 218 @property
219 219 def input(self):
220 220 return self.worker.w
221 221
222 222 @property
223 223 def data_added_event(self):
224 224 return self.worker.data_added
225 225
226 226 @property
227 227 def data_added(self):
228 228 return self.worker.data_added.is_set()
229 229
230 230 @property
231 231 def reading_paused(self):
232 232 return not self.worker.keep_reading.is_set()
233 233
234 234 @property
235 235 def done_reading_event(self):
236 236 """
237 237 Done_reding does not mean that the iterator's buffer is empty.
238 238 Iterator might have done reading from underlying source, but the read
239 239 chunks might still be available for serving through .next() method.
240 240
241 241 :returns: An Event class instance.
242 242 """
243 243 return self.worker.EOF
244 244
245 245 @property
246 246 def done_reading(self):
247 247 """
248 248 Done_reding does not mean that the iterator's buffer is empty.
249 249 Iterator might have done reading from underlying source, but the read
250 250 chunks might still be available for serving through .next() method.
251 251
252 252 :returns: An Bool value.
253 253 """
254 254 return self.worker.EOF.is_set()
255 255
256 256 @property
257 257 def length(self):
258 258 """
259 259 returns int.
260 260
261 261 This is the lenght of the que of chunks, not the length of
262 262 the combined contents in those chunks.
263 263
264 264 __len__() cannot be meaningfully implemented because this
265 265 reader is just flying throuh a bottomless pit content and
266 266 can only know the lenght of what it already saw.
267 267
268 268 If __len__() on WSGI server per PEP 3333 returns a value,
269 269 the responce's length will be set to that. In order not to
270 270 confuse WSGI PEP3333 servers, we will not implement __len__
271 271 at all.
272 272 """
273 273 return len(self.data)
274 274
275 275 def prepend(self, x):
276 276 self.data.appendleft(x)
277 277
278 278 def append(self, x):
279 279 self.data.append(x)
280 280
281 281 def extend(self, o):
282 282 self.data.extend(o)
283 283
284 284 def __getitem__(self, i):
285 285 return self.data[i]
286 286
287 287
288 288 class SubprocessIOChunker(object):
289 289 """
290 290 Processor class wrapping handling of subprocess IO.
291 291
292 292 .. important::
293 293
294 294 Watch out for the method `__del__` on this class. If this object
295 295 is deleted, it will kill the subprocess, so avoid to
296 296 return the `output` attribute or usage of it like in the following
297 297 example::
298 298
299 299 # `args` expected to run a program that produces a lot of output
300 300 output = ''.join(SubprocessIOChunker(
301 301 args, shell=False, inputstream=inputstream, env=environ).output)
302 302
303 303 # `output` will not contain all the data, because the __del__ method
304 304 # has already killed the subprocess in this case before all output
305 305 # has been consumed.
306 306
307 307
308 308
309 309 In a way, this is a "communicate()" replacement with a twist.
310 310
311 311 - We are multithreaded. Writing in and reading out, err are all sep threads.
312 312 - We support concurrent (in and out) stream processing.
313 313 - The output is not a stream. It's a queue of read string (bytes, not unicode)
314 314 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
315 315 - We are non-blocking in more respects than communicate()
316 316 (reading from subprocess out pauses when internal buffer is full, but
317 317 does not block the parent calling code. On the flip side, reading from
318 318 slow-yielding subprocess may block the iteration until data shows up. This
319 319 does not block the parallel inpipe reading occurring parallel thread.)
320 320
321 321 The purpose of the object is to allow us to wrap subprocess interactions into
322 322 and interable that can be passed to a WSGI server as the application's return
323 323 value. Because of stream-processing-ability, WSGI does not have to read ALL
324 324 of the subprocess's output and buffer it, before handing it to WSGI server for
325 325 HTTP response. Instead, the class initializer reads just a bit of the stream
326 326 to figure out if error ocurred or likely to occur and if not, just hands the
327 327 further iteration over subprocess output to the server for completion of HTTP
328 328 response.
329 329
330 330 The real or perceived subprocess error is trapped and raised as one of
331 331 EnvironmentError family of exceptions
332 332
333 333 Example usage:
334 334 # try:
335 335 # answer = SubprocessIOChunker(
336 336 # cmd,
337 337 # input,
338 338 # buffer_size = 65536,
339 339 # chunk_size = 4096
340 340 # )
341 341 # except (EnvironmentError) as e:
342 342 # print str(e)
343 343 # raise e
344 344 #
345 345 # return answer
346 346
347 347
348 348 """
349 349
350 350 # TODO: johbo: This is used to make sure that the open end of the PIPE
351 351 # is closed in the end. It would be way better to wrap this into an
352 352 # object, so that it is closed automatically once it is consumed or
353 353 # something similar.
354 354 _close_input_fd = None
355 355
356 356 _closed = False
357 357
358 358 def __init__(self, cmd, inputstream=None, buffer_size=65536,
359 359 chunk_size=4096, starting_values=None, fail_on_stderr=True,
360 360 fail_on_return_code=True, **kwargs):
361 361 """
362 362 Initializes SubprocessIOChunker
363 363
364 364 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
365 365 :param inputstream: (Default: None) A file-like, string, or file pointer.
366 366 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
367 367 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
368 368 :param starting_values: (Default: []) An array of strings to put in front of output que.
369 369 :param fail_on_stderr: (Default: True) Whether to raise an exception in
370 370 case something is written to stderr.
371 371 :param fail_on_return_code: (Default: True) Whether to raise an
372 372 exception if the return code is not 0.
373 373 """
374 374
375 375 starting_values = starting_values or []
376 376 if inputstream:
377 377 input_streamer = StreamFeeder(inputstream)
378 378 input_streamer.start()
379 379 inputstream = input_streamer.output
380 380 self._close_input_fd = inputstream
381 381
382 382 self._fail_on_stderr = fail_on_stderr
383 383 self._fail_on_return_code = fail_on_return_code
384 384
385 385 _shell = kwargs.get('shell', True)
386 386 kwargs['shell'] = _shell
387 387
388 388 _p = subprocess.Popen(cmd, bufsize=-1,
389 389 stdin=inputstream,
390 390 stdout=subprocess.PIPE,
391 391 stderr=subprocess.PIPE,
392 392 **kwargs)
393 393
394 394 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
395 395 starting_values)
396 396 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
397 397
398 398 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
399 399 # doing this until we reach either end of file, or end of buffer.
400 400 bg_out.data_added_event.wait(1)
401 401 bg_out.data_added_event.clear()
402 402
403 403 # at this point it's still ambiguous if we are done reading or just full buffer.
404 404 # Either way, if error (returned by ended process, or implied based on
405 405 # presence of stuff in stderr output) we error out.
406 406 # Else, we are happy.
407 407 _returncode = _p.poll()
408 408
409 409 if ((_returncode and fail_on_return_code) or
410 410 (fail_on_stderr and _returncode is None and bg_err.length)):
411 411 try:
412 412 _p.terminate()
413 413 except Exception:
414 414 pass
415 415 bg_out.stop()
416 416 bg_err.stop()
417 417 if fail_on_stderr:
418 418 err = ''.join(bg_err)
419 419 raise EnvironmentError(
420 420 "Subprocess exited due to an error:\n" + err)
421 421 if _returncode and fail_on_return_code:
422 422 err = ''.join(bg_err)
423 423 if not err:
424 424 # maybe get empty stderr, try stdout instead
425 425 # in many cases git reports the errors on stdout too
426 426 err = ''.join(bg_out)
427 427 raise EnvironmentError(
428 428 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
429 429 _returncode, err))
430 430
431 431 self.process = _p
432 432 self.output = bg_out
433 433 self.error = bg_err
434 434
435 435 def __iter__(self):
436 436 return self
437 437
438 438 def next(self):
439 439 # Note: mikhail: We need to be sure that we are checking the return
440 440 # code after the stdout stream is closed. Some processes, e.g. git
441 441 # are doing some magic in between closing stdout and terminating the
442 442 # process and, as a result, we are not getting return code on "slow"
443 443 # systems.
444 444 result = None
445 445 stop_iteration = None
446 446 try:
447 447 result = self.output.next()
448 448 except StopIteration as e:
449 449 stop_iteration = e
450 450
451 451 if self.process.poll() and self._fail_on_return_code:
452 452 err = '%s' % ''.join(self.error)
453 453 raise EnvironmentError(
454 454 "Subprocess exited due to an error:\n" + err)
455 455
456 456 if stop_iteration:
457 457 raise stop_iteration
458 458 return result
459 459
460 460 def throw(self, type, value=None, traceback=None):
461 461 if self.output.length or not self.output.done_reading:
462 462 raise type(value)
463 463
464 464 def close(self):
465 465 if self._closed:
466 466 return
467 467 self._closed = True
468 468 try:
469 469 self.process.terminate()
470 470 except:
471 471 pass
472 472 if self._close_input_fd:
473 473 os.close(self._close_input_fd)
474 474 try:
475 475 self.output.close()
476 476 except:
477 477 pass
478 478 try:
479 479 self.error.close()
480 480 except:
481 481 pass
482 482
483 483 def __del__(self):
484 484 self.close()
485
486
487 def run_command(arguments, env=None):
488 """
489 Run the specified command and return the stdout.
490
491 :param arguments: sequence of program arguments (including the program name)
492 :type arguments: list[str]
493 """
494
495 cmd = arguments
496 log.debug('Running subprocessio command %s', cmd)
497 try:
498 _opts = {'shell': False, 'fail_on_stderr': False}
499 if env:
500 _opts.update({'env': env})
501 p = SubprocessIOChunker(cmd, **_opts)
502 stdout = ''.join(p)
503 stderr = ''.join(''.join(p.error))
504 except (EnvironmentError, OSError) as err:
505 cmd = ' '.join(cmd) # human friendly CMD
506 tb_err = ("Couldn't run subprocessio command (%s).\n"
507 "Original error was:%s\n" % (cmd, err))
508 log.exception(tb_err)
509 raise Exception(tb_err)
510
511 return stdout, stderr
@@ -1,679 +1,677 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2018 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 from __future__ import absolute_import
19 19
20 20 import os
21 21 from urllib2 import URLError
22 22 import logging
23 23 import posixpath as vcspath
24 24 import StringIO
25 import subprocess
26 25 import urllib
27 26 import traceback
28 27
29 28 import svn.client
30 29 import svn.core
31 30 import svn.delta
32 31 import svn.diff
33 32 import svn.fs
34 33 import svn.repos
35 34
36 from vcsserver import svn_diff
37 from vcsserver import exceptions
35 from vcsserver import svn_diff, exceptions, subprocessio
38 36 from vcsserver.base import RepoFactory, raise_from_original
39 37
40
41 38 log = logging.getLogger(__name__)
42 39
43 40
44 41 # Set of svn compatible version flags.
45 42 # Compare with subversion/svnadmin/svnadmin.c
46 43 svn_compatible_versions = set([
47 44 'pre-1.4-compatible',
48 45 'pre-1.5-compatible',
49 46 'pre-1.6-compatible',
50 47 'pre-1.8-compatible',
51 48 'pre-1.9-compatible',
52 49 ])
53 50
54 51 svn_compatible_versions_map = {
55 52 'pre-1.4-compatible': '1.3',
56 53 'pre-1.5-compatible': '1.4',
57 54 'pre-1.6-compatible': '1.5',
58 55 'pre-1.8-compatible': '1.7',
59 56 'pre-1.9-compatible': '1.8',
60 57 }
61 58
62 59
63 60 def reraise_safe_exceptions(func):
64 61 """Decorator for converting svn exceptions to something neutral."""
65 62 def wrapper(*args, **kwargs):
66 63 try:
67 64 return func(*args, **kwargs)
68 65 except Exception as e:
69 66 if not hasattr(e, '_vcs_kind'):
70 67 log.exception("Unhandled exception in hg remote call")
71 68 raise_from_original(exceptions.UnhandledException)
72 69 raise
73 70 return wrapper
74 71
75 72
76 73 class SubversionFactory(RepoFactory):
77 74
78 75 def _create_repo(self, wire, create, compatible_version):
79 76 path = svn.core.svn_path_canonicalize(wire['path'])
80 77 if create:
81 78 fs_config = {'compatible-version': '1.9'}
82 79 if compatible_version:
83 80 if compatible_version not in svn_compatible_versions:
84 81 raise Exception('Unknown SVN compatible version "{}"'
85 82 .format(compatible_version))
86 83 fs_config['compatible-version'] = \
87 84 svn_compatible_versions_map[compatible_version]
88 85
89 86 log.debug('Create SVN repo with config "%s"', fs_config)
90 87 repo = svn.repos.create(path, "", "", None, fs_config)
91 88 else:
92 89 repo = svn.repos.open(path)
93 90
94 91 log.debug('Got SVN object: %s', repo)
95 92 return repo
96 93
97 94 def repo(self, wire, create=False, compatible_version=None):
98 95 def create_new_repo():
99 96 return self._create_repo(wire, create, compatible_version)
100 97
101 98 return self._repo(wire, create_new_repo)
102 99
103 100
104 101 NODE_TYPE_MAPPING = {
105 102 svn.core.svn_node_file: 'file',
106 103 svn.core.svn_node_dir: 'dir',
107 104 }
108 105
109 106
110 107 class SvnRemote(object):
111 108
112 109 def __init__(self, factory, hg_factory=None):
113 110 self._factory = factory
114 111 # TODO: Remove once we do not use internal Mercurial objects anymore
115 112 # for subversion
116 113 self._hg_factory = hg_factory
117 114
118 115 @reraise_safe_exceptions
119 116 def discover_svn_version(self):
120 117 try:
121 118 import svn.core
122 119 svn_ver = svn.core.SVN_VERSION
123 120 except ImportError:
124 121 svn_ver = None
125 122 return svn_ver
126 123
127 124 def check_url(self, url, config_items):
128 125 # this can throw exception if not installed, but we detect this
129 126 from hgsubversion import svnrepo
130 127
131 128 baseui = self._hg_factory._create_config(config_items)
132 129 # uuid function get's only valid UUID from proper repo, else
133 130 # throws exception
134 131 try:
135 132 svnrepo.svnremoterepo(baseui, url).svn.uuid
136 133 except Exception:
137 134 tb = traceback.format_exc()
138 135 log.debug("Invalid Subversion url: `%s`, tb: %s", url, tb)
139 136 raise URLError(
140 137 '"%s" is not a valid Subversion source url.' % (url, ))
141 138 return True
142 139
143 140 def is_path_valid_repository(self, wire, path):
144 141
145 142 # NOTE(marcink): short circuit the check for SVN repo
146 143 # the repos.open might be expensive to check, but we have one cheap
147 144 # pre condition that we can use, to check for 'format' file
148 145
149 146 if not os.path.isfile(os.path.join(path, 'format')):
150 147 return False
151 148
152 149 try:
153 150 svn.repos.open(path)
154 151 except svn.core.SubversionException:
155 152 tb = traceback.format_exc()
156 153 log.debug("Invalid Subversion path `%s`, tb: %s", path, tb)
157 154 return False
158 155 return True
159 156
160 157 @reraise_safe_exceptions
161 158 def verify(self, wire,):
162 159 repo_path = wire['path']
163 160 if not self.is_path_valid_repository(wire, repo_path):
164 161 raise Exception(
165 162 "Path %s is not a valid Subversion repository." % repo_path)
166 163
167 load = subprocess.Popen(
168 ['svnadmin', 'info', repo_path],
169 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
170 return ''.join(load.stdout)
164 cmd = ['svnadmin', 'info', repo_path]
165 stdout, stderr = subprocessio.run_command(cmd)
166 return stdout
171 167
172 168 def lookup(self, wire, revision):
173 169 if revision not in [-1, None, 'HEAD']:
174 170 raise NotImplementedError
175 171 repo = self._factory.repo(wire)
176 172 fs_ptr = svn.repos.fs(repo)
177 173 head = svn.fs.youngest_rev(fs_ptr)
178 174 return head
179 175
180 176 def lookup_interval(self, wire, start_ts, end_ts):
181 177 repo = self._factory.repo(wire)
182 178 fsobj = svn.repos.fs(repo)
183 179 start_rev = None
184 180 end_rev = None
185 181 if start_ts:
186 182 start_ts_svn = apr_time_t(start_ts)
187 183 start_rev = svn.repos.dated_revision(repo, start_ts_svn) + 1
188 184 else:
189 185 start_rev = 1
190 186 if end_ts:
191 187 end_ts_svn = apr_time_t(end_ts)
192 188 end_rev = svn.repos.dated_revision(repo, end_ts_svn)
193 189 else:
194 190 end_rev = svn.fs.youngest_rev(fsobj)
195 191 return start_rev, end_rev
196 192
197 193 def revision_properties(self, wire, revision):
198 194 repo = self._factory.repo(wire)
199 195 fs_ptr = svn.repos.fs(repo)
200 196 return svn.fs.revision_proplist(fs_ptr, revision)
201 197
202 198 def revision_changes(self, wire, revision):
203 199
204 200 repo = self._factory.repo(wire)
205 201 fsobj = svn.repos.fs(repo)
206 202 rev_root = svn.fs.revision_root(fsobj, revision)
207 203
208 204 editor = svn.repos.ChangeCollector(fsobj, rev_root)
209 205 editor_ptr, editor_baton = svn.delta.make_editor(editor)
210 206 base_dir = ""
211 207 send_deltas = False
212 208 svn.repos.replay2(
213 209 rev_root, base_dir, svn.core.SVN_INVALID_REVNUM, send_deltas,
214 210 editor_ptr, editor_baton, None)
215 211
216 212 added = []
217 213 changed = []
218 214 removed = []
219 215
220 216 # TODO: CHANGE_ACTION_REPLACE: Figure out where it belongs
221 217 for path, change in editor.changes.iteritems():
222 218 # TODO: Decide what to do with directory nodes. Subversion can add
223 219 # empty directories.
224 220
225 221 if change.item_kind == svn.core.svn_node_dir:
226 222 continue
227 223 if change.action in [svn.repos.CHANGE_ACTION_ADD]:
228 224 added.append(path)
229 225 elif change.action in [svn.repos.CHANGE_ACTION_MODIFY,
230 226 svn.repos.CHANGE_ACTION_REPLACE]:
231 227 changed.append(path)
232 228 elif change.action in [svn.repos.CHANGE_ACTION_DELETE]:
233 229 removed.append(path)
234 230 else:
235 231 raise NotImplementedError(
236 232 "Action %s not supported on path %s" % (
237 233 change.action, path))
238 234
239 235 changes = {
240 236 'added': added,
241 237 'changed': changed,
242 238 'removed': removed,
243 239 }
244 240 return changes
245 241
246 242 def node_history(self, wire, path, revision, limit):
247 243 cross_copies = False
248 244 repo = self._factory.repo(wire)
249 245 fsobj = svn.repos.fs(repo)
250 246 rev_root = svn.fs.revision_root(fsobj, revision)
251 247
252 248 history_revisions = []
253 249 history = svn.fs.node_history(rev_root, path)
254 250 history = svn.fs.history_prev(history, cross_copies)
255 251 while history:
256 252 __, node_revision = svn.fs.history_location(history)
257 253 history_revisions.append(node_revision)
258 254 if limit and len(history_revisions) >= limit:
259 255 break
260 256 history = svn.fs.history_prev(history, cross_copies)
261 257 return history_revisions
262 258
263 259 def node_properties(self, wire, path, revision):
264 260 repo = self._factory.repo(wire)
265 261 fsobj = svn.repos.fs(repo)
266 262 rev_root = svn.fs.revision_root(fsobj, revision)
267 263 return svn.fs.node_proplist(rev_root, path)
268 264
269 265 def file_annotate(self, wire, path, revision):
270 266 abs_path = 'file://' + urllib.pathname2url(
271 267 vcspath.join(wire['path'], path))
272 268 file_uri = svn.core.svn_path_canonicalize(abs_path)
273 269
274 270 start_rev = svn_opt_revision_value_t(0)
275 271 peg_rev = svn_opt_revision_value_t(revision)
276 272 end_rev = peg_rev
277 273
278 274 annotations = []
279 275
280 276 def receiver(line_no, revision, author, date, line, pool):
281 277 annotations.append((line_no, revision, line))
282 278
283 279 # TODO: Cannot use blame5, missing typemap function in the swig code
284 280 try:
285 281 svn.client.blame2(
286 282 file_uri, peg_rev, start_rev, end_rev,
287 283 receiver, svn.client.create_context())
288 284 except svn.core.SubversionException as exc:
289 285 log.exception("Error during blame operation.")
290 286 raise Exception(
291 287 "Blame not supported or file does not exist at path %s. "
292 288 "Error %s." % (path, exc))
293 289
294 290 return annotations
295 291
296 292 def get_node_type(self, wire, path, rev=None):
297 293 repo = self._factory.repo(wire)
298 294 fs_ptr = svn.repos.fs(repo)
299 295 if rev is None:
300 296 rev = svn.fs.youngest_rev(fs_ptr)
301 297 root = svn.fs.revision_root(fs_ptr, rev)
302 298 node = svn.fs.check_path(root, path)
303 299 return NODE_TYPE_MAPPING.get(node, None)
304 300
305 301 def get_nodes(self, wire, path, revision=None):
306 302 repo = self._factory.repo(wire)
307 303 fsobj = svn.repos.fs(repo)
308 304 if revision is None:
309 305 revision = svn.fs.youngest_rev(fsobj)
310 306 root = svn.fs.revision_root(fsobj, revision)
311 307 entries = svn.fs.dir_entries(root, path)
312 308 result = []
313 309 for entry_path, entry_info in entries.iteritems():
314 310 result.append(
315 311 (entry_path, NODE_TYPE_MAPPING.get(entry_info.kind, None)))
316 312 return result
317 313
318 314 def get_file_content(self, wire, path, rev=None):
319 315 repo = self._factory.repo(wire)
320 316 fsobj = svn.repos.fs(repo)
321 317 if rev is None:
322 318 rev = svn.fs.youngest_revision(fsobj)
323 319 root = svn.fs.revision_root(fsobj, rev)
324 320 content = svn.core.Stream(svn.fs.file_contents(root, path))
325 321 return content.read()
326 322
327 323 def get_file_size(self, wire, path, revision=None):
328 324 repo = self._factory.repo(wire)
329 325 fsobj = svn.repos.fs(repo)
330 326 if revision is None:
331 327 revision = svn.fs.youngest_revision(fsobj)
332 328 root = svn.fs.revision_root(fsobj, revision)
333 329 size = svn.fs.file_length(root, path)
334 330 return size
335 331
336 332 def create_repository(self, wire, compatible_version=None):
337 333 log.info('Creating Subversion repository in path "%s"', wire['path'])
338 334 self._factory.repo(wire, create=True,
339 335 compatible_version=compatible_version)
340 336
341 337 def import_remote_repository(self, wire, src_url):
342 338 repo_path = wire['path']
343 339 if not self.is_path_valid_repository(wire, repo_path):
344 340 raise Exception(
345 341 "Path %s is not a valid Subversion repository." % repo_path)
342
346 343 # TODO: johbo: URL checks ?
344 import subprocess
347 345 rdump = subprocess.Popen(
348 346 ['svnrdump', 'dump', '--non-interactive', src_url],
349 347 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
350 348 load = subprocess.Popen(
351 349 ['svnadmin', 'load', repo_path], stdin=rdump.stdout)
352 350
353 351 # TODO: johbo: This can be a very long operation, might be better
354 352 # to track some kind of status and provide an api to check if the
355 353 # import is done.
356 354 rdump.wait()
357 355 load.wait()
358 356
359 357 if rdump.returncode != 0:
360 358 errors = rdump.stderr.read()
361 359 log.error('svnrdump dump failed: statuscode %s: message: %s',
362 360 rdump.returncode, errors)
363 361 reason = 'UNKNOWN'
364 362 if 'svnrdump: E230001:' in errors:
365 363 reason = 'INVALID_CERTIFICATE'
366 364 raise Exception(
367 365 'Failed to dump the remote repository from %s.' % src_url,
368 366 reason)
369 367 if load.returncode != 0:
370 368 raise Exception(
371 369 'Failed to load the dump of remote repository from %s.' %
372 370 (src_url, ))
373 371
374 372 def commit(self, wire, message, author, timestamp, updated, removed):
375 373 assert isinstance(message, str)
376 374 assert isinstance(author, str)
377 375
378 376 repo = self._factory.repo(wire)
379 377 fsobj = svn.repos.fs(repo)
380 378
381 379 rev = svn.fs.youngest_rev(fsobj)
382 380 txn = svn.repos.fs_begin_txn_for_commit(repo, rev, author, message)
383 381 txn_root = svn.fs.txn_root(txn)
384 382
385 383 for node in updated:
386 384 TxnNodeProcessor(node, txn_root).update()
387 385 for node in removed:
388 386 TxnNodeProcessor(node, txn_root).remove()
389 387
390 388 commit_id = svn.repos.fs_commit_txn(repo, txn)
391 389
392 390 if timestamp:
393 391 apr_time = apr_time_t(timestamp)
394 392 ts_formatted = svn.core.svn_time_to_cstring(apr_time)
395 393 svn.fs.change_rev_prop(fsobj, commit_id, 'svn:date', ts_formatted)
396 394
397 395 log.debug('Committed revision "%s" to "%s".', commit_id, wire['path'])
398 396 return commit_id
399 397
400 398 def diff(self, wire, rev1, rev2, path1=None, path2=None,
401 399 ignore_whitespace=False, context=3):
402 400
403 401 wire.update(cache=False)
404 402 repo = self._factory.repo(wire)
405 403 diff_creator = SvnDiffer(
406 404 repo, rev1, path1, rev2, path2, ignore_whitespace, context)
407 405 try:
408 406 return diff_creator.generate_diff()
409 407 except svn.core.SubversionException as e:
410 408 log.exception(
411 409 "Error during diff operation operation. "
412 410 "Path might not exist %s, %s" % (path1, path2))
413 411 return ""
414 412
415 413 @reraise_safe_exceptions
416 414 def is_large_file(self, wire, path):
417 415 return False
418 416
419 417
420 418 class SvnDiffer(object):
421 419 """
422 420 Utility to create diffs based on difflib and the Subversion api
423 421 """
424 422
425 423 binary_content = False
426 424
427 425 def __init__(
428 426 self, repo, src_rev, src_path, tgt_rev, tgt_path,
429 427 ignore_whitespace, context):
430 428 self.repo = repo
431 429 self.ignore_whitespace = ignore_whitespace
432 430 self.context = context
433 431
434 432 fsobj = svn.repos.fs(repo)
435 433
436 434 self.tgt_rev = tgt_rev
437 435 self.tgt_path = tgt_path or ''
438 436 self.tgt_root = svn.fs.revision_root(fsobj, tgt_rev)
439 437 self.tgt_kind = svn.fs.check_path(self.tgt_root, self.tgt_path)
440 438
441 439 self.src_rev = src_rev
442 440 self.src_path = src_path or self.tgt_path
443 441 self.src_root = svn.fs.revision_root(fsobj, src_rev)
444 442 self.src_kind = svn.fs.check_path(self.src_root, self.src_path)
445 443
446 444 self._validate()
447 445
448 446 def _validate(self):
449 447 if (self.tgt_kind != svn.core.svn_node_none and
450 448 self.src_kind != svn.core.svn_node_none and
451 449 self.src_kind != self.tgt_kind):
452 450 # TODO: johbo: proper error handling
453 451 raise Exception(
454 452 "Source and target are not compatible for diff generation. "
455 453 "Source type: %s, target type: %s" %
456 454 (self.src_kind, self.tgt_kind))
457 455
458 456 def generate_diff(self):
459 457 buf = StringIO.StringIO()
460 458 if self.tgt_kind == svn.core.svn_node_dir:
461 459 self._generate_dir_diff(buf)
462 460 else:
463 461 self._generate_file_diff(buf)
464 462 return buf.getvalue()
465 463
466 464 def _generate_dir_diff(self, buf):
467 465 editor = DiffChangeEditor()
468 466 editor_ptr, editor_baton = svn.delta.make_editor(editor)
469 467 svn.repos.dir_delta2(
470 468 self.src_root,
471 469 self.src_path,
472 470 '', # src_entry
473 471 self.tgt_root,
474 472 self.tgt_path,
475 473 editor_ptr, editor_baton,
476 474 authorization_callback_allow_all,
477 475 False, # text_deltas
478 476 svn.core.svn_depth_infinity, # depth
479 477 False, # entry_props
480 478 False, # ignore_ancestry
481 479 )
482 480
483 481 for path, __, change in sorted(editor.changes):
484 482 self._generate_node_diff(
485 483 buf, change, path, self.tgt_path, path, self.src_path)
486 484
487 485 def _generate_file_diff(self, buf):
488 486 change = None
489 487 if self.src_kind == svn.core.svn_node_none:
490 488 change = "add"
491 489 elif self.tgt_kind == svn.core.svn_node_none:
492 490 change = "delete"
493 491 tgt_base, tgt_path = vcspath.split(self.tgt_path)
494 492 src_base, src_path = vcspath.split(self.src_path)
495 493 self._generate_node_diff(
496 494 buf, change, tgt_path, tgt_base, src_path, src_base)
497 495
498 496 def _generate_node_diff(
499 497 self, buf, change, tgt_path, tgt_base, src_path, src_base):
500 498
501 499 if self.src_rev == self.tgt_rev and tgt_base == src_base:
502 500 # makes consistent behaviour with git/hg to return empty diff if
503 501 # we compare same revisions
504 502 return
505 503
506 504 tgt_full_path = vcspath.join(tgt_base, tgt_path)
507 505 src_full_path = vcspath.join(src_base, src_path)
508 506
509 507 self.binary_content = False
510 508 mime_type = self._get_mime_type(tgt_full_path)
511 509
512 510 if mime_type and not mime_type.startswith('text'):
513 511 self.binary_content = True
514 512 buf.write("=" * 67 + '\n')
515 513 buf.write("Cannot display: file marked as a binary type.\n")
516 514 buf.write("svn:mime-type = %s\n" % mime_type)
517 515 buf.write("Index: %s\n" % (tgt_path, ))
518 516 buf.write("=" * 67 + '\n')
519 517 buf.write("diff --git a/%(tgt_path)s b/%(tgt_path)s\n" % {
520 518 'tgt_path': tgt_path})
521 519
522 520 if change == 'add':
523 521 # TODO: johbo: SVN is missing a zero here compared to git
524 522 buf.write("new file mode 10644\n")
525 523
526 524 #TODO(marcink): intro to binary detection of svn patches
527 525 # if self.binary_content:
528 526 # buf.write('GIT binary patch\n')
529 527
530 528 buf.write("--- /dev/null\t(revision 0)\n")
531 529 src_lines = []
532 530 else:
533 531 if change == 'delete':
534 532 buf.write("deleted file mode 10644\n")
535 533
536 534 #TODO(marcink): intro to binary detection of svn patches
537 535 # if self.binary_content:
538 536 # buf.write('GIT binary patch\n')
539 537
540 538 buf.write("--- a/%s\t(revision %s)\n" % (
541 539 src_path, self.src_rev))
542 540 src_lines = self._svn_readlines(self.src_root, src_full_path)
543 541
544 542 if change == 'delete':
545 543 buf.write("+++ /dev/null\t(revision %s)\n" % (self.tgt_rev, ))
546 544 tgt_lines = []
547 545 else:
548 546 buf.write("+++ b/%s\t(revision %s)\n" % (
549 547 tgt_path, self.tgt_rev))
550 548 tgt_lines = self._svn_readlines(self.tgt_root, tgt_full_path)
551 549
552 550 if not self.binary_content:
553 551 udiff = svn_diff.unified_diff(
554 552 src_lines, tgt_lines, context=self.context,
555 553 ignore_blank_lines=self.ignore_whitespace,
556 554 ignore_case=False,
557 555 ignore_space_changes=self.ignore_whitespace)
558 556 buf.writelines(udiff)
559 557
560 558 def _get_mime_type(self, path):
561 559 try:
562 560 mime_type = svn.fs.node_prop(
563 561 self.tgt_root, path, svn.core.SVN_PROP_MIME_TYPE)
564 562 except svn.core.SubversionException:
565 563 mime_type = svn.fs.node_prop(
566 564 self.src_root, path, svn.core.SVN_PROP_MIME_TYPE)
567 565 return mime_type
568 566
569 567 def _svn_readlines(self, fs_root, node_path):
570 568 if self.binary_content:
571 569 return []
572 570 node_kind = svn.fs.check_path(fs_root, node_path)
573 571 if node_kind not in (
574 572 svn.core.svn_node_file, svn.core.svn_node_symlink):
575 573 return []
576 574 content = svn.core.Stream(
577 575 svn.fs.file_contents(fs_root, node_path)).read()
578 576 return content.splitlines(True)
579 577
580 578
581 579 class DiffChangeEditor(svn.delta.Editor):
582 580 """
583 581 Records changes between two given revisions
584 582 """
585 583
586 584 def __init__(self):
587 585 self.changes = []
588 586
589 587 def delete_entry(self, path, revision, parent_baton, pool=None):
590 588 self.changes.append((path, None, 'delete'))
591 589
592 590 def add_file(
593 591 self, path, parent_baton, copyfrom_path, copyfrom_revision,
594 592 file_pool=None):
595 593 self.changes.append((path, 'file', 'add'))
596 594
597 595 def open_file(self, path, parent_baton, base_revision, file_pool=None):
598 596 self.changes.append((path, 'file', 'change'))
599 597
600 598
601 599 def authorization_callback_allow_all(root, path, pool):
602 600 return True
603 601
604 602
605 603 class TxnNodeProcessor(object):
606 604 """
607 605 Utility to process the change of one node within a transaction root.
608 606
609 607 It encapsulates the knowledge of how to add, update or remove
610 608 a node for a given transaction root. The purpose is to support the method
611 609 `SvnRemote.commit`.
612 610 """
613 611
614 612 def __init__(self, node, txn_root):
615 613 assert isinstance(node['path'], str)
616 614
617 615 self.node = node
618 616 self.txn_root = txn_root
619 617
620 618 def update(self):
621 619 self._ensure_parent_dirs()
622 620 self._add_file_if_node_does_not_exist()
623 621 self._update_file_content()
624 622 self._update_file_properties()
625 623
626 624 def remove(self):
627 625 svn.fs.delete(self.txn_root, self.node['path'])
628 626 # TODO: Clean up directory if empty
629 627
630 628 def _ensure_parent_dirs(self):
631 629 curdir = vcspath.dirname(self.node['path'])
632 630 dirs_to_create = []
633 631 while not self._svn_path_exists(curdir):
634 632 dirs_to_create.append(curdir)
635 633 curdir = vcspath.dirname(curdir)
636 634
637 635 for curdir in reversed(dirs_to_create):
638 636 log.debug('Creating missing directory "%s"', curdir)
639 637 svn.fs.make_dir(self.txn_root, curdir)
640 638
641 639 def _svn_path_exists(self, path):
642 640 path_status = svn.fs.check_path(self.txn_root, path)
643 641 return path_status != svn.core.svn_node_none
644 642
645 643 def _add_file_if_node_does_not_exist(self):
646 644 kind = svn.fs.check_path(self.txn_root, self.node['path'])
647 645 if kind == svn.core.svn_node_none:
648 646 svn.fs.make_file(self.txn_root, self.node['path'])
649 647
650 648 def _update_file_content(self):
651 649 assert isinstance(self.node['content'], str)
652 650 handler, baton = svn.fs.apply_textdelta(
653 651 self.txn_root, self.node['path'], None, None)
654 652 svn.delta.svn_txdelta_send_string(self.node['content'], handler, baton)
655 653
656 654 def _update_file_properties(self):
657 655 properties = self.node.get('properties', {})
658 656 for key, value in properties.iteritems():
659 657 svn.fs.change_node_prop(
660 658 self.txn_root, self.node['path'], key, value)
661 659
662 660
663 661 def apr_time_t(timestamp):
664 662 """
665 663 Convert a Python timestamp into APR timestamp type apr_time_t
666 664 """
667 665 return timestamp * 1E6
668 666
669 667
670 668 def svn_opt_revision_value_t(num):
671 669 """
672 670 Put `num` into a `svn_opt_revision_value_t` structure.
673 671 """
674 672 value = svn.core.svn_opt_revision_value_t()
675 673 value.number = num
676 674 revision = svn.core.svn_opt_revision_t()
677 675 revision.kind = svn.core.svn_opt_revision_number
678 676 revision.value = value
679 677 return revision
General Comments 0
You need to be logged in to leave comments. Login now