##// END OF EJS Templates
pep8: fix potential code warnings.
marcink -
r340:d9c81d6a default
parent child Browse files
Show More
@@ -1,476 +1,479 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import subprocess32 as subprocess
26 import subprocess32 as subprocess
27 from collections import deque
27 from collections import deque
28 from threading import Event, Thread
28 from threading import Event, Thread
29
29
30
30
31 class StreamFeeder(Thread):
31 class StreamFeeder(Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38
38
39 def __init__(self, source):
39 def __init__(self, source):
40 super(StreamFeeder, self).__init__()
40 super(StreamFeeder, self).__init__()
41 self.daemon = True
41 self.daemon = True
42 filelike = False
42 filelike = False
43 self.bytes = bytes()
43 self.bytes = bytes()
44 if type(source) in (type(''), bytes, bytearray): # string-like
44 if type(source) in (type(''), bytes, bytearray): # string-like
45 self.bytes = bytes(source)
45 self.bytes = bytes(source)
46 else: # can be either file pointer or file-like
46 else: # can be either file pointer or file-like
47 if type(source) in (int, long): # file pointer it is
47 if type(source) in (int, long): # file pointer it is
48 ## converting file descriptor (int) stdin into file-like
48 ## converting file descriptor (int) stdin into file-like
49 try:
49 try:
50 source = os.fdopen(source, 'rb', 16384)
50 source = os.fdopen(source, 'rb', 16384)
51 except Exception:
51 except Exception:
52 pass
52 pass
53 # let's see if source is file-like by now
53 # let's see if source is file-like by now
54 try:
54 try:
55 filelike = source.read
55 filelike = source.read
56 except Exception:
56 except Exception:
57 pass
57 pass
58 if not filelike and not self.bytes:
58 if not filelike and not self.bytes:
59 raise TypeError("StreamFeeder's source object must be a readable "
59 raise TypeError("StreamFeeder's source object must be a readable "
60 "file-like, a file descriptor, or a string-like.")
60 "file-like, a file descriptor, or a string-like.")
61 self.source = source
61 self.source = source
62 self.readiface, self.writeiface = os.pipe()
62 self.readiface, self.writeiface = os.pipe()
63
63
64 def run(self):
64 def run(self):
65 t = self.writeiface
65 t = self.writeiface
66 if self.bytes:
66 if self.bytes:
67 os.write(t, self.bytes)
67 os.write(t, self.bytes)
68 else:
68 else:
69 s = self.source
69 s = self.source
70 b = s.read(4096)
70 b = s.read(4096)
71 while b:
71 while b:
72 os.write(t, b)
72 os.write(t, b)
73 b = s.read(4096)
73 b = s.read(4096)
74 os.close(t)
74 os.close(t)
75
75
76 @property
76 @property
77 def output(self):
77 def output(self):
78 return self.readiface
78 return self.readiface
79
79
80
80
81 class InputStreamChunker(Thread):
81 class InputStreamChunker(Thread):
82 def __init__(self, source, target, buffer_size, chunk_size):
82 def __init__(self, source, target, buffer_size, chunk_size):
83
83
84 super(InputStreamChunker, self).__init__()
84 super(InputStreamChunker, self).__init__()
85
85
86 self.daemon = True # die die die.
86 self.daemon = True # die die die.
87
87
88 self.source = source
88 self.source = source
89 self.target = target
89 self.target = target
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 self.chunk_size = chunk_size
91 self.chunk_size = chunk_size
92
92
93 self.data_added = Event()
93 self.data_added = Event()
94 self.data_added.clear()
94 self.data_added.clear()
95
95
96 self.keep_reading = Event()
96 self.keep_reading = Event()
97 self.keep_reading.set()
97 self.keep_reading.set()
98
98
99 self.EOF = Event()
99 self.EOF = Event()
100 self.EOF.clear()
100 self.EOF.clear()
101
101
102 self.go = Event()
102 self.go = Event()
103 self.go.set()
103 self.go.set()
104
104
105 def stop(self):
105 def stop(self):
106 self.go.clear()
106 self.go.clear()
107 self.EOF.set()
107 self.EOF.set()
108 try:
108 try:
109 # this is not proper, but is done to force the reader thread let
109 # this is not proper, but is done to force the reader thread let
110 # go of the input because, if successful, .close() will send EOF
110 # go of the input because, if successful, .close() will send EOF
111 # down the pipe.
111 # down the pipe.
112 self.source.close()
112 self.source.close()
113 except:
113 except:
114 pass
114 pass
115
115
116 def run(self):
116 def run(self):
117 s = self.source
117 s = self.source
118 t = self.target
118 t = self.target
119 cs = self.chunk_size
119 cs = self.chunk_size
120 ccm = self.chunk_count_max
120 ccm = self.chunk_count_max
121 kr = self.keep_reading
121 kr = self.keep_reading
122 da = self.data_added
122 da = self.data_added
123 go = self.go
123 go = self.go
124
124
125 try:
125 try:
126 b = s.read(cs)
126 b = s.read(cs)
127 except ValueError:
127 except ValueError:
128 b = ''
128 b = ''
129
129
130 while b and go.is_set():
130 while b and go.is_set():
131 if len(t) > ccm:
131 if len(t) > ccm:
132 kr.clear()
132 kr.clear()
133 kr.wait(2)
133 kr.wait(2)
134 # # this only works on 2.7.x and up
134 # # this only works on 2.7.x and up
135 # if not kr.wait(10):
135 # if not kr.wait(10):
136 # raise Exception("Timed out while waiting for input to be read.")
136 # raise Exception("Timed out while waiting for input to be read.")
137 # instead we'll use this
137 # instead we'll use this
138 if len(t) > ccm + 3:
138 if len(t) > ccm + 3:
139 raise IOError(
139 raise IOError(
140 "Timed out while waiting for input from subprocess.")
140 "Timed out while waiting for input from subprocess.")
141 t.append(b)
141 t.append(b)
142 da.set()
142 da.set()
143 b = s.read(cs)
143 b = s.read(cs)
144 self.EOF.set()
144 self.EOF.set()
145 da.set() # for cases when done but there was no input.
145 da.set() # for cases when done but there was no input.
146
146
147
147
148 class BufferedGenerator(object):
148 class BufferedGenerator(object):
149 """
149 """
150 Class behaves as a non-blocking, buffered pipe reader.
150 Class behaves as a non-blocking, buffered pipe reader.
151 Reads chunks of data (through a thread)
151 Reads chunks of data (through a thread)
152 from a blocking pipe, and attaches these to an array (Deque) of chunks.
152 from a blocking pipe, and attaches these to an array (Deque) of chunks.
153 Reading is halted in the thread when max chunks is internally buffered.
153 Reading is halted in the thread when max chunks is internally buffered.
154 The .next() may operate in blocking or non-blocking fashion by yielding
154 The .next() may operate in blocking or non-blocking fashion by yielding
155 '' if no data is ready
155 '' if no data is ready
156 to be sent or by not returning until there is some data to send
156 to be sent or by not returning until there is some data to send
157 When we get EOF from underlying source pipe we raise the marker to raise
157 When we get EOF from underlying source pipe we raise the marker to raise
158 StopIteration after the last chunk of data is yielded.
158 StopIteration after the last chunk of data is yielded.
159 """
159 """
160
160
161 def __init__(self, source, buffer_size=65536, chunk_size=4096,
161 def __init__(self, source, buffer_size=65536, chunk_size=4096,
162 starting_values=[], bottomless=False):
162 starting_values=None, bottomless=False):
163 starting_values = starting_values or []
163
164
164 if bottomless:
165 if bottomless:
165 maxlen = int(buffer_size / chunk_size)
166 maxlen = int(buffer_size / chunk_size)
166 else:
167 else:
167 maxlen = None
168 maxlen = None
168
169
169 self.data = deque(starting_values, maxlen)
170 self.data = deque(starting_values, maxlen)
170 self.worker = InputStreamChunker(source, self.data, buffer_size,
171 self.worker = InputStreamChunker(source, self.data, buffer_size,
171 chunk_size)
172 chunk_size)
172 if starting_values:
173 if starting_values:
173 self.worker.data_added.set()
174 self.worker.data_added.set()
174 self.worker.start()
175 self.worker.start()
175
176
176 ####################
177 ####################
177 # Generator's methods
178 # Generator's methods
178 ####################
179 ####################
179
180
180 def __iter__(self):
181 def __iter__(self):
181 return self
182 return self
182
183
183 def next(self):
184 def next(self):
184 while not len(self.data) and not self.worker.EOF.is_set():
185 while not len(self.data) and not self.worker.EOF.is_set():
185 self.worker.data_added.clear()
186 self.worker.data_added.clear()
186 self.worker.data_added.wait(0.2)
187 self.worker.data_added.wait(0.2)
187 if len(self.data):
188 if len(self.data):
188 self.worker.keep_reading.set()
189 self.worker.keep_reading.set()
189 return bytes(self.data.popleft())
190 return bytes(self.data.popleft())
190 elif self.worker.EOF.is_set():
191 elif self.worker.EOF.is_set():
191 raise StopIteration
192 raise StopIteration
192
193
193 def throw(self, type, value=None, traceback=None):
194 def throw(self, exc_type, value=None, traceback=None):
194 if not self.worker.EOF.is_set():
195 if not self.worker.EOF.is_set():
195 raise type(value)
196 raise exc_type(value)
196
197
197 def start(self):
198 def start(self):
198 self.worker.start()
199 self.worker.start()
199
200
200 def stop(self):
201 def stop(self):
201 self.worker.stop()
202 self.worker.stop()
202
203
203 def close(self):
204 def close(self):
204 try:
205 try:
205 self.worker.stop()
206 self.worker.stop()
206 self.throw(GeneratorExit)
207 self.throw(GeneratorExit)
207 except (GeneratorExit, StopIteration):
208 except (GeneratorExit, StopIteration):
208 pass
209 pass
209
210
210 def __del__(self):
211 def __del__(self):
211 self.close()
212 self.close()
212
213
213 ####################
214 ####################
214 # Threaded reader's infrastructure.
215 # Threaded reader's infrastructure.
215 ####################
216 ####################
216 @property
217 @property
217 def input(self):
218 def input(self):
218 return self.worker.w
219 return self.worker.w
219
220
220 @property
221 @property
221 def data_added_event(self):
222 def data_added_event(self):
222 return self.worker.data_added
223 return self.worker.data_added
223
224
224 @property
225 @property
225 def data_added(self):
226 def data_added(self):
226 return self.worker.data_added.is_set()
227 return self.worker.data_added.is_set()
227
228
228 @property
229 @property
229 def reading_paused(self):
230 def reading_paused(self):
230 return not self.worker.keep_reading.is_set()
231 return not self.worker.keep_reading.is_set()
231
232
232 @property
233 @property
233 def done_reading_event(self):
234 def done_reading_event(self):
234 """
235 """
235 Done_reding does not mean that the iterator's buffer is empty.
236 Done_reding does not mean that the iterator's buffer is empty.
236 Iterator might have done reading from underlying source, but the read
237 Iterator might have done reading from underlying source, but the read
237 chunks might still be available for serving through .next() method.
238 chunks might still be available for serving through .next() method.
238
239
239 :returns: An Event class instance.
240 :returns: An Event class instance.
240 """
241 """
241 return self.worker.EOF
242 return self.worker.EOF
242
243
243 @property
244 @property
244 def done_reading(self):
245 def done_reading(self):
245 """
246 """
246 Done_reding does not mean that the iterator's buffer is empty.
247 Done_reding does not mean that the iterator's buffer is empty.
247 Iterator might have done reading from underlying source, but the read
248 Iterator might have done reading from underlying source, but the read
248 chunks might still be available for serving through .next() method.
249 chunks might still be available for serving through .next() method.
249
250
250 :returns: An Bool value.
251 :returns: An Bool value.
251 """
252 """
252 return self.worker.EOF.is_set()
253 return self.worker.EOF.is_set()
253
254
254 @property
255 @property
255 def length(self):
256 def length(self):
256 """
257 """
257 returns int.
258 returns int.
258
259
259 This is the lenght of the que of chunks, not the length of
260 This is the lenght of the que of chunks, not the length of
260 the combined contents in those chunks.
261 the combined contents in those chunks.
261
262
262 __len__() cannot be meaningfully implemented because this
263 __len__() cannot be meaningfully implemented because this
263 reader is just flying throuh a bottomless pit content and
264 reader is just flying throuh a bottomless pit content and
264 can only know the lenght of what it already saw.
265 can only know the lenght of what it already saw.
265
266
266 If __len__() on WSGI server per PEP 3333 returns a value,
267 If __len__() on WSGI server per PEP 3333 returns a value,
267 the responce's length will be set to that. In order not to
268 the responce's length will be set to that. In order not to
268 confuse WSGI PEP3333 servers, we will not implement __len__
269 confuse WSGI PEP3333 servers, we will not implement __len__
269 at all.
270 at all.
270 """
271 """
271 return len(self.data)
272 return len(self.data)
272
273
273 def prepend(self, x):
274 def prepend(self, x):
274 self.data.appendleft(x)
275 self.data.appendleft(x)
275
276
276 def append(self, x):
277 def append(self, x):
277 self.data.append(x)
278 self.data.append(x)
278
279
279 def extend(self, o):
280 def extend(self, o):
280 self.data.extend(o)
281 self.data.extend(o)
281
282
282 def __getitem__(self, i):
283 def __getitem__(self, i):
283 return self.data[i]
284 return self.data[i]
284
285
285
286
286 class SubprocessIOChunker(object):
287 class SubprocessIOChunker(object):
287 """
288 """
288 Processor class wrapping handling of subprocess IO.
289 Processor class wrapping handling of subprocess IO.
289
290
290 .. important::
291 .. important::
291
292
292 Watch out for the method `__del__` on this class. If this object
293 Watch out for the method `__del__` on this class. If this object
293 is deleted, it will kill the subprocess, so avoid to
294 is deleted, it will kill the subprocess, so avoid to
294 return the `output` attribute or usage of it like in the following
295 return the `output` attribute or usage of it like in the following
295 example::
296 example::
296
297
297 # `args` expected to run a program that produces a lot of output
298 # `args` expected to run a program that produces a lot of output
298 output = ''.join(SubprocessIOChunker(
299 output = ''.join(SubprocessIOChunker(
299 args, shell=False, inputstream=inputstream, env=environ).output)
300 args, shell=False, inputstream=inputstream, env=environ).output)
300
301
301 # `output` will not contain all the data, because the __del__ method
302 # `output` will not contain all the data, because the __del__ method
302 # has already killed the subprocess in this case before all output
303 # has already killed the subprocess in this case before all output
303 # has been consumed.
304 # has been consumed.
304
305
305
306
306
307
307 In a way, this is a "communicate()" replacement with a twist.
308 In a way, this is a "communicate()" replacement with a twist.
308
309
309 - We are multithreaded. Writing in and reading out, err are all sep threads.
310 - We are multithreaded. Writing in and reading out, err are all sep threads.
310 - We support concurrent (in and out) stream processing.
311 - We support concurrent (in and out) stream processing.
311 - The output is not a stream. It's a queue of read string (bytes, not unicode)
312 - The output is not a stream. It's a queue of read string (bytes, not unicode)
312 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
313 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
313 - We are non-blocking in more respects than communicate()
314 - We are non-blocking in more respects than communicate()
314 (reading from subprocess out pauses when internal buffer is full, but
315 (reading from subprocess out pauses when internal buffer is full, but
315 does not block the parent calling code. On the flip side, reading from
316 does not block the parent calling code. On the flip side, reading from
316 slow-yielding subprocess may block the iteration until data shows up. This
317 slow-yielding subprocess may block the iteration until data shows up. This
317 does not block the parallel inpipe reading occurring parallel thread.)
318 does not block the parallel inpipe reading occurring parallel thread.)
318
319
319 The purpose of the object is to allow us to wrap subprocess interactions into
320 The purpose of the object is to allow us to wrap subprocess interactions into
320 and interable that can be passed to a WSGI server as the application's return
321 and interable that can be passed to a WSGI server as the application's return
321 value. Because of stream-processing-ability, WSGI does not have to read ALL
322 value. Because of stream-processing-ability, WSGI does not have to read ALL
322 of the subprocess's output and buffer it, before handing it to WSGI server for
323 of the subprocess's output and buffer it, before handing it to WSGI server for
323 HTTP response. Instead, the class initializer reads just a bit of the stream
324 HTTP response. Instead, the class initializer reads just a bit of the stream
324 to figure out if error ocurred or likely to occur and if not, just hands the
325 to figure out if error ocurred or likely to occur and if not, just hands the
325 further iteration over subprocess output to the server for completion of HTTP
326 further iteration over subprocess output to the server for completion of HTTP
326 response.
327 response.
327
328
328 The real or perceived subprocess error is trapped and raised as one of
329 The real or perceived subprocess error is trapped and raised as one of
329 EnvironmentError family of exceptions
330 EnvironmentError family of exceptions
330
331
331 Example usage:
332 Example usage:
332 # try:
333 # try:
333 # answer = SubprocessIOChunker(
334 # answer = SubprocessIOChunker(
334 # cmd,
335 # cmd,
335 # input,
336 # input,
336 # buffer_size = 65536,
337 # buffer_size = 65536,
337 # chunk_size = 4096
338 # chunk_size = 4096
338 # )
339 # )
339 # except (EnvironmentError) as e:
340 # except (EnvironmentError) as e:
340 # print str(e)
341 # print str(e)
341 # raise e
342 # raise e
342 #
343 #
343 # return answer
344 # return answer
344
345
345
346
346 """
347 """
347
348
348 # TODO: johbo: This is used to make sure that the open end of the PIPE
349 # TODO: johbo: This is used to make sure that the open end of the PIPE
349 # is closed in the end. It would be way better to wrap this into an
350 # is closed in the end. It would be way better to wrap this into an
350 # object, so that it is closed automatically once it is consumed or
351 # object, so that it is closed automatically once it is consumed or
351 # something similar.
352 # something similar.
352 _close_input_fd = None
353 _close_input_fd = None
353
354
354 _closed = False
355 _closed = False
355
356
356 def __init__(self, cmd, inputstream=None, buffer_size=65536,
357 def __init__(self, cmd, inputstream=None, buffer_size=65536,
357 chunk_size=4096, starting_values=[], fail_on_stderr=True,
358 chunk_size=4096, starting_values=None, fail_on_stderr=True,
358 fail_on_return_code=True, **kwargs):
359 fail_on_return_code=True, **kwargs):
359 """
360 """
360 Initializes SubprocessIOChunker
361 Initializes SubprocessIOChunker
361
362
362 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
363 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
363 :param inputstream: (Default: None) A file-like, string, or file pointer.
364 :param inputstream: (Default: None) A file-like, string, or file pointer.
364 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
365 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
365 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
366 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
366 :param starting_values: (Default: []) An array of strings to put in front of output que.
367 :param starting_values: (Default: []) An array of strings to put in front of output que.
367 :param fail_on_stderr: (Default: True) Whether to raise an exception in
368 :param fail_on_stderr: (Default: True) Whether to raise an exception in
368 case something is written to stderr.
369 case something is written to stderr.
369 :param fail_on_return_code: (Default: True) Whether to raise an
370 :param fail_on_return_code: (Default: True) Whether to raise an
370 exception if the return code is not 0.
371 exception if the return code is not 0.
371 """
372 """
372
373
374 starting_values = starting_values or []
373 if inputstream:
375 if inputstream:
374 input_streamer = StreamFeeder(inputstream)
376 input_streamer = StreamFeeder(inputstream)
375 input_streamer.start()
377 input_streamer.start()
376 inputstream = input_streamer.output
378 inputstream = input_streamer.output
377 self._close_input_fd = inputstream
379 self._close_input_fd = inputstream
378
380
379 self._fail_on_stderr = fail_on_stderr
381 self._fail_on_stderr = fail_on_stderr
380 self._fail_on_return_code = fail_on_return_code
382 self._fail_on_return_code = fail_on_return_code
381
383
382 _shell = kwargs.get('shell', True)
384 _shell = kwargs.get('shell', True)
383 kwargs['shell'] = _shell
385 kwargs['shell'] = _shell
384
386
385 _p = subprocess.Popen(cmd, bufsize=-1,
387 _p = subprocess.Popen(cmd, bufsize=-1,
386 stdin=inputstream,
388 stdin=inputstream,
387 stdout=subprocess.PIPE,
389 stdout=subprocess.PIPE,
388 stderr=subprocess.PIPE,
390 stderr=subprocess.PIPE,
389 **kwargs)
391 **kwargs)
390
392
391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
393 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 starting_values)
394 starting_values)
393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
395 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394
396
395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
397 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 # doing this until we reach either end of file, or end of buffer.
398 # doing this until we reach either end of file, or end of buffer.
397 bg_out.data_added_event.wait(1)
399 bg_out.data_added_event.wait(1)
398 bg_out.data_added_event.clear()
400 bg_out.data_added_event.clear()
399
401
400 # at this point it's still ambiguous if we are done reading or just full buffer.
402 # at this point it's still ambiguous if we are done reading or just full buffer.
401 # Either way, if error (returned by ended process, or implied based on
403 # Either way, if error (returned by ended process, or implied based on
402 # presence of stuff in stderr output) we error out.
404 # presence of stuff in stderr output) we error out.
403 # Else, we are happy.
405 # Else, we are happy.
404 _returncode = _p.poll()
406 _returncode = _p.poll()
405
407
406 if ((_returncode and fail_on_return_code) or
408 if ((_returncode and fail_on_return_code) or
407 (fail_on_stderr and _returncode is None and bg_err.length)):
409 (fail_on_stderr and _returncode is None and bg_err.length)):
408 try:
410 try:
409 _p.terminate()
411 _p.terminate()
410 except Exception:
412 except Exception:
411 pass
413 pass
412 bg_out.stop()
414 bg_out.stop()
413 bg_err.stop()
415 bg_err.stop()
414 if fail_on_stderr:
416 if fail_on_stderr:
415 err = ''.join(bg_err)
417 err = ''.join(bg_err)
416 raise EnvironmentError(
418 raise EnvironmentError(
417 "Subprocess exited due to an error:\n" + err)
419 "Subprocess exited due to an error:\n" + err)
418 if _returncode and fail_on_return_code:
420 if _returncode and fail_on_return_code:
419 err = ''.join(bg_err)
421 err = ''.join(bg_err)
420 raise EnvironmentError(
422 raise EnvironmentError(
421 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
423 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
422 _returncode, err))
424 _returncode, err))
423
425
424 self.process = _p
426 self.process = _p
425 self.output = bg_out
427 self.output = bg_out
426 self.error = bg_err
428 self.error = bg_err
427
429
428 def __iter__(self):
430 def __iter__(self):
429 return self
431 return self
430
432
431 def next(self):
433 def next(self):
432 # Note: mikhail: We need to be sure that we are checking the return
434 # Note: mikhail: We need to be sure that we are checking the return
433 # code after the stdout stream is closed. Some processes, e.g. git
435 # code after the stdout stream is closed. Some processes, e.g. git
434 # are doing some magic in between closing stdout and terminating the
436 # are doing some magic in between closing stdout and terminating the
435 # process and, as a result, we are not getting return code on "slow"
437 # process and, as a result, we are not getting return code on "slow"
436 # systems.
438 # systems.
439 result = None
437 stop_iteration = None
440 stop_iteration = None
438 try:
441 try:
439 result = self.output.next()
442 result = self.output.next()
440 except StopIteration as e:
443 except StopIteration as e:
441 stop_iteration = e
444 stop_iteration = e
442
445
443 if self.process.poll() and self._fail_on_return_code:
446 if self.process.poll() and self._fail_on_return_code:
444 err = '%s' % ''.join(self.error)
447 err = '%s' % ''.join(self.error)
445 raise EnvironmentError(
448 raise EnvironmentError(
446 "Subprocess exited due to an error:\n" + err)
449 "Subprocess exited due to an error:\n" + err)
447
450
448 if stop_iteration:
451 if stop_iteration:
449 raise stop_iteration
452 raise stop_iteration
450 return result
453 return result
451
454
452 def throw(self, type, value=None, traceback=None):
455 def throw(self, type, value=None, traceback=None):
453 if self.output.length or not self.output.done_reading:
456 if self.output.length or not self.output.done_reading:
454 raise type(value)
457 raise type(value)
455
458
456 def close(self):
459 def close(self):
457 if self._closed:
460 if self._closed:
458 return
461 return
459 self._closed = True
462 self._closed = True
460 try:
463 try:
461 self.process.terminate()
464 self.process.terminate()
462 except:
465 except:
463 pass
466 pass
464 if self._close_input_fd:
467 if self._close_input_fd:
465 os.close(self._close_input_fd)
468 os.close(self._close_input_fd)
466 try:
469 try:
467 self.output.close()
470 self.output.close()
468 except:
471 except:
469 pass
472 pass
470 try:
473 try:
471 self.error.close()
474 self.error.close()
472 except:
475 except:
473 pass
476 pass
474
477
475 def __del__(self):
478 def __del__(self):
476 self.close()
479 self.close()
General Comments 0
You need to be logged in to leave comments. Login now