##// END OF EJS Templates
git: handle flacky and slow connection issues with git....
marcink -
r357:305b33c6 stable
parent child Browse files
Show More
@@ -1,481 +1,480 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import subprocess32 as subprocess
26 import subprocess32 as subprocess
27 from collections import deque
27 from collections import deque
28 from threading import Event, Thread
28 from threading import Event, Thread
29
29
30
30
31 class StreamFeeder(Thread):
31 class StreamFeeder(Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38
38
39 def __init__(self, source):
39 def __init__(self, source):
40 super(StreamFeeder, self).__init__()
40 super(StreamFeeder, self).__init__()
41 self.daemon = True
41 self.daemon = True
42 filelike = False
42 filelike = False
43 self.bytes = bytes()
43 self.bytes = bytes()
44 if type(source) in (type(''), bytes, bytearray): # string-like
44 if type(source) in (type(''), bytes, bytearray): # string-like
45 self.bytes = bytes(source)
45 self.bytes = bytes(source)
46 else: # can be either file pointer or file-like
46 else: # can be either file pointer or file-like
47 if type(source) in (int, long): # file pointer it is
47 if type(source) in (int, long): # file pointer it is
48 # converting file descriptor (int) stdin into file-like
48 # converting file descriptor (int) stdin into file-like
49 try:
49 try:
50 source = os.fdopen(source, 'rb', 16384)
50 source = os.fdopen(source, 'rb', 16384)
51 except Exception:
51 except Exception:
52 pass
52 pass
53 # let's see if source is file-like by now
53 # let's see if source is file-like by now
54 try:
54 try:
55 filelike = source.read
55 filelike = source.read
56 except Exception:
56 except Exception:
57 pass
57 pass
58 if not filelike and not self.bytes:
58 if not filelike and not self.bytes:
59 raise TypeError("StreamFeeder's source object must be a readable "
59 raise TypeError("StreamFeeder's source object must be a readable "
60 "file-like, a file descriptor, or a string-like.")
60 "file-like, a file descriptor, or a string-like.")
61 self.source = source
61 self.source = source
62 self.readiface, self.writeiface = os.pipe()
62 self.readiface, self.writeiface = os.pipe()
63
63
64 def run(self):
64 def run(self):
65 t = self.writeiface
65 t = self.writeiface
66 if self.bytes:
66 if self.bytes:
67 os.write(t, self.bytes)
67 os.write(t, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70 b = s.read(4096)
70 b = s.read(4096)
71 while b:
71 while b:
72 os.write(t, b)
72 os.write(t, b)
73 b = s.read(4096)
73 b = s.read(4096)
74 os.close(t)
74 os.close(t)
75
75
76 @property
76 @property
77 def output(self):
77 def output(self):
78 return self.readiface
78 return self.readiface
79
79
80
80
81 class InputStreamChunker(Thread):
81 class InputStreamChunker(Thread):
82 def __init__(self, source, target, buffer_size, chunk_size):
82 def __init__(self, source, target, buffer_size, chunk_size):
83
83
84 super(InputStreamChunker, self).__init__()
84 super(InputStreamChunker, self).__init__()
85
85
86 self.daemon = True # die die die.
86 self.daemon = True # die die die.
87
87
88 self.source = source
88 self.source = source
89 self.target = target
89 self.target = target
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 self.chunk_size = chunk_size
91 self.chunk_size = chunk_size
92
92
93 self.data_added = Event()
93 self.data_added = Event()
94 self.data_added.clear()
94 self.data_added.clear()
95
95
96 self.keep_reading = Event()
96 self.keep_reading = Event()
97 self.keep_reading.set()
97 self.keep_reading.set()
98
98
99 self.EOF = Event()
99 self.EOF = Event()
100 self.EOF.clear()
100 self.EOF.clear()
101
101
102 self.go = Event()
102 self.go = Event()
103 self.go.set()
103 self.go.set()
104
104
105 def stop(self):
105 def stop(self):
106 self.go.clear()
106 self.go.clear()
107 self.EOF.set()
107 self.EOF.set()
108 try:
108 try:
109 # this is not proper, but is done to force the reader thread let
109 # this is not proper, but is done to force the reader thread let
110 # go of the input because, if successful, .close() will send EOF
110 # go of the input because, if successful, .close() will send EOF
111 # down the pipe.
111 # down the pipe.
112 self.source.close()
112 self.source.close()
113 except:
113 except:
114 pass
114 pass
115
115
116 def run(self):
116 def run(self):
117 s = self.source
117 s = self.source
118 t = self.target
118 t = self.target
119 cs = self.chunk_size
119 cs = self.chunk_size
120 ccm = self.chunk_count_max
120 chunk_count_max = self.chunk_count_max
121 keep_reading = self.keep_reading
121 keep_reading = self.keep_reading
122 da = self.data_added
122 da = self.data_added
123 go = self.go
123 go = self.go
124
124
125 try:
125 try:
126 b = s.read(cs)
126 b = s.read(cs)
127 except ValueError:
127 except ValueError:
128 b = ''
128 b = ''
129
129
130 timeout_input = 20
130 while b and go.is_set():
131 while b and go.is_set():
131 if len(t) > ccm:
132 if len(t) > chunk_count_max:
132 keep_reading.clear()
133 keep_reading.clear()
133 keep_reading.wait(2)
134 keep_reading.wait(timeout_input)
134
135 if len(t) > chunk_count_max + timeout_input:
135 if not keep_reading.wait(10):
136 raise IOError(
136 raise Exception(
137 "Timed out while waiting for input from subprocess.")
137 "Timed out while waiting for input to be read.")
138
139 t.append(b)
138 t.append(b)
140 da.set()
139 da.set()
141 b = s.read(cs)
140 b = s.read(cs)
142 self.EOF.set()
141 self.EOF.set()
143 da.set() # for cases when done but there was no input.
142 da.set() # for cases when done but there was no input.
144
143
145
144
146 class BufferedGenerator(object):
145 class BufferedGenerator(object):
147 """
146 """
148 Class behaves as a non-blocking, buffered pipe reader.
147 Class behaves as a non-blocking, buffered pipe reader.
149 Reads chunks of data (through a thread)
148 Reads chunks of data (through a thread)
150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
151 Reading is halted in the thread when max chunks is internally buffered.
150 Reading is halted in the thread when max chunks is internally buffered.
152 The .next() may operate in blocking or non-blocking fashion by yielding
151 The .next() may operate in blocking or non-blocking fashion by yielding
153 '' if no data is ready
152 '' if no data is ready
154 to be sent or by not returning until there is some data to send
153 to be sent or by not returning until there is some data to send
155 When we get EOF from underlying source pipe we raise the marker to raise
154 When we get EOF from underlying source pipe we raise the marker to raise
156 StopIteration after the last chunk of data is yielded.
155 StopIteration after the last chunk of data is yielded.
157 """
156 """
158
157
159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
160 starting_values=None, bottomless=False):
159 starting_values=None, bottomless=False):
161 starting_values = starting_values or []
160 starting_values = starting_values or []
162
161
163 if bottomless:
162 if bottomless:
164 maxlen = int(buffer_size / chunk_size)
163 maxlen = int(buffer_size / chunk_size)
165 else:
164 else:
166 maxlen = None
165 maxlen = None
167
166
168 self.data = deque(starting_values, maxlen)
167 self.data = deque(starting_values, maxlen)
169 self.worker = InputStreamChunker(source, self.data, buffer_size,
168 self.worker = InputStreamChunker(source, self.data, buffer_size,
170 chunk_size)
169 chunk_size)
171 if starting_values:
170 if starting_values:
172 self.worker.data_added.set()
171 self.worker.data_added.set()
173 self.worker.start()
172 self.worker.start()
174
173
175 ####################
174 ####################
176 # Generator's methods
175 # Generator's methods
177 ####################
176 ####################
178
177
179 def __iter__(self):
178 def __iter__(self):
180 return self
179 return self
181
180
182 def next(self):
181 def next(self):
183 while not len(self.data) and not self.worker.EOF.is_set():
182 while not len(self.data) and not self.worker.EOF.is_set():
184 self.worker.data_added.clear()
183 self.worker.data_added.clear()
185 self.worker.data_added.wait(0.2)
184 self.worker.data_added.wait(0.2)
186 if len(self.data):
185 if len(self.data):
187 self.worker.keep_reading.set()
186 self.worker.keep_reading.set()
188 return bytes(self.data.popleft())
187 return bytes(self.data.popleft())
189 elif self.worker.EOF.is_set():
188 elif self.worker.EOF.is_set():
190 raise StopIteration
189 raise StopIteration
191
190
192 def throw(self, exc_type, value=None, traceback=None):
191 def throw(self, exc_type, value=None, traceback=None):
193 if not self.worker.EOF.is_set():
192 if not self.worker.EOF.is_set():
194 raise exc_type(value)
193 raise exc_type(value)
195
194
196 def start(self):
195 def start(self):
197 self.worker.start()
196 self.worker.start()
198
197
199 def stop(self):
198 def stop(self):
200 self.worker.stop()
199 self.worker.stop()
201
200
202 def close(self):
201 def close(self):
203 try:
202 try:
204 self.worker.stop()
203 self.worker.stop()
205 self.throw(GeneratorExit)
204 self.throw(GeneratorExit)
206 except (GeneratorExit, StopIteration):
205 except (GeneratorExit, StopIteration):
207 pass
206 pass
208
207
209 def __del__(self):
208 def __del__(self):
210 self.close()
209 self.close()
211
210
212 ####################
211 ####################
213 # Threaded reader's infrastructure.
212 # Threaded reader's infrastructure.
214 ####################
213 ####################
215 @property
214 @property
216 def input(self):
215 def input(self):
217 return self.worker.w
216 return self.worker.w
218
217
219 @property
218 @property
220 def data_added_event(self):
219 def data_added_event(self):
221 return self.worker.data_added
220 return self.worker.data_added
222
221
223 @property
222 @property
224 def data_added(self):
223 def data_added(self):
225 return self.worker.data_added.is_set()
224 return self.worker.data_added.is_set()
226
225
227 @property
226 @property
228 def reading_paused(self):
227 def reading_paused(self):
229 return not self.worker.keep_reading.is_set()
228 return not self.worker.keep_reading.is_set()
230
229
231 @property
230 @property
232 def done_reading_event(self):
231 def done_reading_event(self):
233 """
232 """
234 Done_reding does not mean that the iterator's buffer is empty.
233 Done_reding does not mean that the iterator's buffer is empty.
235 Iterator might have done reading from underlying source, but the read
234 Iterator might have done reading from underlying source, but the read
236 chunks might still be available for serving through .next() method.
235 chunks might still be available for serving through .next() method.
237
236
238 :returns: An Event class instance.
237 :returns: An Event class instance.
239 """
238 """
240 return self.worker.EOF
239 return self.worker.EOF
241
240
242 @property
241 @property
243 def done_reading(self):
242 def done_reading(self):
244 """
243 """
245 Done_reding does not mean that the iterator's buffer is empty.
244 Done_reding does not mean that the iterator's buffer is empty.
246 Iterator might have done reading from underlying source, but the read
245 Iterator might have done reading from underlying source, but the read
247 chunks might still be available for serving through .next() method.
246 chunks might still be available for serving through .next() method.
248
247
249 :returns: An Bool value.
248 :returns: An Bool value.
250 """
249 """
251 return self.worker.EOF.is_set()
250 return self.worker.EOF.is_set()
252
251
253 @property
252 @property
254 def length(self):
253 def length(self):
255 """
254 """
256 returns int.
255 returns int.
257
256
258 This is the lenght of the que of chunks, not the length of
257 This is the lenght of the que of chunks, not the length of
259 the combined contents in those chunks.
258 the combined contents in those chunks.
260
259
261 __len__() cannot be meaningfully implemented because this
260 __len__() cannot be meaningfully implemented because this
262 reader is just flying throuh a bottomless pit content and
261 reader is just flying throuh a bottomless pit content and
263 can only know the lenght of what it already saw.
262 can only know the lenght of what it already saw.
264
263
265 If __len__() on WSGI server per PEP 3333 returns a value,
264 If __len__() on WSGI server per PEP 3333 returns a value,
266 the responce's length will be set to that. In order not to
265 the responce's length will be set to that. In order not to
267 confuse WSGI PEP3333 servers, we will not implement __len__
266 confuse WSGI PEP3333 servers, we will not implement __len__
268 at all.
267 at all.
269 """
268 """
270 return len(self.data)
269 return len(self.data)
271
270
272 def prepend(self, x):
271 def prepend(self, x):
273 self.data.appendleft(x)
272 self.data.appendleft(x)
274
273
275 def append(self, x):
274 def append(self, x):
276 self.data.append(x)
275 self.data.append(x)
277
276
278 def extend(self, o):
277 def extend(self, o):
279 self.data.extend(o)
278 self.data.extend(o)
280
279
281 def __getitem__(self, i):
280 def __getitem__(self, i):
282 return self.data[i]
281 return self.data[i]
283
282
284
283
285 class SubprocessIOChunker(object):
284 class SubprocessIOChunker(object):
286 """
285 """
287 Processor class wrapping handling of subprocess IO.
286 Processor class wrapping handling of subprocess IO.
288
287
289 .. important::
288 .. important::
290
289
291 Watch out for the method `__del__` on this class. If this object
290 Watch out for the method `__del__` on this class. If this object
292 is deleted, it will kill the subprocess, so avoid to
291 is deleted, it will kill the subprocess, so avoid to
293 return the `output` attribute or usage of it like in the following
292 return the `output` attribute or usage of it like in the following
294 example::
293 example::
295
294
296 # `args` expected to run a program that produces a lot of output
295 # `args` expected to run a program that produces a lot of output
297 output = ''.join(SubprocessIOChunker(
296 output = ''.join(SubprocessIOChunker(
298 args, shell=False, inputstream=inputstream, env=environ).output)
297 args, shell=False, inputstream=inputstream, env=environ).output)
299
298
300 # `output` will not contain all the data, because the __del__ method
299 # `output` will not contain all the data, because the __del__ method
301 # has already killed the subprocess in this case before all output
300 # has already killed the subprocess in this case before all output
302 # has been consumed.
301 # has been consumed.
303
302
304
303
305
304
306 In a way, this is a "communicate()" replacement with a twist.
305 In a way, this is a "communicate()" replacement with a twist.
307
306
308 - We are multithreaded. Writing in and reading out, err are all sep threads.
307 - We are multithreaded. Writing in and reading out, err are all sep threads.
309 - We support concurrent (in and out) stream processing.
308 - We support concurrent (in and out) stream processing.
310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
309 - The output is not a stream. It's a queue of read string (bytes, not unicode)
311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
310 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
312 - We are non-blocking in more respects than communicate()
311 - We are non-blocking in more respects than communicate()
313 (reading from subprocess out pauses when internal buffer is full, but
312 (reading from subprocess out pauses when internal buffer is full, but
314 does not block the parent calling code. On the flip side, reading from
313 does not block the parent calling code. On the flip side, reading from
315 slow-yielding subprocess may block the iteration until data shows up. This
314 slow-yielding subprocess may block the iteration until data shows up. This
316 does not block the parallel inpipe reading occurring parallel thread.)
315 does not block the parallel inpipe reading occurring parallel thread.)
317
316
318 The purpose of the object is to allow us to wrap subprocess interactions into
317 The purpose of the object is to allow us to wrap subprocess interactions into
319 and interable that can be passed to a WSGI server as the application's return
318 and interable that can be passed to a WSGI server as the application's return
320 value. Because of stream-processing-ability, WSGI does not have to read ALL
319 value. Because of stream-processing-ability, WSGI does not have to read ALL
321 of the subprocess's output and buffer it, before handing it to WSGI server for
320 of the subprocess's output and buffer it, before handing it to WSGI server for
322 HTTP response. Instead, the class initializer reads just a bit of the stream
321 HTTP response. Instead, the class initializer reads just a bit of the stream
323 to figure out if error ocurred or likely to occur and if not, just hands the
322 to figure out if error ocurred or likely to occur and if not, just hands the
324 further iteration over subprocess output to the server for completion of HTTP
323 further iteration over subprocess output to the server for completion of HTTP
325 response.
324 response.
326
325
327 The real or perceived subprocess error is trapped and raised as one of
326 The real or perceived subprocess error is trapped and raised as one of
328 EnvironmentError family of exceptions
327 EnvironmentError family of exceptions
329
328
330 Example usage:
329 Example usage:
331 # try:
330 # try:
332 # answer = SubprocessIOChunker(
331 # answer = SubprocessIOChunker(
333 # cmd,
332 # cmd,
334 # input,
333 # input,
335 # buffer_size = 65536,
334 # buffer_size = 65536,
336 # chunk_size = 4096
335 # chunk_size = 4096
337 # )
336 # )
338 # except (EnvironmentError) as e:
337 # except (EnvironmentError) as e:
339 # print str(e)
338 # print str(e)
340 # raise e
339 # raise e
341 #
340 #
342 # return answer
341 # return answer
343
342
344
343
345 """
344 """
346
345
347 # TODO: johbo: This is used to make sure that the open end of the PIPE
346 # TODO: johbo: This is used to make sure that the open end of the PIPE
348 # is closed in the end. It would be way better to wrap this into an
347 # is closed in the end. It would be way better to wrap this into an
349 # object, so that it is closed automatically once it is consumed or
348 # object, so that it is closed automatically once it is consumed or
350 # something similar.
349 # something similar.
351 _close_input_fd = None
350 _close_input_fd = None
352
351
353 _closed = False
352 _closed = False
354
353
355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
354 def __init__(self, cmd, inputstream=None, buffer_size=65536,
356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
355 chunk_size=4096, starting_values=None, fail_on_stderr=True,
357 fail_on_return_code=True, **kwargs):
356 fail_on_return_code=True, **kwargs):
358 """
357 """
359 Initializes SubprocessIOChunker
358 Initializes SubprocessIOChunker
360
359
361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
360 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
362 :param inputstream: (Default: None) A file-like, string, or file pointer.
361 :param inputstream: (Default: None) A file-like, string, or file pointer.
363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
362 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
363 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
365 :param starting_values: (Default: []) An array of strings to put in front of output que.
364 :param starting_values: (Default: []) An array of strings to put in front of output que.
366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
365 :param fail_on_stderr: (Default: True) Whether to raise an exception in
367 case something is written to stderr.
366 case something is written to stderr.
368 :param fail_on_return_code: (Default: True) Whether to raise an
367 :param fail_on_return_code: (Default: True) Whether to raise an
369 exception if the return code is not 0.
368 exception if the return code is not 0.
370 """
369 """
371
370
372 starting_values = starting_values or []
371 starting_values = starting_values or []
373 if inputstream:
372 if inputstream:
374 input_streamer = StreamFeeder(inputstream)
373 input_streamer = StreamFeeder(inputstream)
375 input_streamer.start()
374 input_streamer.start()
376 inputstream = input_streamer.output
375 inputstream = input_streamer.output
377 self._close_input_fd = inputstream
376 self._close_input_fd = inputstream
378
377
379 self._fail_on_stderr = fail_on_stderr
378 self._fail_on_stderr = fail_on_stderr
380 self._fail_on_return_code = fail_on_return_code
379 self._fail_on_return_code = fail_on_return_code
381
380
382 _shell = kwargs.get('shell', True)
381 _shell = kwargs.get('shell', True)
383 kwargs['shell'] = _shell
382 kwargs['shell'] = _shell
384
383
385 _p = subprocess.Popen(cmd, bufsize=-1,
384 _p = subprocess.Popen(cmd, bufsize=-1,
386 stdin=inputstream,
385 stdin=inputstream,
387 stdout=subprocess.PIPE,
386 stdout=subprocess.PIPE,
388 stderr=subprocess.PIPE,
387 stderr=subprocess.PIPE,
389 **kwargs)
388 **kwargs)
390
389
391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
390 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 starting_values)
391 starting_values)
393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
392 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394
393
395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
394 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 # doing this until we reach either end of file, or end of buffer.
395 # doing this until we reach either end of file, or end of buffer.
397 bg_out.data_added_event.wait(1)
396 bg_out.data_added_event.wait(1)
398 bg_out.data_added_event.clear()
397 bg_out.data_added_event.clear()
399
398
400 # at this point it's still ambiguous if we are done reading or just full buffer.
399 # at this point it's still ambiguous if we are done reading or just full buffer.
401 # Either way, if error (returned by ended process, or implied based on
400 # Either way, if error (returned by ended process, or implied based on
402 # presence of stuff in stderr output) we error out.
401 # presence of stuff in stderr output) we error out.
403 # Else, we are happy.
402 # Else, we are happy.
404 _returncode = _p.poll()
403 _returncode = _p.poll()
405
404
406 if ((_returncode and fail_on_return_code) or
405 if ((_returncode and fail_on_return_code) or
407 (fail_on_stderr and _returncode is None and bg_err.length)):
406 (fail_on_stderr and _returncode is None and bg_err.length)):
408 try:
407 try:
409 _p.terminate()
408 _p.terminate()
410 except Exception:
409 except Exception:
411 pass
410 pass
412 bg_out.stop()
411 bg_out.stop()
413 bg_err.stop()
412 bg_err.stop()
414 if fail_on_stderr:
413 if fail_on_stderr:
415 err = ''.join(bg_err)
414 err = ''.join(bg_err)
416 raise EnvironmentError(
415 raise EnvironmentError(
417 "Subprocess exited due to an error:\n" + err)
416 "Subprocess exited due to an error:\n" + err)
418 if _returncode and fail_on_return_code:
417 if _returncode and fail_on_return_code:
419 err = ''.join(bg_err)
418 err = ''.join(bg_err)
420 if not err:
419 if not err:
421 # maybe get empty stderr, try stdout instead
420 # maybe get empty stderr, try stdout instead
422 # in many cases git reports the errors on stdout too
421 # in many cases git reports the errors on stdout too
423 err = ''.join(bg_out)
422 err = ''.join(bg_out)
424 raise EnvironmentError(
423 raise EnvironmentError(
425 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
424 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
426 _returncode, err))
425 _returncode, err))
427
426
428 self.process = _p
427 self.process = _p
429 self.output = bg_out
428 self.output = bg_out
430 self.error = bg_err
429 self.error = bg_err
431
430
432 def __iter__(self):
431 def __iter__(self):
433 return self
432 return self
434
433
435 def next(self):
434 def next(self):
436 # Note: mikhail: We need to be sure that we are checking the return
435 # Note: mikhail: We need to be sure that we are checking the return
437 # code after the stdout stream is closed. Some processes, e.g. git
436 # code after the stdout stream is closed. Some processes, e.g. git
438 # are doing some magic in between closing stdout and terminating the
437 # are doing some magic in between closing stdout and terminating the
439 # process and, as a result, we are not getting return code on "slow"
438 # process and, as a result, we are not getting return code on "slow"
440 # systems.
439 # systems.
441 result = None
440 result = None
442 stop_iteration = None
441 stop_iteration = None
443 try:
442 try:
444 result = self.output.next()
443 result = self.output.next()
445 except StopIteration as e:
444 except StopIteration as e:
446 stop_iteration = e
445 stop_iteration = e
447
446
448 if self.process.poll() and self._fail_on_return_code:
447 if self.process.poll() and self._fail_on_return_code:
449 err = '%s' % ''.join(self.error)
448 err = '%s' % ''.join(self.error)
450 raise EnvironmentError(
449 raise EnvironmentError(
451 "Subprocess exited due to an error:\n" + err)
450 "Subprocess exited due to an error:\n" + err)
452
451
453 if stop_iteration:
452 if stop_iteration:
454 raise stop_iteration
453 raise stop_iteration
455 return result
454 return result
456
455
457 def throw(self, type, value=None, traceback=None):
456 def throw(self, type, value=None, traceback=None):
458 if self.output.length or not self.output.done_reading:
457 if self.output.length or not self.output.done_reading:
459 raise type(value)
458 raise type(value)
460
459
461 def close(self):
460 def close(self):
462 if self._closed:
461 if self._closed:
463 return
462 return
464 self._closed = True
463 self._closed = True
465 try:
464 try:
466 self.process.terminate()
465 self.process.terminate()
467 except:
466 except:
468 pass
467 pass
469 if self._close_input_fd:
468 if self._close_input_fd:
470 os.close(self._close_input_fd)
469 os.close(self._close_input_fd)
471 try:
470 try:
472 self.output.close()
471 self.output.close()
473 except:
472 except:
474 pass
473 pass
475 try:
474 try:
476 self.error.close()
475 self.error.close()
477 except:
476 except:
478 pass
477 pass
479
478
480 def __del__(self):
479 def __del__(self):
481 self.close()
480 self.close()
General Comments 0
You need to be logged in to leave comments. Login now