##// END OF EJS Templates
dont format errors to string in subprocessio
marcink -
r2572:71fc1c98 beta
parent child Browse files
Show More
@@ -1,403 +1,405
1 '''
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
8
9 This file is part of git_http_backend.py Project.
9 This file is part of git_http_backend.py Project.
10
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
14 or (at your option) any later version.
15
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
19 GNU Lesser General Public License for more details.
20
20
21 You should have received a copy of the GNU Lesser General Public License
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
24 '''
25 import os
25 import os
26 import subprocess
26 import subprocess
27 import threading
27 import threading
28 from collections import deque
28 from collections import deque
29
29
30
30
31 class StreamFeeder(threading.Thread):
31 class StreamFeeder(threading.Thread):
32 """
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
36 We close inpipe once the end of the source stream is reached.
37 """
37 """
38 def __init__(self, source):
38 def __init__(self, source):
39 super(StreamFeeder, self).__init__()
39 super(StreamFeeder, self).__init__()
40 self.daemon = True
40 self.daemon = True
41 filelike = False
41 filelike = False
42 self.bytes = bytes()
42 self.bytes = bytes()
43 if type(source) in (type(''), bytes, bytearray): # string-like
43 if type(source) in (type(''), bytes, bytearray): # string-like
44 self.bytes = bytes(source)
44 self.bytes = bytes(source)
45 else: # can be either file pointer or file-like
45 else: # can be either file pointer or file-like
46 if type(source) in (int, long): # file pointer it is
46 if type(source) in (int, long): # file pointer it is
47 ## converting file descriptor (int) stdin into file-like
47 ## converting file descriptor (int) stdin into file-like
48 try:
48 try:
49 source = os.fdopen(source, 'rb', 16384)
49 source = os.fdopen(source, 'rb', 16384)
50 except Exception:
50 except Exception:
51 pass
51 pass
52 # let's see if source is file-like by now
52 # let's see if source is file-like by now
53 try:
53 try:
54 filelike = source.read
54 filelike = source.read
55 except Exception:
55 except Exception:
56 pass
56 pass
57 if not filelike and not self.bytes:
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable "
58 raise TypeError("StreamFeeder's source object must be a readable "
59 "file-like, a file descriptor, or a string-like.")
59 "file-like, a file descriptor, or a string-like.")
60 self.source = source
60 self.source = source
61 self.readiface, self.writeiface = os.pipe()
61 self.readiface, self.writeiface = os.pipe()
62
62
63 def run(self):
63 def run(self):
64 t = self.writeiface
64 t = self.writeiface
65 if self.bytes:
65 if self.bytes:
66 os.write(t, self.bytes)
66 os.write(t, self.bytes)
67 else:
67 else:
68 s = self.source
68 s = self.source
69 b = s.read(4096)
69 b = s.read(4096)
70 while b:
70 while b:
71 os.write(t, b)
71 os.write(t, b)
72 b = s.read(4096)
72 b = s.read(4096)
73 os.close(t)
73 os.close(t)
74
74
75 @property
75 @property
76 def output(self):
76 def output(self):
77 return self.readiface
77 return self.readiface
78
78
79
79
80 class InputStreamChunker(threading.Thread):
80 class InputStreamChunker(threading.Thread):
81 def __init__(self, source, target, buffer_size, chunk_size):
81 def __init__(self, source, target, buffer_size, chunk_size):
82
82
83 super(InputStreamChunker, self).__init__()
83 super(InputStreamChunker, self).__init__()
84
84
85 self.daemon = True # die die die.
85 self.daemon = True # die die die.
86
86
87 self.source = source
87 self.source = source
88 self.target = target
88 self.target = target
89 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 self.chunk_size = chunk_size
90 self.chunk_size = chunk_size
91
91
92 self.data_added = threading.Event()
92 self.data_added = threading.Event()
93 self.data_added.clear()
93 self.data_added.clear()
94
94
95 self.keep_reading = threading.Event()
95 self.keep_reading = threading.Event()
96 self.keep_reading.set()
96 self.keep_reading.set()
97
97
98 self.EOF = threading.Event()
98 self.EOF = threading.Event()
99 self.EOF.clear()
99 self.EOF.clear()
100
100
101 self.go = threading.Event()
101 self.go = threading.Event()
102 self.go.set()
102 self.go.set()
103
103
104 def stop(self):
104 def stop(self):
105 self.go.clear()
105 self.go.clear()
106 self.EOF.set()
106 self.EOF.set()
107 try:
107 try:
108 # this is not proper, but is done to force the reader thread let
108 # this is not proper, but is done to force the reader thread let
109 # go of the input because, if successful, .close() will send EOF
109 # go of the input because, if successful, .close() will send EOF
110 # down the pipe.
110 # down the pipe.
111 self.source.close()
111 self.source.close()
112 except:
112 except:
113 pass
113 pass
114
114
115 def run(self):
115 def run(self):
116 s = self.source
116 s = self.source
117 t = self.target
117 t = self.target
118 cs = self.chunk_size
118 cs = self.chunk_size
119 ccm = self.chunk_count_max
119 ccm = self.chunk_count_max
120 kr = self.keep_reading
120 kr = self.keep_reading
121 da = self.data_added
121 da = self.data_added
122 go = self.go
122 go = self.go
123 b = s.read(cs)
123 b = s.read(cs)
124 while b and go.is_set():
124 while b and go.is_set():
125 if len(t) > ccm:
125 if len(t) > ccm:
126 kr.clear()
126 kr.clear()
127 kr.wait(2)
127 kr.wait(2)
128 # # this only works on 2.7.x and up
128 # # this only works on 2.7.x and up
129 # if not kr.wait(10):
129 # if not kr.wait(10):
130 # raise Exception("Timed out while waiting for input to be read.")
130 # raise Exception("Timed out while waiting for input to be read.")
131 # instead we'll use this
131 # instead we'll use this
132 if len(t) > ccm + 3:
132 if len(t) > ccm + 3:
133 raise IOError("Timed out while waiting for input from subprocess.")
133 raise IOError("Timed out while waiting for input from subprocess.")
134 t.append(b)
134 t.append(b)
135 da.set()
135 da.set()
136 b = s.read(cs)
136 b = s.read(cs)
137 self.EOF.set()
137 self.EOF.set()
138 da.set() # for cases when done but there was no input.
138 da.set() # for cases when done but there was no input.
139
139
140
140
141 class BufferedGenerator():
141 class BufferedGenerator():
142 '''
142 '''
143 Class behaves as a non-blocking, buffered pipe reader.
143 Class behaves as a non-blocking, buffered pipe reader.
144 Reads chunks of data (through a thread)
144 Reads chunks of data (through a thread)
145 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 from a blocking pipe, and attaches these to an array (Deque) of chunks.
146 Reading is halted in the thread when max chunks is internally buffered.
146 Reading is halted in the thread when max chunks is internally buffered.
147 The .next() may operate in blocking or non-blocking fashion by yielding
147 The .next() may operate in blocking or non-blocking fashion by yielding
148 '' if no data is ready
148 '' if no data is ready
149 to be sent or by not returning until there is some data to send
149 to be sent or by not returning until there is some data to send
150 When we get EOF from underlying source pipe we raise the marker to raise
150 When we get EOF from underlying source pipe we raise the marker to raise
151 StopIteration after the last chunk of data is yielded.
151 StopIteration after the last chunk of data is yielded.
152 '''
152 '''
153
153
154 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 def __init__(self, source, buffer_size=65536, chunk_size=4096,
155 starting_values=[], bottomless=False):
155 starting_values=[], bottomless=False):
156
156
157 if bottomless:
157 if bottomless:
158 maxlen = int(buffer_size / chunk_size)
158 maxlen = int(buffer_size / chunk_size)
159 else:
159 else:
160 maxlen = None
160 maxlen = None
161
161
162 self.data = deque(starting_values, maxlen)
162 self.data = deque(starting_values, maxlen)
163
163
164 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 self.worker = InputStreamChunker(source, self.data, buffer_size,
165 chunk_size)
165 chunk_size)
166 if starting_values:
166 if starting_values:
167 self.worker.data_added.set()
167 self.worker.data_added.set()
168 self.worker.start()
168 self.worker.start()
169
169
170 ####################
170 ####################
171 # Generator's methods
171 # Generator's methods
172 ####################
172 ####################
173
173
174 def __iter__(self):
174 def __iter__(self):
175 return self
175 return self
176
176
177 def next(self):
177 def next(self):
178 while not len(self.data) and not self.worker.EOF.is_set():
178 while not len(self.data) and not self.worker.EOF.is_set():
179 self.worker.data_added.clear()
179 self.worker.data_added.clear()
180 self.worker.data_added.wait(0.2)
180 self.worker.data_added.wait(0.2)
181 if len(self.data):
181 if len(self.data):
182 self.worker.keep_reading.set()
182 self.worker.keep_reading.set()
183 return bytes(self.data.popleft())
183 return bytes(self.data.popleft())
184 elif self.worker.EOF.is_set():
184 elif self.worker.EOF.is_set():
185 raise StopIteration
185 raise StopIteration
186
186
187 def throw(self, type, value=None, traceback=None):
187 def throw(self, type, value=None, traceback=None):
188 if not self.worker.EOF.is_set():
188 if not self.worker.EOF.is_set():
189 raise type(value)
189 raise type(value)
190
190
191 def start(self):
191 def start(self):
192 self.worker.start()
192 self.worker.start()
193
193
194 def stop(self):
194 def stop(self):
195 self.worker.stop()
195 self.worker.stop()
196
196
197 def close(self):
197 def close(self):
198 try:
198 try:
199 self.worker.stop()
199 self.worker.stop()
200 self.throw(GeneratorExit)
200 self.throw(GeneratorExit)
201 except (GeneratorExit, StopIteration):
201 except (GeneratorExit, StopIteration):
202 pass
202 pass
203
203
204 def __del__(self):
204 def __del__(self):
205 self.close()
205 self.close()
206
206
207 ####################
207 ####################
208 # Threaded reader's infrastructure.
208 # Threaded reader's infrastructure.
209 ####################
209 ####################
210 @property
210 @property
211 def input(self):
211 def input(self):
212 return self.worker.w
212 return self.worker.w
213
213
214 @property
214 @property
215 def data_added_event(self):
215 def data_added_event(self):
216 return self.worker.data_added
216 return self.worker.data_added
217
217
218 @property
218 @property
219 def data_added(self):
219 def data_added(self):
220 return self.worker.data_added.is_set()
220 return self.worker.data_added.is_set()
221
221
222 @property
222 @property
223 def reading_paused(self):
223 def reading_paused(self):
224 return not self.worker.keep_reading.is_set()
224 return not self.worker.keep_reading.is_set()
225
225
226 @property
226 @property
227 def done_reading_event(self):
227 def done_reading_event(self):
228 '''
228 '''
229 Done_reding does not mean that the iterator's buffer is empty.
229 Done_reding does not mean that the iterator's buffer is empty.
230 Iterator might have done reading from underlying source, but the read
230 Iterator might have done reading from underlying source, but the read
231 chunks might still be available for serving through .next() method.
231 chunks might still be available for serving through .next() method.
232
232
233 @return An Event class instance.
233 @return An Event class instance.
234 '''
234 '''
235 return self.worker.EOF
235 return self.worker.EOF
236
236
237 @property
237 @property
238 def done_reading(self):
238 def done_reading(self):
239 '''
239 '''
240 Done_reding does not mean that the iterator's buffer is empty.
240 Done_reding does not mean that the iterator's buffer is empty.
241 Iterator might have done reading from underlying source, but the read
241 Iterator might have done reading from underlying source, but the read
242 chunks might still be available for serving through .next() method.
242 chunks might still be available for serving through .next() method.
243
243
244 @return An Bool value.
244 @return An Bool value.
245 '''
245 '''
246 return self.worker.EOF.is_set()
246 return self.worker.EOF.is_set()
247
247
248 @property
248 @property
249 def length(self):
249 def length(self):
250 '''
250 '''
251 returns int.
251 returns int.
252
252
253 This is the lenght of the que of chunks, not the length of
253 This is the lenght of the que of chunks, not the length of
254 the combined contents in those chunks.
254 the combined contents in those chunks.
255
255
256 __len__() cannot be meaningfully implemented because this
256 __len__() cannot be meaningfully implemented because this
257 reader is just flying throuh a bottomless pit content and
257 reader is just flying throuh a bottomless pit content and
258 can only know the lenght of what it already saw.
258 can only know the lenght of what it already saw.
259
259
260 If __len__() on WSGI server per PEP 3333 returns a value,
260 If __len__() on WSGI server per PEP 3333 returns a value,
261 the responce's length will be set to that. In order not to
261 the responce's length will be set to that. In order not to
262 confuse WSGI PEP3333 servers, we will not implement __len__
262 confuse WSGI PEP3333 servers, we will not implement __len__
263 at all.
263 at all.
264 '''
264 '''
265 return len(self.data)
265 return len(self.data)
266
266
267 def prepend(self, x):
267 def prepend(self, x):
268 self.data.appendleft(x)
268 self.data.appendleft(x)
269
269
270 def append(self, x):
270 def append(self, x):
271 self.data.append(x)
271 self.data.append(x)
272
272
273 def extend(self, o):
273 def extend(self, o):
274 self.data.extend(o)
274 self.data.extend(o)
275
275
276 def __getitem__(self, i):
276 def __getitem__(self, i):
277 return self.data[i]
277 return self.data[i]
278
278
279
279
280 class SubprocessIOChunker(object):
280 class SubprocessIOChunker(object):
281 '''
281 '''
282 Processor class wrapping handling of subprocess IO.
282 Processor class wrapping handling of subprocess IO.
283
283
284 In a way, this is a "communicate()" replacement with a twist.
284 In a way, this is a "communicate()" replacement with a twist.
285
285
286 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 - We are multithreaded. Writing in and reading out, err are all sep threads.
287 - We support concurrent (in and out) stream processing.
287 - We support concurrent (in and out) stream processing.
288 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 - The output is not a stream. It's a queue of read string (bytes, not unicode)
289 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
290 - We are non-blocking in more respects than communicate()
290 - We are non-blocking in more respects than communicate()
291 (reading from subprocess out pauses when internal buffer is full, but
291 (reading from subprocess out pauses when internal buffer is full, but
292 does not block the parent calling code. On the flip side, reading from
292 does not block the parent calling code. On the flip side, reading from
293 slow-yielding subprocess may block the iteration until data shows up. This
293 slow-yielding subprocess may block the iteration until data shows up. This
294 does not block the parallel inpipe reading occurring parallel thread.)
294 does not block the parallel inpipe reading occurring parallel thread.)
295
295
296 The purpose of the object is to allow us to wrap subprocess interactions into
296 The purpose of the object is to allow us to wrap subprocess interactions into
297 and interable that can be passed to a WSGI server as the application's return
297 and interable that can be passed to a WSGI server as the application's return
298 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 value. Because of stream-processing-ability, WSGI does not have to read ALL
299 of the subprocess's output and buffer it, before handing it to WSGI server for
299 of the subprocess's output and buffer it, before handing it to WSGI server for
300 HTTP response. Instead, the class initializer reads just a bit of the stream
300 HTTP response. Instead, the class initializer reads just a bit of the stream
301 to figure out if error ocurred or likely to occur and if not, just hands the
301 to figure out if error ocurred or likely to occur and if not, just hands the
302 further iteration over subprocess output to the server for completion of HTTP
302 further iteration over subprocess output to the server for completion of HTTP
303 response.
303 response.
304
304
305 The real or perceived subprocess error is trapped and raised as one of
305 The real or perceived subprocess error is trapped and raised as one of
306 EnvironmentError family of exceptions
306 EnvironmentError family of exceptions
307
307
308 Example usage:
308 Example usage:
309 # try:
309 # try:
310 # answer = SubprocessIOChunker(
310 # answer = SubprocessIOChunker(
311 # cmd,
311 # cmd,
312 # input,
312 # input,
313 # buffer_size = 65536,
313 # buffer_size = 65536,
314 # chunk_size = 4096
314 # chunk_size = 4096
315 # )
315 # )
316 # except (EnvironmentError) as e:
316 # except (EnvironmentError) as e:
317 # print str(e)
317 # print str(e)
318 # raise e
318 # raise e
319 #
319 #
320 # return answer
320 # return answer
321
321
322
322
323 '''
323 '''
324 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 def __init__(self, cmd, inputstream=None, buffer_size=65536,
325 chunk_size=4096, starting_values=[], **kwargs):
325 chunk_size=4096, starting_values=[], **kwargs):
326 '''
326 '''
327 Initializes SubprocessIOChunker
327 Initializes SubprocessIOChunker
328
328
329 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
329 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
330 :param inputstream: (Default: None) A file-like, string, or file pointer.
330 :param inputstream: (Default: None) A file-like, string, or file pointer.
331 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
331 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
332 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
333 :param starting_values: (Default: []) An array of strings to put in front of output que.
333 :param starting_values: (Default: []) An array of strings to put in front of output que.
334 '''
334 '''
335
335
336 if inputstream:
336 if inputstream:
337 input_streamer = StreamFeeder(inputstream)
337 input_streamer = StreamFeeder(inputstream)
338 input_streamer.start()
338 input_streamer.start()
339 inputstream = input_streamer.output
339 inputstream = input_streamer.output
340
340
341 _p = subprocess.Popen(cmd,
341 _p = subprocess.Popen(cmd,
342 bufsize=-1,
342 bufsize=-1,
343 shell=True,
343 shell=True,
344 stdin=inputstream,
344 stdin=inputstream,
345 stdout=subprocess.PIPE,
345 stdout=subprocess.PIPE,
346 stderr=subprocess.PIPE,
346 stderr=subprocess.PIPE,
347 **kwargs
347 **kwargs
348 )
348 )
349
349
350 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
350 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
351 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
351 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
352
352
353 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
353 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
354 # doing this until we reach either end of file, or end of buffer.
354 # doing this until we reach either end of file, or end of buffer.
355 bg_out.data_added_event.wait(1)
355 bg_out.data_added_event.wait(1)
356 bg_out.data_added_event.clear()
356 bg_out.data_added_event.clear()
357
357
358 # at this point it's still ambiguous if we are done reading or just full buffer.
358 # at this point it's still ambiguous if we are done reading or just full buffer.
359 # Either way, if error (returned by ended process, or implied based on
359 # Either way, if error (returned by ended process, or implied based on
360 # presence of stuff in stderr output) we error out.
360 # presence of stuff in stderr output) we error out.
361 # Else, we are happy.
361 # Else, we are happy.
362 _returncode = _p.poll()
362 _returncode = _p.poll()
363 if _returncode or (_returncode == None and bg_err.length):
363 if _returncode or (_returncode == None and bg_err.length):
364 try:
364 try:
365 _p.terminate()
365 _p.terminate()
366 except:
366 except:
367 pass
367 pass
368 bg_out.stop()
368 bg_out.stop()
369 bg_err.stop()
369 bg_err.stop()
370 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
370 err = '%r' % ''.join(bg_err)
371 raise EnvironmentError("Subprocess exited due to an error.\n" + err)
371
372
372 self.process = _p
373 self.process = _p
373 self.output = bg_out
374 self.output = bg_out
374 self.error = bg_err
375 self.error = bg_err
375
376
376 def __iter__(self):
377 def __iter__(self):
377 return self
378 return self
378
379
379 def next(self):
380 def next(self):
380 if self.process.poll():
381 if self.process.poll():
381 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
382 err = '%r' % ''.join(self.error)
383 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
382 return self.output.next()
384 return self.output.next()
383
385
384 def throw(self, type, value=None, traceback=None):
386 def throw(self, type, value=None, traceback=None):
385 if self.output.length or not self.output.done_reading:
387 if self.output.length or not self.output.done_reading:
386 raise type(value)
388 raise type(value)
387
389
388 def close(self):
390 def close(self):
389 try:
391 try:
390 self.process.terminate()
392 self.process.terminate()
391 except:
393 except:
392 pass
394 pass
393 try:
395 try:
394 self.output.close()
396 self.output.close()
395 except:
397 except:
396 pass
398 pass
397 try:
399 try:
398 self.error.close()
400 self.error.close()
399 except:
401 except:
400 pass
402 pass
401
403
402 def __del__(self):
404 def __del__(self):
403 self.close()
405 self.close()
General Comments 0
You need to be logged in to leave comments. Login now