##// END OF EJS Templates
subprocessio: instead of raising a Thread error we sent a signal to gunicorn....
marcink -
r358:12f7a361 stable
parent child Browse files
Show More
@@ -1,480 +1,484 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import logging
26 import subprocess32 as subprocess
27 import subprocess32 as subprocess
27 from collections import deque
28 from collections import deque
28 from threading import Event, Thread
29 from threading import Event, Thread
29
30
31 log = logging.getLogger(__name__)
32
30
33
31 class StreamFeeder(Thread):
34 class StreamFeeder(Thread):
32 """
35 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
36 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
37 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
38 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
39 We close inpipe once the end of the source stream is reached.
37 """
40 """
38
41
39 def __init__(self, source):
42 def __init__(self, source):
40 super(StreamFeeder, self).__init__()
43 super(StreamFeeder, self).__init__()
41 self.daemon = True
44 self.daemon = True
42 filelike = False
45 filelike = False
43 self.bytes = bytes()
46 self.bytes = bytes()
44 if type(source) in (type(''), bytes, bytearray): # string-like
47 if type(source) in (type(''), bytes, bytearray): # string-like
45 self.bytes = bytes(source)
48 self.bytes = bytes(source)
46 else: # can be either file pointer or file-like
49 else: # can be either file pointer or file-like
47 if type(source) in (int, long): # file pointer it is
50 if type(source) in (int, long): # file pointer it is
48 # converting file descriptor (int) stdin into file-like
51 # converting file descriptor (int) stdin into file-like
49 try:
52 try:
50 source = os.fdopen(source, 'rb', 16384)
53 source = os.fdopen(source, 'rb', 16384)
51 except Exception:
54 except Exception:
52 pass
55 pass
53 # let's see if source is file-like by now
56 # let's see if source is file-like by now
54 try:
57 try:
55 filelike = source.read
58 filelike = source.read
56 except Exception:
59 except Exception:
57 pass
60 pass
58 if not filelike and not self.bytes:
61 if not filelike and not self.bytes:
59 raise TypeError("StreamFeeder's source object must be a readable "
62 raise TypeError("StreamFeeder's source object must be a readable "
60 "file-like, a file descriptor, or a string-like.")
63 "file-like, a file descriptor, or a string-like.")
61 self.source = source
64 self.source = source
62 self.readiface, self.writeiface = os.pipe()
65 self.readiface, self.writeiface = os.pipe()
63
66
64 def run(self):
67 def run(self):
65 t = self.writeiface
68 t = self.writeiface
66 if self.bytes:
69 if self.bytes:
67 os.write(t, self.bytes)
70 os.write(t, self.bytes)
68 else:
71 else:
69 s = self.source
72 s = self.source
70 b = s.read(4096)
73 b = s.read(4096)
71 while b:
74 while b:
72 os.write(t, b)
75 os.write(t, b)
73 b = s.read(4096)
76 b = s.read(4096)
74 os.close(t)
77 os.close(t)
75
78
76 @property
79 @property
77 def output(self):
80 def output(self):
78 return self.readiface
81 return self.readiface
79
82
80
83
81 class InputStreamChunker(Thread):
84 class InputStreamChunker(Thread):
82 def __init__(self, source, target, buffer_size, chunk_size):
85 def __init__(self, source, target, buffer_size, chunk_size):
83
86
84 super(InputStreamChunker, self).__init__()
87 super(InputStreamChunker, self).__init__()
85
88
86 self.daemon = True # die die die.
89 self.daemon = True # die die die.
87
90
88 self.source = source
91 self.source = source
89 self.target = target
92 self.target = target
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 self.chunk_size = chunk_size
94 self.chunk_size = chunk_size
92
95
93 self.data_added = Event()
96 self.data_added = Event()
94 self.data_added.clear()
97 self.data_added.clear()
95
98
96 self.keep_reading = Event()
99 self.keep_reading = Event()
97 self.keep_reading.set()
100 self.keep_reading.set()
98
101
99 self.EOF = Event()
102 self.EOF = Event()
100 self.EOF.clear()
103 self.EOF.clear()
101
104
102 self.go = Event()
105 self.go = Event()
103 self.go.set()
106 self.go.set()
104
107
105 def stop(self):
108 def stop(self):
106 self.go.clear()
109 self.go.clear()
107 self.EOF.set()
110 self.EOF.set()
108 try:
111 try:
109 # this is not proper, but is done to force the reader thread let
112 # this is not proper, but is done to force the reader thread let
110 # go of the input because, if successful, .close() will send EOF
113 # go of the input because, if successful, .close() will send EOF
111 # down the pipe.
114 # down the pipe.
112 self.source.close()
115 self.source.close()
113 except:
116 except:
114 pass
117 pass
115
118
116 def run(self):
119 def run(self):
117 s = self.source
120 s = self.source
118 t = self.target
121 t = self.target
119 cs = self.chunk_size
122 cs = self.chunk_size
120 chunk_count_max = self.chunk_count_max
123 chunk_count_max = self.chunk_count_max
121 keep_reading = self.keep_reading
124 keep_reading = self.keep_reading
122 da = self.data_added
125 da = self.data_added
123 go = self.go
126 go = self.go
124
127
125 try:
128 try:
126 b = s.read(cs)
129 b = s.read(cs)
127 except ValueError:
130 except ValueError:
128 b = ''
131 b = ''
129
132
130 timeout_input = 20
133 timeout_input = 20
131 while b and go.is_set():
134 while b and go.is_set():
132 if len(t) > chunk_count_max:
135 if len(t) > chunk_count_max:
133 keep_reading.clear()
136 keep_reading.clear()
134 keep_reading.wait(timeout_input)
137 keep_reading.wait(timeout_input)
135 if len(t) > chunk_count_max + timeout_input:
138 if len(t) > chunk_count_max + timeout_input:
136 raise IOError(
139 log.error("Timed out while waiting for input from subprocess.")
137 "Timed out while waiting for input from subprocess.")
140 os._exit(-1) # this will cause the worker to recycle itself
141
138 t.append(b)
142 t.append(b)
139 da.set()
143 da.set()
140 b = s.read(cs)
144 b = s.read(cs)
141 self.EOF.set()
145 self.EOF.set()
142 da.set() # for cases when done but there was no input.
146 da.set() # for cases when done but there was no input.
143
147
144
148
145 class BufferedGenerator(object):
149 class BufferedGenerator(object):
146 """
150 """
147 Class behaves as a non-blocking, buffered pipe reader.
151 Class behaves as a non-blocking, buffered pipe reader.
148 Reads chunks of data (through a thread)
152 Reads chunks of data (through a thread)
149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
153 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 Reading is halted in the thread when max chunks is internally buffered.
154 Reading is halted in the thread when max chunks is internally buffered.
151 The .next() may operate in blocking or non-blocking fashion by yielding
155 The .next() may operate in blocking or non-blocking fashion by yielding
152 '' if no data is ready
156 '' if no data is ready
153 to be sent or by not returning until there is some data to send
157 to be sent or by not returning until there is some data to send
154 When we get EOF from underlying source pipe we raise the marker to raise
158 When we get EOF from underlying source pipe we raise the marker to raise
155 StopIteration after the last chunk of data is yielded.
159 StopIteration after the last chunk of data is yielded.
156 """
160 """
157
161
158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
162 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 starting_values=None, bottomless=False):
163 starting_values=None, bottomless=False):
160 starting_values = starting_values or []
164 starting_values = starting_values or []
161
165
162 if bottomless:
166 if bottomless:
163 maxlen = int(buffer_size / chunk_size)
167 maxlen = int(buffer_size / chunk_size)
164 else:
168 else:
165 maxlen = None
169 maxlen = None
166
170
167 self.data = deque(starting_values, maxlen)
171 self.data = deque(starting_values, maxlen)
168 self.worker = InputStreamChunker(source, self.data, buffer_size,
172 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 chunk_size)
173 chunk_size)
170 if starting_values:
174 if starting_values:
171 self.worker.data_added.set()
175 self.worker.data_added.set()
172 self.worker.start()
176 self.worker.start()
173
177
174 ####################
178 ####################
175 # Generator's methods
179 # Generator's methods
176 ####################
180 ####################
177
181
178 def __iter__(self):
182 def __iter__(self):
179 return self
183 return self
180
184
181 def next(self):
185 def next(self):
182 while not len(self.data) and not self.worker.EOF.is_set():
186 while not len(self.data) and not self.worker.EOF.is_set():
183 self.worker.data_added.clear()
187 self.worker.data_added.clear()
184 self.worker.data_added.wait(0.2)
188 self.worker.data_added.wait(0.2)
185 if len(self.data):
189 if len(self.data):
186 self.worker.keep_reading.set()
190 self.worker.keep_reading.set()
187 return bytes(self.data.popleft())
191 return bytes(self.data.popleft())
188 elif self.worker.EOF.is_set():
192 elif self.worker.EOF.is_set():
189 raise StopIteration
193 raise StopIteration
190
194
191 def throw(self, exc_type, value=None, traceback=None):
195 def throw(self, exc_type, value=None, traceback=None):
192 if not self.worker.EOF.is_set():
196 if not self.worker.EOF.is_set():
193 raise exc_type(value)
197 raise exc_type(value)
194
198
195 def start(self):
199 def start(self):
196 self.worker.start()
200 self.worker.start()
197
201
198 def stop(self):
202 def stop(self):
199 self.worker.stop()
203 self.worker.stop()
200
204
201 def close(self):
205 def close(self):
202 try:
206 try:
203 self.worker.stop()
207 self.worker.stop()
204 self.throw(GeneratorExit)
208 self.throw(GeneratorExit)
205 except (GeneratorExit, StopIteration):
209 except (GeneratorExit, StopIteration):
206 pass
210 pass
207
211
208 def __del__(self):
212 def __del__(self):
209 self.close()
213 self.close()
210
214
211 ####################
215 ####################
212 # Threaded reader's infrastructure.
216 # Threaded reader's infrastructure.
213 ####################
217 ####################
214 @property
218 @property
215 def input(self):
219 def input(self):
216 return self.worker.w
220 return self.worker.w
217
221
218 @property
222 @property
219 def data_added_event(self):
223 def data_added_event(self):
220 return self.worker.data_added
224 return self.worker.data_added
221
225
222 @property
226 @property
223 def data_added(self):
227 def data_added(self):
224 return self.worker.data_added.is_set()
228 return self.worker.data_added.is_set()
225
229
226 @property
230 @property
227 def reading_paused(self):
231 def reading_paused(self):
228 return not self.worker.keep_reading.is_set()
232 return not self.worker.keep_reading.is_set()
229
233
230 @property
234 @property
231 def done_reading_event(self):
235 def done_reading_event(self):
232 """
236 """
233 Done_reding does not mean that the iterator's buffer is empty.
237 Done_reding does not mean that the iterator's buffer is empty.
234 Iterator might have done reading from underlying source, but the read
238 Iterator might have done reading from underlying source, but the read
235 chunks might still be available for serving through .next() method.
239 chunks might still be available for serving through .next() method.
236
240
237 :returns: An Event class instance.
241 :returns: An Event class instance.
238 """
242 """
239 return self.worker.EOF
243 return self.worker.EOF
240
244
241 @property
245 @property
242 def done_reading(self):
246 def done_reading(self):
243 """
247 """
244 Done_reding does not mean that the iterator's buffer is empty.
248 Done_reding does not mean that the iterator's buffer is empty.
245 Iterator might have done reading from underlying source, but the read
249 Iterator might have done reading from underlying source, but the read
246 chunks might still be available for serving through .next() method.
250 chunks might still be available for serving through .next() method.
247
251
248 :returns: An Bool value.
252 :returns: An Bool value.
249 """
253 """
250 return self.worker.EOF.is_set()
254 return self.worker.EOF.is_set()
251
255
252 @property
256 @property
253 def length(self):
257 def length(self):
254 """
258 """
255 returns int.
259 returns int.
256
260
257 This is the lenght of the que of chunks, not the length of
261 This is the lenght of the que of chunks, not the length of
258 the combined contents in those chunks.
262 the combined contents in those chunks.
259
263
260 __len__() cannot be meaningfully implemented because this
264 __len__() cannot be meaningfully implemented because this
261 reader is just flying throuh a bottomless pit content and
265 reader is just flying throuh a bottomless pit content and
262 can only know the lenght of what it already saw.
266 can only know the lenght of what it already saw.
263
267
264 If __len__() on WSGI server per PEP 3333 returns a value,
268 If __len__() on WSGI server per PEP 3333 returns a value,
265 the responce's length will be set to that. In order not to
269 the responce's length will be set to that. In order not to
266 confuse WSGI PEP3333 servers, we will not implement __len__
270 confuse WSGI PEP3333 servers, we will not implement __len__
267 at all.
271 at all.
268 """
272 """
269 return len(self.data)
273 return len(self.data)
270
274
271 def prepend(self, x):
275 def prepend(self, x):
272 self.data.appendleft(x)
276 self.data.appendleft(x)
273
277
274 def append(self, x):
278 def append(self, x):
275 self.data.append(x)
279 self.data.append(x)
276
280
277 def extend(self, o):
281 def extend(self, o):
278 self.data.extend(o)
282 self.data.extend(o)
279
283
280 def __getitem__(self, i):
284 def __getitem__(self, i):
281 return self.data[i]
285 return self.data[i]
282
286
283
287
284 class SubprocessIOChunker(object):
288 class SubprocessIOChunker(object):
285 """
289 """
286 Processor class wrapping handling of subprocess IO.
290 Processor class wrapping handling of subprocess IO.
287
291
288 .. important::
292 .. important::
289
293
290 Watch out for the method `__del__` on this class. If this object
294 Watch out for the method `__del__` on this class. If this object
291 is deleted, it will kill the subprocess, so avoid to
295 is deleted, it will kill the subprocess, so avoid to
292 return the `output` attribute or usage of it like in the following
296 return the `output` attribute or usage of it like in the following
293 example::
297 example::
294
298
295 # `args` expected to run a program that produces a lot of output
299 # `args` expected to run a program that produces a lot of output
296 output = ''.join(SubprocessIOChunker(
300 output = ''.join(SubprocessIOChunker(
297 args, shell=False, inputstream=inputstream, env=environ).output)
301 args, shell=False, inputstream=inputstream, env=environ).output)
298
302
299 # `output` will not contain all the data, because the __del__ method
303 # `output` will not contain all the data, because the __del__ method
300 # has already killed the subprocess in this case before all output
304 # has already killed the subprocess in this case before all output
301 # has been consumed.
305 # has been consumed.
302
306
303
307
304
308
305 In a way, this is a "communicate()" replacement with a twist.
309 In a way, this is a "communicate()" replacement with a twist.
306
310
307 - We are multithreaded. Writing in and reading out, err are all sep threads.
311 - We are multithreaded. Writing in and reading out, err are all sep threads.
308 - We support concurrent (in and out) stream processing.
312 - We support concurrent (in and out) stream processing.
309 - The output is not a stream. It's a queue of read string (bytes, not unicode)
313 - The output is not a stream. It's a queue of read string (bytes, not unicode)
310 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
314 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
311 - We are non-blocking in more respects than communicate()
315 - We are non-blocking in more respects than communicate()
312 (reading from subprocess out pauses when internal buffer is full, but
316 (reading from subprocess out pauses when internal buffer is full, but
313 does not block the parent calling code. On the flip side, reading from
317 does not block the parent calling code. On the flip side, reading from
314 slow-yielding subprocess may block the iteration until data shows up. This
318 slow-yielding subprocess may block the iteration until data shows up. This
315 does not block the parallel inpipe reading occurring parallel thread.)
319 does not block the parallel inpipe reading occurring parallel thread.)
316
320
317 The purpose of the object is to allow us to wrap subprocess interactions into
321 The purpose of the object is to allow us to wrap subprocess interactions into
318 and interable that can be passed to a WSGI server as the application's return
322 and interable that can be passed to a WSGI server as the application's return
319 value. Because of stream-processing-ability, WSGI does not have to read ALL
323 value. Because of stream-processing-ability, WSGI does not have to read ALL
320 of the subprocess's output and buffer it, before handing it to WSGI server for
324 of the subprocess's output and buffer it, before handing it to WSGI server for
321 HTTP response. Instead, the class initializer reads just a bit of the stream
325 HTTP response. Instead, the class initializer reads just a bit of the stream
322 to figure out if error ocurred or likely to occur and if not, just hands the
326 to figure out if error ocurred or likely to occur and if not, just hands the
323 further iteration over subprocess output to the server for completion of HTTP
327 further iteration over subprocess output to the server for completion of HTTP
324 response.
328 response.
325
329
326 The real or perceived subprocess error is trapped and raised as one of
330 The real or perceived subprocess error is trapped and raised as one of
327 EnvironmentError family of exceptions
331 EnvironmentError family of exceptions
328
332
329 Example usage:
333 Example usage:
330 # try:
334 # try:
331 # answer = SubprocessIOChunker(
335 # answer = SubprocessIOChunker(
332 # cmd,
336 # cmd,
333 # input,
337 # input,
334 # buffer_size = 65536,
338 # buffer_size = 65536,
335 # chunk_size = 4096
339 # chunk_size = 4096
336 # )
340 # )
337 # except (EnvironmentError) as e:
341 # except (EnvironmentError) as e:
338 # print str(e)
342 # print str(e)
339 # raise e
343 # raise e
340 #
344 #
341 # return answer
345 # return answer
342
346
343
347
344 """
348 """
345
349
346 # TODO: johbo: This is used to make sure that the open end of the PIPE
350 # TODO: johbo: This is used to make sure that the open end of the PIPE
347 # is closed in the end. It would be way better to wrap this into an
351 # is closed in the end. It would be way better to wrap this into an
348 # object, so that it is closed automatically once it is consumed or
352 # object, so that it is closed automatically once it is consumed or
349 # something similar.
353 # something similar.
350 _close_input_fd = None
354 _close_input_fd = None
351
355
352 _closed = False
356 _closed = False
353
357
354 def __init__(self, cmd, inputstream=None, buffer_size=65536,
358 def __init__(self, cmd, inputstream=None, buffer_size=65536,
355 chunk_size=4096, starting_values=None, fail_on_stderr=True,
359 chunk_size=4096, starting_values=None, fail_on_stderr=True,
356 fail_on_return_code=True, **kwargs):
360 fail_on_return_code=True, **kwargs):
357 """
361 """
358 Initializes SubprocessIOChunker
362 Initializes SubprocessIOChunker
359
363
360 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
364 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
361 :param inputstream: (Default: None) A file-like, string, or file pointer.
365 :param inputstream: (Default: None) A file-like, string, or file pointer.
362 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
366 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
363 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
367 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
364 :param starting_values: (Default: []) An array of strings to put in front of output que.
368 :param starting_values: (Default: []) An array of strings to put in front of output que.
365 :param fail_on_stderr: (Default: True) Whether to raise an exception in
369 :param fail_on_stderr: (Default: True) Whether to raise an exception in
366 case something is written to stderr.
370 case something is written to stderr.
367 :param fail_on_return_code: (Default: True) Whether to raise an
371 :param fail_on_return_code: (Default: True) Whether to raise an
368 exception if the return code is not 0.
372 exception if the return code is not 0.
369 """
373 """
370
374
371 starting_values = starting_values or []
375 starting_values = starting_values or []
372 if inputstream:
376 if inputstream:
373 input_streamer = StreamFeeder(inputstream)
377 input_streamer = StreamFeeder(inputstream)
374 input_streamer.start()
378 input_streamer.start()
375 inputstream = input_streamer.output
379 inputstream = input_streamer.output
376 self._close_input_fd = inputstream
380 self._close_input_fd = inputstream
377
381
378 self._fail_on_stderr = fail_on_stderr
382 self._fail_on_stderr = fail_on_stderr
379 self._fail_on_return_code = fail_on_return_code
383 self._fail_on_return_code = fail_on_return_code
380
384
381 _shell = kwargs.get('shell', True)
385 _shell = kwargs.get('shell', True)
382 kwargs['shell'] = _shell
386 kwargs['shell'] = _shell
383
387
384 _p = subprocess.Popen(cmd, bufsize=-1,
388 _p = subprocess.Popen(cmd, bufsize=-1,
385 stdin=inputstream,
389 stdin=inputstream,
386 stdout=subprocess.PIPE,
390 stdout=subprocess.PIPE,
387 stderr=subprocess.PIPE,
391 stderr=subprocess.PIPE,
388 **kwargs)
392 **kwargs)
389
393
390 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
394 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
391 starting_values)
395 starting_values)
392 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
396 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
393
397
394 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
398 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
395 # doing this until we reach either end of file, or end of buffer.
399 # doing this until we reach either end of file, or end of buffer.
396 bg_out.data_added_event.wait(1)
400 bg_out.data_added_event.wait(1)
397 bg_out.data_added_event.clear()
401 bg_out.data_added_event.clear()
398
402
399 # at this point it's still ambiguous if we are done reading or just full buffer.
403 # at this point it's still ambiguous if we are done reading or just full buffer.
400 # Either way, if error (returned by ended process, or implied based on
404 # Either way, if error (returned by ended process, or implied based on
401 # presence of stuff in stderr output) we error out.
405 # presence of stuff in stderr output) we error out.
402 # Else, we are happy.
406 # Else, we are happy.
403 _returncode = _p.poll()
407 _returncode = _p.poll()
404
408
405 if ((_returncode and fail_on_return_code) or
409 if ((_returncode and fail_on_return_code) or
406 (fail_on_stderr and _returncode is None and bg_err.length)):
410 (fail_on_stderr and _returncode is None and bg_err.length)):
407 try:
411 try:
408 _p.terminate()
412 _p.terminate()
409 except Exception:
413 except Exception:
410 pass
414 pass
411 bg_out.stop()
415 bg_out.stop()
412 bg_err.stop()
416 bg_err.stop()
413 if fail_on_stderr:
417 if fail_on_stderr:
414 err = ''.join(bg_err)
418 err = ''.join(bg_err)
415 raise EnvironmentError(
419 raise EnvironmentError(
416 "Subprocess exited due to an error:\n" + err)
420 "Subprocess exited due to an error:\n" + err)
417 if _returncode and fail_on_return_code:
421 if _returncode and fail_on_return_code:
418 err = ''.join(bg_err)
422 err = ''.join(bg_err)
419 if not err:
423 if not err:
420 # maybe get empty stderr, try stdout instead
424 # maybe get empty stderr, try stdout instead
421 # in many cases git reports the errors on stdout too
425 # in many cases git reports the errors on stdout too
422 err = ''.join(bg_out)
426 err = ''.join(bg_out)
423 raise EnvironmentError(
427 raise EnvironmentError(
424 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
428 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
425 _returncode, err))
429 _returncode, err))
426
430
427 self.process = _p
431 self.process = _p
428 self.output = bg_out
432 self.output = bg_out
429 self.error = bg_err
433 self.error = bg_err
430
434
431 def __iter__(self):
435 def __iter__(self):
432 return self
436 return self
433
437
434 def next(self):
438 def next(self):
435 # Note: mikhail: We need to be sure that we are checking the return
439 # Note: mikhail: We need to be sure that we are checking the return
436 # code after the stdout stream is closed. Some processes, e.g. git
440 # code after the stdout stream is closed. Some processes, e.g. git
437 # are doing some magic in between closing stdout and terminating the
441 # are doing some magic in between closing stdout and terminating the
438 # process and, as a result, we are not getting return code on "slow"
442 # process and, as a result, we are not getting return code on "slow"
439 # systems.
443 # systems.
440 result = None
444 result = None
441 stop_iteration = None
445 stop_iteration = None
442 try:
446 try:
443 result = self.output.next()
447 result = self.output.next()
444 except StopIteration as e:
448 except StopIteration as e:
445 stop_iteration = e
449 stop_iteration = e
446
450
447 if self.process.poll() and self._fail_on_return_code:
451 if self.process.poll() and self._fail_on_return_code:
448 err = '%s' % ''.join(self.error)
452 err = '%s' % ''.join(self.error)
449 raise EnvironmentError(
453 raise EnvironmentError(
450 "Subprocess exited due to an error:\n" + err)
454 "Subprocess exited due to an error:\n" + err)
451
455
452 if stop_iteration:
456 if stop_iteration:
453 raise stop_iteration
457 raise stop_iteration
454 return result
458 return result
455
459
456 def throw(self, type, value=None, traceback=None):
460 def throw(self, type, value=None, traceback=None):
457 if self.output.length or not self.output.done_reading:
461 if self.output.length or not self.output.done_reading:
458 raise type(value)
462 raise type(value)
459
463
460 def close(self):
464 def close(self):
461 if self._closed:
465 if self._closed:
462 return
466 return
463 self._closed = True
467 self._closed = True
464 try:
468 try:
465 self.process.terminate()
469 self.process.terminate()
466 except:
470 except:
467 pass
471 pass
468 if self._close_input_fd:
472 if self._close_input_fd:
469 os.close(self._close_input_fd)
473 os.close(self._close_input_fd)
470 try:
474 try:
471 self.output.close()
475 self.output.close()
472 except:
476 except:
473 pass
477 pass
474 try:
478 try:
475 self.error.close()
479 self.error.close()
476 except:
480 except:
477 pass
481 pass
478
482
479 def __del__(self):
483 def __del__(self):
480 self.close()
484 self.close()
General Comments 0
You need to be logged in to leave comments. Login now