##// END OF EJS Templates
helpers: moved duplicated code to type_utils
helpers: moved duplicated code to type_utils

File last commit:

r1095:268c6aa4 python3
r1111:a5985c44 python3
Show More
subprocessio.py
563 lines | 18.7 KiB | text/x-python | PythonLexer
initial commit
r0 """
Module provides a class allowing to wrap communication over subprocess.Popen
input, output, error streams into a meaningfull, non-blocking, concurrent
stream processor exposing the output data as an iterator fitting to be a
return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
This file is part of git_http_backend.py Project.
git_http_backend.py Project is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 2.1 of the License,
or (at your option) any later version.
git_http_backend.py Project is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with git_http_backend.py Project.
If not, see <http://www.gnu.org/licenses/>.
"""
import os
python3: code change for py3 support...
r1048 import collections
subprocessio: instead of raising a Thread error we sent a signal to gunicorn....
r358 import logging
py3: drop subprocess32 backport.
r977 import subprocess
python3: code change for py3 support...
r1048 import threading
initial commit
r0
core: various fixes of bytes vs str usage based on rhodecode-ce tests outputs
r1070 from vcsserver.str_utils import safe_str
subprocessio: instead of raising a Thread error we sent a signal to gunicorn....
r358 log = logging.getLogger(__name__)
initial commit
r0
python3: code change for py3 support...
r1048 class StreamFeeder(threading.Thread):
initial commit
r0 """
Normal writing into pipe-like is blocking once the buffer is filled.
This thread allows a thread to seep data from a file-like into a pipe
without blocking the main thread.
We close inpipe once the end of the source stream is reached.
"""
def __init__(self, source):
super(StreamFeeder, self).__init__()
self.daemon = True
filelike = False
self.bytes = bytes()
if type(source) in (type(''), bytes, bytearray): # string-like
self.bytes = bytes(source)
else: # can be either file pointer or file-like
python3: code change for py3 support...
r1048 if isinstance(source, int): # file pointer it is
subprocessio: py27+ compat
r341 # converting file descriptor (int) stdin into file-like
python3: code change for py3 support...
r1048 source = os.fdopen(source, 'rb', 16384)
initial commit
r0 # let's see if source is file-like by now
python3: code change for py3 support...
r1048 filelike = hasattr(source, 'read')
initial commit
r0 if not filelike and not self.bytes:
raise TypeError("StreamFeeder's source object must be a readable "
"file-like, a file descriptor, or a string-like.")
self.source = source
self.readiface, self.writeiface = os.pipe()
def run(self):
python3: code change for py3 support...
r1048 writer = self.writeiface
subprocession: limit fd leaks
r624 try:
if self.bytes:
python3: code change for py3 support...
r1048 os.write(writer, self.bytes)
subprocession: limit fd leaks
r624 else:
s = self.source
python3: code change for py3 support...
r1048
while 1:
_bytes = s.read(4096)
if not _bytes:
break
os.write(writer, _bytes)
subprocession: limit fd leaks
r624 finally:
python3: code change for py3 support...
r1048 os.close(writer)
initial commit
r0
@property
def output(self):
return self.readiface
python3: code change for py3 support...
r1048 class InputStreamChunker(threading.Thread):
initial commit
r0 def __init__(self, source, target, buffer_size, chunk_size):
super(InputStreamChunker, self).__init__()
self.daemon = True # die die die.
self.source = source
self.target = target
self.chunk_count_max = int(buffer_size / chunk_size) + 1
self.chunk_size = chunk_size
python3: code change for py3 support...
r1048 self.data_added = threading.Event()
initial commit
r0 self.data_added.clear()
python3: code change for py3 support...
r1048 self.keep_reading = threading.Event()
initial commit
r0 self.keep_reading.set()
python3: code change for py3 support...
r1048 self.EOF = threading.Event()
initial commit
r0 self.EOF.clear()
python3: code change for py3 support...
r1048 self.go = threading.Event()
initial commit
r0 self.go.set()
def stop(self):
self.go.clear()
self.EOF.set()
try:
# this is not proper, but is done to force the reader thread let
# go of the input because, if successful, .close() will send EOF
# down the pipe.
self.source.close()
linting
r1095 except Exception:
initial commit
r0 pass
def run(self):
s = self.source
t = self.target
cs = self.chunk_size
git: handle flacky and slow connection issues with git....
r357 chunk_count_max = self.chunk_count_max
subprocessio: py27+ compat
r341 keep_reading = self.keep_reading
initial commit
r0 da = self.data_added
go = self.go
try:
b = s.read(cs)
except ValueError:
b = ''
git: handle flacky and slow connection issues with git....
r357 timeout_input = 20
initial commit
r0 while b and go.is_set():
git: handle flacky and slow connection issues with git....
r357 if len(t) > chunk_count_max:
subprocessio: py27+ compat
r341 keep_reading.clear()
git: handle flacky and slow connection issues with git....
r357 keep_reading.wait(timeout_input)
if len(t) > chunk_count_max + timeout_input:
subprocessio: instead of raising a Thread error we sent a signal to gunicorn....
r358 log.error("Timed out while waiting for input from subprocess.")
os._exit(-1) # this will cause the worker to recycle itself
initial commit
r0 t.append(b)
da.set()
subprocessio: use safe b.read() and prevent potential valueErrors
r371
try:
b = s.read(cs)
python3: code change for py3 support...
r1048 except ValueError: # probably "I/O operation on closed file"
subprocessio: use safe b.read() and prevent potential valueErrors
r371 b = ''
initial commit
r0 self.EOF.set()
da.set() # for cases when done but there was no input.
class BufferedGenerator(object):
"""
Class behaves as a non-blocking, buffered pipe reader.
Reads chunks of data (through a thread)
from a blocking pipe, and attaches these to an array (Deque) of chunks.
Reading is halted in the thread when max chunks is internally buffered.
The .next() may operate in blocking or non-blocking fashion by yielding
'' if no data is ready
to be sent or by not returning until there is some data to send
When we get EOF from underlying source pipe we raise the marker to raise
StopIteration after the last chunk of data is yielded.
"""
python3: code change for py3 support...
r1048 def __init__(self, name, source, buffer_size=65536, chunk_size=4096,
pep8: fix potential code warnings.
r340 starting_values=None, bottomless=False):
starting_values = starting_values or []
python3: code change for py3 support...
r1048 self.name = name
self.buffer_size = buffer_size
self.chunk_size = chunk_size
initial commit
r0
if bottomless:
maxlen = int(buffer_size / chunk_size)
else:
maxlen = None
python3: code change for py3 support...
r1048 self.data_queue = collections.deque(starting_values, maxlen)
self.worker = InputStreamChunker(source, self.data_queue, buffer_size, chunk_size)
initial commit
r0 if starting_values:
self.worker.data_added.set()
self.worker.start()
####################
# Generator's methods
####################
python3: code change for py3 support...
r1048 def __str__(self):
return f'BufferedGenerator(name={self.name} chunk: {self.chunk_size} on buffer: {self.buffer_size})'
initial commit
r0
def __iter__(self):
return self
py3: 2to3 run
r1044 def __next__(self):
python3: code change for py3 support...
r1048
while not self.length and not self.worker.EOF.is_set():
initial commit
r0 self.worker.data_added.clear()
self.worker.data_added.wait(0.2)
python3: code change for py3 support...
r1048
if self.length:
initial commit
r0 self.worker.keep_reading.set()
python3: code change for py3 support...
r1048 return bytes(self.data_queue.popleft())
initial commit
r0 elif self.worker.EOF.is_set():
raise StopIteration
pep8: fix potential code warnings.
r340 def throw(self, exc_type, value=None, traceback=None):
initial commit
r0 if not self.worker.EOF.is_set():
pep8: fix potential code warnings.
r340 raise exc_type(value)
initial commit
r0
def start(self):
self.worker.start()
def stop(self):
self.worker.stop()
def close(self):
try:
self.worker.stop()
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
####################
# Threaded reader's infrastructure.
####################
@property
def input(self):
return self.worker.w
@property
def data_added_event(self):
return self.worker.data_added
@property
def data_added(self):
return self.worker.data_added.is_set()
@property
def reading_paused(self):
return not self.worker.keep_reading.is_set()
@property
def done_reading_event(self):
"""
Done_reding does not mean that the iterator's buffer is empty.
Iterator might have done reading from underlying source, but the read
chunks might still be available for serving through .next() method.
:returns: An Event class instance.
"""
return self.worker.EOF
@property
def done_reading(self):
"""
python3: code change for py3 support...
r1048 Done_reading does not mean that the iterator's buffer is empty.
initial commit
r0 Iterator might have done reading from underlying source, but the read
chunks might still be available for serving through .next() method.
:returns: An Bool value.
"""
return self.worker.EOF.is_set()
@property
def length(self):
"""
returns int.
python3: code change for py3 support...
r1048 This is the length of the queue of chunks, not the length of
initial commit
r0 the combined contents in those chunks.
__len__() cannot be meaningfully implemented because this
python3: code change for py3 support...
r1048 reader is just flying through a bottomless pit content and
can only know the length of what it already saw.
initial commit
r0
If __len__() on WSGI server per PEP 3333 returns a value,
python3: code change for py3 support...
r1048 the response's length will be set to that. In order not to
initial commit
r0 confuse WSGI PEP3333 servers, we will not implement __len__
at all.
"""
python3: code change for py3 support...
r1048 return len(self.data_queue)
initial commit
r0
def prepend(self, x):
python3: code change for py3 support...
r1048 self.data_queue.appendleft(x)
initial commit
r0
def append(self, x):
python3: code change for py3 support...
r1048 self.data_queue.append(x)
initial commit
r0
def extend(self, o):
python3: code change for py3 support...
r1048 self.data_queue.extend(o)
initial commit
r0
def __getitem__(self, i):
python3: code change for py3 support...
r1048 return self.data_queue[i]
initial commit
r0
class SubprocessIOChunker(object):
"""
Processor class wrapping handling of subprocess IO.
.. important::
Watch out for the method `__del__` on this class. If this object
is deleted, it will kill the subprocess, so avoid to
return the `output` attribute or usage of it like in the following
example::
# `args` expected to run a program that produces a lot of output
output = ''.join(SubprocessIOChunker(
args, shell=False, inputstream=inputstream, env=environ).output)
# `output` will not contain all the data, because the __del__ method
# has already killed the subprocess in this case before all output
# has been consumed.
In a way, this is a "communicate()" replacement with a twist.
- We are multithreaded. Writing in and reading out, err are all sep threads.
- We support concurrent (in and out) stream processing.
python3: code change for py3 support...
r1048 - The output is not a stream. It's a queue of read string (bytes, not str)
initial commit
r0 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
- We are non-blocking in more respects than communicate()
(reading from subprocess out pauses when internal buffer is full, but
does not block the parent calling code. On the flip side, reading from
slow-yielding subprocess may block the iteration until data shows up. This
does not block the parallel inpipe reading occurring parallel thread.)
The purpose of the object is to allow us to wrap subprocess interactions into
python3: code change for py3 support...
r1048 an iterable that can be passed to a WSGI server as the application's return
initial commit
r0 value. Because of stream-processing-ability, WSGI does not have to read ALL
of the subprocess's output and buffer it, before handing it to WSGI server for
HTTP response. Instead, the class initializer reads just a bit of the stream
python3: code change for py3 support...
r1048 to figure out if error occurred or likely to occur and if not, just hands the
initial commit
r0 further iteration over subprocess output to the server for completion of HTTP
response.
The real or perceived subprocess error is trapped and raised as one of
python3: code change for py3 support...
r1048 OSError family of exceptions
initial commit
r0
Example usage:
# try:
# answer = SubprocessIOChunker(
# cmd,
# input,
# buffer_size = 65536,
# chunk_size = 4096
# )
python3: code change for py3 support...
r1048 # except (OSError) as e:
initial commit
r0 # print str(e)
# raise e
#
# return answer
"""
# TODO: johbo: This is used to make sure that the open end of the PIPE
# is closed in the end. It would be way better to wrap this into an
# object, so that it is closed automatically once it is consumed or
# something similar.
_close_input_fd = None
_closed = False
python3: code change for py3 support...
r1048 _stdout = None
_stderr = None
initial commit
r0
python3: code change for py3 support...
r1048 def __init__(self, cmd, input_stream=None, buffer_size=65536,
pep8: fix potential code warnings.
r340 chunk_size=4096, starting_values=None, fail_on_stderr=True,
initial commit
r0 fail_on_return_code=True, **kwargs):
"""
Initializes SubprocessIOChunker
:param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
python3: code change for py3 support...
r1048 :param input_stream: (Default: None) A file-like, string, or file pointer.
initial commit
r0 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
:param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
:param starting_values: (Default: []) An array of strings to put in front of output que.
:param fail_on_stderr: (Default: True) Whether to raise an exception in
case something is written to stderr.
:param fail_on_return_code: (Default: True) Whether to raise an
exception if the return code is not 0.
"""
python3: code change for py3 support...
r1048 kwargs['shell'] = kwargs.get('shell', True)
pep8: fix potential code warnings.
r340 starting_values = starting_values or []
python3: code change for py3 support...
r1048 if input_stream:
input_streamer = StreamFeeder(input_stream)
initial commit
r0 input_streamer.start()
python3: code change for py3 support...
r1048 input_stream = input_streamer.output
self._close_input_fd = input_stream
initial commit
r0
self._fail_on_stderr = fail_on_stderr
self._fail_on_return_code = fail_on_return_code
python3: code change for py3 support...
r1048 self.cmd = cmd
initial commit
r0
python3: code change for py3 support...
r1048 _p = subprocess.Popen(cmd, bufsize=-1, stdin=input_stream, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
initial commit
r0 **kwargs)
python3: code change for py3 support...
r1048 self.process = _p
initial commit
r0
python3: code change for py3 support...
r1048 bg_out = BufferedGenerator('stdout', _p.stdout, buffer_size, chunk_size, starting_values)
bg_err = BufferedGenerator('stderr', _p.stderr, 10240, 1, bottomless=True)
initial commit
r0
while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
# doing this until we reach either end of file, or end of buffer.
python3: code change for py3 support...
r1048 bg_out.data_added_event.wait(0.2)
initial commit
r0 bg_out.data_added_event.clear()
# at this point it's still ambiguous if we are done reading or just full buffer.
# Either way, if error (returned by ended process, or implied based on
# presence of stuff in stderr output) we error out.
# Else, we are happy.
python3: code change for py3 support...
r1048 return_code = _p.poll()
ret_code_ok = return_code in [None, 0]
ret_code_fail = return_code is not None and return_code != 0
if (
(ret_code_fail and fail_on_return_code) or
(ret_code_ok and fail_on_stderr and bg_err.length)
):
initial commit
r0
try:
_p.terminate()
except Exception:
pass
python3: code change for py3 support...
r1048
initial commit
r0 bg_out.stop()
python3: code change for py3 support...
r1048 out = b''.join(bg_out)
self._stdout = out
initial commit
r0 bg_err.stop()
python3: code change for py3 support...
r1048 err = b''.join(bg_err)
self._stderr = err
# code from https://github.com/schacon/grack/pull/7
if err.strip() == b'fatal: The remote end hung up unexpectedly' and out.startswith(b'0034shallow '):
bg_out = iter([out])
_p = None
elif err and fail_on_stderr:
text_err = err.decode()
raise OSError(
"Subprocess exited due to an error:\n{}".format(text_err))
if ret_code_fail and fail_on_return_code:
text_err = err.decode()
git: report errors from stdout if stderr is empty
r350 if not err:
# maybe get empty stderr, try stdout instead
# in many cases git reports the errors on stdout too
python3: code change for py3 support...
r1048 text_err = out.decode()
raise OSError(
"Subprocess exited with non 0 ret code:{}: stderr:{}".format(return_code, text_err))
initial commit
r0
python3: code change for py3 support...
r1048 self.stdout = bg_out
self.stderr = bg_err
self.inputstream = input_stream
def __str__(self):
proc = getattr(self, 'process', 'NO_PROCESS')
return f'SubprocessIOChunker: {proc}'
initial commit
r0
def __iter__(self):
return self
py3: 2to3 run
r1044 def __next__(self):
initial commit
r0 # Note: mikhail: We need to be sure that we are checking the return
# code after the stdout stream is closed. Some processes, e.g. git
# are doing some magic in between closing stdout and terminating the
# process and, as a result, we are not getting return code on "slow"
# systems.
pep8: fix potential code warnings.
r340 result = None
initial commit
r0 stop_iteration = None
try:
python3: code change for py3 support...
r1048 result = next(self.stdout)
initial commit
r0 except StopIteration as e:
stop_iteration = e
python3: code change for py3 support...
r1048 if self.process:
return_code = self.process.poll()
ret_code_fail = return_code is not None and return_code != 0
if ret_code_fail and self._fail_on_return_code:
self.stop_streams()
err = self.get_stderr()
raise OSError(
"Subprocess exited (exit_code:{}) due to an error during iteration:\n{}".format(return_code, err))
initial commit
r0
if stop_iteration:
raise stop_iteration
return result
python3: code change for py3 support...
r1048 def throw(self, exc_type, value=None, traceback=None):
if self.stdout.length or not self.stdout.done_reading:
raise exc_type(value)
initial commit
r0
def close(self):
if self._closed:
return
python3: code change for py3 support...
r1048
initial commit
r0 try:
self.process.terminate()
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 except Exception:
initial commit
r0 pass
if self._close_input_fd:
os.close(self._close_input_fd)
try:
python3: code change for py3 support...
r1048 self.stdout.close()
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 except Exception:
initial commit
r0 pass
try:
python3: code change for py3 support...
r1048 self.stderr.close()
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 except Exception:
initial commit
r0 pass
subprocession: limit fd leaks
r624 try:
os.close(self.inputstream)
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 except Exception:
subprocession: limit fd leaks
r624 pass
initial commit
r0
python3: code change for py3 support...
r1048 self._closed = True
def stop_streams(self):
getattr(self.stdout, 'stop', lambda: None)()
getattr(self.stderr, 'stop', lambda: None)()
def get_stdout(self):
if self._stdout:
return self._stdout
else:
return b''.join(self.stdout)
def get_stderr(self):
if self._stderr:
return self._stderr
else:
return b''.join(self.stderr)
subprocess: use subprocessio helper to run various subprocess commands.
r370
def run_command(arguments, env=None):
"""
Run the specified command and return the stdout.
:param arguments: sequence of program arguments (including the program name)
:type arguments: list[str]
"""
cmd = arguments
log.debug('Running subprocessio command %s', cmd)
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 proc = None
subprocess: use subprocessio helper to run various subprocess commands.
r370 try:
_opts = {'shell': False, 'fail_on_stderr': False}
if env:
_opts.update({'env': env})
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 proc = SubprocessIOChunker(cmd, **_opts)
python3: code change for py3 support...
r1048 return b''.join(proc), b''.join(proc.stderr)
except OSError as err:
core: various fixes of bytes vs str usage based on rhodecode-ce tests outputs
r1070 cmd = ' '.join(map(safe_str, cmd)) # human friendly CMD
subprocess: use subprocessio helper to run various subprocess commands.
r370 tb_err = ("Couldn't run subprocessio command (%s).\n"
"Original error was:%s\n" % (cmd, err))
log.exception(tb_err)
raise Exception(tb_err)
dan
subprocessio: don't use __del__ to close the buffers and readers. Instead use a finally block....
r799 finally:
if proc:
proc.close()
subprocess: use subprocessio helper to run various subprocess commands.
r370