##// END OF EJS Templates
subprocessio: py27+ compat
marcink -
r341:a40eb400 default
parent child Browse files
Show More
@@ -1,479 +1,477 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import subprocess32 as subprocess
26 import subprocess32 as subprocess
27 from collections import deque
27 from collections import deque
28 from threading import Event, Thread
28 from threading import Event, Thread
29
29
30
30
31 class StreamFeeder(Thread):
31 class StreamFeeder(Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38
38
39 def __init__(self, source):
39 def __init__(self, source):
40 super(StreamFeeder, self).__init__()
40 super(StreamFeeder, self).__init__()
41 self.daemon = True
41 self.daemon = True
42 filelike = False
42 filelike = False
43 self.bytes = bytes()
43 self.bytes = bytes()
44 if type(source) in (type(''), bytes, bytearray): # string-like
44 if type(source) in (type(''), bytes, bytearray): # string-like
45 self.bytes = bytes(source)
45 self.bytes = bytes(source)
46 else: # can be either file pointer or file-like
46 else: # can be either file pointer or file-like
47 if type(source) in (int, long): # file pointer it is
47 if type(source) in (int, long): # file pointer it is
48 ## converting file descriptor (int) stdin into file-like
48 # converting file descriptor (int) stdin into file-like
49 try:
49 try:
50 source = os.fdopen(source, 'rb', 16384)
50 source = os.fdopen(source, 'rb', 16384)
51 except Exception:
51 except Exception:
52 pass
52 pass
53 # let's see if source is file-like by now
53 # let's see if source is file-like by now
54 try:
54 try:
55 filelike = source.read
55 filelike = source.read
56 except Exception:
56 except Exception:
57 pass
57 pass
58 if not filelike and not self.bytes:
58 if not filelike and not self.bytes:
59 raise TypeError("StreamFeeder's source object must be a readable "
59 raise TypeError("StreamFeeder's source object must be a readable "
60 "file-like, a file descriptor, or a string-like.")
60 "file-like, a file descriptor, or a string-like.")
61 self.source = source
61 self.source = source
62 self.readiface, self.writeiface = os.pipe()
62 self.readiface, self.writeiface = os.pipe()
63
63
64 def run(self):
64 def run(self):
65 t = self.writeiface
65 t = self.writeiface
66 if self.bytes:
66 if self.bytes:
67 os.write(t, self.bytes)
67 os.write(t, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70 b = s.read(4096)
70 b = s.read(4096)
71 while b:
71 while b:
72 os.write(t, b)
72 os.write(t, b)
73 b = s.read(4096)
73 b = s.read(4096)
74 os.close(t)
74 os.close(t)
75
75
76 @property
76 @property
77 def output(self):
77 def output(self):
78 return self.readiface
78 return self.readiface
79
79
80
80
81 class InputStreamChunker(Thread):
81 class InputStreamChunker(Thread):
82 def __init__(self, source, target, buffer_size, chunk_size):
82 def __init__(self, source, target, buffer_size, chunk_size):
83
83
84 super(InputStreamChunker, self).__init__()
84 super(InputStreamChunker, self).__init__()
85
85
86 self.daemon = True # die die die.
86 self.daemon = True # die die die.
87
87
88 self.source = source
88 self.source = source
89 self.target = target
89 self.target = target
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 self.chunk_size = chunk_size
91 self.chunk_size = chunk_size
92
92
93 self.data_added = Event()
93 self.data_added = Event()
94 self.data_added.clear()
94 self.data_added.clear()
95
95
96 self.keep_reading = Event()
96 self.keep_reading = Event()
97 self.keep_reading.set()
97 self.keep_reading.set()
98
98
99 self.EOF = Event()
99 self.EOF = Event()
100 self.EOF.clear()
100 self.EOF.clear()
101
101
102 self.go = Event()
102 self.go = Event()
103 self.go.set()
103 self.go.set()
104
104
105 def stop(self):
105 def stop(self):
106 self.go.clear()
106 self.go.clear()
107 self.EOF.set()
107 self.EOF.set()
108 try:
108 try:
109 # this is not proper, but is done to force the reader thread let
109 # this is not proper, but is done to force the reader thread let
110 # go of the input because, if successful, .close() will send EOF
110 # go of the input because, if successful, .close() will send EOF
111 # down the pipe.
111 # down the pipe.
112 self.source.close()
112 self.source.close()
113 except:
113 except:
114 pass
114 pass
115
115
116 def run(self):
116 def run(self):
117 s = self.source
117 s = self.source
118 t = self.target
118 t = self.target
119 cs = self.chunk_size
119 cs = self.chunk_size
120 ccm = self.chunk_count_max
120 ccm = self.chunk_count_max
121 kr = self.keep_reading
121 keep_reading = self.keep_reading
122 da = self.data_added
122 da = self.data_added
123 go = self.go
123 go = self.go
124
124
125 try:
125 try:
126 b = s.read(cs)
126 b = s.read(cs)
127 except ValueError:
127 except ValueError:
128 b = ''
128 b = ''
129
129
130 while b and go.is_set():
130 while b and go.is_set():
131 if len(t) > ccm:
131 if len(t) > ccm:
132 kr.clear()
132 keep_reading.clear()
133 kr.wait(2)
133 keep_reading.wait(2)
134 # # this only works on 2.7.x and up
134
135 # if not kr.wait(10):
135 if not keep_reading.wait(10):
136 # raise Exception("Timed out while waiting for input to be read.")
136 raise Exception(
137 # instead we'll use this
137 "Timed out while waiting for input to be read.")
138 if len(t) > ccm + 3:
138
139 raise IOError(
140 "Timed out while waiting for input from subprocess.")
141 t.append(b)
139 t.append(b)
142 da.set()
140 da.set()
143 b = s.read(cs)
141 b = s.read(cs)
144 self.EOF.set()
142 self.EOF.set()
145 da.set() # for cases when done but there was no input.
143 da.set() # for cases when done but there was no input.
146
144
147
145
148 class BufferedGenerator(object):
146 class BufferedGenerator(object):
149 """
147 """
150 Class behaves as a non-blocking, buffered pipe reader.
148 Class behaves as a non-blocking, buffered pipe reader.
151 Reads chunks of data (through a thread)
149 Reads chunks of data (through a thread)
152 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
153 Reading is halted in the thread when max chunks is internally buffered.
151 Reading is halted in the thread when max chunks is internally buffered.
154 The .next() may operate in blocking or non-blocking fashion by yielding
152 The .next() may operate in blocking or non-blocking fashion by yielding
155 '' if no data is ready
153 '' if no data is ready
156 to be sent or by not returning until there is some data to send
154 to be sent or by not returning until there is some data to send
157 When we get EOF from underlying source pipe we raise the marker to raise
155 When we get EOF from underlying source pipe we raise the marker to raise
158 StopIteration after the last chunk of data is yielded.
156 StopIteration after the last chunk of data is yielded.
159 """
157 """
160
158
161 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
162 starting_values=None, bottomless=False):
160 starting_values=None, bottomless=False):
163 starting_values = starting_values or []
161 starting_values = starting_values or []
164
162
165 if bottomless:
163 if bottomless:
166 maxlen = int(buffer_size / chunk_size)
164 maxlen = int(buffer_size / chunk_size)
167 else:
165 else:
168 maxlen = None
166 maxlen = None
169
167
170 self.data = deque(starting_values, maxlen)
168 self.data = deque(starting_values, maxlen)
171 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 self.worker = InputStreamChunker(source, self.data, buffer_size,
172 chunk_size)
170 chunk_size)
173 if starting_values:
171 if starting_values:
174 self.worker.data_added.set()
172 self.worker.data_added.set()
175 self.worker.start()
173 self.worker.start()
176
174
177 ####################
175 ####################
178 # Generator's methods
176 # Generator's methods
179 ####################
177 ####################
180
178
181 def __iter__(self):
179 def __iter__(self):
182 return self
180 return self
183
181
184 def next(self):
182 def next(self):
185 while not len(self.data) and not self.worker.EOF.is_set():
183 while not len(self.data) and not self.worker.EOF.is_set():
186 self.worker.data_added.clear()
184 self.worker.data_added.clear()
187 self.worker.data_added.wait(0.2)
185 self.worker.data_added.wait(0.2)
188 if len(self.data):
186 if len(self.data):
189 self.worker.keep_reading.set()
187 self.worker.keep_reading.set()
190 return bytes(self.data.popleft())
188 return bytes(self.data.popleft())
191 elif self.worker.EOF.is_set():
189 elif self.worker.EOF.is_set():
192 raise StopIteration
190 raise StopIteration
193
191
194 def throw(self, exc_type, value=None, traceback=None):
192 def throw(self, exc_type, value=None, traceback=None):
195 if not self.worker.EOF.is_set():
193 if not self.worker.EOF.is_set():
196 raise exc_type(value)
194 raise exc_type(value)
197
195
198 def start(self):
196 def start(self):
199 self.worker.start()
197 self.worker.start()
200
198
201 def stop(self):
199 def stop(self):
202 self.worker.stop()
200 self.worker.stop()
203
201
204 def close(self):
202 def close(self):
205 try:
203 try:
206 self.worker.stop()
204 self.worker.stop()
207 self.throw(GeneratorExit)
205 self.throw(GeneratorExit)
208 except (GeneratorExit, StopIteration):
206 except (GeneratorExit, StopIteration):
209 pass
207 pass
210
208
211 def __del__(self):
209 def __del__(self):
212 self.close()
210 self.close()
213
211
214 ####################
212 ####################
215 # Threaded reader's infrastructure.
213 # Threaded reader's infrastructure.
216 ####################
214 ####################
217 @property
215 @property
218 def input(self):
216 def input(self):
219 return self.worker.w
217 return self.worker.w
220
218
221 @property
219 @property
222 def data_added_event(self):
220 def data_added_event(self):
223 return self.worker.data_added
221 return self.worker.data_added
224
222
225 @property
223 @property
226 def data_added(self):
224 def data_added(self):
227 return self.worker.data_added.is_set()
225 return self.worker.data_added.is_set()
228
226
229 @property
227 @property
230 def reading_paused(self):
228 def reading_paused(self):
231 return not self.worker.keep_reading.is_set()
229 return not self.worker.keep_reading.is_set()
232
230
233 @property
231 @property
234 def done_reading_event(self):
232 def done_reading_event(self):
235 """
233 """
236 Done_reding does not mean that the iterator's buffer is empty.
234 Done_reding does not mean that the iterator's buffer is empty.
237 Iterator might have done reading from underlying source, but the read
235 Iterator might have done reading from underlying source, but the read
238 chunks might still be available for serving through .next() method.
236 chunks might still be available for serving through .next() method.
239
237
240 :returns: An Event class instance.
238 :returns: An Event class instance.
241 """
239 """
242 return self.worker.EOF
240 return self.worker.EOF
243
241
244 @property
242 @property
245 def done_reading(self):
243 def done_reading(self):
246 """
244 """
247 Done_reding does not mean that the iterator's buffer is empty.
245 Done_reding does not mean that the iterator's buffer is empty.
248 Iterator might have done reading from underlying source, but the read
246 Iterator might have done reading from underlying source, but the read
249 chunks might still be available for serving through .next() method.
247 chunks might still be available for serving through .next() method.
250
248
251 :returns: An Bool value.
249 :returns: An Bool value.
252 """
250 """
253 return self.worker.EOF.is_set()
251 return self.worker.EOF.is_set()
254
252
255 @property
253 @property
256 def length(self):
254 def length(self):
257 """
255 """
258 returns int.
256 returns int.
259
257
260 This is the lenght of the que of chunks, not the length of
258 This is the lenght of the que of chunks, not the length of
261 the combined contents in those chunks.
259 the combined contents in those chunks.
262
260
263 __len__() cannot be meaningfully implemented because this
261 __len__() cannot be meaningfully implemented because this
264 reader is just flying throuh a bottomless pit content and
262 reader is just flying throuh a bottomless pit content and
265 can only know the lenght of what it already saw.
263 can only know the lenght of what it already saw.
266
264
267 If __len__() on WSGI server per PEP 3333 returns a value,
265 If __len__() on WSGI server per PEP 3333 returns a value,
268 the responce's length will be set to that. In order not to
266 the responce's length will be set to that. In order not to
269 confuse WSGI PEP3333 servers, we will not implement __len__
267 confuse WSGI PEP3333 servers, we will not implement __len__
270 at all.
268 at all.
271 """
269 """
272 return len(self.data)
270 return len(self.data)
273
271
274 def prepend(self, x):
272 def prepend(self, x):
275 self.data.appendleft(x)
273 self.data.appendleft(x)
276
274
277 def append(self, x):
275 def append(self, x):
278 self.data.append(x)
276 self.data.append(x)
279
277
280 def extend(self, o):
278 def extend(self, o):
281 self.data.extend(o)
279 self.data.extend(o)
282
280
283 def __getitem__(self, i):
281 def __getitem__(self, i):
284 return self.data[i]
282 return self.data[i]
285
283
286
284
287 class SubprocessIOChunker(object):
285 class SubprocessIOChunker(object):
288 """
286 """
289 Processor class wrapping handling of subprocess IO.
287 Processor class wrapping handling of subprocess IO.
290
288
291 .. important::
289 .. important::
292
290
293 Watch out for the method `__del__` on this class. If this object
291 Watch out for the method `__del__` on this class. If this object
294 is deleted, it will kill the subprocess, so avoid to
292 is deleted, it will kill the subprocess, so avoid to
295 return the `output` attribute or usage of it like in the following
293 return the `output` attribute or usage of it like in the following
296 example::
294 example::
297
295
298 # `args` expected to run a program that produces a lot of output
296 # `args` expected to run a program that produces a lot of output
299 output = ''.join(SubprocessIOChunker(
297 output = ''.join(SubprocessIOChunker(
300 args, shell=False, inputstream=inputstream, env=environ).output)
298 args, shell=False, inputstream=inputstream, env=environ).output)
301
299
302 # `output` will not contain all the data, because the __del__ method
300 # `output` will not contain all the data, because the __del__ method
303 # has already killed the subprocess in this case before all output
301 # has already killed the subprocess in this case before all output
304 # has been consumed.
302 # has been consumed.
305
303
306
304
307
305
308 In a way, this is a "communicate()" replacement with a twist.
306 In a way, this is a "communicate()" replacement with a twist.
309
307
310 - We are multithreaded. Writing in and reading out, err are all sep threads.
308 - We are multithreaded. Writing in and reading out, err are all sep threads.
311 - We support concurrent (in and out) stream processing.
309 - We support concurrent (in and out) stream processing.
312 - The output is not a stream. It's a queue of read string (bytes, not unicode)
310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
313 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
314 - We are non-blocking in more respects than communicate()
312 - We are non-blocking in more respects than communicate()
315 (reading from subprocess out pauses when internal buffer is full, but
313 (reading from subprocess out pauses when internal buffer is full, but
316 does not block the parent calling code. On the flip side, reading from
314 does not block the parent calling code. On the flip side, reading from
317 slow-yielding subprocess may block the iteration until data shows up. This
315 slow-yielding subprocess may block the iteration until data shows up. This
318 does not block the parallel inpipe reading occurring parallel thread.)
316 does not block the parallel inpipe reading occurring parallel thread.)
319
317
320 The purpose of the object is to allow us to wrap subprocess interactions into
318 The purpose of the object is to allow us to wrap subprocess interactions into
321 and interable that can be passed to a WSGI server as the application's return
319 and interable that can be passed to a WSGI server as the application's return
322 value. Because of stream-processing-ability, WSGI does not have to read ALL
320 value. Because of stream-processing-ability, WSGI does not have to read ALL
323 of the subprocess's output and buffer it, before handing it to WSGI server for
321 of the subprocess's output and buffer it, before handing it to WSGI server for
324 HTTP response. Instead, the class initializer reads just a bit of the stream
322 HTTP response. Instead, the class initializer reads just a bit of the stream
325 to figure out if error ocurred or likely to occur and if not, just hands the
323 to figure out if error ocurred or likely to occur and if not, just hands the
326 further iteration over subprocess output to the server for completion of HTTP
324 further iteration over subprocess output to the server for completion of HTTP
327 response.
325 response.
328
326
329 The real or perceived subprocess error is trapped and raised as one of
327 The real or perceived subprocess error is trapped and raised as one of
330 EnvironmentError family of exceptions
328 EnvironmentError family of exceptions
331
329
332 Example usage:
330 Example usage:
333 # try:
331 # try:
334 # answer = SubprocessIOChunker(
332 # answer = SubprocessIOChunker(
335 # cmd,
333 # cmd,
336 # input,
334 # input,
337 # buffer_size = 65536,
335 # buffer_size = 65536,
338 # chunk_size = 4096
336 # chunk_size = 4096
339 # )
337 # )
340 # except (EnvironmentError) as e:
338 # except (EnvironmentError) as e:
341 # print str(e)
339 # print str(e)
342 # raise e
340 # raise e
343 #
341 #
344 # return answer
342 # return answer
345
343
346
344
347 """
345 """
348
346
349 # TODO: johbo: This is used to make sure that the open end of the PIPE
347 # TODO: johbo: This is used to make sure that the open end of the PIPE
350 # is closed in the end. It would be way better to wrap this into an
348 # is closed in the end. It would be way better to wrap this into an
351 # object, so that it is closed automatically once it is consumed or
349 # object, so that it is closed automatically once it is consumed or
352 # something similar.
350 # something similar.
353 _close_input_fd = None
351 _close_input_fd = None
354
352
355 _closed = False
353 _closed = False
356
354
357 def __init__(self, cmd, inputstream=None, buffer_size=65536,
355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
358 chunk_size=4096, starting_values=None, fail_on_stderr=True,
356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
359 fail_on_return_code=True, **kwargs):
357 fail_on_return_code=True, **kwargs):
360 """
358 """
361 Initializes SubprocessIOChunker
359 Initializes SubprocessIOChunker
362
360
363 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
364 :param inputstream: (Default: None) A file-like, string, or file pointer.
362 :param inputstream: (Default: None) A file-like, string, or file pointer.
365 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
366 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
367 :param starting_values: (Default: []) An array of strings to put in front of output que.
365 :param starting_values: (Default: []) An array of strings to put in front of output que.
368 :param fail_on_stderr: (Default: True) Whether to raise an exception in
366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
369 case something is written to stderr.
367 case something is written to stderr.
370 :param fail_on_return_code: (Default: True) Whether to raise an
368 :param fail_on_return_code: (Default: True) Whether to raise an
371 exception if the return code is not 0.
369 exception if the return code is not 0.
372 """
370 """
373
371
374 starting_values = starting_values or []
372 starting_values = starting_values or []
375 if inputstream:
373 if inputstream:
376 input_streamer = StreamFeeder(inputstream)
374 input_streamer = StreamFeeder(inputstream)
377 input_streamer.start()
375 input_streamer.start()
378 inputstream = input_streamer.output
376 inputstream = input_streamer.output
379 self._close_input_fd = inputstream
377 self._close_input_fd = inputstream
380
378
381 self._fail_on_stderr = fail_on_stderr
379 self._fail_on_stderr = fail_on_stderr
382 self._fail_on_return_code = fail_on_return_code
380 self._fail_on_return_code = fail_on_return_code
383
381
384 _shell = kwargs.get('shell', True)
382 _shell = kwargs.get('shell', True)
385 kwargs['shell'] = _shell
383 kwargs['shell'] = _shell
386
384
387 _p = subprocess.Popen(cmd, bufsize=-1,
385 _p = subprocess.Popen(cmd, bufsize=-1,
388 stdin=inputstream,
386 stdin=inputstream,
389 stdout=subprocess.PIPE,
387 stdout=subprocess.PIPE,
390 stderr=subprocess.PIPE,
388 stderr=subprocess.PIPE,
391 **kwargs)
389 **kwargs)
392
390
393 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
394 starting_values)
392 starting_values)
395 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
396
394
397 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
398 # doing this until we reach either end of file, or end of buffer.
396 # doing this until we reach either end of file, or end of buffer.
399 bg_out.data_added_event.wait(1)
397 bg_out.data_added_event.wait(1)
400 bg_out.data_added_event.clear()
398 bg_out.data_added_event.clear()
401
399
402 # at this point it's still ambiguous if we are done reading or just full buffer.
400 # at this point it's still ambiguous if we are done reading or just full buffer.
403 # Either way, if error (returned by ended process, or implied based on
401 # Either way, if error (returned by ended process, or implied based on
404 # presence of stuff in stderr output) we error out.
402 # presence of stuff in stderr output) we error out.
405 # Else, we are happy.
403 # Else, we are happy.
406 _returncode = _p.poll()
404 _returncode = _p.poll()
407
405
408 if ((_returncode and fail_on_return_code) or
406 if ((_returncode and fail_on_return_code) or
409 (fail_on_stderr and _returncode is None and bg_err.length)):
407 (fail_on_stderr and _returncode is None and bg_err.length)):
410 try:
408 try:
411 _p.terminate()
409 _p.terminate()
412 except Exception:
410 except Exception:
413 pass
411 pass
414 bg_out.stop()
412 bg_out.stop()
415 bg_err.stop()
413 bg_err.stop()
416 if fail_on_stderr:
414 if fail_on_stderr:
417 err = ''.join(bg_err)
415 err = ''.join(bg_err)
418 raise EnvironmentError(
416 raise EnvironmentError(
419 "Subprocess exited due to an error:\n" + err)
417 "Subprocess exited due to an error:\n" + err)
420 if _returncode and fail_on_return_code:
418 if _returncode and fail_on_return_code:
421 err = ''.join(bg_err)
419 err = ''.join(bg_err)
422 raise EnvironmentError(
420 raise EnvironmentError(
423 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
421 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
424 _returncode, err))
422 _returncode, err))
425
423
426 self.process = _p
424 self.process = _p
427 self.output = bg_out
425 self.output = bg_out
428 self.error = bg_err
426 self.error = bg_err
429
427
430 def __iter__(self):
428 def __iter__(self):
431 return self
429 return self
432
430
433 def next(self):
431 def next(self):
434 # Note: mikhail: We need to be sure that we are checking the return
432 # Note: mikhail: We need to be sure that we are checking the return
435 # code after the stdout stream is closed. Some processes, e.g. git
433 # code after the stdout stream is closed. Some processes, e.g. git
436 # are doing some magic in between closing stdout and terminating the
434 # are doing some magic in between closing stdout and terminating the
437 # process and, as a result, we are not getting return code on "slow"
435 # process and, as a result, we are not getting return code on "slow"
438 # systems.
436 # systems.
439 result = None
437 result = None
440 stop_iteration = None
438 stop_iteration = None
441 try:
439 try:
442 result = self.output.next()
440 result = self.output.next()
443 except StopIteration as e:
441 except StopIteration as e:
444 stop_iteration = e
442 stop_iteration = e
445
443
446 if self.process.poll() and self._fail_on_return_code:
444 if self.process.poll() and self._fail_on_return_code:
447 err = '%s' % ''.join(self.error)
445 err = '%s' % ''.join(self.error)
448 raise EnvironmentError(
446 raise EnvironmentError(
449 "Subprocess exited due to an error:\n" + err)
447 "Subprocess exited due to an error:\n" + err)
450
448
451 if stop_iteration:
449 if stop_iteration:
452 raise stop_iteration
450 raise stop_iteration
453 return result
451 return result
454
452
455 def throw(self, type, value=None, traceback=None):
453 def throw(self, type, value=None, traceback=None):
456 if self.output.length or not self.output.done_reading:
454 if self.output.length or not self.output.done_reading:
457 raise type(value)
455 raise type(value)
458
456
459 def close(self):
457 def close(self):
460 if self._closed:
458 if self._closed:
461 return
459 return
462 self._closed = True
460 self._closed = True
463 try:
461 try:
464 self.process.terminate()
462 self.process.terminate()
465 except:
463 except:
466 pass
464 pass
467 if self._close_input_fd:
465 if self._close_input_fd:
468 os.close(self._close_input_fd)
466 os.close(self._close_input_fd)
469 try:
467 try:
470 self.output.close()
468 self.output.close()
471 except:
469 except:
472 pass
470 pass
473 try:
471 try:
474 self.error.close()
472 self.error.close()
475 except:
473 except:
476 pass
474 pass
477
475
478 def __del__(self):
476 def __del__(self):
479 self.close()
477 self.close()
General Comments 0
You need to be logged in to leave comments. Login now