##// END OF EJS Templates
subprocessio: use safe b.read() and prevent potential valueErrors
marcink -
r371:7da6cab7 default
parent child Browse files
Show More
@@ -1,511 +1,516 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import logging
26 import logging
27 import subprocess32 as subprocess
27 import subprocess32 as subprocess
28 from collections import deque
28 from collections import deque
29 from threading import Event, Thread
29 from threading import Event, Thread
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 class StreamFeeder(Thread):
34 class StreamFeeder(Thread):
35 """
35 """
36 Normal writing into pipe-like is blocking once the buffer is filled.
36 Normal writing into pipe-like is blocking once the buffer is filled.
37 This thread allows a thread to seep data from a file-like into a pipe
37 This thread allows a thread to seep data from a file-like into a pipe
38 without blocking the main thread.
38 without blocking the main thread.
39 We close inpipe once the end of the source stream is reached.
39 We close inpipe once the end of the source stream is reached.
40 """
40 """
41
41
42 def __init__(self, source):
42 def __init__(self, source):
43 super(StreamFeeder, self).__init__()
43 super(StreamFeeder, self).__init__()
44 self.daemon = True
44 self.daemon = True
45 filelike = False
45 filelike = False
46 self.bytes = bytes()
46 self.bytes = bytes()
47 if type(source) in (type(''), bytes, bytearray): # string-like
47 if type(source) in (type(''), bytes, bytearray): # string-like
48 self.bytes = bytes(source)
48 self.bytes = bytes(source)
49 else: # can be either file pointer or file-like
49 else: # can be either file pointer or file-like
50 if type(source) in (int, long): # file pointer it is
50 if type(source) in (int, long): # file pointer it is
51 # converting file descriptor (int) stdin into file-like
51 # converting file descriptor (int) stdin into file-like
52 try:
52 try:
53 source = os.fdopen(source, 'rb', 16384)
53 source = os.fdopen(source, 'rb', 16384)
54 except Exception:
54 except Exception:
55 pass
55 pass
56 # let's see if source is file-like by now
56 # let's see if source is file-like by now
57 try:
57 try:
58 filelike = source.read
58 filelike = source.read
59 except Exception:
59 except Exception:
60 pass
60 pass
61 if not filelike and not self.bytes:
61 if not filelike and not self.bytes:
62 raise TypeError("StreamFeeder's source object must be a readable "
62 raise TypeError("StreamFeeder's source object must be a readable "
63 "file-like, a file descriptor, or a string-like.")
63 "file-like, a file descriptor, or a string-like.")
64 self.source = source
64 self.source = source
65 self.readiface, self.writeiface = os.pipe()
65 self.readiface, self.writeiface = os.pipe()
66
66
67 def run(self):
67 def run(self):
68 t = self.writeiface
68 t = self.writeiface
69 if self.bytes:
69 if self.bytes:
70 os.write(t, self.bytes)
70 os.write(t, self.bytes)
71 else:
71 else:
72 s = self.source
72 s = self.source
73 b = s.read(4096)
73 b = s.read(4096)
74 while b:
74 while b:
75 os.write(t, b)
75 os.write(t, b)
76 b = s.read(4096)
76 b = s.read(4096)
77 os.close(t)
77 os.close(t)
78
78
79 @property
79 @property
80 def output(self):
80 def output(self):
81 return self.readiface
81 return self.readiface
82
82
83
83
84 class InputStreamChunker(Thread):
84 class InputStreamChunker(Thread):
85 def __init__(self, source, target, buffer_size, chunk_size):
85 def __init__(self, source, target, buffer_size, chunk_size):
86
86
87 super(InputStreamChunker, self).__init__()
87 super(InputStreamChunker, self).__init__()
88
88
89 self.daemon = True # die die die.
89 self.daemon = True # die die die.
90
90
91 self.source = source
91 self.source = source
92 self.target = target
92 self.target = target
93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 self.chunk_size = chunk_size
94 self.chunk_size = chunk_size
95
95
96 self.data_added = Event()
96 self.data_added = Event()
97 self.data_added.clear()
97 self.data_added.clear()
98
98
99 self.keep_reading = Event()
99 self.keep_reading = Event()
100 self.keep_reading.set()
100 self.keep_reading.set()
101
101
102 self.EOF = Event()
102 self.EOF = Event()
103 self.EOF.clear()
103 self.EOF.clear()
104
104
105 self.go = Event()
105 self.go = Event()
106 self.go.set()
106 self.go.set()
107
107
108 def stop(self):
108 def stop(self):
109 self.go.clear()
109 self.go.clear()
110 self.EOF.set()
110 self.EOF.set()
111 try:
111 try:
112 # this is not proper, but is done to force the reader thread let
112 # this is not proper, but is done to force the reader thread let
113 # go of the input because, if successful, .close() will send EOF
113 # go of the input because, if successful, .close() will send EOF
114 # down the pipe.
114 # down the pipe.
115 self.source.close()
115 self.source.close()
116 except:
116 except:
117 pass
117 pass
118
118
119 def run(self):
119 def run(self):
120 s = self.source
120 s = self.source
121 t = self.target
121 t = self.target
122 cs = self.chunk_size
122 cs = self.chunk_size
123 chunk_count_max = self.chunk_count_max
123 chunk_count_max = self.chunk_count_max
124 keep_reading = self.keep_reading
124 keep_reading = self.keep_reading
125 da = self.data_added
125 da = self.data_added
126 go = self.go
126 go = self.go
127
127
128 try:
128 try:
129 b = s.read(cs)
129 b = s.read(cs)
130 except ValueError:
130 except ValueError:
131 b = ''
131 b = ''
132
132
133 timeout_input = 20
133 timeout_input = 20
134 while b and go.is_set():
134 while b and go.is_set():
135 if len(t) > chunk_count_max:
135 if len(t) > chunk_count_max:
136 keep_reading.clear()
136 keep_reading.clear()
137 keep_reading.wait(timeout_input)
137 keep_reading.wait(timeout_input)
138 if len(t) > chunk_count_max + timeout_input:
138 if len(t) > chunk_count_max + timeout_input:
139 log.error("Timed out while waiting for input from subprocess.")
139 log.error("Timed out while waiting for input from subprocess.")
140 os._exit(-1) # this will cause the worker to recycle itself
140 os._exit(-1) # this will cause the worker to recycle itself
141
141
142 t.append(b)
142 t.append(b)
143 da.set()
143 da.set()
144 b = s.read(cs)
144
145 try:
146 b = s.read(cs)
147 except ValueError:
148 b = ''
149
145 self.EOF.set()
150 self.EOF.set()
146 da.set() # for cases when done but there was no input.
151 da.set() # for cases when done but there was no input.
147
152
148
153
149 class BufferedGenerator(object):
154 class BufferedGenerator(object):
150 """
155 """
151 Class behaves as a non-blocking, buffered pipe reader.
156 Class behaves as a non-blocking, buffered pipe reader.
152 Reads chunks of data (through a thread)
157 Reads chunks of data (through a thread)
153 from a blocking pipe, and attaches these to an array (Deque) of chunks.
158 from a blocking pipe, and attaches these to an array (Deque) of chunks.
154 Reading is halted in the thread when max chunks is internally buffered.
159 Reading is halted in the thread when max chunks is internally buffered.
155 The .next() may operate in blocking or non-blocking fashion by yielding
160 The .next() may operate in blocking or non-blocking fashion by yielding
156 '' if no data is ready
161 '' if no data is ready
157 to be sent or by not returning until there is some data to send
162 to be sent or by not returning until there is some data to send
158 When we get EOF from underlying source pipe we raise the marker to raise
163 When we get EOF from underlying source pipe we raise the marker to raise
159 StopIteration after the last chunk of data is yielded.
164 StopIteration after the last chunk of data is yielded.
160 """
165 """
161
166
162 def __init__(self, source, buffer_size=65536, chunk_size=4096,
167 def __init__(self, source, buffer_size=65536, chunk_size=4096,
163 starting_values=None, bottomless=False):
168 starting_values=None, bottomless=False):
164 starting_values = starting_values or []
169 starting_values = starting_values or []
165
170
166 if bottomless:
171 if bottomless:
167 maxlen = int(buffer_size / chunk_size)
172 maxlen = int(buffer_size / chunk_size)
168 else:
173 else:
169 maxlen = None
174 maxlen = None
170
175
171 self.data = deque(starting_values, maxlen)
176 self.data = deque(starting_values, maxlen)
172 self.worker = InputStreamChunker(source, self.data, buffer_size,
177 self.worker = InputStreamChunker(source, self.data, buffer_size,
173 chunk_size)
178 chunk_size)
174 if starting_values:
179 if starting_values:
175 self.worker.data_added.set()
180 self.worker.data_added.set()
176 self.worker.start()
181 self.worker.start()
177
182
178 ####################
183 ####################
179 # Generator's methods
184 # Generator's methods
180 ####################
185 ####################
181
186
182 def __iter__(self):
187 def __iter__(self):
183 return self
188 return self
184
189
185 def next(self):
190 def next(self):
186 while not len(self.data) and not self.worker.EOF.is_set():
191 while not len(self.data) and not self.worker.EOF.is_set():
187 self.worker.data_added.clear()
192 self.worker.data_added.clear()
188 self.worker.data_added.wait(0.2)
193 self.worker.data_added.wait(0.2)
189 if len(self.data):
194 if len(self.data):
190 self.worker.keep_reading.set()
195 self.worker.keep_reading.set()
191 return bytes(self.data.popleft())
196 return bytes(self.data.popleft())
192 elif self.worker.EOF.is_set():
197 elif self.worker.EOF.is_set():
193 raise StopIteration
198 raise StopIteration
194
199
195 def throw(self, exc_type, value=None, traceback=None):
200 def throw(self, exc_type, value=None, traceback=None):
196 if not self.worker.EOF.is_set():
201 if not self.worker.EOF.is_set():
197 raise exc_type(value)
202 raise exc_type(value)
198
203
199 def start(self):
204 def start(self):
200 self.worker.start()
205 self.worker.start()
201
206
202 def stop(self):
207 def stop(self):
203 self.worker.stop()
208 self.worker.stop()
204
209
205 def close(self):
210 def close(self):
206 try:
211 try:
207 self.worker.stop()
212 self.worker.stop()
208 self.throw(GeneratorExit)
213 self.throw(GeneratorExit)
209 except (GeneratorExit, StopIteration):
214 except (GeneratorExit, StopIteration):
210 pass
215 pass
211
216
212 def __del__(self):
217 def __del__(self):
213 self.close()
218 self.close()
214
219
215 ####################
220 ####################
216 # Threaded reader's infrastructure.
221 # Threaded reader's infrastructure.
217 ####################
222 ####################
218 @property
223 @property
219 def input(self):
224 def input(self):
220 return self.worker.w
225 return self.worker.w
221
226
222 @property
227 @property
223 def data_added_event(self):
228 def data_added_event(self):
224 return self.worker.data_added
229 return self.worker.data_added
225
230
226 @property
231 @property
227 def data_added(self):
232 def data_added(self):
228 return self.worker.data_added.is_set()
233 return self.worker.data_added.is_set()
229
234
230 @property
235 @property
231 def reading_paused(self):
236 def reading_paused(self):
232 return not self.worker.keep_reading.is_set()
237 return not self.worker.keep_reading.is_set()
233
238
234 @property
239 @property
235 def done_reading_event(self):
240 def done_reading_event(self):
236 """
241 """
237 Done_reding does not mean that the iterator's buffer is empty.
242 Done_reding does not mean that the iterator's buffer is empty.
238 Iterator might have done reading from underlying source, but the read
243 Iterator might have done reading from underlying source, but the read
239 chunks might still be available for serving through .next() method.
244 chunks might still be available for serving through .next() method.
240
245
241 :returns: An Event class instance.
246 :returns: An Event class instance.
242 """
247 """
243 return self.worker.EOF
248 return self.worker.EOF
244
249
245 @property
250 @property
246 def done_reading(self):
251 def done_reading(self):
247 """
252 """
248 Done_reding does not mean that the iterator's buffer is empty.
253 Done_reding does not mean that the iterator's buffer is empty.
249 Iterator might have done reading from underlying source, but the read
254 Iterator might have done reading from underlying source, but the read
250 chunks might still be available for serving through .next() method.
255 chunks might still be available for serving through .next() method.
251
256
252 :returns: An Bool value.
257 :returns: An Bool value.
253 """
258 """
254 return self.worker.EOF.is_set()
259 return self.worker.EOF.is_set()
255
260
256 @property
261 @property
257 def length(self):
262 def length(self):
258 """
263 """
259 returns int.
264 returns int.
260
265
261 This is the lenght of the que of chunks, not the length of
266 This is the lenght of the que of chunks, not the length of
262 the combined contents in those chunks.
267 the combined contents in those chunks.
263
268
264 __len__() cannot be meaningfully implemented because this
269 __len__() cannot be meaningfully implemented because this
265 reader is just flying throuh a bottomless pit content and
270 reader is just flying throuh a bottomless pit content and
266 can only know the lenght of what it already saw.
271 can only know the lenght of what it already saw.
267
272
268 If __len__() on WSGI server per PEP 3333 returns a value,
273 If __len__() on WSGI server per PEP 3333 returns a value,
269 the responce's length will be set to that. In order not to
274 the responce's length will be set to that. In order not to
270 confuse WSGI PEP3333 servers, we will not implement __len__
275 confuse WSGI PEP3333 servers, we will not implement __len__
271 at all.
276 at all.
272 """
277 """
273 return len(self.data)
278 return len(self.data)
274
279
275 def prepend(self, x):
280 def prepend(self, x):
276 self.data.appendleft(x)
281 self.data.appendleft(x)
277
282
278 def append(self, x):
283 def append(self, x):
279 self.data.append(x)
284 self.data.append(x)
280
285
281 def extend(self, o):
286 def extend(self, o):
282 self.data.extend(o)
287 self.data.extend(o)
283
288
284 def __getitem__(self, i):
289 def __getitem__(self, i):
285 return self.data[i]
290 return self.data[i]
286
291
287
292
288 class SubprocessIOChunker(object):
293 class SubprocessIOChunker(object):
289 """
294 """
290 Processor class wrapping handling of subprocess IO.
295 Processor class wrapping handling of subprocess IO.
291
296
292 .. important::
297 .. important::
293
298
294 Watch out for the method `__del__` on this class. If this object
299 Watch out for the method `__del__` on this class. If this object
295 is deleted, it will kill the subprocess, so avoid to
300 is deleted, it will kill the subprocess, so avoid to
296 return the `output` attribute or usage of it like in the following
301 return the `output` attribute or usage of it like in the following
297 example::
302 example::
298
303
299 # `args` expected to run a program that produces a lot of output
304 # `args` expected to run a program that produces a lot of output
300 output = ''.join(SubprocessIOChunker(
305 output = ''.join(SubprocessIOChunker(
301 args, shell=False, inputstream=inputstream, env=environ).output)
306 args, shell=False, inputstream=inputstream, env=environ).output)
302
307
303 # `output` will not contain all the data, because the __del__ method
308 # `output` will not contain all the data, because the __del__ method
304 # has already killed the subprocess in this case before all output
309 # has already killed the subprocess in this case before all output
305 # has been consumed.
310 # has been consumed.
306
311
307
312
308
313
309 In a way, this is a "communicate()" replacement with a twist.
314 In a way, this is a "communicate()" replacement with a twist.
310
315
311 - We are multithreaded. Writing in and reading out, err are all sep threads.
316 - We are multithreaded. Writing in and reading out, err are all sep threads.
312 - We support concurrent (in and out) stream processing.
317 - We support concurrent (in and out) stream processing.
313 - The output is not a stream. It's a queue of read string (bytes, not unicode)
318 - The output is not a stream. It's a queue of read string (bytes, not unicode)
314 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
319 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
315 - We are non-blocking in more respects than communicate()
320 - We are non-blocking in more respects than communicate()
316 (reading from subprocess out pauses when internal buffer is full, but
321 (reading from subprocess out pauses when internal buffer is full, but
317 does not block the parent calling code. On the flip side, reading from
322 does not block the parent calling code. On the flip side, reading from
318 slow-yielding subprocess may block the iteration until data shows up. This
323 slow-yielding subprocess may block the iteration until data shows up. This
319 does not block the parallel inpipe reading occurring parallel thread.)
324 does not block the parallel inpipe reading occurring parallel thread.)
320
325
321 The purpose of the object is to allow us to wrap subprocess interactions into
326 The purpose of the object is to allow us to wrap subprocess interactions into
322 and interable that can be passed to a WSGI server as the application's return
327 and interable that can be passed to a WSGI server as the application's return
323 value. Because of stream-processing-ability, WSGI does not have to read ALL
328 value. Because of stream-processing-ability, WSGI does not have to read ALL
324 of the subprocess's output and buffer it, before handing it to WSGI server for
329 of the subprocess's output and buffer it, before handing it to WSGI server for
325 HTTP response. Instead, the class initializer reads just a bit of the stream
330 HTTP response. Instead, the class initializer reads just a bit of the stream
326 to figure out if error ocurred or likely to occur and if not, just hands the
331 to figure out if error ocurred or likely to occur and if not, just hands the
327 further iteration over subprocess output to the server for completion of HTTP
332 further iteration over subprocess output to the server for completion of HTTP
328 response.
333 response.
329
334
330 The real or perceived subprocess error is trapped and raised as one of
335 The real or perceived subprocess error is trapped and raised as one of
331 EnvironmentError family of exceptions
336 EnvironmentError family of exceptions
332
337
333 Example usage:
338 Example usage:
334 # try:
339 # try:
335 # answer = SubprocessIOChunker(
340 # answer = SubprocessIOChunker(
336 # cmd,
341 # cmd,
337 # input,
342 # input,
338 # buffer_size = 65536,
343 # buffer_size = 65536,
339 # chunk_size = 4096
344 # chunk_size = 4096
340 # )
345 # )
341 # except (EnvironmentError) as e:
346 # except (EnvironmentError) as e:
342 # print str(e)
347 # print str(e)
343 # raise e
348 # raise e
344 #
349 #
345 # return answer
350 # return answer
346
351
347
352
348 """
353 """
349
354
350 # TODO: johbo: This is used to make sure that the open end of the PIPE
355 # TODO: johbo: This is used to make sure that the open end of the PIPE
351 # is closed in the end. It would be way better to wrap this into an
356 # is closed in the end. It would be way better to wrap this into an
352 # object, so that it is closed automatically once it is consumed or
357 # object, so that it is closed automatically once it is consumed or
353 # something similar.
358 # something similar.
354 _close_input_fd = None
359 _close_input_fd = None
355
360
356 _closed = False
361 _closed = False
357
362
358 def __init__(self, cmd, inputstream=None, buffer_size=65536,
363 def __init__(self, cmd, inputstream=None, buffer_size=65536,
359 chunk_size=4096, starting_values=None, fail_on_stderr=True,
364 chunk_size=4096, starting_values=None, fail_on_stderr=True,
360 fail_on_return_code=True, **kwargs):
365 fail_on_return_code=True, **kwargs):
361 """
366 """
362 Initializes SubprocessIOChunker
367 Initializes SubprocessIOChunker
363
368
364 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
369 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
365 :param inputstream: (Default: None) A file-like, string, or file pointer.
370 :param inputstream: (Default: None) A file-like, string, or file pointer.
366 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
371 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
367 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
372 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
368 :param starting_values: (Default: []) An array of strings to put in front of output que.
373 :param starting_values: (Default: []) An array of strings to put in front of output que.
369 :param fail_on_stderr: (Default: True) Whether to raise an exception in
374 :param fail_on_stderr: (Default: True) Whether to raise an exception in
370 case something is written to stderr.
375 case something is written to stderr.
371 :param fail_on_return_code: (Default: True) Whether to raise an
376 :param fail_on_return_code: (Default: True) Whether to raise an
372 exception if the return code is not 0.
377 exception if the return code is not 0.
373 """
378 """
374
379
375 starting_values = starting_values or []
380 starting_values = starting_values or []
376 if inputstream:
381 if inputstream:
377 input_streamer = StreamFeeder(inputstream)
382 input_streamer = StreamFeeder(inputstream)
378 input_streamer.start()
383 input_streamer.start()
379 inputstream = input_streamer.output
384 inputstream = input_streamer.output
380 self._close_input_fd = inputstream
385 self._close_input_fd = inputstream
381
386
382 self._fail_on_stderr = fail_on_stderr
387 self._fail_on_stderr = fail_on_stderr
383 self._fail_on_return_code = fail_on_return_code
388 self._fail_on_return_code = fail_on_return_code
384
389
385 _shell = kwargs.get('shell', True)
390 _shell = kwargs.get('shell', True)
386 kwargs['shell'] = _shell
391 kwargs['shell'] = _shell
387
392
388 _p = subprocess.Popen(cmd, bufsize=-1,
393 _p = subprocess.Popen(cmd, bufsize=-1,
389 stdin=inputstream,
394 stdin=inputstream,
390 stdout=subprocess.PIPE,
395 stdout=subprocess.PIPE,
391 stderr=subprocess.PIPE,
396 stderr=subprocess.PIPE,
392 **kwargs)
397 **kwargs)
393
398
394 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
399 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
395 starting_values)
400 starting_values)
396 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
401 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
397
402
398 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
403 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
399 # doing this until we reach either end of file, or end of buffer.
404 # doing this until we reach either end of file, or end of buffer.
400 bg_out.data_added_event.wait(1)
405 bg_out.data_added_event.wait(1)
401 bg_out.data_added_event.clear()
406 bg_out.data_added_event.clear()
402
407
403 # at this point it's still ambiguous if we are done reading or just full buffer.
408 # at this point it's still ambiguous if we are done reading or just full buffer.
404 # Either way, if error (returned by ended process, or implied based on
409 # Either way, if error (returned by ended process, or implied based on
405 # presence of stuff in stderr output) we error out.
410 # presence of stuff in stderr output) we error out.
406 # Else, we are happy.
411 # Else, we are happy.
407 _returncode = _p.poll()
412 _returncode = _p.poll()
408
413
409 if ((_returncode and fail_on_return_code) or
414 if ((_returncode and fail_on_return_code) or
410 (fail_on_stderr and _returncode is None and bg_err.length)):
415 (fail_on_stderr and _returncode is None and bg_err.length)):
411 try:
416 try:
412 _p.terminate()
417 _p.terminate()
413 except Exception:
418 except Exception:
414 pass
419 pass
415 bg_out.stop()
420 bg_out.stop()
416 bg_err.stop()
421 bg_err.stop()
417 if fail_on_stderr:
422 if fail_on_stderr:
418 err = ''.join(bg_err)
423 err = ''.join(bg_err)
419 raise EnvironmentError(
424 raise EnvironmentError(
420 "Subprocess exited due to an error:\n" + err)
425 "Subprocess exited due to an error:\n" + err)
421 if _returncode and fail_on_return_code:
426 if _returncode and fail_on_return_code:
422 err = ''.join(bg_err)
427 err = ''.join(bg_err)
423 if not err:
428 if not err:
424 # maybe get empty stderr, try stdout instead
429 # maybe get empty stderr, try stdout instead
425 # in many cases git reports the errors on stdout too
430 # in many cases git reports the errors on stdout too
426 err = ''.join(bg_out)
431 err = ''.join(bg_out)
427 raise EnvironmentError(
432 raise EnvironmentError(
428 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
433 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
429 _returncode, err))
434 _returncode, err))
430
435
431 self.process = _p
436 self.process = _p
432 self.output = bg_out
437 self.output = bg_out
433 self.error = bg_err
438 self.error = bg_err
434
439
435 def __iter__(self):
440 def __iter__(self):
436 return self
441 return self
437
442
438 def next(self):
443 def next(self):
439 # Note: mikhail: We need to be sure that we are checking the return
444 # Note: mikhail: We need to be sure that we are checking the return
440 # code after the stdout stream is closed. Some processes, e.g. git
445 # code after the stdout stream is closed. Some processes, e.g. git
441 # are doing some magic in between closing stdout and terminating the
446 # are doing some magic in between closing stdout and terminating the
442 # process and, as a result, we are not getting return code on "slow"
447 # process and, as a result, we are not getting return code on "slow"
443 # systems.
448 # systems.
444 result = None
449 result = None
445 stop_iteration = None
450 stop_iteration = None
446 try:
451 try:
447 result = self.output.next()
452 result = self.output.next()
448 except StopIteration as e:
453 except StopIteration as e:
449 stop_iteration = e
454 stop_iteration = e
450
455
451 if self.process.poll() and self._fail_on_return_code:
456 if self.process.poll() and self._fail_on_return_code:
452 err = '%s' % ''.join(self.error)
457 err = '%s' % ''.join(self.error)
453 raise EnvironmentError(
458 raise EnvironmentError(
454 "Subprocess exited due to an error:\n" + err)
459 "Subprocess exited due to an error:\n" + err)
455
460
456 if stop_iteration:
461 if stop_iteration:
457 raise stop_iteration
462 raise stop_iteration
458 return result
463 return result
459
464
460 def throw(self, type, value=None, traceback=None):
465 def throw(self, type, value=None, traceback=None):
461 if self.output.length or not self.output.done_reading:
466 if self.output.length or not self.output.done_reading:
462 raise type(value)
467 raise type(value)
463
468
464 def close(self):
469 def close(self):
465 if self._closed:
470 if self._closed:
466 return
471 return
467 self._closed = True
472 self._closed = True
468 try:
473 try:
469 self.process.terminate()
474 self.process.terminate()
470 except:
475 except:
471 pass
476 pass
472 if self._close_input_fd:
477 if self._close_input_fd:
473 os.close(self._close_input_fd)
478 os.close(self._close_input_fd)
474 try:
479 try:
475 self.output.close()
480 self.output.close()
476 except:
481 except:
477 pass
482 pass
478 try:
483 try:
479 self.error.close()
484 self.error.close()
480 except:
485 except:
481 pass
486 pass
482
487
483 def __del__(self):
488 def __del__(self):
484 self.close()
489 self.close()
485
490
486
491
487 def run_command(arguments, env=None):
492 def run_command(arguments, env=None):
488 """
493 """
489 Run the specified command and return the stdout.
494 Run the specified command and return the stdout.
490
495
491 :param arguments: sequence of program arguments (including the program name)
496 :param arguments: sequence of program arguments (including the program name)
492 :type arguments: list[str]
497 :type arguments: list[str]
493 """
498 """
494
499
495 cmd = arguments
500 cmd = arguments
496 log.debug('Running subprocessio command %s', cmd)
501 log.debug('Running subprocessio command %s', cmd)
497 try:
502 try:
498 _opts = {'shell': False, 'fail_on_stderr': False}
503 _opts = {'shell': False, 'fail_on_stderr': False}
499 if env:
504 if env:
500 _opts.update({'env': env})
505 _opts.update({'env': env})
501 p = SubprocessIOChunker(cmd, **_opts)
506 p = SubprocessIOChunker(cmd, **_opts)
502 stdout = ''.join(p)
507 stdout = ''.join(p)
503 stderr = ''.join(''.join(p.error))
508 stderr = ''.join(''.join(p.error))
504 except (EnvironmentError, OSError) as err:
509 except (EnvironmentError, OSError) as err:
505 cmd = ' '.join(cmd) # human friendly CMD
510 cmd = ' '.join(cmd) # human friendly CMD
506 tb_err = ("Couldn't run subprocessio command (%s).\n"
511 tb_err = ("Couldn't run subprocessio command (%s).\n"
507 "Original error was:%s\n" % (cmd, err))
512 "Original error was:%s\n" % (cmd, err))
508 log.exception(tb_err)
513 log.exception(tb_err)
509 raise Exception(tb_err)
514 raise Exception(tb_err)
510
515
511 return stdout, stderr
516 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now