##// END OF EJS Templates
subprocession: limit fd leaks
marcink -
r624:560860c2 default
parent child Browse files
Show More
@@ -1,516 +1,523 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import logging
26 import logging
27 import subprocess32 as subprocess
27 import subprocess32 as subprocess
28 from collections import deque
28 from collections import deque
29 from threading import Event, Thread
29 from threading import Event, Thread
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 class StreamFeeder(Thread):
34 class StreamFeeder(Thread):
35 """
35 """
36 Normal writing into pipe-like is blocking once the buffer is filled.
36 Normal writing into pipe-like is blocking once the buffer is filled.
37 This thread allows a thread to seep data from a file-like into a pipe
37 This thread allows a thread to seep data from a file-like into a pipe
38 without blocking the main thread.
38 without blocking the main thread.
39 We close inpipe once the end of the source stream is reached.
39 We close inpipe once the end of the source stream is reached.
40 """
40 """
41
41
42 def __init__(self, source):
42 def __init__(self, source):
43 super(StreamFeeder, self).__init__()
43 super(StreamFeeder, self).__init__()
44 self.daemon = True
44 self.daemon = True
45 filelike = False
45 filelike = False
46 self.bytes = bytes()
46 self.bytes = bytes()
47 if type(source) in (type(''), bytes, bytearray): # string-like
47 if type(source) in (type(''), bytes, bytearray): # string-like
48 self.bytes = bytes(source)
48 self.bytes = bytes(source)
49 else: # can be either file pointer or file-like
49 else: # can be either file pointer or file-like
50 if type(source) in (int, long): # file pointer it is
50 if type(source) in (int, long): # file pointer it is
51 # converting file descriptor (int) stdin into file-like
51 # converting file descriptor (int) stdin into file-like
52 try:
52 try:
53 source = os.fdopen(source, 'rb', 16384)
53 source = os.fdopen(source, 'rb', 16384)
54 except Exception:
54 except Exception:
55 pass
55 pass
56 # let's see if source is file-like by now
56 # let's see if source is file-like by now
57 try:
57 try:
58 filelike = source.read
58 filelike = source.read
59 except Exception:
59 except Exception:
60 pass
60 pass
61 if not filelike and not self.bytes:
61 if not filelike and not self.bytes:
62 raise TypeError("StreamFeeder's source object must be a readable "
62 raise TypeError("StreamFeeder's source object must be a readable "
63 "file-like, a file descriptor, or a string-like.")
63 "file-like, a file descriptor, or a string-like.")
64 self.source = source
64 self.source = source
65 self.readiface, self.writeiface = os.pipe()
65 self.readiface, self.writeiface = os.pipe()
66
66
67 def run(self):
67 def run(self):
68 t = self.writeiface
68 t = self.writeiface
69 try:
69 if self.bytes:
70 if self.bytes:
70 os.write(t, self.bytes)
71 os.write(t, self.bytes)
71 else:
72 else:
72 s = self.source
73 s = self.source
73 b = s.read(4096)
74 b = s.read(4096)
74 while b:
75 while b:
75 os.write(t, b)
76 os.write(t, b)
76 b = s.read(4096)
77 b = s.read(4096)
78 finally:
77 os.close(t)
79 os.close(t)
78
80
79 @property
81 @property
80 def output(self):
82 def output(self):
81 return self.readiface
83 return self.readiface
82
84
83
85
84 class InputStreamChunker(Thread):
86 class InputStreamChunker(Thread):
85 def __init__(self, source, target, buffer_size, chunk_size):
87 def __init__(self, source, target, buffer_size, chunk_size):
86
88
87 super(InputStreamChunker, self).__init__()
89 super(InputStreamChunker, self).__init__()
88
90
89 self.daemon = True # die die die.
91 self.daemon = True # die die die.
90
92
91 self.source = source
93 self.source = source
92 self.target = target
94 self.target = target
93 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
94 self.chunk_size = chunk_size
96 self.chunk_size = chunk_size
95
97
96 self.data_added = Event()
98 self.data_added = Event()
97 self.data_added.clear()
99 self.data_added.clear()
98
100
99 self.keep_reading = Event()
101 self.keep_reading = Event()
100 self.keep_reading.set()
102 self.keep_reading.set()
101
103
102 self.EOF = Event()
104 self.EOF = Event()
103 self.EOF.clear()
105 self.EOF.clear()
104
106
105 self.go = Event()
107 self.go = Event()
106 self.go.set()
108 self.go.set()
107
109
108 def stop(self):
110 def stop(self):
109 self.go.clear()
111 self.go.clear()
110 self.EOF.set()
112 self.EOF.set()
111 try:
113 try:
112 # this is not proper, but is done to force the reader thread let
114 # this is not proper, but is done to force the reader thread let
113 # go of the input because, if successful, .close() will send EOF
115 # go of the input because, if successful, .close() will send EOF
114 # down the pipe.
116 # down the pipe.
115 self.source.close()
117 self.source.close()
116 except:
118 except:
117 pass
119 pass
118
120
119 def run(self):
121 def run(self):
120 s = self.source
122 s = self.source
121 t = self.target
123 t = self.target
122 cs = self.chunk_size
124 cs = self.chunk_size
123 chunk_count_max = self.chunk_count_max
125 chunk_count_max = self.chunk_count_max
124 keep_reading = self.keep_reading
126 keep_reading = self.keep_reading
125 da = self.data_added
127 da = self.data_added
126 go = self.go
128 go = self.go
127
129
128 try:
130 try:
129 b = s.read(cs)
131 b = s.read(cs)
130 except ValueError:
132 except ValueError:
131 b = ''
133 b = ''
132
134
133 timeout_input = 20
135 timeout_input = 20
134 while b and go.is_set():
136 while b and go.is_set():
135 if len(t) > chunk_count_max:
137 if len(t) > chunk_count_max:
136 keep_reading.clear()
138 keep_reading.clear()
137 keep_reading.wait(timeout_input)
139 keep_reading.wait(timeout_input)
138 if len(t) > chunk_count_max + timeout_input:
140 if len(t) > chunk_count_max + timeout_input:
139 log.error("Timed out while waiting for input from subprocess.")
141 log.error("Timed out while waiting for input from subprocess.")
140 os._exit(-1) # this will cause the worker to recycle itself
142 os._exit(-1) # this will cause the worker to recycle itself
141
143
142 t.append(b)
144 t.append(b)
143 da.set()
145 da.set()
144
146
145 try:
147 try:
146 b = s.read(cs)
148 b = s.read(cs)
147 except ValueError:
149 except ValueError:
148 b = ''
150 b = ''
149
151
150 self.EOF.set()
152 self.EOF.set()
151 da.set() # for cases when done but there was no input.
153 da.set() # for cases when done but there was no input.
152
154
153
155
154 class BufferedGenerator(object):
156 class BufferedGenerator(object):
155 """
157 """
156 Class behaves as a non-blocking, buffered pipe reader.
158 Class behaves as a non-blocking, buffered pipe reader.
157 Reads chunks of data (through a thread)
159 Reads chunks of data (through a thread)
158 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
159 Reading is halted in the thread when max chunks is internally buffered.
161 Reading is halted in the thread when max chunks is internally buffered.
160 The .next() may operate in blocking or non-blocking fashion by yielding
162 The .next() may operate in blocking or non-blocking fashion by yielding
161 '' if no data is ready
163 '' if no data is ready
162 to be sent or by not returning until there is some data to send
164 to be sent or by not returning until there is some data to send
163 When we get EOF from underlying source pipe we raise the marker to raise
165 When we get EOF from underlying source pipe we raise the marker to raise
164 StopIteration after the last chunk of data is yielded.
166 StopIteration after the last chunk of data is yielded.
165 """
167 """
166
168
167 def __init__(self, source, buffer_size=65536, chunk_size=4096,
169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
168 starting_values=None, bottomless=False):
170 starting_values=None, bottomless=False):
169 starting_values = starting_values or []
171 starting_values = starting_values or []
170
172
171 if bottomless:
173 if bottomless:
172 maxlen = int(buffer_size / chunk_size)
174 maxlen = int(buffer_size / chunk_size)
173 else:
175 else:
174 maxlen = None
176 maxlen = None
175
177
176 self.data = deque(starting_values, maxlen)
178 self.data = deque(starting_values, maxlen)
177 self.worker = InputStreamChunker(source, self.data, buffer_size,
179 self.worker = InputStreamChunker(source, self.data, buffer_size,
178 chunk_size)
180 chunk_size)
179 if starting_values:
181 if starting_values:
180 self.worker.data_added.set()
182 self.worker.data_added.set()
181 self.worker.start()
183 self.worker.start()
182
184
183 ####################
185 ####################
184 # Generator's methods
186 # Generator's methods
185 ####################
187 ####################
186
188
187 def __iter__(self):
189 def __iter__(self):
188 return self
190 return self
189
191
190 def next(self):
192 def next(self):
191 while not len(self.data) and not self.worker.EOF.is_set():
193 while not len(self.data) and not self.worker.EOF.is_set():
192 self.worker.data_added.clear()
194 self.worker.data_added.clear()
193 self.worker.data_added.wait(0.2)
195 self.worker.data_added.wait(0.2)
194 if len(self.data):
196 if len(self.data):
195 self.worker.keep_reading.set()
197 self.worker.keep_reading.set()
196 return bytes(self.data.popleft())
198 return bytes(self.data.popleft())
197 elif self.worker.EOF.is_set():
199 elif self.worker.EOF.is_set():
198 raise StopIteration
200 raise StopIteration
199
201
200 def throw(self, exc_type, value=None, traceback=None):
202 def throw(self, exc_type, value=None, traceback=None):
201 if not self.worker.EOF.is_set():
203 if not self.worker.EOF.is_set():
202 raise exc_type(value)
204 raise exc_type(value)
203
205
204 def start(self):
206 def start(self):
205 self.worker.start()
207 self.worker.start()
206
208
207 def stop(self):
209 def stop(self):
208 self.worker.stop()
210 self.worker.stop()
209
211
210 def close(self):
212 def close(self):
211 try:
213 try:
212 self.worker.stop()
214 self.worker.stop()
213 self.throw(GeneratorExit)
215 self.throw(GeneratorExit)
214 except (GeneratorExit, StopIteration):
216 except (GeneratorExit, StopIteration):
215 pass
217 pass
216
218
217 def __del__(self):
219 def __del__(self):
218 self.close()
220 self.close()
219
221
220 ####################
222 ####################
221 # Threaded reader's infrastructure.
223 # Threaded reader's infrastructure.
222 ####################
224 ####################
223 @property
225 @property
224 def input(self):
226 def input(self):
225 return self.worker.w
227 return self.worker.w
226
228
227 @property
229 @property
228 def data_added_event(self):
230 def data_added_event(self):
229 return self.worker.data_added
231 return self.worker.data_added
230
232
231 @property
233 @property
232 def data_added(self):
234 def data_added(self):
233 return self.worker.data_added.is_set()
235 return self.worker.data_added.is_set()
234
236
235 @property
237 @property
236 def reading_paused(self):
238 def reading_paused(self):
237 return not self.worker.keep_reading.is_set()
239 return not self.worker.keep_reading.is_set()
238
240
239 @property
241 @property
240 def done_reading_event(self):
242 def done_reading_event(self):
241 """
243 """
242 Done_reding does not mean that the iterator's buffer is empty.
244 Done_reding does not mean that the iterator's buffer is empty.
243 Iterator might have done reading from underlying source, but the read
245 Iterator might have done reading from underlying source, but the read
244 chunks might still be available for serving through .next() method.
246 chunks might still be available for serving through .next() method.
245
247
246 :returns: An Event class instance.
248 :returns: An Event class instance.
247 """
249 """
248 return self.worker.EOF
250 return self.worker.EOF
249
251
250 @property
252 @property
251 def done_reading(self):
253 def done_reading(self):
252 """
254 """
253 Done_reding does not mean that the iterator's buffer is empty.
255 Done_reding does not mean that the iterator's buffer is empty.
254 Iterator might have done reading from underlying source, but the read
256 Iterator might have done reading from underlying source, but the read
255 chunks might still be available for serving through .next() method.
257 chunks might still be available for serving through .next() method.
256
258
257 :returns: An Bool value.
259 :returns: An Bool value.
258 """
260 """
259 return self.worker.EOF.is_set()
261 return self.worker.EOF.is_set()
260
262
261 @property
263 @property
262 def length(self):
264 def length(self):
263 """
265 """
264 returns int.
266 returns int.
265
267
266 This is the lenght of the que of chunks, not the length of
268 This is the lenght of the que of chunks, not the length of
267 the combined contents in those chunks.
269 the combined contents in those chunks.
268
270
269 __len__() cannot be meaningfully implemented because this
271 __len__() cannot be meaningfully implemented because this
270 reader is just flying throuh a bottomless pit content and
272 reader is just flying throuh a bottomless pit content and
271 can only know the lenght of what it already saw.
273 can only know the lenght of what it already saw.
272
274
273 If __len__() on WSGI server per PEP 3333 returns a value,
275 If __len__() on WSGI server per PEP 3333 returns a value,
274 the responce's length will be set to that. In order not to
276 the responce's length will be set to that. In order not to
275 confuse WSGI PEP3333 servers, we will not implement __len__
277 confuse WSGI PEP3333 servers, we will not implement __len__
276 at all.
278 at all.
277 """
279 """
278 return len(self.data)
280 return len(self.data)
279
281
280 def prepend(self, x):
282 def prepend(self, x):
281 self.data.appendleft(x)
283 self.data.appendleft(x)
282
284
283 def append(self, x):
285 def append(self, x):
284 self.data.append(x)
286 self.data.append(x)
285
287
286 def extend(self, o):
288 def extend(self, o):
287 self.data.extend(o)
289 self.data.extend(o)
288
290
289 def __getitem__(self, i):
291 def __getitem__(self, i):
290 return self.data[i]
292 return self.data[i]
291
293
292
294
293 class SubprocessIOChunker(object):
295 class SubprocessIOChunker(object):
294 """
296 """
295 Processor class wrapping handling of subprocess IO.
297 Processor class wrapping handling of subprocess IO.
296
298
297 .. important::
299 .. important::
298
300
299 Watch out for the method `__del__` on this class. If this object
301 Watch out for the method `__del__` on this class. If this object
300 is deleted, it will kill the subprocess, so avoid to
302 is deleted, it will kill the subprocess, so avoid to
301 return the `output` attribute or usage of it like in the following
303 return the `output` attribute or usage of it like in the following
302 example::
304 example::
303
305
304 # `args` expected to run a program that produces a lot of output
306 # `args` expected to run a program that produces a lot of output
305 output = ''.join(SubprocessIOChunker(
307 output = ''.join(SubprocessIOChunker(
306 args, shell=False, inputstream=inputstream, env=environ).output)
308 args, shell=False, inputstream=inputstream, env=environ).output)
307
309
308 # `output` will not contain all the data, because the __del__ method
310 # `output` will not contain all the data, because the __del__ method
309 # has already killed the subprocess in this case before all output
311 # has already killed the subprocess in this case before all output
310 # has been consumed.
312 # has been consumed.
311
313
312
314
313
315
314 In a way, this is a "communicate()" replacement with a twist.
316 In a way, this is a "communicate()" replacement with a twist.
315
317
316 - We are multithreaded. Writing in and reading out, err are all sep threads.
318 - We are multithreaded. Writing in and reading out, err are all sep threads.
317 - We support concurrent (in and out) stream processing.
319 - We support concurrent (in and out) stream processing.
318 - The output is not a stream. It's a queue of read string (bytes, not unicode)
320 - The output is not a stream. It's a queue of read string (bytes, not unicode)
319 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
321 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
320 - We are non-blocking in more respects than communicate()
322 - We are non-blocking in more respects than communicate()
321 (reading from subprocess out pauses when internal buffer is full, but
323 (reading from subprocess out pauses when internal buffer is full, but
322 does not block the parent calling code. On the flip side, reading from
324 does not block the parent calling code. On the flip side, reading from
323 slow-yielding subprocess may block the iteration until data shows up. This
325 slow-yielding subprocess may block the iteration until data shows up. This
324 does not block the parallel inpipe reading occurring parallel thread.)
326 does not block the parallel inpipe reading occurring parallel thread.)
325
327
326 The purpose of the object is to allow us to wrap subprocess interactions into
328 The purpose of the object is to allow us to wrap subprocess interactions into
327 and interable that can be passed to a WSGI server as the application's return
329 and interable that can be passed to a WSGI server as the application's return
328 value. Because of stream-processing-ability, WSGI does not have to read ALL
330 value. Because of stream-processing-ability, WSGI does not have to read ALL
329 of the subprocess's output and buffer it, before handing it to WSGI server for
331 of the subprocess's output and buffer it, before handing it to WSGI server for
330 HTTP response. Instead, the class initializer reads just a bit of the stream
332 HTTP response. Instead, the class initializer reads just a bit of the stream
331 to figure out if error ocurred or likely to occur and if not, just hands the
333 to figure out if error ocurred or likely to occur and if not, just hands the
332 further iteration over subprocess output to the server for completion of HTTP
334 further iteration over subprocess output to the server for completion of HTTP
333 response.
335 response.
334
336
335 The real or perceived subprocess error is trapped and raised as one of
337 The real or perceived subprocess error is trapped and raised as one of
336 EnvironmentError family of exceptions
338 EnvironmentError family of exceptions
337
339
338 Example usage:
340 Example usage:
339 # try:
341 # try:
340 # answer = SubprocessIOChunker(
342 # answer = SubprocessIOChunker(
341 # cmd,
343 # cmd,
342 # input,
344 # input,
343 # buffer_size = 65536,
345 # buffer_size = 65536,
344 # chunk_size = 4096
346 # chunk_size = 4096
345 # )
347 # )
346 # except (EnvironmentError) as e:
348 # except (EnvironmentError) as e:
347 # print str(e)
349 # print str(e)
348 # raise e
350 # raise e
349 #
351 #
350 # return answer
352 # return answer
351
353
352
354
353 """
355 """
354
356
355 # TODO: johbo: This is used to make sure that the open end of the PIPE
357 # TODO: johbo: This is used to make sure that the open end of the PIPE
356 # is closed in the end. It would be way better to wrap this into an
358 # is closed in the end. It would be way better to wrap this into an
357 # object, so that it is closed automatically once it is consumed or
359 # object, so that it is closed automatically once it is consumed or
358 # something similar.
360 # something similar.
359 _close_input_fd = None
361 _close_input_fd = None
360
362
361 _closed = False
363 _closed = False
362
364
363 def __init__(self, cmd, inputstream=None, buffer_size=65536,
365 def __init__(self, cmd, inputstream=None, buffer_size=65536,
364 chunk_size=4096, starting_values=None, fail_on_stderr=True,
366 chunk_size=4096, starting_values=None, fail_on_stderr=True,
365 fail_on_return_code=True, **kwargs):
367 fail_on_return_code=True, **kwargs):
366 """
368 """
367 Initializes SubprocessIOChunker
369 Initializes SubprocessIOChunker
368
370
369 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
371 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
370 :param inputstream: (Default: None) A file-like, string, or file pointer.
372 :param inputstream: (Default: None) A file-like, string, or file pointer.
371 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
373 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
372 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
374 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
373 :param starting_values: (Default: []) An array of strings to put in front of output que.
375 :param starting_values: (Default: []) An array of strings to put in front of output que.
374 :param fail_on_stderr: (Default: True) Whether to raise an exception in
376 :param fail_on_stderr: (Default: True) Whether to raise an exception in
375 case something is written to stderr.
377 case something is written to stderr.
376 :param fail_on_return_code: (Default: True) Whether to raise an
378 :param fail_on_return_code: (Default: True) Whether to raise an
377 exception if the return code is not 0.
379 exception if the return code is not 0.
378 """
380 """
379
381
380 starting_values = starting_values or []
382 starting_values = starting_values or []
381 if inputstream:
383 if inputstream:
382 input_streamer = StreamFeeder(inputstream)
384 input_streamer = StreamFeeder(inputstream)
383 input_streamer.start()
385 input_streamer.start()
384 inputstream = input_streamer.output
386 inputstream = input_streamer.output
385 self._close_input_fd = inputstream
387 self._close_input_fd = inputstream
386
388
387 self._fail_on_stderr = fail_on_stderr
389 self._fail_on_stderr = fail_on_stderr
388 self._fail_on_return_code = fail_on_return_code
390 self._fail_on_return_code = fail_on_return_code
389
391
390 _shell = kwargs.get('shell', True)
392 _shell = kwargs.get('shell', True)
391 kwargs['shell'] = _shell
393 kwargs['shell'] = _shell
392
394
393 _p = subprocess.Popen(cmd, bufsize=-1,
395 _p = subprocess.Popen(cmd, bufsize=-1,
394 stdin=inputstream,
396 stdin=inputstream,
395 stdout=subprocess.PIPE,
397 stdout=subprocess.PIPE,
396 stderr=subprocess.PIPE,
398 stderr=subprocess.PIPE,
397 **kwargs)
399 **kwargs)
398
400
399 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
401 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
400 starting_values)
402 starting_values)
401 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
403 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
402
404
403 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
405 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
404 # doing this until we reach either end of file, or end of buffer.
406 # doing this until we reach either end of file, or end of buffer.
405 bg_out.data_added_event.wait(1)
407 bg_out.data_added_event.wait(1)
406 bg_out.data_added_event.clear()
408 bg_out.data_added_event.clear()
407
409
408 # at this point it's still ambiguous if we are done reading or just full buffer.
410 # at this point it's still ambiguous if we are done reading or just full buffer.
409 # Either way, if error (returned by ended process, or implied based on
411 # Either way, if error (returned by ended process, or implied based on
410 # presence of stuff in stderr output) we error out.
412 # presence of stuff in stderr output) we error out.
411 # Else, we are happy.
413 # Else, we are happy.
412 _returncode = _p.poll()
414 _returncode = _p.poll()
413
415
414 if ((_returncode and fail_on_return_code) or
416 if ((_returncode and fail_on_return_code) or
415 (fail_on_stderr and _returncode is None and bg_err.length)):
417 (fail_on_stderr and _returncode is None and bg_err.length)):
416 try:
418 try:
417 _p.terminate()
419 _p.terminate()
418 except Exception:
420 except Exception:
419 pass
421 pass
420 bg_out.stop()
422 bg_out.stop()
421 bg_err.stop()
423 bg_err.stop()
422 if fail_on_stderr:
424 if fail_on_stderr:
423 err = ''.join(bg_err)
425 err = ''.join(bg_err)
424 raise EnvironmentError(
426 raise EnvironmentError(
425 "Subprocess exited due to an error:\n" + err)
427 "Subprocess exited due to an error:\n" + err)
426 if _returncode and fail_on_return_code:
428 if _returncode and fail_on_return_code:
427 err = ''.join(bg_err)
429 err = ''.join(bg_err)
428 if not err:
430 if not err:
429 # maybe get empty stderr, try stdout instead
431 # maybe get empty stderr, try stdout instead
430 # in many cases git reports the errors on stdout too
432 # in many cases git reports the errors on stdout too
431 err = ''.join(bg_out)
433 err = ''.join(bg_out)
432 raise EnvironmentError(
434 raise EnvironmentError(
433 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
435 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
434 _returncode, err))
436 _returncode, err))
435
437
436 self.process = _p
438 self.process = _p
437 self.output = bg_out
439 self.output = bg_out
438 self.error = bg_err
440 self.error = bg_err
441 self.inputstream = inputstream
439
442
440 def __iter__(self):
443 def __iter__(self):
441 return self
444 return self
442
445
443 def next(self):
446 def next(self):
444 # Note: mikhail: We need to be sure that we are checking the return
447 # Note: mikhail: We need to be sure that we are checking the return
445 # code after the stdout stream is closed. Some processes, e.g. git
448 # code after the stdout stream is closed. Some processes, e.g. git
446 # are doing some magic in between closing stdout and terminating the
449 # are doing some magic in between closing stdout and terminating the
447 # process and, as a result, we are not getting return code on "slow"
450 # process and, as a result, we are not getting return code on "slow"
448 # systems.
451 # systems.
449 result = None
452 result = None
450 stop_iteration = None
453 stop_iteration = None
451 try:
454 try:
452 result = self.output.next()
455 result = self.output.next()
453 except StopIteration as e:
456 except StopIteration as e:
454 stop_iteration = e
457 stop_iteration = e
455
458
456 if self.process.poll() and self._fail_on_return_code:
459 if self.process.poll() and self._fail_on_return_code:
457 err = '%s' % ''.join(self.error)
460 err = '%s' % ''.join(self.error)
458 raise EnvironmentError(
461 raise EnvironmentError(
459 "Subprocess exited due to an error:\n" + err)
462 "Subprocess exited due to an error:\n" + err)
460
463
461 if stop_iteration:
464 if stop_iteration:
462 raise stop_iteration
465 raise stop_iteration
463 return result
466 return result
464
467
465 def throw(self, type, value=None, traceback=None):
468 def throw(self, type, value=None, traceback=None):
466 if self.output.length or not self.output.done_reading:
469 if self.output.length or not self.output.done_reading:
467 raise type(value)
470 raise type(value)
468
471
469 def close(self):
472 def close(self):
470 if self._closed:
473 if self._closed:
471 return
474 return
472 self._closed = True
475 self._closed = True
473 try:
476 try:
474 self.process.terminate()
477 self.process.terminate()
475 except:
478 except:
476 pass
479 pass
477 if self._close_input_fd:
480 if self._close_input_fd:
478 os.close(self._close_input_fd)
481 os.close(self._close_input_fd)
479 try:
482 try:
480 self.output.close()
483 self.output.close()
481 except:
484 except:
482 pass
485 pass
483 try:
486 try:
484 self.error.close()
487 self.error.close()
485 except:
488 except:
486 pass
489 pass
490 try:
491 os.close(self.inputstream)
492 except:
493 pass
487
494
488 def __del__(self):
495 def __del__(self):
489 self.close()
496 self.close()
490
497
491
498
492 def run_command(arguments, env=None):
499 def run_command(arguments, env=None):
493 """
500 """
494 Run the specified command and return the stdout.
501 Run the specified command and return the stdout.
495
502
496 :param arguments: sequence of program arguments (including the program name)
503 :param arguments: sequence of program arguments (including the program name)
497 :type arguments: list[str]
504 :type arguments: list[str]
498 """
505 """
499
506
500 cmd = arguments
507 cmd = arguments
501 log.debug('Running subprocessio command %s', cmd)
508 log.debug('Running subprocessio command %s', cmd)
502 try:
509 try:
503 _opts = {'shell': False, 'fail_on_stderr': False}
510 _opts = {'shell': False, 'fail_on_stderr': False}
504 if env:
511 if env:
505 _opts.update({'env': env})
512 _opts.update({'env': env})
506 p = SubprocessIOChunker(cmd, **_opts)
513 p = SubprocessIOChunker(cmd, **_opts)
507 stdout = ''.join(p)
514 stdout = ''.join(p)
508 stderr = ''.join(''.join(p.error))
515 stderr = ''.join(''.join(p.error))
509 except (EnvironmentError, OSError) as err:
516 except (EnvironmentError, OSError) as err:
510 cmd = ' '.join(cmd) # human friendly CMD
517 cmd = ' '.join(cmd) # human friendly CMD
511 tb_err = ("Couldn't run subprocessio command (%s).\n"
518 tb_err = ("Couldn't run subprocessio command (%s).\n"
512 "Original error was:%s\n" % (cmd, err))
519 "Original error was:%s\n" % (cmd, err))
513 log.exception(tb_err)
520 log.exception(tb_err)
514 raise Exception(tb_err)
521 raise Exception(tb_err)
515
522
516 return stdout, stderr
523 return stdout, stderr
General Comments 0
You need to be logged in to leave comments. Login now