##// END OF EJS Templates
py3: drop subprocess32 backport.
marcink -
r977:f12b9f4d python3
parent child Browse files
Show More
@@ -1,46 +1,45 b''
1 ## dependencies
1 ## dependencies
2
2
3 # our custom configobj
3 # our custom configobj
4 https://code.rhodecode.com/upstream/configobj/artifacts/download/0-012de99a-b1e1-4f64-a5c0-07a98a41b324.tar.gz?md5=6a513f51fe04b2c18cf84c1395a7c626#egg=configobj==5.0.6
4 https://code.rhodecode.com/upstream/configobj/artifacts/download/0-012de99a-b1e1-4f64-a5c0-07a98a41b324.tar.gz?md5=6a513f51fe04b2c18cf84c1395a7c626#egg=configobj==5.0.6
5
5
6 dogpile.cache==0.9.0
6 dogpile.cache==0.9.0
7 dogpile.core==0.4.1
7 dogpile.core==0.4.1
8 decorator==4.1.2
8 decorator==4.1.2
9 dulwich==0.13.0
9 dulwich==0.13.0
10 hgsubversion==1.9.3
10 hgsubversion==1.9.3
11 hg-evolve==9.1.0
11 hg-evolve==9.1.0
12
12
13 mercurial==5.1.1
13 mercurial==5.1.1
14 msgpack-python==0.5.6
14 msgpack-python==0.5.6
15
15
16 pastedeploy==2.1.0
16 pastedeploy==2.1.0
17 pyramid==1.10.4
17 pyramid==1.10.4
18 pygit2==0.28.2
18 pygit2==0.28.2
19
19
20 repoze.lru==0.7
20 repoze.lru==0.7
21 redis==3.4.1
21 redis==3.4.1
22 simplejson==3.16.0
22 simplejson==3.16.0
23 subprocess32==3.5.4
24 subvertpy==0.10.1
23 subvertpy==0.10.1
25
24
26 six==1.11.0
25 six==1.11.0
27 translationstring==1.3
26 translationstring==1.3
28 webob==1.8.5
27 webob==1.8.5
29 zope.deprecation==4.4.0
28 zope.deprecation==4.4.0
30 zope.interface==4.6.0
29 zope.interface==4.6.0
31
30
32 ## http servers
31 ## http servers
33 gevent==1.5.0
32 gevent==1.5.0
34 greenlet==0.4.15
33 greenlet==0.4.15
35 gunicorn==19.9.0
34 gunicorn==19.9.0
36 waitress==1.3.1
35 waitress==1.3.1
37
36
38 ## debug
37 ## debug
39 ipdb==0.13.2
38 ipdb==0.13.2
40 ipython==5.10.0
39 ipython==5.10.0
41
40
42 ## test related requirements
41 ## test related requirements
43 -r requirements_test.txt
42 -r requirements_test.txt
44
43
45 ## uncomment to add the debug libraries
44 ## uncomment to add the debug libraries
46 #-r requirements_debug.txt
45 #-r requirements_debug.txt
@@ -1,519 +1,519 b''
1 """
1 """
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 """
24 """
25 import os
25 import os
26 import logging
26 import logging
27 import subprocess32 as subprocess
27 import subprocess
28 from collections import deque
28 from collections import deque
29 from threading import Event, Thread
29 from threading import Event, Thread
30
30
31 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
32
32
33
33
34 class StreamFeeder(Thread):
34 class StreamFeeder(Thread):
35 """
35 """
36 Normal writing into pipe-like is blocking once the buffer is filled.
36 Normal writing into pipe-like is blocking once the buffer is filled.
37 This thread allows a thread to seep data from a file-like into a pipe
37 This thread allows a thread to seep data from a file-like into a pipe
38 without blocking the main thread.
38 without blocking the main thread.
39 We close inpipe once the end of the source stream is reached.
39 We close inpipe once the end of the source stream is reached.
40 """
40 """
41
41
42 def __init__(self, source):
42 def __init__(self, source):
43 super(StreamFeeder, self).__init__()
43 super(StreamFeeder, self).__init__()
44 self.daemon = True
44 self.daemon = True
45 filelike = False
45 filelike = False
46 self.bytes = bytes()
46 self.bytes = bytes()
47 if type(source) in (type(''), bytes, bytearray): # string-like
47 if type(source) in (type(''), bytes, bytearray): # string-like
48 self.bytes = bytes(source)
48 self.bytes = bytes(source)
49 else: # can be either file pointer or file-like
49 else: # can be either file pointer or file-like
50 if type(source) in (int, long): # file pointer it is
50 if type(source) in (int, long): # file pointer it is
51 # converting file descriptor (int) stdin into file-like
51 # converting file descriptor (int) stdin into file-like
52 try:
52 try:
53 source = os.fdopen(source, 'rb', 16384)
53 source = os.fdopen(source, 'rb', 16384)
54 except Exception:
54 except Exception:
55 pass
55 pass
56 # let's see if source is file-like by now
56 # let's see if source is file-like by now
57 try:
57 try:
58 filelike = source.read
58 filelike = source.read
59 except Exception:
59 except Exception:
60 pass
60 pass
61 if not filelike and not self.bytes:
61 if not filelike and not self.bytes:
62 raise TypeError("StreamFeeder's source object must be a readable "
62 raise TypeError("StreamFeeder's source object must be a readable "
63 "file-like, a file descriptor, or a string-like.")
63 "file-like, a file descriptor, or a string-like.")
64 self.source = source
64 self.source = source
65 self.readiface, self.writeiface = os.pipe()
65 self.readiface, self.writeiface = os.pipe()
66
66
67 def run(self):
67 def run(self):
68 t = self.writeiface
68 t = self.writeiface
69 try:
69 try:
70 if self.bytes:
70 if self.bytes:
71 os.write(t, self.bytes)
71 os.write(t, self.bytes)
72 else:
72 else:
73 s = self.source
73 s = self.source
74 b = s.read(4096)
74 b = s.read(4096)
75 while b:
75 while b:
76 os.write(t, b)
76 os.write(t, b)
77 b = s.read(4096)
77 b = s.read(4096)
78 finally:
78 finally:
79 os.close(t)
79 os.close(t)
80
80
81 @property
81 @property
82 def output(self):
82 def output(self):
83 return self.readiface
83 return self.readiface
84
84
85
85
86 class InputStreamChunker(Thread):
86 class InputStreamChunker(Thread):
87 def __init__(self, source, target, buffer_size, chunk_size):
87 def __init__(self, source, target, buffer_size, chunk_size):
88
88
89 super(InputStreamChunker, self).__init__()
89 super(InputStreamChunker, self).__init__()
90
90
91 self.daemon = True # die die die.
91 self.daemon = True # die die die.
92
92
93 self.source = source
93 self.source = source
94 self.target = target
94 self.target = target
95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
95 self.chunk_count_max = int(buffer_size / chunk_size) + 1
96 self.chunk_size = chunk_size
96 self.chunk_size = chunk_size
97
97
98 self.data_added = Event()
98 self.data_added = Event()
99 self.data_added.clear()
99 self.data_added.clear()
100
100
101 self.keep_reading = Event()
101 self.keep_reading = Event()
102 self.keep_reading.set()
102 self.keep_reading.set()
103
103
104 self.EOF = Event()
104 self.EOF = Event()
105 self.EOF.clear()
105 self.EOF.clear()
106
106
107 self.go = Event()
107 self.go = Event()
108 self.go.set()
108 self.go.set()
109
109
110 def stop(self):
110 def stop(self):
111 self.go.clear()
111 self.go.clear()
112 self.EOF.set()
112 self.EOF.set()
113 try:
113 try:
114 # this is not proper, but is done to force the reader thread let
114 # this is not proper, but is done to force the reader thread let
115 # go of the input because, if successful, .close() will send EOF
115 # go of the input because, if successful, .close() will send EOF
116 # down the pipe.
116 # down the pipe.
117 self.source.close()
117 self.source.close()
118 except:
118 except:
119 pass
119 pass
120
120
121 def run(self):
121 def run(self):
122 s = self.source
122 s = self.source
123 t = self.target
123 t = self.target
124 cs = self.chunk_size
124 cs = self.chunk_size
125 chunk_count_max = self.chunk_count_max
125 chunk_count_max = self.chunk_count_max
126 keep_reading = self.keep_reading
126 keep_reading = self.keep_reading
127 da = self.data_added
127 da = self.data_added
128 go = self.go
128 go = self.go
129
129
130 try:
130 try:
131 b = s.read(cs)
131 b = s.read(cs)
132 except ValueError:
132 except ValueError:
133 b = ''
133 b = ''
134
134
135 timeout_input = 20
135 timeout_input = 20
136 while b and go.is_set():
136 while b and go.is_set():
137 if len(t) > chunk_count_max:
137 if len(t) > chunk_count_max:
138 keep_reading.clear()
138 keep_reading.clear()
139 keep_reading.wait(timeout_input)
139 keep_reading.wait(timeout_input)
140 if len(t) > chunk_count_max + timeout_input:
140 if len(t) > chunk_count_max + timeout_input:
141 log.error("Timed out while waiting for input from subprocess.")
141 log.error("Timed out while waiting for input from subprocess.")
142 os._exit(-1) # this will cause the worker to recycle itself
142 os._exit(-1) # this will cause the worker to recycle itself
143
143
144 t.append(b)
144 t.append(b)
145 da.set()
145 da.set()
146
146
147 try:
147 try:
148 b = s.read(cs)
148 b = s.read(cs)
149 except ValueError:
149 except ValueError:
150 b = ''
150 b = ''
151
151
152 self.EOF.set()
152 self.EOF.set()
153 da.set() # for cases when done but there was no input.
153 da.set() # for cases when done but there was no input.
154
154
155
155
156 class BufferedGenerator(object):
156 class BufferedGenerator(object):
157 """
157 """
158 Class behaves as a non-blocking, buffered pipe reader.
158 Class behaves as a non-blocking, buffered pipe reader.
159 Reads chunks of data (through a thread)
159 Reads chunks of data (through a thread)
160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
160 from a blocking pipe, and attaches these to an array (Deque) of chunks.
161 Reading is halted in the thread when max chunks is internally buffered.
161 Reading is halted in the thread when max chunks is internally buffered.
162 The .next() may operate in blocking or non-blocking fashion by yielding
162 The .next() may operate in blocking or non-blocking fashion by yielding
163 '' if no data is ready
163 '' if no data is ready
164 to be sent or by not returning until there is some data to send
164 to be sent or by not returning until there is some data to send
165 When we get EOF from underlying source pipe we raise the marker to raise
165 When we get EOF from underlying source pipe we raise the marker to raise
166 StopIteration after the last chunk of data is yielded.
166 StopIteration after the last chunk of data is yielded.
167 """
167 """
168
168
169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
169 def __init__(self, source, buffer_size=65536, chunk_size=4096,
170 starting_values=None, bottomless=False):
170 starting_values=None, bottomless=False):
171 starting_values = starting_values or []
171 starting_values = starting_values or []
172
172
173 if bottomless:
173 if bottomless:
174 maxlen = int(buffer_size / chunk_size)
174 maxlen = int(buffer_size / chunk_size)
175 else:
175 else:
176 maxlen = None
176 maxlen = None
177
177
178 self.data = deque(starting_values, maxlen)
178 self.data = deque(starting_values, maxlen)
179 self.worker = InputStreamChunker(source, self.data, buffer_size,
179 self.worker = InputStreamChunker(source, self.data, buffer_size,
180 chunk_size)
180 chunk_size)
181 if starting_values:
181 if starting_values:
182 self.worker.data_added.set()
182 self.worker.data_added.set()
183 self.worker.start()
183 self.worker.start()
184
184
185 ####################
185 ####################
186 # Generator's methods
186 # Generator's methods
187 ####################
187 ####################
188
188
189 def __iter__(self):
189 def __iter__(self):
190 return self
190 return self
191
191
192 def next(self):
192 def next(self):
193 while not len(self.data) and not self.worker.EOF.is_set():
193 while not len(self.data) and not self.worker.EOF.is_set():
194 self.worker.data_added.clear()
194 self.worker.data_added.clear()
195 self.worker.data_added.wait(0.2)
195 self.worker.data_added.wait(0.2)
196 if len(self.data):
196 if len(self.data):
197 self.worker.keep_reading.set()
197 self.worker.keep_reading.set()
198 return bytes(self.data.popleft())
198 return bytes(self.data.popleft())
199 elif self.worker.EOF.is_set():
199 elif self.worker.EOF.is_set():
200 raise StopIteration
200 raise StopIteration
201
201
202 def throw(self, exc_type, value=None, traceback=None):
202 def throw(self, exc_type, value=None, traceback=None):
203 if not self.worker.EOF.is_set():
203 if not self.worker.EOF.is_set():
204 raise exc_type(value)
204 raise exc_type(value)
205
205
206 def start(self):
206 def start(self):
207 self.worker.start()
207 self.worker.start()
208
208
209 def stop(self):
209 def stop(self):
210 self.worker.stop()
210 self.worker.stop()
211
211
212 def close(self):
212 def close(self):
213 try:
213 try:
214 self.worker.stop()
214 self.worker.stop()
215 self.throw(GeneratorExit)
215 self.throw(GeneratorExit)
216 except (GeneratorExit, StopIteration):
216 except (GeneratorExit, StopIteration):
217 pass
217 pass
218
218
219 ####################
219 ####################
220 # Threaded reader's infrastructure.
220 # Threaded reader's infrastructure.
221 ####################
221 ####################
222 @property
222 @property
223 def input(self):
223 def input(self):
224 return self.worker.w
224 return self.worker.w
225
225
226 @property
226 @property
227 def data_added_event(self):
227 def data_added_event(self):
228 return self.worker.data_added
228 return self.worker.data_added
229
229
230 @property
230 @property
231 def data_added(self):
231 def data_added(self):
232 return self.worker.data_added.is_set()
232 return self.worker.data_added.is_set()
233
233
234 @property
234 @property
235 def reading_paused(self):
235 def reading_paused(self):
236 return not self.worker.keep_reading.is_set()
236 return not self.worker.keep_reading.is_set()
237
237
238 @property
238 @property
239 def done_reading_event(self):
239 def done_reading_event(self):
240 """
240 """
241 Done_reding does not mean that the iterator's buffer is empty.
241 Done_reding does not mean that the iterator's buffer is empty.
242 Iterator might have done reading from underlying source, but the read
242 Iterator might have done reading from underlying source, but the read
243 chunks might still be available for serving through .next() method.
243 chunks might still be available for serving through .next() method.
244
244
245 :returns: An Event class instance.
245 :returns: An Event class instance.
246 """
246 """
247 return self.worker.EOF
247 return self.worker.EOF
248
248
249 @property
249 @property
250 def done_reading(self):
250 def done_reading(self):
251 """
251 """
252 Done_reding does not mean that the iterator's buffer is empty.
252 Done_reding does not mean that the iterator's buffer is empty.
253 Iterator might have done reading from underlying source, but the read
253 Iterator might have done reading from underlying source, but the read
254 chunks might still be available for serving through .next() method.
254 chunks might still be available for serving through .next() method.
255
255
256 :returns: An Bool value.
256 :returns: An Bool value.
257 """
257 """
258 return self.worker.EOF.is_set()
258 return self.worker.EOF.is_set()
259
259
260 @property
260 @property
261 def length(self):
261 def length(self):
262 """
262 """
263 returns int.
263 returns int.
264
264
265 This is the lenght of the que of chunks, not the length of
265 This is the lenght of the que of chunks, not the length of
266 the combined contents in those chunks.
266 the combined contents in those chunks.
267
267
268 __len__() cannot be meaningfully implemented because this
268 __len__() cannot be meaningfully implemented because this
269 reader is just flying throuh a bottomless pit content and
269 reader is just flying throuh a bottomless pit content and
270 can only know the lenght of what it already saw.
270 can only know the lenght of what it already saw.
271
271
272 If __len__() on WSGI server per PEP 3333 returns a value,
272 If __len__() on WSGI server per PEP 3333 returns a value,
273 the responce's length will be set to that. In order not to
273 the responce's length will be set to that. In order not to
274 confuse WSGI PEP3333 servers, we will not implement __len__
274 confuse WSGI PEP3333 servers, we will not implement __len__
275 at all.
275 at all.
276 """
276 """
277 return len(self.data)
277 return len(self.data)
278
278
279 def prepend(self, x):
279 def prepend(self, x):
280 self.data.appendleft(x)
280 self.data.appendleft(x)
281
281
282 def append(self, x):
282 def append(self, x):
283 self.data.append(x)
283 self.data.append(x)
284
284
285 def extend(self, o):
285 def extend(self, o):
286 self.data.extend(o)
286 self.data.extend(o)
287
287
288 def __getitem__(self, i):
288 def __getitem__(self, i):
289 return self.data[i]
289 return self.data[i]
290
290
291
291
292 class SubprocessIOChunker(object):
292 class SubprocessIOChunker(object):
293 """
293 """
294 Processor class wrapping handling of subprocess IO.
294 Processor class wrapping handling of subprocess IO.
295
295
296 .. important::
296 .. important::
297
297
298 Watch out for the method `__del__` on this class. If this object
298 Watch out for the method `__del__` on this class. If this object
299 is deleted, it will kill the subprocess, so avoid to
299 is deleted, it will kill the subprocess, so avoid to
300 return the `output` attribute or usage of it like in the following
300 return the `output` attribute or usage of it like in the following
301 example::
301 example::
302
302
303 # `args` expected to run a program that produces a lot of output
303 # `args` expected to run a program that produces a lot of output
304 output = ''.join(SubprocessIOChunker(
304 output = ''.join(SubprocessIOChunker(
305 args, shell=False, inputstream=inputstream, env=environ).output)
305 args, shell=False, inputstream=inputstream, env=environ).output)
306
306
307 # `output` will not contain all the data, because the __del__ method
307 # `output` will not contain all the data, because the __del__ method
308 # has already killed the subprocess in this case before all output
308 # has already killed the subprocess in this case before all output
309 # has been consumed.
309 # has been consumed.
310
310
311
311
312
312
313 In a way, this is a "communicate()" replacement with a twist.
313 In a way, this is a "communicate()" replacement with a twist.
314
314
315 - We are multithreaded. Writing in and reading out, err are all sep threads.
315 - We are multithreaded. Writing in and reading out, err are all sep threads.
316 - We support concurrent (in and out) stream processing.
316 - We support concurrent (in and out) stream processing.
317 - The output is not a stream. It's a queue of read string (bytes, not unicode)
317 - The output is not a stream. It's a queue of read string (bytes, not unicode)
318 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
318 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
319 - We are non-blocking in more respects than communicate()
319 - We are non-blocking in more respects than communicate()
320 (reading from subprocess out pauses when internal buffer is full, but
320 (reading from subprocess out pauses when internal buffer is full, but
321 does not block the parent calling code. On the flip side, reading from
321 does not block the parent calling code. On the flip side, reading from
322 slow-yielding subprocess may block the iteration until data shows up. This
322 slow-yielding subprocess may block the iteration until data shows up. This
323 does not block the parallel inpipe reading occurring parallel thread.)
323 does not block the parallel inpipe reading occurring parallel thread.)
324
324
325 The purpose of the object is to allow us to wrap subprocess interactions into
325 The purpose of the object is to allow us to wrap subprocess interactions into
326 and interable that can be passed to a WSGI server as the application's return
326 and interable that can be passed to a WSGI server as the application's return
327 value. Because of stream-processing-ability, WSGI does not have to read ALL
327 value. Because of stream-processing-ability, WSGI does not have to read ALL
328 of the subprocess's output and buffer it, before handing it to WSGI server for
328 of the subprocess's output and buffer it, before handing it to WSGI server for
329 HTTP response. Instead, the class initializer reads just a bit of the stream
329 HTTP response. Instead, the class initializer reads just a bit of the stream
330 to figure out if error ocurred or likely to occur and if not, just hands the
330 to figure out if error ocurred or likely to occur and if not, just hands the
331 further iteration over subprocess output to the server for completion of HTTP
331 further iteration over subprocess output to the server for completion of HTTP
332 response.
332 response.
333
333
334 The real or perceived subprocess error is trapped and raised as one of
334 The real or perceived subprocess error is trapped and raised as one of
335 EnvironmentError family of exceptions
335 EnvironmentError family of exceptions
336
336
337 Example usage:
337 Example usage:
338 # try:
338 # try:
339 # answer = SubprocessIOChunker(
339 # answer = SubprocessIOChunker(
340 # cmd,
340 # cmd,
341 # input,
341 # input,
342 # buffer_size = 65536,
342 # buffer_size = 65536,
343 # chunk_size = 4096
343 # chunk_size = 4096
344 # )
344 # )
345 # except (EnvironmentError) as e:
345 # except (EnvironmentError) as e:
346 # print str(e)
346 # print str(e)
347 # raise e
347 # raise e
348 #
348 #
349 # return answer
349 # return answer
350
350
351
351
352 """
352 """
353
353
354 # TODO: johbo: This is used to make sure that the open end of the PIPE
354 # TODO: johbo: This is used to make sure that the open end of the PIPE
355 # is closed in the end. It would be way better to wrap this into an
355 # is closed in the end. It would be way better to wrap this into an
356 # object, so that it is closed automatically once it is consumed or
356 # object, so that it is closed automatically once it is consumed or
357 # something similar.
357 # something similar.
358 _close_input_fd = None
358 _close_input_fd = None
359
359
360 _closed = False
360 _closed = False
361
361
362 def __init__(self, cmd, inputstream=None, buffer_size=65536,
362 def __init__(self, cmd, inputstream=None, buffer_size=65536,
363 chunk_size=4096, starting_values=None, fail_on_stderr=True,
363 chunk_size=4096, starting_values=None, fail_on_stderr=True,
364 fail_on_return_code=True, **kwargs):
364 fail_on_return_code=True, **kwargs):
365 """
365 """
366 Initializes SubprocessIOChunker
366 Initializes SubprocessIOChunker
367
367
368 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
368 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
369 :param inputstream: (Default: None) A file-like, string, or file pointer.
369 :param inputstream: (Default: None) A file-like, string, or file pointer.
370 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
370 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
371 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
371 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
372 :param starting_values: (Default: []) An array of strings to put in front of output que.
372 :param starting_values: (Default: []) An array of strings to put in front of output que.
373 :param fail_on_stderr: (Default: True) Whether to raise an exception in
373 :param fail_on_stderr: (Default: True) Whether to raise an exception in
374 case something is written to stderr.
374 case something is written to stderr.
375 :param fail_on_return_code: (Default: True) Whether to raise an
375 :param fail_on_return_code: (Default: True) Whether to raise an
376 exception if the return code is not 0.
376 exception if the return code is not 0.
377 """
377 """
378
378
379 starting_values = starting_values or []
379 starting_values = starting_values or []
380 if inputstream:
380 if inputstream:
381 input_streamer = StreamFeeder(inputstream)
381 input_streamer = StreamFeeder(inputstream)
382 input_streamer.start()
382 input_streamer.start()
383 inputstream = input_streamer.output
383 inputstream = input_streamer.output
384 self._close_input_fd = inputstream
384 self._close_input_fd = inputstream
385
385
386 self._fail_on_stderr = fail_on_stderr
386 self._fail_on_stderr = fail_on_stderr
387 self._fail_on_return_code = fail_on_return_code
387 self._fail_on_return_code = fail_on_return_code
388
388
389 _shell = kwargs.get('shell', True)
389 _shell = kwargs.get('shell', True)
390 kwargs['shell'] = _shell
390 kwargs['shell'] = _shell
391
391
392 _p = subprocess.Popen(cmd, bufsize=-1,
392 _p = subprocess.Popen(cmd, bufsize=-1,
393 stdin=inputstream,
393 stdin=inputstream,
394 stdout=subprocess.PIPE,
394 stdout=subprocess.PIPE,
395 stderr=subprocess.PIPE,
395 stderr=subprocess.PIPE,
396 **kwargs)
396 **kwargs)
397
397
398 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
398 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
399 starting_values)
399 starting_values)
400 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
400 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
401
401
402 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
402 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
403 # doing this until we reach either end of file, or end of buffer.
403 # doing this until we reach either end of file, or end of buffer.
404 bg_out.data_added_event.wait(1)
404 bg_out.data_added_event.wait(1)
405 bg_out.data_added_event.clear()
405 bg_out.data_added_event.clear()
406
406
407 # at this point it's still ambiguous if we are done reading or just full buffer.
407 # at this point it's still ambiguous if we are done reading or just full buffer.
408 # Either way, if error (returned by ended process, or implied based on
408 # Either way, if error (returned by ended process, or implied based on
409 # presence of stuff in stderr output) we error out.
409 # presence of stuff in stderr output) we error out.
410 # Else, we are happy.
410 # Else, we are happy.
411 _returncode = _p.poll()
411 _returncode = _p.poll()
412
412
413 if ((_returncode and fail_on_return_code) or
413 if ((_returncode and fail_on_return_code) or
414 (fail_on_stderr and _returncode is None and bg_err.length)):
414 (fail_on_stderr and _returncode is None and bg_err.length)):
415 try:
415 try:
416 _p.terminate()
416 _p.terminate()
417 except Exception:
417 except Exception:
418 pass
418 pass
419 bg_out.stop()
419 bg_out.stop()
420 bg_err.stop()
420 bg_err.stop()
421 if fail_on_stderr:
421 if fail_on_stderr:
422 err = ''.join(bg_err)
422 err = ''.join(bg_err)
423 raise EnvironmentError(
423 raise EnvironmentError(
424 "Subprocess exited due to an error:\n" + err)
424 "Subprocess exited due to an error:\n" + err)
425 if _returncode and fail_on_return_code:
425 if _returncode and fail_on_return_code:
426 err = ''.join(bg_err)
426 err = ''.join(bg_err)
427 if not err:
427 if not err:
428 # maybe get empty stderr, try stdout instead
428 # maybe get empty stderr, try stdout instead
429 # in many cases git reports the errors on stdout too
429 # in many cases git reports the errors on stdout too
430 err = ''.join(bg_out)
430 err = ''.join(bg_out)
431 raise EnvironmentError(
431 raise EnvironmentError(
432 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
432 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
433 _returncode, err))
433 _returncode, err))
434
434
435 self.process = _p
435 self.process = _p
436 self.output = bg_out
436 self.output = bg_out
437 self.error = bg_err
437 self.error = bg_err
438 self.inputstream = inputstream
438 self.inputstream = inputstream
439
439
440 def __iter__(self):
440 def __iter__(self):
441 return self
441 return self
442
442
443 def next(self):
443 def next(self):
444 # Note: mikhail: We need to be sure that we are checking the return
444 # Note: mikhail: We need to be sure that we are checking the return
445 # code after the stdout stream is closed. Some processes, e.g. git
445 # code after the stdout stream is closed. Some processes, e.g. git
446 # are doing some magic in between closing stdout and terminating the
446 # are doing some magic in between closing stdout and terminating the
447 # process and, as a result, we are not getting return code on "slow"
447 # process and, as a result, we are not getting return code on "slow"
448 # systems.
448 # systems.
449 result = None
449 result = None
450 stop_iteration = None
450 stop_iteration = None
451 try:
451 try:
452 result = self.output.next()
452 result = self.output.next()
453 except StopIteration as e:
453 except StopIteration as e:
454 stop_iteration = e
454 stop_iteration = e
455
455
456 if self.process.poll() and self._fail_on_return_code:
456 if self.process.poll() and self._fail_on_return_code:
457 err = '%s' % ''.join(self.error)
457 err = '%s' % ''.join(self.error)
458 raise EnvironmentError(
458 raise EnvironmentError(
459 "Subprocess exited due to an error:\n" + err)
459 "Subprocess exited due to an error:\n" + err)
460
460
461 if stop_iteration:
461 if stop_iteration:
462 raise stop_iteration
462 raise stop_iteration
463 return result
463 return result
464
464
465 def throw(self, type, value=None, traceback=None):
465 def throw(self, type, value=None, traceback=None):
466 if self.output.length or not self.output.done_reading:
466 if self.output.length or not self.output.done_reading:
467 raise type(value)
467 raise type(value)
468
468
469 def close(self):
469 def close(self):
470 if self._closed:
470 if self._closed:
471 return
471 return
472 self._closed = True
472 self._closed = True
473 try:
473 try:
474 self.process.terminate()
474 self.process.terminate()
475 except Exception:
475 except Exception:
476 pass
476 pass
477 if self._close_input_fd:
477 if self._close_input_fd:
478 os.close(self._close_input_fd)
478 os.close(self._close_input_fd)
479 try:
479 try:
480 self.output.close()
480 self.output.close()
481 except Exception:
481 except Exception:
482 pass
482 pass
483 try:
483 try:
484 self.error.close()
484 self.error.close()
485 except Exception:
485 except Exception:
486 pass
486 pass
487 try:
487 try:
488 os.close(self.inputstream)
488 os.close(self.inputstream)
489 except Exception:
489 except Exception:
490 pass
490 pass
491
491
492
492
493 def run_command(arguments, env=None):
493 def run_command(arguments, env=None):
494 """
494 """
495 Run the specified command and return the stdout.
495 Run the specified command and return the stdout.
496
496
497 :param arguments: sequence of program arguments (including the program name)
497 :param arguments: sequence of program arguments (including the program name)
498 :type arguments: list[str]
498 :type arguments: list[str]
499 """
499 """
500
500
501 cmd = arguments
501 cmd = arguments
502 log.debug('Running subprocessio command %s', cmd)
502 log.debug('Running subprocessio command %s', cmd)
503 proc = None
503 proc = None
504 try:
504 try:
505 _opts = {'shell': False, 'fail_on_stderr': False}
505 _opts = {'shell': False, 'fail_on_stderr': False}
506 if env:
506 if env:
507 _opts.update({'env': env})
507 _opts.update({'env': env})
508 proc = SubprocessIOChunker(cmd, **_opts)
508 proc = SubprocessIOChunker(cmd, **_opts)
509 return ''.join(proc), ''.join(proc.error)
509 return ''.join(proc), ''.join(proc.error)
510 except (EnvironmentError, OSError) as err:
510 except (EnvironmentError, OSError) as err:
511 cmd = ' '.join(cmd) # human friendly CMD
511 cmd = ' '.join(cmd) # human friendly CMD
512 tb_err = ("Couldn't run subprocessio command (%s).\n"
512 tb_err = ("Couldn't run subprocessio command (%s).\n"
513 "Original error was:%s\n" % (cmd, err))
513 "Original error was:%s\n" % (cmd, err))
514 log.exception(tb_err)
514 log.exception(tb_err)
515 raise Exception(tb_err)
515 raise Exception(tb_err)
516 finally:
516 finally:
517 if proc:
517 if proc:
518 proc.close()
518 proc.close()
519
519
General Comments 0
You need to be logged in to leave comments. Login now