diff --git a/docs/changelog.rst b/docs/changelog.rst --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -23,6 +23,8 @@ news - #469 added --update-only option to whoosh to re-index only given list of repos in index - rhodecode-api CLI client +- new git http protocol replaced buggy dulwich implementation. + Now based on pygrack & gitweb fixes +++++ diff --git a/rhodecode/lib/middleware/pygrack.py b/rhodecode/lib/middleware/pygrack.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/middleware/pygrack.py @@ -0,0 +1,181 @@ +import os +import socket +import logging +import subprocess + +from webob import Request, Response, exc + +from rhodecode.lib import subprocessio + +log = logging.getLogger(__name__) + + +class FileWrapper(object): + + def __init__(self, fd, content_length): + self.fd = fd + self.content_length = content_length + self.remain = content_length + + def read(self, size): + if size <= self.remain: + try: + data = self.fd.read(size) + except socket.error: + raise IOError(self) + self.remain -= size + elif self.remain: + data = self.fd.read(self.remain) + self.remain = 0 + else: + data = None + return data + + def __repr__(self): + return '' % ( + self.fd, self.content_length, self.content_length - self.remain + ) + + +class GitRepository(object): + git_folder_signature = set(['config', 'head', 'info', 'objects', 'refs']) + commands = ['git-upload-pack', 'git-receive-pack'] + + def __init__(self, repo_name, content_path): + files = set([f.lower() for f in os.listdir(content_path)]) + if not (self.git_folder_signature.intersection(files) + == self.git_folder_signature): + raise OSError('%s missing git signature' % content_path) + self.content_path = content_path + self.valid_accepts = ['application/x-%s-result' % + c for c in self.commands] + self.repo_name = repo_name + + def _get_fixedpath(self, path): + """ + Small fix for repo_path + + :param path: + :type path: + """ + return path.split(self.repo_name, 1)[-1].strip('/') + + def inforefs(self, request, environ): + """ + WSGI Response producer for HTTP GET Git Smart + HTTP /info/refs request. + """ + + git_command = request.GET['service'] + if git_command not in self.commands: + log.debug('command %s not allowed' % git_command) + return exc.HTTPMethodNotAllowed() + + # note to self: + # please, resist the urge to add '\n' to git capture and increment + # line count by 1. + # The code in Git client not only does NOT need '\n', but actually + # blows up if you sprinkle "flush" (0000) as "0001\n". + # It reads binary, per number of bytes specified. + # if you do add '\n' as part of data, count it. + smart_server_advert = '# service=%s' % git_command + try: + out = subprocessio.SubprocessIOChunker( + r'git %s --stateless-rpc --advertise-refs "%s"' % ( + git_command[4:], self.content_path), + starting_values=[ + str(hex(len(smart_server_advert) + 4)[2:] + .rjust(4, '0') + smart_server_advert + '0000') + ] + ) + except EnvironmentError, e: + log.exception(e) + raise exc.HTTPExpectationFailed() + resp = Response() + resp.content_type = 'application/x-%s-advertisement' % str(git_command) + resp.app_iter = out + return resp + + def backend(self, request, environ): + """ + WSGI Response producer for HTTP POST Git Smart HTTP requests. + Reads commands and data from HTTP POST's body. + returns an iterator obj with contents of git command's + response to stdout + """ + git_command = self._get_fixedpath(request.path_info) + if git_command not in self.commands: + log.debug('command %s not allowed' % git_command) + return exc.HTTPMethodNotAllowed() + + if 'CONTENT_LENGTH' in environ: + inputstream = FileWrapper(environ['wsgi.input'], + request.content_length) + else: + inputstream = environ['wsgi.input'] + + try: + out = subprocessio.SubprocessIOChunker( + r'git %s --stateless-rpc "%s"' % (git_command[4:], + self.content_path), + inputstream=inputstream + ) + except EnvironmentError, e: + log.exception(e) + raise exc.HTTPExpectationFailed() + + if git_command in [u'git-receive-pack']: + # updating refs manually after each push. + # Needed for pre-1.7.0.4 git clients using regular HTTP mode. + subprocess.call(u'git --git-dir "%s" ' + 'update-server-info' % self.content_path, + shell=True) + + resp = Response() + resp.content_type = 'application/x-%s-result' % git_command.encode('utf8') + resp.app_iter = out + return resp + + def __call__(self, environ, start_response): + request = Request(environ) + _path = self._get_fixedpath(request.path_info) + if _path.startswith('info/refs'): + app = self.inforefs + elif [a for a in self.valid_accepts if a in request.accept]: + app = self.backend + try: + resp = app(request, environ) + except exc.HTTPException, e: + resp = e + log.exception(e) + except Exception, e: + log.exception(e) + resp = exc.HTTPInternalServerError() + return resp(environ, start_response) + + +class GitDirectory(object): + + def __init__(self, repo_root, repo_name): + repo_location = os.path.join(repo_root, repo_name) + if not os.path.isdir(repo_location): + raise OSError(repo_location) + + self.content_path = repo_location + self.repo_name = repo_name + self.repo_location = repo_location + + def __call__(self, environ, start_response): + content_path = self.content_path + try: + app = GitRepository(self.repo_name, content_path) + except (AssertionError, OSError): + if os.path.isdir(os.path.join(content_path, '.git')): + app = GitRepository(os.path.join(content_path, '.git')) + else: + return exc.HTTPNotFound()(environ, start_response) + return app(environ, start_response) + + +def make_wsgi_app(repo_name, repo_root): + return GitDirectory(repo_root, repo_name) diff --git a/rhodecode/lib/middleware/simplegit.py b/rhodecode/lib/middleware/simplegit.py --- a/rhodecode/lib/middleware/simplegit.py +++ b/rhodecode/lib/middleware/simplegit.py @@ -218,11 +218,13 @@ class SimpleGit(BaseVCSController): :param repo_name: name of the repository :param repo_path: full path to the repository """ - _d = {'/' + repo_name: Repo(repo_path)} - backend = dulserver.DictBackend(_d) - gitserve = make_wsgi_chain(backend) - return gitserve + from rhodecode.lib.middleware.pygrack import make_wsgi_app + app = make_wsgi_app( + repo_root=os.path.dirname(repo_path), + repo_name=repo_name, + ) + return app def __get_repository(self, environ): """ diff --git a/rhodecode/lib/subprocessio.py b/rhodecode/lib/subprocessio.py new file mode 100644 --- /dev/null +++ b/rhodecode/lib/subprocessio.py @@ -0,0 +1,401 @@ +''' +Module provides a class allowing to wrap communication over subprocess.Popen +input, output, error streams into a meaningfull, non-blocking, concurrent +stream processor exposing the output data as an iterator fitting to be a +return value passed by a WSGI applicaiton to a WSGI server per PEP 3333. + +Copyright (c) 2011 Daniel Dotsenko + +This file is part of git_http_backend.py Project. + +git_http_backend.py Project is free software: you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public License as +published by the Free Software Foundation, either version 2.1 of the License, +or (at your option) any later version. + +git_http_backend.py Project is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with git_http_backend.py Project. +If not, see . +''' +import os +import subprocess +import threading +from collections import deque + + +class StreamFeeder(threading.Thread): + """ + Normal writing into pipe-like is blocking once the buffer is filled. + This thread allows a thread to seep data from a file-like into a pipe + without blocking the main thread. + We close inpipe once the end of the source stream is reached. + """ + def __init__(self, source): + super(StreamFeeder, self).__init__() + self.daemon = True + filelike = False + self.bytes = b'' + if type(source) in (type(''), bytes, bytearray): # string-like + self.bytes = bytes(source) + else: # can be either file pointer or file-like + if type(source) in (int, long): # file pointer it is + ## converting file descriptor (int) stdin into file-like + try: + source = os.fdopen(source, 'rb', 16384) + except: + pass + # let's see if source is file-like by now + try: + filelike = source.read + except: + pass + if not filelike and not self.bytes: + raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.") + self.source = source + self.readiface, self.writeiface = os.pipe() + + def run(self): + t = self.writeiface + if self.bytes: + os.write(t, self.bytes) + else: + s = self.source + b = s.read(4096) + while b: + os.write(t, b) + b = s.read(4096) + os.close(t) + + @property + def output(self): + return self.readiface + + +class InputStreamChunker(threading.Thread): + def __init__(self, source, target, buffer_size, chunk_size): + + super(InputStreamChunker, self).__init__() + + self.daemon = True # die die die. + + self.source = source + self.target = target + self.chunk_count_max = int(buffer_size / chunk_size) + 1 + self.chunk_size = chunk_size + + self.data_added = threading.Event() + self.data_added.clear() + + self.keep_reading = threading.Event() + self.keep_reading.set() + + self.EOF = threading.Event() + self.EOF.clear() + + self.go = threading.Event() + self.go.set() + + def stop(self): + self.go.clear() + self.EOF.set() + try: + # this is not proper, but is done to force the reader thread let + # go of the input because, if successful, .close() will send EOF + # down the pipe. + self.source.close() + except: + pass + + def run(self): + s = self.source + t = self.target + cs = self.chunk_size + ccm = self.chunk_count_max + kr = self.keep_reading + da = self.data_added + go = self.go + b = s.read(cs) + while b and go.is_set(): + if len(t) > ccm: + kr.clear() + kr.wait(2) +# # this only works on 2.7.x and up +# if not kr.wait(10): +# raise Exception("Timed out while waiting for input to be read.") + # instead we'll use this + if len(t) > ccm + 3: + raise IOError("Timed out while waiting for input from subprocess.") + t.append(b) + da.set() + b = s.read(cs) + self.EOF.set() + da.set() # for cases when done but there was no input. + + +class BufferedGenerator(): + ''' + Class behaves as a non-blocking, buffered pipe reader. + Reads chunks of data (through a thread) + from a blocking pipe, and attaches these to an array (Deque) of chunks. + Reading is halted in the thread when max chunks is internally buffered. + The .next() may operate in blocking or non-blocking fashion by yielding + '' if no data is ready + to be sent or by not returning until there is some data to send + When we get EOF from underlying source pipe we raise the marker to raise + StopIteration after the last chunk of data is yielded. + ''' + + def __init__(self, source, buffer_size=65536, chunk_size=4096, + starting_values=[], bottomless=False): + + if bottomless: + maxlen = int(buffer_size / chunk_size) + else: + maxlen = None + + self.data = deque(starting_values, maxlen) + + self.worker = InputStreamChunker(source, self.data, buffer_size, + chunk_size) + if starting_values: + self.worker.data_added.set() + self.worker.start() + + #################### + # Generator's methods + #################### + + def __iter__(self): + return self + + def next(self): + while not len(self.data) and not self.worker.EOF.is_set(): + self.worker.data_added.clear() + self.worker.data_added.wait(0.2) + if len(self.data): + self.worker.keep_reading.set() + return bytes(self.data.popleft()) + elif self.worker.EOF.is_set(): + raise StopIteration + + def throw(self, type, value=None, traceback=None): + if not self.worker.EOF.is_set(): + raise type(value) + + def start(self): + self.worker.start() + + def stop(self): + self.worker.stop() + + def close(self): + try: + self.worker.stop() + self.throw(GeneratorExit) + except (GeneratorExit, StopIteration): + pass + + def __del__(self): + self.close() + + #################### + # Threaded reader's infrastructure. + #################### + @property + def input(self): + return self.worker.w + + @property + def data_added_event(self): + return self.worker.data_added + + @property + def data_added(self): + return self.worker.data_added.is_set() + + @property + def reading_paused(self): + return not self.worker.keep_reading.is_set() + + @property + def done_reading_event(self): + ''' + Done_reding does not mean that the iterator's buffer is empty. + Iterator might have done reading from underlying source, but the read + chunks might still be available for serving through .next() method. + + @return An Event class instance. + ''' + return self.worker.EOF + + @property + def done_reading(self): + ''' + Done_reding does not mean that the iterator's buffer is empty. + Iterator might have done reading from underlying source, but the read + chunks might still be available for serving through .next() method. + + @return An Bool value. + ''' + return self.worker.EOF.is_set() + + @property + def length(self): + ''' + returns int. + + This is the lenght of the que of chunks, not the length of + the combined contents in those chunks. + + __len__() cannot be meaningfully implemented because this + reader is just flying throuh a bottomless pit content and + can only know the lenght of what it already saw. + + If __len__() on WSGI server per PEP 3333 returns a value, + the responce's length will be set to that. In order not to + confuse WSGI PEP3333 servers, we will not implement __len__ + at all. + ''' + return len(self.data) + + def prepend(self, x): + self.data.appendleft(x) + + def append(self, x): + self.data.append(x) + + def extend(self, o): + self.data.extend(o) + + def __getitem__(self, i): + return self.data[i] + + +class SubprocessIOChunker(): + ''' + Processor class wrapping handling of subprocess IO. + + In a way, this is a "communicate()" replacement with a twist. + + - We are multithreaded. Writing in and reading out, err are all sep threads. + - We support concurrent (in and out) stream processing. + - The output is not a stream. It's a queue of read string (bytes, not unicode) + chunks. The object behaves as an iterable. You can "for chunk in obj:" us. + - We are non-blocking in more respects than communicate() + (reading from subprocess out pauses when internal buffer is full, but + does not block the parent calling code. On the flip side, reading from + slow-yielding subprocess may block the iteration until data shows up. This + does not block the parallel inpipe reading occurring parallel thread.) + + The purpose of the object is to allow us to wrap subprocess interactions into + and interable that can be passed to a WSGI server as the application's return + value. Because of stream-processing-ability, WSGI does not have to read ALL + of the subprocess's output and buffer it, before handing it to WSGI server for + HTTP response. Instead, the class initializer reads just a bit of the stream + to figure out if error ocurred or likely to occur and if not, just hands the + further iteration over subprocess output to the server for completion of HTTP + response. + + The real or perceived subprocess error is trapped and raised as one of + EnvironmentError family of exceptions + + Example usage: + # try: + # answer = SubprocessIOChunker( + # cmd, + # input, + # buffer_size = 65536, + # chunk_size = 4096 + # ) + # except (EnvironmentError) as e: + # print str(e) + # raise e + # + # return answer + + + ''' + def __init__(self, cmd, inputstream=None, buffer_size=65536, + chunk_size=4096, starting_values=[]): + ''' + Initializes SubprocessIOChunker + + @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings + @param inputstream (Default: None) A file-like, string, or file pointer. + @param buffer_size (Default: 65536) A size of total buffer per stream in bytes. + @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller. + @param starting_values (Default: []) An array of strings to put in front of output que. + ''' + + if inputstream: + input_streamer = StreamFeeder(inputstream) + input_streamer.start() + inputstream = input_streamer.output + + _p = subprocess.Popen(cmd, + bufsize=-1, + shell=True, + stdin=inputstream, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values) + bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True) + + while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length: + # doing this until we reach either end of file, or end of buffer. + bg_out.data_added_event.wait(1) + bg_out.data_added_event.clear() + + # at this point it's still ambiguous if we are done reading or just full buffer. + # Either way, if error (returned by ended process, or implied based on + # presence of stuff in stderr output) we error out. + # Else, we are happy. + _returncode = _p.poll() + if _returncode or (_returncode == None and bg_err.length): + try: + _p.terminate() + except: + pass + bg_out.stop() + bg_err.stop() + raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err)) + + self.process = _p + self.output = bg_out + self.error = bg_err + + def __iter__(self): + return self + + def next(self): + if self.process.poll(): + raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error)) + return self.output.next() + + def throw(self, type, value=None, traceback=None): + if self.output.length or not self.output.done_reading: + raise type(value) + + def close(self): + try: + self.process.terminate() + except: + pass + try: + self.output.close() + except: + pass + try: + self.error.close() + except: + pass + + def __del__(self): + self.close()