##// END OF EJS Templates
git: report errors from stdout if stderr is empty
marcink -
r350:5473f1d4 default
parent child Browse files
Show More
@@ -1,477 +1,481 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import subprocess32 as subprocess
26 import subprocess32 as subprocess
27 from collections import deque
27 from collections import deque
28 from threading import Event, Thread
28 from threading import Event, Thread
29
29
30
30
31 class StreamFeeder(Thread):
31 class StreamFeeder(Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38
38
39 def __init__(self, source):
39 def __init__(self, source):
40 super(StreamFeeder, self).__init__()
40 super(StreamFeeder, self).__init__()
41 self.daemon = True
41 self.daemon = True
42 filelike = False
42 filelike = False
43 self.bytes = bytes()
43 self.bytes = bytes()
44 if type(source) in (type(''), bytes, bytearray): # string-like
44 if type(source) in (type(''), bytes, bytearray): # string-like
45 self.bytes = bytes(source)
45 self.bytes = bytes(source)
46 else: # can be either file pointer or file-like
46 else: # can be either file pointer or file-like
47 if type(source) in (int, long): # file pointer it is
47 if type(source) in (int, long): # file pointer it is
48 # converting file descriptor (int) stdin into file-like
48 # converting file descriptor (int) stdin into file-like
49 try:
49 try:
50 source = os.fdopen(source, 'rb', 16384)
50 source = os.fdopen(source, 'rb', 16384)
51 except Exception:
51 except Exception:
52 pass
52 pass
53 # let's see if source is file-like by now
53 # let's see if source is file-like by now
54 try:
54 try:
55 filelike = source.read
55 filelike = source.read
56 except Exception:
56 except Exception:
57 pass
57 pass
58 if not filelike and not self.bytes:
58 if not filelike and not self.bytes:
59 raise TypeError("StreamFeeder's source object must be a readable "
59 raise TypeError("StreamFeeder's source object must be a readable "
60 "file-like, a file descriptor, or a string-like.")
60 "file-like, a file descriptor, or a string-like.")
61 self.source = source
61 self.source = source
62 self.readiface, self.writeiface = os.pipe()
62 self.readiface, self.writeiface = os.pipe()
63
63
64 def run(self):
64 def run(self):
65 t = self.writeiface
65 t = self.writeiface
66 if self.bytes:
66 if self.bytes:
67 os.write(t, self.bytes)
67 os.write(t, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70 b = s.read(4096)
70 b = s.read(4096)
71 while b:
71 while b:
72 os.write(t, b)
72 os.write(t, b)
73 b = s.read(4096)
73 b = s.read(4096)
74 os.close(t)
74 os.close(t)
75
75
76 @property
76 @property
77 def output(self):
77 def output(self):
78 return self.readiface
78 return self.readiface
79
79
80
80
81 class InputStreamChunker(Thread):
81 class InputStreamChunker(Thread):
82 def __init__(self, source, target, buffer_size, chunk_size):
82 def __init__(self, source, target, buffer_size, chunk_size):
83
83
84 super(InputStreamChunker, self).__init__()
84 super(InputStreamChunker, self).__init__()
85
85
86 self.daemon = True # die die die.
86 self.daemon = True # die die die.
87
87
88 self.source = source
88 self.source = source
89 self.target = target
89 self.target = target
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 self.chunk_size = chunk_size
91 self.chunk_size = chunk_size
92
92
93 self.data_added = Event()
93 self.data_added = Event()
94 self.data_added.clear()
94 self.data_added.clear()
95
95
96 self.keep_reading = Event()
96 self.keep_reading = Event()
97 self.keep_reading.set()
97 self.keep_reading.set()
98
98
99 self.EOF = Event()
99 self.EOF = Event()
100 self.EOF.clear()
100 self.EOF.clear()
101
101
102 self.go = Event()
102 self.go = Event()
103 self.go.set()
103 self.go.set()
104
104
105 def stop(self):
105 def stop(self):
106 self.go.clear()
106 self.go.clear()
107 self.EOF.set()
107 self.EOF.set()
108 try:
108 try:
109 # this is not proper, but is done to force the reader thread let
109 # this is not proper, but is done to force the reader thread let
110 # go of the input because, if successful, .close() will send EOF
110 # go of the input because, if successful, .close() will send EOF
111 # down the pipe.
111 # down the pipe.
112 self.source.close()
112 self.source.close()
113 except:
113 except:
114 pass
114 pass
115
115
116 def run(self):
116 def run(self):
117 s = self.source
117 s = self.source
118 t = self.target
118 t = self.target
119 cs = self.chunk_size
119 cs = self.chunk_size
120 ccm = self.chunk_count_max
120 ccm = self.chunk_count_max
121 keep_reading = self.keep_reading
121 keep_reading = self.keep_reading
122 da = self.data_added
122 da = self.data_added
123 go = self.go
123 go = self.go
124
124
125 try:
125 try:
126 b = s.read(cs)
126 b = s.read(cs)
127 except ValueError:
127 except ValueError:
128 b = ''
128 b = ''
129
129
130 while b and go.is_set():
130 while b and go.is_set():
131 if len(t) > ccm:
131 if len(t) > ccm:
132 keep_reading.clear()
132 keep_reading.clear()
133 keep_reading.wait(2)
133 keep_reading.wait(2)
134
134
135 if not keep_reading.wait(10):
135 if not keep_reading.wait(10):
136 raise Exception(
136 raise Exception(
137 "Timed out while waiting for input to be read.")
137 "Timed out while waiting for input to be read.")
138
138
139 t.append(b)
139 t.append(b)
140 da.set()
140 da.set()
141 b = s.read(cs)
141 b = s.read(cs)
142 self.EOF.set()
142 self.EOF.set()
143 da.set() # for cases when done but there was no input.
143 da.set() # for cases when done but there was no input.
144
144
145
145
146 class BufferedGenerator(object):
146 class BufferedGenerator(object):
147 """
147 """
148 Class behaves as a non-blocking, buffered pipe reader.
148 Class behaves as a non-blocking, buffered pipe reader.
149 Reads chunks of data (through a thread)
149 Reads chunks of data (through a thread)
150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
151 Reading is halted in the thread when max chunks is internally buffered.
151 Reading is halted in the thread when max chunks is internally buffered.
152 The .next() may operate in blocking or non-blocking fashion by yielding
152 The .next() may operate in blocking or non-blocking fashion by yielding
153 '' if no data is ready
153 '' if no data is ready
154 to be sent or by not returning until there is some data to send
154 to be sent or by not returning until there is some data to send
155 When we get EOF from underlying source pipe we raise the marker to raise
155 When we get EOF from underlying source pipe we raise the marker to raise
156 StopIteration after the last chunk of data is yielded.
156 StopIteration after the last chunk of data is yielded.
157 """
157 """
158
158
159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
160 starting_values=None, bottomless=False):
160 starting_values=None, bottomless=False):
161 starting_values = starting_values or []
161 starting_values = starting_values or []
162
162
163 if bottomless:
163 if bottomless:
164 maxlen = int(buffer_size / chunk_size)
164 maxlen = int(buffer_size / chunk_size)
165 else:
165 else:
166 maxlen = None
166 maxlen = None
167
167
168 self.data = deque(starting_values, maxlen)
168 self.data = deque(starting_values, maxlen)
169 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 self.worker = InputStreamChunker(source, self.data, buffer_size,
170 chunk_size)
170 chunk_size)
171 if starting_values:
171 if starting_values:
172 self.worker.data_added.set()
172 self.worker.data_added.set()
173 self.worker.start()
173 self.worker.start()
174
174
175 ####################
175 ####################
176 # Generator's methods
176 # Generator's methods
177 ####################
177 ####################
178
178
179 def __iter__(self):
179 def __iter__(self):
180 return self
180 return self
181
181
182 def next(self):
182 def next(self):
183 while not len(self.data) and not self.worker.EOF.is_set():
183 while not len(self.data) and not self.worker.EOF.is_set():
184 self.worker.data_added.clear()
184 self.worker.data_added.clear()
185 self.worker.data_added.wait(0.2)
185 self.worker.data_added.wait(0.2)
186 if len(self.data):
186 if len(self.data):
187 self.worker.keep_reading.set()
187 self.worker.keep_reading.set()
188 return bytes(self.data.popleft())
188 return bytes(self.data.popleft())
189 elif self.worker.EOF.is_set():
189 elif self.worker.EOF.is_set():
190 raise StopIteration
190 raise StopIteration
191
191
192 def throw(self, exc_type, value=None, traceback=None):
192 def throw(self, exc_type, value=None, traceback=None):
193 if not self.worker.EOF.is_set():
193 if not self.worker.EOF.is_set():
194 raise exc_type(value)
194 raise exc_type(value)
195
195
196 def start(self):
196 def start(self):
197 self.worker.start()
197 self.worker.start()
198
198
199 def stop(self):
199 def stop(self):
200 self.worker.stop()
200 self.worker.stop()
201
201
202 def close(self):
202 def close(self):
203 try:
203 try:
204 self.worker.stop()
204 self.worker.stop()
205 self.throw(GeneratorExit)
205 self.throw(GeneratorExit)
206 except (GeneratorExit, StopIteration):
206 except (GeneratorExit, StopIteration):
207 pass
207 pass
208
208
209 def __del__(self):
209 def __del__(self):
210 self.close()
210 self.close()
211
211
212 ####################
212 ####################
213 # Threaded reader's infrastructure.
213 # Threaded reader's infrastructure.
214 ####################
214 ####################
215 @property
215 @property
216 def input(self):
216 def input(self):
217 return self.worker.w
217 return self.worker.w
218
218
219 @property
219 @property
220 def data_added_event(self):
220 def data_added_event(self):
221 return self.worker.data_added
221 return self.worker.data_added
222
222
223 @property
223 @property
224 def data_added(self):
224 def data_added(self):
225 return self.worker.data_added.is_set()
225 return self.worker.data_added.is_set()
226
226
227 @property
227 @property
228 def reading_paused(self):
228 def reading_paused(self):
229 return not self.worker.keep_reading.is_set()
229 return not self.worker.keep_reading.is_set()
230
230
231 @property
231 @property
232 def done_reading_event(self):
232 def done_reading_event(self):
233 """
233 """
234 Done_reding does not mean that the iterator's buffer is empty.
234 Done_reding does not mean that the iterator's buffer is empty.
235 Iterator might have done reading from underlying source, but the read
235 Iterator might have done reading from underlying source, but the read
236 chunks might still be available for serving through .next() method.
236 chunks might still be available for serving through .next() method.
237
237
238 :returns: An Event class instance.
238 :returns: An Event class instance.
239 """
239 """
240 return self.worker.EOF
240 return self.worker.EOF
241
241
242 @property
242 @property
243 def done_reading(self):
243 def done_reading(self):
244 """
244 """
245 Done_reding does not mean that the iterator's buffer is empty.
245 Done_reding does not mean that the iterator's buffer is empty.
246 Iterator might have done reading from underlying source, but the read
246 Iterator might have done reading from underlying source, but the read
247 chunks might still be available for serving through .next() method.
247 chunks might still be available for serving through .next() method.
248
248
249 :returns: An Bool value.
249 :returns: An Bool value.
250 """
250 """
251 return self.worker.EOF.is_set()
251 return self.worker.EOF.is_set()
252
252
253 @property
253 @property
254 def length(self):
254 def length(self):
255 """
255 """
256 returns int.
256 returns int.
257
257
258 This is the lenght of the que of chunks, not the length of
258 This is the lenght of the que of chunks, not the length of
259 the combined contents in those chunks.
259 the combined contents in those chunks.
260
260
261 __len__() cannot be meaningfully implemented because this
261 __len__() cannot be meaningfully implemented because this
262 reader is just flying throuh a bottomless pit content and
262 reader is just flying throuh a bottomless pit content and
263 can only know the lenght of what it already saw.
263 can only know the lenght of what it already saw.
264
264
265 If __len__() on WSGI server per PEP 3333 returns a value,
265 If __len__() on WSGI server per PEP 3333 returns a value,
266 the responce's length will be set to that. In order not to
266 the responce's length will be set to that. In order not to
267 confuse WSGI PEP3333 servers, we will not implement __len__
267 confuse WSGI PEP3333 servers, we will not implement __len__
268 at all.
268 at all.
269 """
269 """
270 return len(self.data)
270 return len(self.data)
271
271
272 def prepend(self, x):
272 def prepend(self, x):
273 self.data.appendleft(x)
273 self.data.appendleft(x)
274
274
275 def append(self, x):
275 def append(self, x):
276 self.data.append(x)
276 self.data.append(x)
277
277
278 def extend(self, o):
278 def extend(self, o):
279 self.data.extend(o)
279 self.data.extend(o)
280
280
281 def __getitem__(self, i):
281 def __getitem__(self, i):
282 return self.data[i]
282 return self.data[i]
283
283
284
284
285 class SubprocessIOChunker(object):
285 class SubprocessIOChunker(object):
286 """
286 """
287 Processor class wrapping handling of subprocess IO.
287 Processor class wrapping handling of subprocess IO.
288
288
289 .. important::
289 .. important::
290
290
291 Watch out for the method `__del__` on this class. If this object
291 Watch out for the method `__del__` on this class. If this object
292 is deleted, it will kill the subprocess, so avoid to
292 is deleted, it will kill the subprocess, so avoid to
293 return the `output` attribute or usage of it like in the following
293 return the `output` attribute or usage of it like in the following
294 example::
294 example::
295
295
296 # `args` expected to run a program that produces a lot of output
296 # `args` expected to run a program that produces a lot of output
297 output = ''.join(SubprocessIOChunker(
297 output = ''.join(SubprocessIOChunker(
298 args, shell=False, inputstream=inputstream, env=environ).output)
298 args, shell=False, inputstream=inputstream, env=environ).output)
299
299
300 # `output` will not contain all the data, because the __del__ method
300 # `output` will not contain all the data, because the __del__ method
301 # has already killed the subprocess in this case before all output
301 # has already killed the subprocess in this case before all output
302 # has been consumed.
302 # has been consumed.
303
303
304
304
305
305
306 In a way, this is a "communicate()" replacement with a twist.
306 In a way, this is a "communicate()" replacement with a twist.
307
307
308 - We are multithreaded. Writing in and reading out, err are all sep threads.
308 - We are multithreaded. Writing in and reading out, err are all sep threads.
309 - We support concurrent (in and out) stream processing.
309 - We support concurrent (in and out) stream processing.
310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
312 - We are non-blocking in more respects than communicate()
312 - We are non-blocking in more respects than communicate()
313 (reading from subprocess out pauses when internal buffer is full, but
313 (reading from subprocess out pauses when internal buffer is full, but
314 does not block the parent calling code. On the flip side, reading from
314 does not block the parent calling code. On the flip side, reading from
315 slow-yielding subprocess may block the iteration until data shows up. This
315 slow-yielding subprocess may block the iteration until data shows up. This
316 does not block the parallel inpipe reading occurring parallel thread.)
316 does not block the parallel inpipe reading occurring parallel thread.)
317
317
318 The purpose of the object is to allow us to wrap subprocess interactions into
318 The purpose of the object is to allow us to wrap subprocess interactions into
319 and interable that can be passed to a WSGI server as the application's return
319 and interable that can be passed to a WSGI server as the application's return
320 value. Because of stream-processing-ability, WSGI does not have to read ALL
320 value. Because of stream-processing-ability, WSGI does not have to read ALL
321 of the subprocess's output and buffer it, before handing it to WSGI server for
321 of the subprocess's output and buffer it, before handing it to WSGI server for
322 HTTP response. Instead, the class initializer reads just a bit of the stream
322 HTTP response. Instead, the class initializer reads just a bit of the stream
323 to figure out if error ocurred or likely to occur and if not, just hands the
323 to figure out if error ocurred or likely to occur and if not, just hands the
324 further iteration over subprocess output to the server for completion of HTTP
324 further iteration over subprocess output to the server for completion of HTTP
325 response.
325 response.
326
326
327 The real or perceived subprocess error is trapped and raised as one of
327 The real or perceived subprocess error is trapped and raised as one of
328 EnvironmentError family of exceptions
328 EnvironmentError family of exceptions
329
329
330 Example usage:
330 Example usage:
331 # try:
331 # try:
332 # answer = SubprocessIOChunker(
332 # answer = SubprocessIOChunker(
333 # cmd,
333 # cmd,
334 # input,
334 # input,
335 # buffer_size = 65536,
335 # buffer_size = 65536,
336 # chunk_size = 4096
336 # chunk_size = 4096
337 # )
337 # )
338 # except (EnvironmentError) as e:
338 # except (EnvironmentError) as e:
339 # print str(e)
339 # print str(e)
340 # raise e
340 # raise e
341 #
341 #
342 # return answer
342 # return answer
343
343
344
344
345 """
345 """
346
346
347 # TODO: johbo: This is used to make sure that the open end of the PIPE
347 # TODO: johbo: This is used to make sure that the open end of the PIPE
348 # is closed in the end. It would be way better to wrap this into an
348 # is closed in the end. It would be way better to wrap this into an
349 # object, so that it is closed automatically once it is consumed or
349 # object, so that it is closed automatically once it is consumed or
350 # something similar.
350 # something similar.
351 _close_input_fd = None
351 _close_input_fd = None
352
352
353 _closed = False
353 _closed = False
354
354
355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
357 fail_on_return_code=True, **kwargs):
357 fail_on_return_code=True, **kwargs):
358 """
358 """
359 Initializes SubprocessIOChunker
359 Initializes SubprocessIOChunker
360
360
361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
362 :param inputstream: (Default: None) A file-like, string, or file pointer.
362 :param inputstream: (Default: None) A file-like, string, or file pointer.
363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
365 :param starting_values: (Default: []) An array of strings to put in front of output que.
365 :param starting_values: (Default: []) An array of strings to put in front of output que.
366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
367 case something is written to stderr.
367 case something is written to stderr.
368 :param fail_on_return_code: (Default: True) Whether to raise an
368 :param fail_on_return_code: (Default: True) Whether to raise an
369 exception if the return code is not 0.
369 exception if the return code is not 0.
370 """
370 """
371
371
372 starting_values = starting_values or []
372 starting_values = starting_values or []
373 if inputstream:
373 if inputstream:
374 input_streamer = StreamFeeder(inputstream)
374 input_streamer = StreamFeeder(inputstream)
375 input_streamer.start()
375 input_streamer.start()
376 inputstream = input_streamer.output
376 inputstream = input_streamer.output
377 self._close_input_fd = inputstream
377 self._close_input_fd = inputstream
378
378
379 self._fail_on_stderr = fail_on_stderr
379 self._fail_on_stderr = fail_on_stderr
380 self._fail_on_return_code = fail_on_return_code
380 self._fail_on_return_code = fail_on_return_code
381
381
382 _shell = kwargs.get('shell', True)
382 _shell = kwargs.get('shell', True)
383 kwargs['shell'] = _shell
383 kwargs['shell'] = _shell
384
384
385 _p = subprocess.Popen(cmd, bufsize=-1,
385 _p = subprocess.Popen(cmd, bufsize=-1,
386 stdin=inputstream,
386 stdin=inputstream,
387 stdout=subprocess.PIPE,
387 stdout=subprocess.PIPE,
388 stderr=subprocess.PIPE,
388 stderr=subprocess.PIPE,
389 **kwargs)
389 **kwargs)
390
390
391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 starting_values)
392 starting_values)
393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394
394
395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 # doing this until we reach either end of file, or end of buffer.
396 # doing this until we reach either end of file, or end of buffer.
397 bg_out.data_added_event.wait(1)
397 bg_out.data_added_event.wait(1)
398 bg_out.data_added_event.clear()
398 bg_out.data_added_event.clear()
399
399
400 # at this point it's still ambiguous if we are done reading or just full buffer.
400 # at this point it's still ambiguous if we are done reading or just full buffer.
401 # Either way, if error (returned by ended process, or implied based on
401 # Either way, if error (returned by ended process, or implied based on
402 # presence of stuff in stderr output) we error out.
402 # presence of stuff in stderr output) we error out.
403 # Else, we are happy.
403 # Else, we are happy.
404 _returncode = _p.poll()
404 _returncode = _p.poll()
405
405
406 if ((_returncode and fail_on_return_code) or
406 if ((_returncode and fail_on_return_code) or
407 (fail_on_stderr and _returncode is None and bg_err.length)):
407 (fail_on_stderr and _returncode is None and bg_err.length)):
408 try:
408 try:
409 _p.terminate()
409 _p.terminate()
410 except Exception:
410 except Exception:
411 pass
411 pass
412 bg_out.stop()
412 bg_out.stop()
413 bg_err.stop()
413 bg_err.stop()
414 if fail_on_stderr:
414 if fail_on_stderr:
415 err = ''.join(bg_err)
415 err = ''.join(bg_err)
416 raise EnvironmentError(
416 raise EnvironmentError(
417 "Subprocess exited due to an error:\n" + err)
417 "Subprocess exited due to an error:\n" + err)
418 if _returncode and fail_on_return_code:
418 if _returncode and fail_on_return_code:
419 err = ''.join(bg_err)
419 err = ''.join(bg_err)
420 if not err:
421 # maybe get empty stderr, try stdout instead
422 # in many cases git reports the errors on stdout too
423 err = ''.join(bg_out)
420 raise EnvironmentError(
424 raise EnvironmentError(
421 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
425 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
422 _returncode, err))
426 _returncode, err))
423
427
424 self.process = _p
428 self.process = _p
425 self.output = bg_out
429 self.output = bg_out
426 self.error = bg_err
430 self.error = bg_err
427
431
428 def __iter__(self):
432 def __iter__(self):
429 return self
433 return self
430
434
431 def next(self):
435 def next(self):
432 # Note: mikhail: We need to be sure that we are checking the return
436 # Note: mikhail: We need to be sure that we are checking the return
433 # code after the stdout stream is closed. Some processes, e.g. git
437 # code after the stdout stream is closed. Some processes, e.g. git
434 # are doing some magic in between closing stdout and terminating the
438 # are doing some magic in between closing stdout and terminating the
435 # process and, as a result, we are not getting return code on "slow"
439 # process and, as a result, we are not getting return code on "slow"
436 # systems.
440 # systems.
437 result = None
441 result = None
438 stop_iteration = None
442 stop_iteration = None
439 try:
443 try:
440 result = self.output.next()
444 result = self.output.next()
441 except StopIteration as e:
445 except StopIteration as e:
442 stop_iteration = e
446 stop_iteration = e
443
447
444 if self.process.poll() and self._fail_on_return_code:
448 if self.process.poll() and self._fail_on_return_code:
445 err = '%s' % ''.join(self.error)
449 err = '%s' % ''.join(self.error)
446 raise EnvironmentError(
450 raise EnvironmentError(
447 "Subprocess exited due to an error:\n" + err)
451 "Subprocess exited due to an error:\n" + err)
448
452
449 if stop_iteration:
453 if stop_iteration:
450 raise stop_iteration
454 raise stop_iteration
451 return result
455 return result
452
456
453 def throw(self, type, value=None, traceback=None):
457 def throw(self, type, value=None, traceback=None):
454 if self.output.length or not self.output.done_reading:
458 if self.output.length or not self.output.done_reading:
455 raise type(value)
459 raise type(value)
456
460
457 def close(self):
461 def close(self):
458 if self._closed:
462 if self._closed:
459 return
463 return
460 self._closed = True
464 self._closed = True
461 try:
465 try:
462 self.process.terminate()
466 self.process.terminate()
463 except:
467 except:
464 pass
468 pass
465 if self._close_input_fd:
469 if self._close_input_fd:
466 os.close(self._close_input_fd)
470 os.close(self._close_input_fd)
467 try:
471 try:
468 self.output.close()
472 self.output.close()
469 except:
473 except:
470 pass
474 pass
471 try:
475 try:
472 self.error.close()
476 self.error.close()
473 except:
477 except:
474 pass
478 pass
475
479
476 def __del__(self):
480 def __del__(self):
477 self.close()
481 self.close()
General Comments 0
You need to be logged in to leave comments. Login now