subprocessio.py
563 lines
| 18.7 KiB
| text/x-python
|
PythonLexer
/ vcsserver / subprocessio.py
r0 | """ | |||
Module provides a class allowing to wrap communication over subprocess.Popen | ||||
input, output, error streams into a meaningfull, non-blocking, concurrent | ||||
stream processor exposing the output data as an iterator fitting to be a | ||||
return value passed by a WSGI applicaiton to a WSGI server per PEP 3333. | ||||
Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com> | ||||
This file is part of git_http_backend.py Project. | ||||
git_http_backend.py Project is free software: you can redistribute it and/or | ||||
modify it under the terms of the GNU Lesser General Public License as | ||||
published by the Free Software Foundation, either version 2.1 of the License, | ||||
or (at your option) any later version. | ||||
git_http_backend.py Project is distributed in the hope that it will be useful, | ||||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
GNU Lesser General Public License for more details. | ||||
You should have received a copy of the GNU Lesser General Public License | ||||
along with git_http_backend.py Project. | ||||
If not, see <http://www.gnu.org/licenses/>. | ||||
""" | ||||
import os | ||||
r1048 | import collections | |||
r358 | import logging | |||
r977 | import subprocess | |||
r1048 | import threading | |||
r0 | ||||
r1070 | from vcsserver.str_utils import safe_str | |||
r358 | log = logging.getLogger(__name__) | |||
r0 | ||||
r1048 | class StreamFeeder(threading.Thread): | |||
r0 | """ | |||
Normal writing into pipe-like is blocking once the buffer is filled. | ||||
This thread allows a thread to seep data from a file-like into a pipe | ||||
without blocking the main thread. | ||||
We close inpipe once the end of the source stream is reached. | ||||
""" | ||||
def __init__(self, source): | ||||
super(StreamFeeder, self).__init__() | ||||
self.daemon = True | ||||
filelike = False | ||||
self.bytes = bytes() | ||||
if type(source) in (type(''), bytes, bytearray): # string-like | ||||
self.bytes = bytes(source) | ||||
else: # can be either file pointer or file-like | ||||
r1048 | if isinstance(source, int): # file pointer it is | |||
r341 | # converting file descriptor (int) stdin into file-like | |||
r1048 | source = os.fdopen(source, 'rb', 16384) | |||
r0 | # let's see if source is file-like by now | |||
r1048 | filelike = hasattr(source, 'read') | |||
r0 | if not filelike and not self.bytes: | |||
raise TypeError("StreamFeeder's source object must be a readable " | ||||
"file-like, a file descriptor, or a string-like.") | ||||
self.source = source | ||||
self.readiface, self.writeiface = os.pipe() | ||||
def run(self): | ||||
r1048 | writer = self.writeiface | |||
r624 | try: | |||
if self.bytes: | ||||
r1048 | os.write(writer, self.bytes) | |||
r624 | else: | |||
s = self.source | ||||
r1048 | ||||
while 1: | ||||
_bytes = s.read(4096) | ||||
if not _bytes: | ||||
break | ||||
os.write(writer, _bytes) | ||||
r624 | finally: | |||
r1048 | os.close(writer) | |||
r0 | ||||
@property | ||||
def output(self): | ||||
return self.readiface | ||||
r1048 | class InputStreamChunker(threading.Thread): | |||
r0 | def __init__(self, source, target, buffer_size, chunk_size): | |||
super(InputStreamChunker, self).__init__() | ||||
self.daemon = True # die die die. | ||||
self.source = source | ||||
self.target = target | ||||
self.chunk_count_max = int(buffer_size / chunk_size) + 1 | ||||
self.chunk_size = chunk_size | ||||
r1048 | self.data_added = threading.Event() | |||
r0 | self.data_added.clear() | |||
r1048 | self.keep_reading = threading.Event() | |||
r0 | self.keep_reading.set() | |||
r1048 | self.EOF = threading.Event() | |||
r0 | self.EOF.clear() | |||
r1048 | self.go = threading.Event() | |||
r0 | self.go.set() | |||
def stop(self): | ||||
self.go.clear() | ||||
self.EOF.set() | ||||
try: | ||||
# this is not proper, but is done to force the reader thread let | ||||
# go of the input because, if successful, .close() will send EOF | ||||
# down the pipe. | ||||
self.source.close() | ||||
r1095 | except Exception: | |||
r0 | pass | |||
def run(self): | ||||
s = self.source | ||||
t = self.target | ||||
cs = self.chunk_size | ||||
r357 | chunk_count_max = self.chunk_count_max | |||
r341 | keep_reading = self.keep_reading | |||
r0 | da = self.data_added | |||
go = self.go | ||||
try: | ||||
b = s.read(cs) | ||||
except ValueError: | ||||
b = '' | ||||
r357 | timeout_input = 20 | |||
r0 | while b and go.is_set(): | |||
r357 | if len(t) > chunk_count_max: | |||
r341 | keep_reading.clear() | |||
r357 | keep_reading.wait(timeout_input) | |||
if len(t) > chunk_count_max + timeout_input: | ||||
r358 | log.error("Timed out while waiting for input from subprocess.") | |||
os._exit(-1) # this will cause the worker to recycle itself | ||||
r0 | t.append(b) | |||
da.set() | ||||
r371 | ||||
try: | ||||
b = s.read(cs) | ||||
r1048 | except ValueError: # probably "I/O operation on closed file" | |||
r371 | b = '' | |||
r0 | self.EOF.set() | |||
da.set() # for cases when done but there was no input. | ||||
class BufferedGenerator(object): | ||||
""" | ||||
Class behaves as a non-blocking, buffered pipe reader. | ||||
Reads chunks of data (through a thread) | ||||
from a blocking pipe, and attaches these to an array (Deque) of chunks. | ||||
Reading is halted in the thread when max chunks is internally buffered. | ||||
The .next() may operate in blocking or non-blocking fashion by yielding | ||||
'' if no data is ready | ||||
to be sent or by not returning until there is some data to send | ||||
When we get EOF from underlying source pipe we raise the marker to raise | ||||
StopIteration after the last chunk of data is yielded. | ||||
""" | ||||
r1048 | def __init__(self, name, source, buffer_size=65536, chunk_size=4096, | |||
r340 | starting_values=None, bottomless=False): | |||
starting_values = starting_values or [] | ||||
r1048 | self.name = name | |||
self.buffer_size = buffer_size | ||||
self.chunk_size = chunk_size | ||||
r0 | ||||
if bottomless: | ||||
maxlen = int(buffer_size / chunk_size) | ||||
else: | ||||
maxlen = None | ||||
r1048 | self.data_queue = collections.deque(starting_values, maxlen) | |||
self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size) | ||||
r0 | if starting_values: | |||
self.worker.data_added.set() | ||||
self.worker.start() | ||||
#################### | ||||
# Generator's methods | ||||
#################### | ||||
r1048 | def __str__(self): | |||
return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})' | ||||
r0 | ||||
def __iter__(self): | ||||
return self | ||||
r1044 | def __next__(self): | |||
r1048 | ||||
while not self.length and not self.worker.EOF.is_set(): | ||||
r0 | self.worker.data_added.clear() | |||
self.worker.data_added.wait(0.2) | ||||
r1048 | ||||
if self.length: | ||||
r0 | self.worker.keep_reading.set() | |||
r1048 | return bytes(self.data_queue.popleft()) | |||
r0 | elif self.worker.EOF.is_set(): | |||
raise StopIteration | ||||
r340 | def throw(self, exc_type, value=None, traceback=None): | |||
r0 | if not self.worker.EOF.is_set(): | |||
r340 | raise exc_type(value) | |||
r0 | ||||
def start(self): | ||||
self.worker.start() | ||||
def stop(self): | ||||
self.worker.stop() | ||||
def close(self): | ||||
try: | ||||
self.worker.stop() | ||||
self.throw(GeneratorExit) | ||||
except (GeneratorExit, StopIteration): | ||||
pass | ||||
#################### | ||||
# Threaded reader's infrastructure. | ||||
#################### | ||||
@property | ||||
def input(self): | ||||
return self.worker.w | ||||
@property | ||||
def data_added_event(self): | ||||
return self.worker.data_added | ||||
@property | ||||
def data_added(self): | ||||
return self.worker.data_added.is_set() | ||||
@property | ||||
def reading_paused(self): | ||||
return not self.worker.keep_reading.is_set() | ||||
@property | ||||
def done_reading_event(self): | ||||
""" | ||||
Done_reding does not mean that the iterator's buffer is empty. | ||||
Iterator might have done reading from underlying source, but the read | ||||
chunks might still be available for serving through .next() method. | ||||
:returns: An Event class instance. | ||||
""" | ||||
return self.worker.EOF | ||||
@property | ||||
def done_reading(self): | ||||
""" | ||||
r1048 | Done_reading does not mean that the iterator's buffer is empty. | |||
r0 | Iterator might have done reading from underlying source, but the read | |||
chunks might still be available for serving through .next() method. | ||||
:returns: An Bool value. | ||||
""" | ||||
return self.worker.EOF.is_set() | ||||
@property | ||||
def length(self): | ||||
""" | ||||
returns int. | ||||
r1048 | This is the length of the queue of chunks, not the length of | |||
r0 | the combined contents in those chunks. | |||
__len__() cannot be meaningfully implemented because this | ||||
r1048 | reader is just flying through a bottomless pit content and | |||
can only know the length of what it already saw. | ||||
r0 | ||||
If __len__() on WSGI server per PEP 3333 returns a value, | ||||
r1048 | the response's length will be set to that. In order not to | |||
r0 | confuse WSGI PEP3333 servers, we will not implement __len__ | |||
at all. | ||||
""" | ||||
r1048 | return len(self.data_queue) | |||
r0 | ||||
def prepend(self, x): | ||||
r1048 | self.data_queue.appendleft(x) | |||
r0 | ||||
def append(self, x): | ||||
r1048 | self.data_queue.append(x) | |||
r0 | ||||
def extend(self, o): | ||||
r1048 | self.data_queue.extend(o) | |||
r0 | ||||
def __getitem__(self, i): | ||||
r1048 | return self.data_queue[i] | |||
r0 | ||||
class SubprocessIOChunker(object): | ||||
""" | ||||
Processor class wrapping handling of subprocess IO. | ||||
.. important:: | ||||
Watch out for the method `__del__` on this class. If this object | ||||
is deleted, it will kill the subprocess, so avoid to | ||||
return the `output` attribute or usage of it like in the following | ||||
example:: | ||||
# `args` expected to run a program that produces a lot of output | ||||
output = ''.join(SubprocessIOChunker( | ||||
args, shell=False, inputstream=inputstream, env=environ).output) | ||||
# `output` will not contain all the data, because the __del__ method | ||||
# has already killed the subprocess in this case before all output | ||||
# has been consumed. | ||||
In a way, this is a "communicate()" replacement with a twist. | ||||
- We are multithreaded. Writing in and reading out, err are all sep threads. | ||||
- We support concurrent (in and out) stream processing. | ||||
r1048 | - The output is not a stream. It's a queue of read string (bytes, not str) | |||
r0 | chunks. The object behaves as an iterable. You can "for chunk in obj:" us. | |||
- We are non-blocking in more respects than communicate() | ||||
(reading from subprocess out pauses when internal buffer is full, but | ||||
does not block the parent calling code. On the flip side, reading from | ||||
slow-yielding subprocess may block the iteration until data shows up. This | ||||
does not block the parallel inpipe reading occurring parallel thread.) | ||||
The purpose of the object is to allow us to wrap subprocess interactions into | ||||
r1048 | an iterable that can be passed to a WSGI server as the application's return | |||
r0 | value. Because of stream-processing-ability, WSGI does not have to read ALL | |||
of the subprocess's output and buffer it, before handing it to WSGI server for | ||||
HTTP response. Instead, the class initializer reads just a bit of the stream | ||||
r1048 | to figure out if error occurred or likely to occur and if not, just hands the | |||
r0 | further iteration over subprocess output to the server for completion of HTTP | |||
response. | ||||
The real or perceived subprocess error is trapped and raised as one of | ||||
r1048 | OSError family of exceptions | |||
r0 | ||||
Example usage: | ||||
# try: | ||||
# answer = SubprocessIOChunker( | ||||
# cmd, | ||||
# input, | ||||
# buffer_size = 65536, | ||||
# chunk_size = 4096 | ||||
# ) | ||||
r1048 | # except (OSError) as e: | |||
r0 | # print str(e) | |||
# raise e | ||||
# | ||||
# return answer | ||||
""" | ||||
# TODO: johbo: This is used to make sure that the open end of the PIPE | ||||
# is closed in the end. It would be way better to wrap this into an | ||||
# object, so that it is closed automatically once it is consumed or | ||||
# something similar. | ||||
_close_input_fd = None | ||||
_closed = False | ||||
r1048 | _stdout = None | |||
_stderr = None | ||||
r0 | ||||
r1048 | def __init__(self, cmd, input_stream=None, buffer_size=65536, | |||
r340 | chunk_size=4096, starting_values=None, fail_on_stderr=True, | |||
r0 | fail_on_return_code=True, **kwargs): | |||
""" | ||||
Initializes SubprocessIOChunker | ||||
:param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings | ||||
r1048 | :param input_stream: (Default: None) A file-like, string, or file pointer. | |||
r0 | :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes. | |||
:param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller. | ||||
:param starting_values: (Default: []) An array of strings to put in front of output que. | ||||
:param fail_on_stderr: (Default: True) Whether to raise an exception in | ||||
case something is written to stderr. | ||||
:param fail_on_return_code: (Default: True) Whether to raise an | ||||
exception if the return code is not 0. | ||||
""" | ||||
r1048 | kwargs['shell'] = kwargs.get('shell', True) | |||
r340 | starting_values = starting_values or [] | |||
r1048 | if input_stream: | |||
input_streamer = StreamFeeder(input_stream) | ||||
r0 | input_streamer.start() | |||
r1048 | input_stream = input_streamer.output | |||
self._close_input_fd = input_stream | ||||
r0 | ||||
self._fail_on_stderr = fail_on_stderr | ||||
self._fail_on_return_code = fail_on_return_code | ||||
r1048 | self.cmd = cmd | |||
r0 | ||||
r1048 | _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |||
r0 | **kwargs) | |||
r1048 | self.process = _p | |||
r0 | ||||
r1048 | bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values) | |||
bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True) | ||||
r0 | ||||
while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length: | ||||
# doing this until we reach either end of file, or end of buffer. | ||||
r1048 | bg_out.data_added_event.wait(0.2) | |||
r0 | bg_out.data_added_event.clear() | |||
# at this point it's still ambiguous if we are done reading or just full buffer. | ||||
# Either way, if error (returned by ended process, or implied based on | ||||
# presence of stuff in stderr output) we error out. | ||||
# Else, we are happy. | ||||
r1048 | return_code = _p.poll() | |||
ret_code_ok = return_code in [None, 0] | ||||
ret_code_fail = return_code is not None and return_code != 0 | ||||
if ( | ||||
(ret_code_fail and fail_on_return_code) or | ||||
(ret_code_ok and fail_on_stderr and bg_err.length) | ||||
): | ||||
r0 | ||||
try: | ||||
_p.terminate() | ||||
except Exception: | ||||
pass | ||||
r1048 | ||||
r0 | bg_out.stop() | |||
r1048 | out = b''.join(bg_out) | |||
self._stdout = out | ||||
r0 | bg_err.stop() | |||
r1048 | err = b''.join(bg_err) | |||
self._stderr = err | ||||
# code from https://github.com/schacon/grack/pull/7 | ||||
if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '): | ||||
bg_out = iter([out]) | ||||
_p = None | ||||
elif err and fail_on_stderr: | ||||
text_err = err.decode() | ||||
raise OSError( | ||||
"Subprocess exited due to an error:\n{}".format(text_err)) | ||||
if ret_code_fail and fail_on_return_code: | ||||
text_err = err.decode() | ||||
r350 | if not err: | |||
# maybe get empty stderr, try stdout instead | ||||
# in many cases git reports the errors on stdout too | ||||
r1048 | text_err = out.decode() | |||
raise OSError( | ||||
"Subprocess exited with non 0 ret code:{}: stderr:{}".format(return_code, text_err)) | ||||
r0 | ||||
r1048 | self.stdout = bg_out | |||
self.stderr = bg_err | ||||
self.inputstream = input_stream | ||||
def __str__(self): | ||||
proc = getattr(self, 'process', 'NO_PROCESS') | ||||
return f'SubprocessIOChunker: {proc}' | ||||
r0 | ||||
def __iter__(self): | ||||
return self | ||||
r1044 | def __next__(self): | |||
r0 | # Note: mikhail: We need to be sure that we are checking the return | |||
# code after the stdout stream is closed. Some processes, e.g. git | ||||
# are doing some magic in between closing stdout and terminating the | ||||
# process and, as a result, we are not getting return code on "slow" | ||||
# systems. | ||||
r340 | result = None | |||
r0 | stop_iteration = None | |||
try: | ||||
r1048 | result = next(self.stdout) | |||
r0 | except StopIteration as e: | |||
stop_iteration = e | ||||
r1048 | if self.process: | |||
return_code = self.process.poll() | ||||
ret_code_fail = return_code is not None and return_code != 0 | ||||
if ret_code_fail and self._fail_on_return_code: | ||||
self.stop_streams() | ||||
err = self.get_stderr() | ||||
raise OSError( | ||||
"Subprocess exited (exit_code:{}) due to an error during iteration:\n{}".format(return_code, err)) | ||||
r0 | ||||
if stop_iteration: | ||||
raise stop_iteration | ||||
return result | ||||
r1048 | def throw(self, exc_type, value=None, traceback=None): | |||
if self.stdout.length or not self.stdout.done_reading: | ||||
raise exc_type(value) | ||||
r0 | ||||
def close(self): | ||||
if self._closed: | ||||
return | ||||
r1048 | ||||
r0 | try: | |||
self.process.terminate() | ||||
r799 | except Exception: | |||
r0 | pass | |||
if self._close_input_fd: | ||||
os.close(self._close_input_fd) | ||||
try: | ||||
r1048 | self.stdout.close() | |||
r799 | except Exception: | |||
r0 | pass | |||
try: | ||||
r1048 | self.stderr.close() | |||
r799 | except Exception: | |||
r0 | pass | |||
r624 | try: | |||
os.close(self.inputstream) | ||||
r799 | except Exception: | |||
r624 | pass | |||
r0 | ||||
r1048 | self._closed = True | |||
def stop_streams(self): | ||||
getattr(self.stdout, 'stop', lambda: None)() | ||||
getattr(self.stderr, 'stop', lambda: None)() | ||||
def get_stdout(self): | ||||
if self._stdout: | ||||
return self._stdout | ||||
else: | ||||
return b''.join(self.stdout) | ||||
def get_stderr(self): | ||||
if self._stderr: | ||||
return self._stderr | ||||
else: | ||||
return b''.join(self.stderr) | ||||
r370 | ||||
def run_command(arguments, env=None): | ||||
""" | ||||
Run the specified command and return the stdout. | ||||
:param arguments: sequence of program arguments (including the program name) | ||||
:type arguments: list[str] | ||||
""" | ||||
cmd = arguments | ||||
log.debug('Running subprocessio command %s', cmd) | ||||
r799 | proc = None | |||
r370 | try: | |||
_opts = {'shell': False, 'fail_on_stderr': False} | ||||
if env: | ||||
_opts.update({'env': env}) | ||||
r799 | proc = SubprocessIOChunker(cmd, **_opts) | |||
r1048 | return b''.join(proc), b''.join(proc.stderr) | |||
except OSError as err: | ||||
r1070 | cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD | |||
r370 | tb_err = ("Couldn't run subprocessio command (%s).\n" | |||
"Original error was:%s\n" % (cmd, err)) | ||||
log.exception(tb_err) | ||||
raise Exception(tb_err) | ||||
r799 | finally: | |||
if proc: | ||||
proc.close() | ||||
r370 | ||||