##// END OF EJS Templates
Add optional parameters to subprocessio that allow passing params to Popen
marcink -
r2399:a8635cda beta
parent child Browse files
Show More
@@ -1,401 +1,402 b''
1 '''
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
24 '''
25 import os
25 import os
26 import subprocess
26 import subprocess
27 import threading
27 import threading
28 from collections import deque
28 from collections import deque
29
29
30
30
31 class StreamFeeder(threading.Thread):
31 class StreamFeeder(threading.Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38 def __init__(self, source):
38 def __init__(self, source):
39 super(StreamFeeder, self).__init__()
39 super(StreamFeeder, self).__init__()
40 self.daemon = True
40 self.daemon = True
41 filelike = False
41 filelike = False
42 self.bytes = b''
42 self.bytes = b''
43 if type(source) in (type(''), bytes, bytearray): # string-like
43 if type(source) in (type(''), bytes, bytearray): # string-like
44 self.bytes = bytes(source)
44 self.bytes = bytes(source)
45 else: # can be either file pointer or file-like
45 else: # can be either file pointer or file-like
46 if type(source) in (int, long): # file pointer it is
46 if type(source) in (int, long): # file pointer it is
47 ## converting file descriptor (int) stdin into file-like
47 ## converting file descriptor (int) stdin into file-like
48 try:
48 try:
49 source = os.fdopen(source, 'rb', 16384)
49 source = os.fdopen(source, 'rb', 16384)
50 except:
50 except:
51 pass
51 pass
52 # let's see if source is file-like by now
52 # let's see if source is file-like by now
53 try:
53 try:
54 filelike = source.read
54 filelike = source.read
55 except:
55 except:
56 pass
56 pass
57 if not filelike and not self.bytes:
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
58 raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
59 self.source = source
59 self.source = source
60 self.readiface, self.writeiface = os.pipe()
60 self.readiface, self.writeiface = os.pipe()
61
61
62 def run(self):
62 def run(self):
63 t = self.writeiface
63 t = self.writeiface
64 if self.bytes:
64 if self.bytes:
65 os.write(t, self.bytes)
65 os.write(t, self.bytes)
66 else:
66 else:
67 s = self.source
67 s = self.source
68 b = s.read(4096)
68 b = s.read(4096)
69 while b:
69 while b:
70 os.write(t, b)
70 os.write(t, b)
71 b = s.read(4096)
71 b = s.read(4096)
72 os.close(t)
72 os.close(t)
73
73
74 @property
74 @property
75 def output(self):
75 def output(self):
76 return self.readiface
76 return self.readiface
77
77
78
78
79 class InputStreamChunker(threading.Thread):
79 class InputStreamChunker(threading.Thread):
80 def __init__(self, source, target, buffer_size, chunk_size):
80 def __init__(self, source, target, buffer_size, chunk_size):
81
81
82 super(InputStreamChunker, self).__init__()
82 super(InputStreamChunker, self).__init__()
83
83
84 self.daemon = True # die die die.
84 self.daemon = True # die die die.
85
85
86 self.source = source
86 self.source = source
87 self.target = target
87 self.target = target
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_size = chunk_size
89 self.chunk_size = chunk_size
90
90
91 self.data_added = threading.Event()
91 self.data_added = threading.Event()
92 self.data_added.clear()
92 self.data_added.clear()
93
93
94 self.keep_reading = threading.Event()
94 self.keep_reading = threading.Event()
95 self.keep_reading.set()
95 self.keep_reading.set()
96
96
97 self.EOF = threading.Event()
97 self.EOF = threading.Event()
98 self.EOF.clear()
98 self.EOF.clear()
99
99
100 self.go = threading.Event()
100 self.go = threading.Event()
101 self.go.set()
101 self.go.set()
102
102
103 def stop(self):
103 def stop(self):
104 self.go.clear()
104 self.go.clear()
105 self.EOF.set()
105 self.EOF.set()
106 try:
106 try:
107 # this is not proper, but is done to force the reader thread let
107 # this is not proper, but is done to force the reader thread let
108 # go of the input because, if successful, .close() will send EOF
108 # go of the input because, if successful, .close() will send EOF
109 # down the pipe.
109 # down the pipe.
110 self.source.close()
110 self.source.close()
111 except:
111 except:
112 pass
112 pass
113
113
114 def run(self):
114 def run(self):
115 s = self.source
115 s = self.source
116 t = self.target
116 t = self.target
117 cs = self.chunk_size
117 cs = self.chunk_size
118 ccm = self.chunk_count_max
118 ccm = self.chunk_count_max
119 kr = self.keep_reading
119 kr = self.keep_reading
120 da = self.data_added
120 da = self.data_added
121 go = self.go
121 go = self.go
122 b = s.read(cs)
122 b = s.read(cs)
123 while b and go.is_set():
123 while b and go.is_set():
124 if len(t) > ccm:
124 if len(t) > ccm:
125 kr.clear()
125 kr.clear()
126 kr.wait(2)
126 kr.wait(2)
127 # # this only works on 2.7.x and up
127 # # this only works on 2.7.x and up
128 # if not kr.wait(10):
128 # if not kr.wait(10):
129 # raise Exception("Timed out while waiting for input to be read.")
129 # raise Exception("Timed out while waiting for input to be read.")
130 # instead we'll use this
130 # instead we'll use this
131 if len(t) > ccm + 3:
131 if len(t) > ccm + 3:
132 raise IOError("Timed out while waiting for input from subprocess.")
132 raise IOError("Timed out while waiting for input from subprocess.")
133 t.append(b)
133 t.append(b)
134 da.set()
134 da.set()
135 b = s.read(cs)
135 b = s.read(cs)
136 self.EOF.set()
136 self.EOF.set()
137 da.set() # for cases when done but there was no input.
137 da.set() # for cases when done but there was no input.
138
138
139
139
140 class BufferedGenerator():
140 class BufferedGenerator():
141 '''
141 '''
142 Class behaves as a non-blocking, buffered pipe reader.
142 Class behaves as a non-blocking, buffered pipe reader.
143 Reads chunks of data (through a thread)
143 Reads chunks of data (through a thread)
144 from a blocking pipe, and attaches these to an array (Deque) of chunks.
144 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 Reading is halted in the thread when max chunks is internally buffered.
145 Reading is halted in the thread when max chunks is internally buffered.
146 The .next() may operate in blocking or non-blocking fashion by yielding
146 The .next() may operate in blocking or non-blocking fashion by yielding
147 '' if no data is ready
147 '' if no data is ready
148 to be sent or by not returning until there is some data to send
148 to be sent or by not returning until there is some data to send
149 When we get EOF from underlying source pipe we raise the marker to raise
149 When we get EOF from underlying source pipe we raise the marker to raise
150 StopIteration after the last chunk of data is yielded.
150 StopIteration after the last chunk of data is yielded.
151 '''
151 '''
152
152
153 def __init__(self, source, buffer_size=65536, chunk_size=4096,
153 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 starting_values=[], bottomless=False):
154 starting_values=[], bottomless=False):
155
155
156 if bottomless:
156 if bottomless:
157 maxlen = int(buffer_size / chunk_size)
157 maxlen = int(buffer_size / chunk_size)
158 else:
158 else:
159 maxlen = None
159 maxlen = None
160
160
161 self.data = deque(starting_values, maxlen)
161 self.data = deque(starting_values, maxlen)
162
162
163 self.worker = InputStreamChunker(source, self.data, buffer_size,
163 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 chunk_size)
164 chunk_size)
165 if starting_values:
165 if starting_values:
166 self.worker.data_added.set()
166 self.worker.data_added.set()
167 self.worker.start()
167 self.worker.start()
168
168
169 ####################
169 ####################
170 # Generator's methods
170 # Generator's methods
171 ####################
171 ####################
172
172
173 def __iter__(self):
173 def __iter__(self):
174 return self
174 return self
175
175
176 def next(self):
176 def next(self):
177 while not len(self.data) and not self.worker.EOF.is_set():
177 while not len(self.data) and not self.worker.EOF.is_set():
178 self.worker.data_added.clear()
178 self.worker.data_added.clear()
179 self.worker.data_added.wait(0.2)
179 self.worker.data_added.wait(0.2)
180 if len(self.data):
180 if len(self.data):
181 self.worker.keep_reading.set()
181 self.worker.keep_reading.set()
182 return bytes(self.data.popleft())
182 return bytes(self.data.popleft())
183 elif self.worker.EOF.is_set():
183 elif self.worker.EOF.is_set():
184 raise StopIteration
184 raise StopIteration
185
185
186 def throw(self, type, value=None, traceback=None):
186 def throw(self, type, value=None, traceback=None):
187 if not self.worker.EOF.is_set():
187 if not self.worker.EOF.is_set():
188 raise type(value)
188 raise type(value)
189
189
190 def start(self):
190 def start(self):
191 self.worker.start()
191 self.worker.start()
192
192
193 def stop(self):
193 def stop(self):
194 self.worker.stop()
194 self.worker.stop()
195
195
196 def close(self):
196 def close(self):
197 try:
197 try:
198 self.worker.stop()
198 self.worker.stop()
199 self.throw(GeneratorExit)
199 self.throw(GeneratorExit)
200 except (GeneratorExit, StopIteration):
200 except (GeneratorExit, StopIteration):
201 pass
201 pass
202
202
203 def __del__(self):
203 def __del__(self):
204 self.close()
204 self.close()
205
205
206 ####################
206 ####################
207 # Threaded reader's infrastructure.
207 # Threaded reader's infrastructure.
208 ####################
208 ####################
209 @property
209 @property
210 def input(self):
210 def input(self):
211 return self.worker.w
211 return self.worker.w
212
212
213 @property
213 @property
214 def data_added_event(self):
214 def data_added_event(self):
215 return self.worker.data_added
215 return self.worker.data_added
216
216
217 @property
217 @property
218 def data_added(self):
218 def data_added(self):
219 return self.worker.data_added.is_set()
219 return self.worker.data_added.is_set()
220
220
221 @property
221 @property
222 def reading_paused(self):
222 def reading_paused(self):
223 return not self.worker.keep_reading.is_set()
223 return not self.worker.keep_reading.is_set()
224
224
225 @property
225 @property
226 def done_reading_event(self):
226 def done_reading_event(self):
227 '''
227 '''
228 Done_reding does not mean that the iterator's buffer is empty.
228 Done_reding does not mean that the iterator's buffer is empty.
229 Iterator might have done reading from underlying source, but the read
229 Iterator might have done reading from underlying source, but the read
230 chunks might still be available for serving through .next() method.
230 chunks might still be available for serving through .next() method.
231
231
232 @return An Event class instance.
232 @return An Event class instance.
233 '''
233 '''
234 return self.worker.EOF
234 return self.worker.EOF
235
235
236 @property
236 @property
237 def done_reading(self):
237 def done_reading(self):
238 '''
238 '''
239 Done_reding does not mean that the iterator's buffer is empty.
239 Done_reding does not mean that the iterator's buffer is empty.
240 Iterator might have done reading from underlying source, but the read
240 Iterator might have done reading from underlying source, but the read
241 chunks might still be available for serving through .next() method.
241 chunks might still be available for serving through .next() method.
242
242
243 @return An Bool value.
243 @return An Bool value.
244 '''
244 '''
245 return self.worker.EOF.is_set()
245 return self.worker.EOF.is_set()
246
246
247 @property
247 @property
248 def length(self):
248 def length(self):
249 '''
249 '''
250 returns int.
250 returns int.
251
251
252 This is the lenght of the que of chunks, not the length of
252 This is the lenght of the que of chunks, not the length of
253 the combined contents in those chunks.
253 the combined contents in those chunks.
254
254
255 __len__() cannot be meaningfully implemented because this
255 __len__() cannot be meaningfully implemented because this
256 reader is just flying throuh a bottomless pit content and
256 reader is just flying throuh a bottomless pit content and
257 can only know the lenght of what it already saw.
257 can only know the lenght of what it already saw.
258
258
259 If __len__() on WSGI server per PEP 3333 returns a value,
259 If __len__() on WSGI server per PEP 3333 returns a value,
260 the responce's length will be set to that. In order not to
260 the responce's length will be set to that. In order not to
261 confuse WSGI PEP3333 servers, we will not implement __len__
261 confuse WSGI PEP3333 servers, we will not implement __len__
262 at all.
262 at all.
263 '''
263 '''
264 return len(self.data)
264 return len(self.data)
265
265
266 def prepend(self, x):
266 def prepend(self, x):
267 self.data.appendleft(x)
267 self.data.appendleft(x)
268
268
269 def append(self, x):
269 def append(self, x):
270 self.data.append(x)
270 self.data.append(x)
271
271
272 def extend(self, o):
272 def extend(self, o):
273 self.data.extend(o)
273 self.data.extend(o)
274
274
275 def __getitem__(self, i):
275 def __getitem__(self, i):
276 return self.data[i]
276 return self.data[i]
277
277
278
278
279 class SubprocessIOChunker():
279 class SubprocessIOChunker(object):
280 '''
280 '''
281 Processor class wrapping handling of subprocess IO.
281 Processor class wrapping handling of subprocess IO.
282
282
283 In a way, this is a "communicate()" replacement with a twist.
283 In a way, this is a "communicate()" replacement with a twist.
284
284
285 - We are multithreaded. Writing in and reading out, err are all sep threads.
285 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 - We support concurrent (in and out) stream processing.
286 - We support concurrent (in and out) stream processing.
287 - The output is not a stream. It's a queue of read string (bytes, not unicode)
287 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
288 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 - We are non-blocking in more respects than communicate()
289 - We are non-blocking in more respects than communicate()
290 (reading from subprocess out pauses when internal buffer is full, but
290 (reading from subprocess out pauses when internal buffer is full, but
291 does not block the parent calling code. On the flip side, reading from
291 does not block the parent calling code. On the flip side, reading from
292 slow-yielding subprocess may block the iteration until data shows up. This
292 slow-yielding subprocess may block the iteration until data shows up. This
293 does not block the parallel inpipe reading occurring parallel thread.)
293 does not block the parallel inpipe reading occurring parallel thread.)
294
294
295 The purpose of the object is to allow us to wrap subprocess interactions into
295 The purpose of the object is to allow us to wrap subprocess interactions into
296 and interable that can be passed to a WSGI server as the application's return
296 and interable that can be passed to a WSGI server as the application's return
297 value. Because of stream-processing-ability, WSGI does not have to read ALL
297 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 of the subprocess's output and buffer it, before handing it to WSGI server for
298 of the subprocess's output and buffer it, before handing it to WSGI server for
299 HTTP response. Instead, the class initializer reads just a bit of the stream
299 HTTP response. Instead, the class initializer reads just a bit of the stream
300 to figure out if error ocurred or likely to occur and if not, just hands the
300 to figure out if error ocurred or likely to occur and if not, just hands the
301 further iteration over subprocess output to the server for completion of HTTP
301 further iteration over subprocess output to the server for completion of HTTP
302 response.
302 response.
303
303
304 The real or perceived subprocess error is trapped and raised as one of
304 The real or perceived subprocess error is trapped and raised as one of
305 EnvironmentError family of exceptions
305 EnvironmentError family of exceptions
306
306
307 Example usage:
307 Example usage:
308 # try:
308 # try:
309 # answer = SubprocessIOChunker(
309 # answer = SubprocessIOChunker(
310 # cmd,
310 # cmd,
311 # input,
311 # input,
312 # buffer_size = 65536,
312 # buffer_size = 65536,
313 # chunk_size = 4096
313 # chunk_size = 4096
314 # )
314 # )
315 # except (EnvironmentError) as e:
315 # except (EnvironmentError) as e:
316 # print str(e)
316 # print str(e)
317 # raise e
317 # raise e
318 #
318 #
319 # return answer
319 # return answer
320
320
321
321
322 '''
322 '''
323 def __init__(self, cmd, inputstream=None, buffer_size=65536,
323 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 chunk_size=4096, starting_values=[]):
324 chunk_size=4096, starting_values=[], **kwargs):
325 '''
325 '''
326 Initializes SubprocessIOChunker
326 Initializes SubprocessIOChunker
327
327
328 @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
328 @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
329 @param inputstream (Default: None) A file-like, string, or file pointer.
329 @param inputstream (Default: None) A file-like, string, or file pointer.
330 @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
330 @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
331 @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
331 @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 @param starting_values (Default: []) An array of strings to put in front of output que.
332 @param starting_values (Default: []) An array of strings to put in front of output que.
333 '''
333 '''
334
334
335 if inputstream:
335 if inputstream:
336 input_streamer = StreamFeeder(inputstream)
336 input_streamer = StreamFeeder(inputstream)
337 input_streamer.start()
337 input_streamer.start()
338 inputstream = input_streamer.output
338 inputstream = input_streamer.output
339
339
340 _p = subprocess.Popen(cmd,
340 _p = subprocess.Popen(cmd,
341 bufsize=-1,
341 bufsize=-1,
342 shell=True,
342 shell=True,
343 stdin=inputstream,
343 stdin=inputstream,
344 stdout=subprocess.PIPE,
344 stdout=subprocess.PIPE,
345 stderr=subprocess.PIPE
345 stderr=subprocess.PIPE,
346 **kwargs
346 )
347 )
347
348
348 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
349 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
349 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
350 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
350
351
351 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
352 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
352 # doing this until we reach either end of file, or end of buffer.
353 # doing this until we reach either end of file, or end of buffer.
353 bg_out.data_added_event.wait(1)
354 bg_out.data_added_event.wait(1)
354 bg_out.data_added_event.clear()
355 bg_out.data_added_event.clear()
355
356
356 # at this point it's still ambiguous if we are done reading or just full buffer.
357 # at this point it's still ambiguous if we are done reading or just full buffer.
357 # Either way, if error (returned by ended process, or implied based on
358 # Either way, if error (returned by ended process, or implied based on
358 # presence of stuff in stderr output) we error out.
359 # presence of stuff in stderr output) we error out.
359 # Else, we are happy.
360 # Else, we are happy.
360 _returncode = _p.poll()
361 _returncode = _p.poll()
361 if _returncode or (_returncode == None and bg_err.length):
362 if _returncode or (_returncode == None and bg_err.length):
362 try:
363 try:
363 _p.terminate()
364 _p.terminate()
364 except:
365 except:
365 pass
366 pass
366 bg_out.stop()
367 bg_out.stop()
367 bg_err.stop()
368 bg_err.stop()
368 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
369 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
369
370
370 self.process = _p
371 self.process = _p
371 self.output = bg_out
372 self.output = bg_out
372 self.error = bg_err
373 self.error = bg_err
373
374
374 def __iter__(self):
375 def __iter__(self):
375 return self
376 return self
376
377
377 def next(self):
378 def next(self):
378 if self.process.poll():
379 if self.process.poll():
379 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
380 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
380 return self.output.next()
381 return self.output.next()
381
382
382 def throw(self, type, value=None, traceback=None):
383 def throw(self, type, value=None, traceback=None):
383 if self.output.length or not self.output.done_reading:
384 if self.output.length or not self.output.done_reading:
384 raise type(value)
385 raise type(value)
385
386
386 def close(self):
387 def close(self):
387 try:
388 try:
388 self.process.terminate()
389 self.process.terminate()
389 except:
390 except:
390 pass
391 pass
391 try:
392 try:
392 self.output.close()
393 self.output.close()
393 except:
394 except:
394 pass
395 pass
395 try:
396 try:
396 self.error.close()
397 self.error.close()
397 except:
398 except:
398 pass
399 pass
399
400
400 def __del__(self):
401 def __del__(self):
401 self.close()
402 self.close()
General Comments 0
You need to be logged in to leave comments. Login now