##// END OF EJS Templates
subprocessio: py27+ compat
marcink -
r341:a40eb400 default
parent child Browse files
Show More
@@ -1,479 +1,477 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess32 as subprocess
27 27 from collections import deque
28 28 from threading import Event, Thread
29 29
30 30
31 31 class StreamFeeder(Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38
39 39 def __init__(self, source):
40 40 super(StreamFeeder, self).__init__()
41 41 self.daemon = True
42 42 filelike = False
43 43 self.bytes = bytes()
44 44 if type(source) in (type(''), bytes, bytearray): # string-like
45 45 self.bytes = bytes(source)
46 46 else: # can be either file pointer or file-like
47 47 if type(source) in (int, long): # file pointer it is
48 ## converting file descriptor (int) stdin into file-like
48 # converting file descriptor (int) stdin into file-like
49 49 try:
50 50 source = os.fdopen(source, 'rb', 16384)
51 51 except Exception:
52 52 pass
53 53 # let's see if source is file-like by now
54 54 try:
55 55 filelike = source.read
56 56 except Exception:
57 57 pass
58 58 if not filelike and not self.bytes:
59 59 raise TypeError("StreamFeeder's source object must be a readable "
60 60 "file-like, a file descriptor, or a string-like.")
61 61 self.source = source
62 62 self.readiface, self.writeiface = os.pipe()
63 63
64 64 def run(self):
65 65 t = self.writeiface
66 66 if self.bytes:
67 67 os.write(t, self.bytes)
68 68 else:
69 69 s = self.source
70 70 b = s.read(4096)
71 71 while b:
72 72 os.write(t, b)
73 73 b = s.read(4096)
74 74 os.close(t)
75 75
76 76 @property
77 77 def output(self):
78 78 return self.readiface
79 79
80 80
81 81 class InputStreamChunker(Thread):
82 82 def __init__(self, source, target, buffer_size, chunk_size):
83 83
84 84 super(InputStreamChunker, self).__init__()
85 85
86 86 self.daemon = True # die die die.
87 87
88 88 self.source = source
89 89 self.target = target
90 90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 91 self.chunk_size = chunk_size
92 92
93 93 self.data_added = Event()
94 94 self.data_added.clear()
95 95
96 96 self.keep_reading = Event()
97 97 self.keep_reading.set()
98 98
99 99 self.EOF = Event()
100 100 self.EOF.clear()
101 101
102 102 self.go = Event()
103 103 self.go.set()
104 104
105 105 def stop(self):
106 106 self.go.clear()
107 107 self.EOF.set()
108 108 try:
109 109 # this is not proper, but is done to force the reader thread let
110 110 # go of the input because, if successful, .close() will send EOF
111 111 # down the pipe.
112 112 self.source.close()
113 113 except:
114 114 pass
115 115
116 116 def run(self):
117 117 s = self.source
118 118 t = self.target
119 119 cs = self.chunk_size
120 120 ccm = self.chunk_count_max
121 kr = self.keep_reading
121 keep_reading = self.keep_reading
122 122 da = self.data_added
123 123 go = self.go
124 124
125 125 try:
126 126 b = s.read(cs)
127 127 except ValueError:
128 128 b = ''
129 129
130 130 while b and go.is_set():
131 131 if len(t) > ccm:
132 kr.clear()
133 kr.wait(2)
134 # # this only works on 2.7.x and up
135 # if not kr.wait(10):
136 # raise Exception("Timed out while waiting for input to be read.")
137 # instead we'll use this
138 if len(t) > ccm + 3:
139 raise IOError(
140 "Timed out while waiting for input from subprocess.")
132 keep_reading.clear()
133 keep_reading.wait(2)
134
135 if not keep_reading.wait(10):
136 raise Exception(
137 "Timed out while waiting for input to be read.")
138
141 139 t.append(b)
142 140 da.set()
143 141 b = s.read(cs)
144 142 self.EOF.set()
145 143 da.set() # for cases when done but there was no input.
146 144
147 145
148 146 class BufferedGenerator(object):
149 147 """
150 148 Class behaves as a non-blocking, buffered pipe reader.
151 149 Reads chunks of data (through a thread)
152 150 from a blocking pipe, and attaches these to an array (Deque) of chunks.
153 151 Reading is halted in the thread when max chunks is internally buffered.
154 152 The .next() may operate in blocking or non-blocking fashion by yielding
155 153 '' if no data is ready
156 154 to be sent or by not returning until there is some data to send
157 155 When we get EOF from underlying source pipe we raise the marker to raise
158 156 StopIteration after the last chunk of data is yielded.
159 157 """
160 158
161 159 def __init__(self, source, buffer_size=65536, chunk_size=4096,
162 160 starting_values=None, bottomless=False):
163 161 starting_values = starting_values or []
164 162
165 163 if bottomless:
166 164 maxlen = int(buffer_size / chunk_size)
167 165 else:
168 166 maxlen = None
169 167
170 168 self.data = deque(starting_values, maxlen)
171 169 self.worker = InputStreamChunker(source, self.data, buffer_size,
172 170 chunk_size)
173 171 if starting_values:
174 172 self.worker.data_added.set()
175 173 self.worker.start()
176 174
177 175 ####################
178 176 # Generator's methods
179 177 ####################
180 178
181 179 def __iter__(self):
182 180 return self
183 181
184 182 def next(self):
185 183 while not len(self.data) and not self.worker.EOF.is_set():
186 184 self.worker.data_added.clear()
187 185 self.worker.data_added.wait(0.2)
188 186 if len(self.data):
189 187 self.worker.keep_reading.set()
190 188 return bytes(self.data.popleft())
191 189 elif self.worker.EOF.is_set():
192 190 raise StopIteration
193 191
194 192 def throw(self, exc_type, value=None, traceback=None):
195 193 if not self.worker.EOF.is_set():
196 194 raise exc_type(value)
197 195
198 196 def start(self):
199 197 self.worker.start()
200 198
201 199 def stop(self):
202 200 self.worker.stop()
203 201
204 202 def close(self):
205 203 try:
206 204 self.worker.stop()
207 205 self.throw(GeneratorExit)
208 206 except (GeneratorExit, StopIteration):
209 207 pass
210 208
211 209 def __del__(self):
212 210 self.close()
213 211
214 212 ####################
215 213 # Threaded reader's infrastructure.
216 214 ####################
217 215 @property
218 216 def input(self):
219 217 return self.worker.w
220 218
221 219 @property
222 220 def data_added_event(self):
223 221 return self.worker.data_added
224 222
225 223 @property
226 224 def data_added(self):
227 225 return self.worker.data_added.is_set()
228 226
229 227 @property
230 228 def reading_paused(self):
231 229 return not self.worker.keep_reading.is_set()
232 230
233 231 @property
234 232 def done_reading_event(self):
235 233 """
236 234 Done_reding does not mean that the iterator's buffer is empty.
237 235 Iterator might have done reading from underlying source, but the read
238 236 chunks might still be available for serving through .next() method.
239 237
240 238 :returns: An Event class instance.
241 239 """
242 240 return self.worker.EOF
243 241
244 242 @property
245 243 def done_reading(self):
246 244 """
247 245 Done_reding does not mean that the iterator's buffer is empty.
248 246 Iterator might have done reading from underlying source, but the read
249 247 chunks might still be available for serving through .next() method.
250 248
251 249 :returns: An Bool value.
252 250 """
253 251 return self.worker.EOF.is_set()
254 252
255 253 @property
256 254 def length(self):
257 255 """
258 256 returns int.
259 257
260 258 This is the lenght of the que of chunks, not the length of
261 259 the combined contents in those chunks.
262 260
263 261 __len__() cannot be meaningfully implemented because this
264 262 reader is just flying throuh a bottomless pit content and
265 263 can only know the lenght of what it already saw.
266 264
267 265 If __len__() on WSGI server per PEP 3333 returns a value,
268 266 the responce's length will be set to that. In order not to
269 267 confuse WSGI PEP3333 servers, we will not implement __len__
270 268 at all.
271 269 """
272 270 return len(self.data)
273 271
274 272 def prepend(self, x):
275 273 self.data.appendleft(x)
276 274
277 275 def append(self, x):
278 276 self.data.append(x)
279 277
280 278 def extend(self, o):
281 279 self.data.extend(o)
282 280
283 281 def __getitem__(self, i):
284 282 return self.data[i]
285 283
286 284
287 285 class SubprocessIOChunker(object):
288 286 """
289 287 Processor class wrapping handling of subprocess IO.
290 288
291 289 .. important::
292 290
293 291 Watch out for the method `__del__` on this class. If this object
294 292 is deleted, it will kill the subprocess, so avoid to
295 293 return the `output` attribute or usage of it like in the following
296 294 example::
297 295
298 296 # `args` expected to run a program that produces a lot of output
299 297 output = ''.join(SubprocessIOChunker(
300 298 args, shell=False, inputstream=inputstream, env=environ).output)
301 299
302 300 # `output` will not contain all the data, because the __del__ method
303 301 # has already killed the subprocess in this case before all output
304 302 # has been consumed.
305 303
306 304
307 305
308 306 In a way, this is a "communicate()" replacement with a twist.
309 307
310 308 - We are multithreaded. Writing in and reading out, err are all sep threads.
311 309 - We support concurrent (in and out) stream processing.
312 310 - The output is not a stream. It's a queue of read string (bytes, not unicode)
313 311 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
314 312 - We are non-blocking in more respects than communicate()
315 313 (reading from subprocess out pauses when internal buffer is full, but
316 314 does not block the parent calling code. On the flip side, reading from
317 315 slow-yielding subprocess may block the iteration until data shows up. This
318 316 does not block the parallel inpipe reading occurring parallel thread.)
319 317
320 318 The purpose of the object is to allow us to wrap subprocess interactions into
321 319 and interable that can be passed to a WSGI server as the application's return
322 320 value. Because of stream-processing-ability, WSGI does not have to read ALL
323 321 of the subprocess's output and buffer it, before handing it to WSGI server for
324 322 HTTP response. Instead, the class initializer reads just a bit of the stream
325 323 to figure out if error ocurred or likely to occur and if not, just hands the
326 324 further iteration over subprocess output to the server for completion of HTTP
327 325 response.
328 326
329 327 The real or perceived subprocess error is trapped and raised as one of
330 328 EnvironmentError family of exceptions
331 329
332 330 Example usage:
333 331 # try:
334 332 # answer = SubprocessIOChunker(
335 333 # cmd,
336 334 # input,
337 335 # buffer_size = 65536,
338 336 # chunk_size = 4096
339 337 # )
340 338 # except (EnvironmentError) as e:
341 339 # print str(e)
342 340 # raise e
343 341 #
344 342 # return answer
345 343
346 344
347 345 """
348 346
349 347 # TODO: johbo: This is used to make sure that the open end of the PIPE
350 348 # is closed in the end. It would be way better to wrap this into an
351 349 # object, so that it is closed automatically once it is consumed or
352 350 # something similar.
353 351 _close_input_fd = None
354 352
355 353 _closed = False
356 354
357 355 def __init__(self, cmd, inputstream=None, buffer_size=65536,
358 356 chunk_size=4096, starting_values=None, fail_on_stderr=True,
359 357 fail_on_return_code=True, **kwargs):
360 358 """
361 359 Initializes SubprocessIOChunker
362 360
363 361 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
364 362 :param inputstream: (Default: None) A file-like, string, or file pointer.
365 363 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
366 364 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
367 365 :param starting_values: (Default: []) An array of strings to put in front of output que.
368 366 :param fail_on_stderr: (Default: True) Whether to raise an exception in
369 367 case something is written to stderr.
370 368 :param fail_on_return_code: (Default: True) Whether to raise an
371 369 exception if the return code is not 0.
372 370 """
373 371
374 372 starting_values = starting_values or []
375 373 if inputstream:
376 374 input_streamer = StreamFeeder(inputstream)
377 375 input_streamer.start()
378 376 inputstream = input_streamer.output
379 377 self._close_input_fd = inputstream
380 378
381 379 self._fail_on_stderr = fail_on_stderr
382 380 self._fail_on_return_code = fail_on_return_code
383 381
384 382 _shell = kwargs.get('shell', True)
385 383 kwargs['shell'] = _shell
386 384
387 385 _p = subprocess.Popen(cmd, bufsize=-1,
388 386 stdin=inputstream,
389 387 stdout=subprocess.PIPE,
390 388 stderr=subprocess.PIPE,
391 389 **kwargs)
392 390
393 391 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
394 392 starting_values)
395 393 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
396 394
397 395 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
398 396 # doing this until we reach either end of file, or end of buffer.
399 397 bg_out.data_added_event.wait(1)
400 398 bg_out.data_added_event.clear()
401 399
402 400 # at this point it's still ambiguous if we are done reading or just full buffer.
403 401 # Either way, if error (returned by ended process, or implied based on
404 402 # presence of stuff in stderr output) we error out.
405 403 # Else, we are happy.
406 404 _returncode = _p.poll()
407 405
408 406 if ((_returncode and fail_on_return_code) or
409 407 (fail_on_stderr and _returncode is None and bg_err.length)):
410 408 try:
411 409 _p.terminate()
412 410 except Exception:
413 411 pass
414 412 bg_out.stop()
415 413 bg_err.stop()
416 414 if fail_on_stderr:
417 415 err = ''.join(bg_err)
418 416 raise EnvironmentError(
419 417 "Subprocess exited due to an error:\n" + err)
420 418 if _returncode and fail_on_return_code:
421 419 err = ''.join(bg_err)
422 420 raise EnvironmentError(
423 421 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
424 422 _returncode, err))
425 423
426 424 self.process = _p
427 425 self.output = bg_out
428 426 self.error = bg_err
429 427
430 428 def __iter__(self):
431 429 return self
432 430
433 431 def next(self):
434 432 # Note: mikhail: We need to be sure that we are checking the return
435 433 # code after the stdout stream is closed. Some processes, e.g. git
436 434 # are doing some magic in between closing stdout and terminating the
437 435 # process and, as a result, we are not getting return code on "slow"
438 436 # systems.
439 437 result = None
440 438 stop_iteration = None
441 439 try:
442 440 result = self.output.next()
443 441 except StopIteration as e:
444 442 stop_iteration = e
445 443
446 444 if self.process.poll() and self._fail_on_return_code:
447 445 err = '%s' % ''.join(self.error)
448 446 raise EnvironmentError(
449 447 "Subprocess exited due to an error:\n" + err)
450 448
451 449 if stop_iteration:
452 450 raise stop_iteration
453 451 return result
454 452
455 453 def throw(self, type, value=None, traceback=None):
456 454 if self.output.length or not self.output.done_reading:
457 455 raise type(value)
458 456
459 457 def close(self):
460 458 if self._closed:
461 459 return
462 460 self._closed = True
463 461 try:
464 462 self.process.terminate()
465 463 except:
466 464 pass
467 465 if self._close_input_fd:
468 466 os.close(self._close_input_fd)
469 467 try:
470 468 self.output.close()
471 469 except:
472 470 pass
473 471 try:
474 472 self.error.close()
475 473 except:
476 474 pass
477 475
478 476 def __del__(self):
479 477 self.close()
General Comments 0
You need to be logged in to leave comments. Login now