##// END OF EJS Templates
dont format errors to string in subprocessio
marcink -
r2572:71fc1c98 beta
parent child Browse files
Show More
@@ -1,403 +1,405
1 1 '''
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 '''
25 25 import os
26 26 import subprocess
27 27 import threading
28 28 from collections import deque
29 29
30 30
31 31 class StreamFeeder(threading.Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38 def __init__(self, source):
39 39 super(StreamFeeder, self).__init__()
40 40 self.daemon = True
41 41 filelike = False
42 42 self.bytes = bytes()
43 43 if type(source) in (type(''), bytes, bytearray): # string-like
44 44 self.bytes = bytes(source)
45 45 else: # can be either file pointer or file-like
46 46 if type(source) in (int, long): # file pointer it is
47 47 ## converting file descriptor (int) stdin into file-like
48 48 try:
49 49 source = os.fdopen(source, 'rb', 16384)
50 50 except Exception:
51 51 pass
52 52 # let's see if source is file-like by now
53 53 try:
54 54 filelike = source.read
55 55 except Exception:
56 56 pass
57 57 if not filelike and not self.bytes:
58 58 raise TypeError("StreamFeeder's source object must be a readable "
59 59 "file-like, a file descriptor, or a string-like.")
60 60 self.source = source
61 61 self.readiface, self.writeiface = os.pipe()
62 62
63 63 def run(self):
64 64 t = self.writeiface
65 65 if self.bytes:
66 66 os.write(t, self.bytes)
67 67 else:
68 68 s = self.source
69 69 b = s.read(4096)
70 70 while b:
71 71 os.write(t, b)
72 72 b = s.read(4096)
73 73 os.close(t)
74 74
75 75 @property
76 76 def output(self):
77 77 return self.readiface
78 78
79 79
80 80 class InputStreamChunker(threading.Thread):
81 81 def __init__(self, source, target, buffer_size, chunk_size):
82 82
83 83 super(InputStreamChunker, self).__init__()
84 84
85 85 self.daemon = True # die die die.
86 86
87 87 self.source = source
88 88 self.target = target
89 89 self.chunk_count_max = int(buffer_size / chunk_size) + 1
90 90 self.chunk_size = chunk_size
91 91
92 92 self.data_added = threading.Event()
93 93 self.data_added.clear()
94 94
95 95 self.keep_reading = threading.Event()
96 96 self.keep_reading.set()
97 97
98 98 self.EOF = threading.Event()
99 99 self.EOF.clear()
100 100
101 101 self.go = threading.Event()
102 102 self.go.set()
103 103
104 104 def stop(self):
105 105 self.go.clear()
106 106 self.EOF.set()
107 107 try:
108 108 # this is not proper, but is done to force the reader thread let
109 109 # go of the input because, if successful, .close() will send EOF
110 110 # down the pipe.
111 111 self.source.close()
112 112 except:
113 113 pass
114 114
115 115 def run(self):
116 116 s = self.source
117 117 t = self.target
118 118 cs = self.chunk_size
119 119 ccm = self.chunk_count_max
120 120 kr = self.keep_reading
121 121 da = self.data_added
122 122 go = self.go
123 123 b = s.read(cs)
124 124 while b and go.is_set():
125 125 if len(t) > ccm:
126 126 kr.clear()
127 127 kr.wait(2)
128 128 # # this only works on 2.7.x and up
129 129 # if not kr.wait(10):
130 130 # raise Exception("Timed out while waiting for input to be read.")
131 131 # instead we'll use this
132 132 if len(t) > ccm + 3:
133 133 raise IOError("Timed out while waiting for input from subprocess.")
134 134 t.append(b)
135 135 da.set()
136 136 b = s.read(cs)
137 137 self.EOF.set()
138 138 da.set() # for cases when done but there was no input.
139 139
140 140
141 141 class BufferedGenerator():
142 142 '''
143 143 Class behaves as a non-blocking, buffered pipe reader.
144 144 Reads chunks of data (through a thread)
145 145 from a blocking pipe, and attaches these to an array (Deque) of chunks.
146 146 Reading is halted in the thread when max chunks is internally buffered.
147 147 The .next() may operate in blocking or non-blocking fashion by yielding
148 148 '' if no data is ready
149 149 to be sent or by not returning until there is some data to send
150 150 When we get EOF from underlying source pipe we raise the marker to raise
151 151 StopIteration after the last chunk of data is yielded.
152 152 '''
153 153
154 154 def __init__(self, source, buffer_size=65536, chunk_size=4096,
155 155 starting_values=[], bottomless=False):
156 156
157 157 if bottomless:
158 158 maxlen = int(buffer_size / chunk_size)
159 159 else:
160 160 maxlen = None
161 161
162 162 self.data = deque(starting_values, maxlen)
163 163
164 164 self.worker = InputStreamChunker(source, self.data, buffer_size,
165 165 chunk_size)
166 166 if starting_values:
167 167 self.worker.data_added.set()
168 168 self.worker.start()
169 169
170 170 ####################
171 171 # Generator's methods
172 172 ####################
173 173
174 174 def __iter__(self):
175 175 return self
176 176
177 177 def next(self):
178 178 while not len(self.data) and not self.worker.EOF.is_set():
179 179 self.worker.data_added.clear()
180 180 self.worker.data_added.wait(0.2)
181 181 if len(self.data):
182 182 self.worker.keep_reading.set()
183 183 return bytes(self.data.popleft())
184 184 elif self.worker.EOF.is_set():
185 185 raise StopIteration
186 186
187 187 def throw(self, type, value=None, traceback=None):
188 188 if not self.worker.EOF.is_set():
189 189 raise type(value)
190 190
191 191 def start(self):
192 192 self.worker.start()
193 193
194 194 def stop(self):
195 195 self.worker.stop()
196 196
197 197 def close(self):
198 198 try:
199 199 self.worker.stop()
200 200 self.throw(GeneratorExit)
201 201 except (GeneratorExit, StopIteration):
202 202 pass
203 203
204 204 def __del__(self):
205 205 self.close()
206 206
207 207 ####################
208 208 # Threaded reader's infrastructure.
209 209 ####################
210 210 @property
211 211 def input(self):
212 212 return self.worker.w
213 213
214 214 @property
215 215 def data_added_event(self):
216 216 return self.worker.data_added
217 217
218 218 @property
219 219 def data_added(self):
220 220 return self.worker.data_added.is_set()
221 221
222 222 @property
223 223 def reading_paused(self):
224 224 return not self.worker.keep_reading.is_set()
225 225
226 226 @property
227 227 def done_reading_event(self):
228 228 '''
229 229 Done_reding does not mean that the iterator's buffer is empty.
230 230 Iterator might have done reading from underlying source, but the read
231 231 chunks might still be available for serving through .next() method.
232 232
233 233 @return An Event class instance.
234 234 '''
235 235 return self.worker.EOF
236 236
237 237 @property
238 238 def done_reading(self):
239 239 '''
240 240 Done_reding does not mean that the iterator's buffer is empty.
241 241 Iterator might have done reading from underlying source, but the read
242 242 chunks might still be available for serving through .next() method.
243 243
244 244 @return An Bool value.
245 245 '''
246 246 return self.worker.EOF.is_set()
247 247
248 248 @property
249 249 def length(self):
250 250 '''
251 251 returns int.
252 252
253 253 This is the lenght of the que of chunks, not the length of
254 254 the combined contents in those chunks.
255 255
256 256 __len__() cannot be meaningfully implemented because this
257 257 reader is just flying throuh a bottomless pit content and
258 258 can only know the lenght of what it already saw.
259 259
260 260 If __len__() on WSGI server per PEP 3333 returns a value,
261 261 the responce's length will be set to that. In order not to
262 262 confuse WSGI PEP3333 servers, we will not implement __len__
263 263 at all.
264 264 '''
265 265 return len(self.data)
266 266
267 267 def prepend(self, x):
268 268 self.data.appendleft(x)
269 269
270 270 def append(self, x):
271 271 self.data.append(x)
272 272
273 273 def extend(self, o):
274 274 self.data.extend(o)
275 275
276 276 def __getitem__(self, i):
277 277 return self.data[i]
278 278
279 279
280 280 class SubprocessIOChunker(object):
281 281 '''
282 282 Processor class wrapping handling of subprocess IO.
283 283
284 284 In a way, this is a "communicate()" replacement with a twist.
285 285
286 286 - We are multithreaded. Writing in and reading out, err are all sep threads.
287 287 - We support concurrent (in and out) stream processing.
288 288 - The output is not a stream. It's a queue of read string (bytes, not unicode)
289 289 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
290 290 - We are non-blocking in more respects than communicate()
291 291 (reading from subprocess out pauses when internal buffer is full, but
292 292 does not block the parent calling code. On the flip side, reading from
293 293 slow-yielding subprocess may block the iteration until data shows up. This
294 294 does not block the parallel inpipe reading occurring parallel thread.)
295 295
296 296 The purpose of the object is to allow us to wrap subprocess interactions into
297 297 and interable that can be passed to a WSGI server as the application's return
298 298 value. Because of stream-processing-ability, WSGI does not have to read ALL
299 299 of the subprocess's output and buffer it, before handing it to WSGI server for
300 300 HTTP response. Instead, the class initializer reads just a bit of the stream
301 301 to figure out if error ocurred or likely to occur and if not, just hands the
302 302 further iteration over subprocess output to the server for completion of HTTP
303 303 response.
304 304
305 305 The real or perceived subprocess error is trapped and raised as one of
306 306 EnvironmentError family of exceptions
307 307
308 308 Example usage:
309 309 # try:
310 310 # answer = SubprocessIOChunker(
311 311 # cmd,
312 312 # input,
313 313 # buffer_size = 65536,
314 314 # chunk_size = 4096
315 315 # )
316 316 # except (EnvironmentError) as e:
317 317 # print str(e)
318 318 # raise e
319 319 #
320 320 # return answer
321 321
322 322
323 323 '''
324 324 def __init__(self, cmd, inputstream=None, buffer_size=65536,
325 325 chunk_size=4096, starting_values=[], **kwargs):
326 326 '''
327 327 Initializes SubprocessIOChunker
328 328
329 329 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
330 330 :param inputstream: (Default: None) A file-like, string, or file pointer.
331 331 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
332 332 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
333 333 :param starting_values: (Default: []) An array of strings to put in front of output que.
334 334 '''
335 335
336 336 if inputstream:
337 337 input_streamer = StreamFeeder(inputstream)
338 338 input_streamer.start()
339 339 inputstream = input_streamer.output
340 340
341 341 _p = subprocess.Popen(cmd,
342 342 bufsize=-1,
343 343 shell=True,
344 344 stdin=inputstream,
345 345 stdout=subprocess.PIPE,
346 346 stderr=subprocess.PIPE,
347 347 **kwargs
348 348 )
349 349
350 350 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
351 351 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
352 352
353 353 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
354 354 # doing this until we reach either end of file, or end of buffer.
355 355 bg_out.data_added_event.wait(1)
356 356 bg_out.data_added_event.clear()
357 357
358 358 # at this point it's still ambiguous if we are done reading or just full buffer.
359 359 # Either way, if error (returned by ended process, or implied based on
360 360 # presence of stuff in stderr output) we error out.
361 361 # Else, we are happy.
362 362 _returncode = _p.poll()
363 363 if _returncode or (_returncode == None and bg_err.length):
364 364 try:
365 365 _p.terminate()
366 366 except:
367 367 pass
368 368 bg_out.stop()
369 369 bg_err.stop()
370 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
370 err = '%r' % ''.join(bg_err)
371 raise EnvironmentError("Subprocess exited due to an error.\n" + err)
371 372
372 373 self.process = _p
373 374 self.output = bg_out
374 375 self.error = bg_err
375 376
376 377 def __iter__(self):
377 378 return self
378 379
379 380 def next(self):
380 381 if self.process.poll():
381 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
382 err = '%r' % ''.join(self.error)
383 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
382 384 return self.output.next()
383 385
384 386 def throw(self, type, value=None, traceback=None):
385 387 if self.output.length or not self.output.done_reading:
386 388 raise type(value)
387 389
388 390 def close(self):
389 391 try:
390 392 self.process.terminate()
391 393 except:
392 394 pass
393 395 try:
394 396 self.output.close()
395 397 except:
396 398 pass
397 399 try:
398 400 self.error.close()
399 401 except:
400 402 pass
401 403
402 404 def __del__(self):
403 405 self.close()
General Comments 0
You need to be logged in to leave comments. Login now