##// END OF EJS Templates
pep8: fix potential code warnings.
marcink -
r340:d9c81d6a default
parent child Browse files
Show More
@@ -1,476 +1,479 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa[at]hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess32 as subprocess
27 27 from collections import deque
28 28 from threading import Event, Thread
29 29
30 30
31 31 class StreamFeeder(Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38
39 39 def __init__(self, source):
40 40 super(StreamFeeder, self).__init__()
41 41 self.daemon = True
42 42 filelike = False
43 43 self.bytes = bytes()
44 44 if type(source) in (type(''), bytes, bytearray): # string-like
45 45 self.bytes = bytes(source)
46 46 else: # can be either file pointer or file-like
47 47 if type(source) in (int, long): # file pointer it is
48 48 ## converting file descriptor (int) stdin into file-like
49 49 try:
50 50 source = os.fdopen(source, 'rb', 16384)
51 51 except Exception:
52 52 pass
53 53 # let's see if source is file-like by now
54 54 try:
55 55 filelike = source.read
56 56 except Exception:
57 57 pass
58 58 if not filelike and not self.bytes:
59 59 raise TypeError("StreamFeeder's source object must be a readable "
60 60 "file-like, a file descriptor, or a string-like.")
61 61 self.source = source
62 62 self.readiface, self.writeiface = os.pipe()
63 63
64 64 def run(self):
65 65 t = self.writeiface
66 66 if self.bytes:
67 67 os.write(t, self.bytes)
68 68 else:
69 69 s = self.source
70 70 b = s.read(4096)
71 71 while b:
72 72 os.write(t, b)
73 73 b = s.read(4096)
74 74 os.close(t)
75 75
76 76 @property
77 77 def output(self):
78 78 return self.readiface
79 79
80 80
81 81 class InputStreamChunker(Thread):
82 82 def __init__(self, source, target, buffer_size, chunk_size):
83 83
84 84 super(InputStreamChunker, self).__init__()
85 85
86 86 self.daemon = True # die die die.
87 87
88 88 self.source = source
89 89 self.target = target
90 90 self.chunk_count_max = int(buffer_size / chunk_size) + 1
91 91 self.chunk_size = chunk_size
92 92
93 93 self.data_added = Event()
94 94 self.data_added.clear()
95 95
96 96 self.keep_reading = Event()
97 97 self.keep_reading.set()
98 98
99 99 self.EOF = Event()
100 100 self.EOF.clear()
101 101
102 102 self.go = Event()
103 103 self.go.set()
104 104
105 105 def stop(self):
106 106 self.go.clear()
107 107 self.EOF.set()
108 108 try:
109 109 # this is not proper, but is done to force the reader thread let
110 110 # go of the input because, if successful, .close() will send EOF
111 111 # down the pipe.
112 112 self.source.close()
113 113 except:
114 114 pass
115 115
116 116 def run(self):
117 117 s = self.source
118 118 t = self.target
119 119 cs = self.chunk_size
120 120 ccm = self.chunk_count_max
121 121 kr = self.keep_reading
122 122 da = self.data_added
123 123 go = self.go
124 124
125 125 try:
126 126 b = s.read(cs)
127 127 except ValueError:
128 128 b = ''
129 129
130 130 while b and go.is_set():
131 131 if len(t) > ccm:
132 132 kr.clear()
133 133 kr.wait(2)
134 134 # # this only works on 2.7.x and up
135 135 # if not kr.wait(10):
136 136 # raise Exception("Timed out while waiting for input to be read.")
137 137 # instead we'll use this
138 138 if len(t) > ccm + 3:
139 139 raise IOError(
140 140 "Timed out while waiting for input from subprocess.")
141 141 t.append(b)
142 142 da.set()
143 143 b = s.read(cs)
144 144 self.EOF.set()
145 145 da.set() # for cases when done but there was no input.
146 146
147 147
148 148 class BufferedGenerator(object):
149 149 """
150 150 Class behaves as a non-blocking, buffered pipe reader.
151 151 Reads chunks of data (through a thread)
152 152 from a blocking pipe, and attaches these to an array (Deque) of chunks.
153 153 Reading is halted in the thread when max chunks is internally buffered.
154 154 The .next() may operate in blocking or non-blocking fashion by yielding
155 155 '' if no data is ready
156 156 to be sent or by not returning until there is some data to send
157 157 When we get EOF from underlying source pipe we raise the marker to raise
158 158 StopIteration after the last chunk of data is yielded.
159 159 """
160 160
161 161 def __init__(self, source, buffer_size=65536, chunk_size=4096,
162 starting_values=[], bottomless=False):
162 starting_values=None, bottomless=False):
163 starting_values = starting_values or []
163 164
164 165 if bottomless:
165 166 maxlen = int(buffer_size / chunk_size)
166 167 else:
167 168 maxlen = None
168 169
169 170 self.data = deque(starting_values, maxlen)
170 171 self.worker = InputStreamChunker(source, self.data, buffer_size,
171 172 chunk_size)
172 173 if starting_values:
173 174 self.worker.data_added.set()
174 175 self.worker.start()
175 176
176 177 ####################
177 178 # Generator's methods
178 179 ####################
179 180
180 181 def __iter__(self):
181 182 return self
182 183
183 184 def next(self):
184 185 while not len(self.data) and not self.worker.EOF.is_set():
185 186 self.worker.data_added.clear()
186 187 self.worker.data_added.wait(0.2)
187 188 if len(self.data):
188 189 self.worker.keep_reading.set()
189 190 return bytes(self.data.popleft())
190 191 elif self.worker.EOF.is_set():
191 192 raise StopIteration
192 193
193 def throw(self, type, value=None, traceback=None):
194 def throw(self, exc_type, value=None, traceback=None):
194 195 if not self.worker.EOF.is_set():
195 raise type(value)
196 raise exc_type(value)
196 197
197 198 def start(self):
198 199 self.worker.start()
199 200
200 201 def stop(self):
201 202 self.worker.stop()
202 203
203 204 def close(self):
204 205 try:
205 206 self.worker.stop()
206 207 self.throw(GeneratorExit)
207 208 except (GeneratorExit, StopIteration):
208 209 pass
209 210
210 211 def __del__(self):
211 212 self.close()
212 213
213 214 ####################
214 215 # Threaded reader's infrastructure.
215 216 ####################
216 217 @property
217 218 def input(self):
218 219 return self.worker.w
219 220
220 221 @property
221 222 def data_added_event(self):
222 223 return self.worker.data_added
223 224
224 225 @property
225 226 def data_added(self):
226 227 return self.worker.data_added.is_set()
227 228
228 229 @property
229 230 def reading_paused(self):
230 231 return not self.worker.keep_reading.is_set()
231 232
232 233 @property
233 234 def done_reading_event(self):
234 235 """
235 236 Done_reding does not mean that the iterator's buffer is empty.
236 237 Iterator might have done reading from underlying source, but the read
237 238 chunks might still be available for serving through .next() method.
238 239
239 240 :returns: An Event class instance.
240 241 """
241 242 return self.worker.EOF
242 243
243 244 @property
244 245 def done_reading(self):
245 246 """
246 247 Done_reding does not mean that the iterator's buffer is empty.
247 248 Iterator might have done reading from underlying source, but the read
248 249 chunks might still be available for serving through .next() method.
249 250
250 251 :returns: An Bool value.
251 252 """
252 253 return self.worker.EOF.is_set()
253 254
254 255 @property
255 256 def length(self):
256 257 """
257 258 returns int.
258 259
259 260 This is the lenght of the que of chunks, not the length of
260 261 the combined contents in those chunks.
261 262
262 263 __len__() cannot be meaningfully implemented because this
263 264 reader is just flying throuh a bottomless pit content and
264 265 can only know the lenght of what it already saw.
265 266
266 267 If __len__() on WSGI server per PEP 3333 returns a value,
267 268 the responce's length will be set to that. In order not to
268 269 confuse WSGI PEP3333 servers, we will not implement __len__
269 270 at all.
270 271 """
271 272 return len(self.data)
272 273
273 274 def prepend(self, x):
274 275 self.data.appendleft(x)
275 276
276 277 def append(self, x):
277 278 self.data.append(x)
278 279
279 280 def extend(self, o):
280 281 self.data.extend(o)
281 282
282 283 def __getitem__(self, i):
283 284 return self.data[i]
284 285
285 286
286 287 class SubprocessIOChunker(object):
287 288 """
288 289 Processor class wrapping handling of subprocess IO.
289 290
290 291 .. important::
291 292
292 293 Watch out for the method `__del__` on this class. If this object
293 294 is deleted, it will kill the subprocess, so avoid to
294 295 return the `output` attribute or usage of it like in the following
295 296 example::
296 297
297 298 # `args` expected to run a program that produces a lot of output
298 299 output = ''.join(SubprocessIOChunker(
299 300 args, shell=False, inputstream=inputstream, env=environ).output)
300 301
301 302 # `output` will not contain all the data, because the __del__ method
302 303 # has already killed the subprocess in this case before all output
303 304 # has been consumed.
304 305
305 306
306 307
307 308 In a way, this is a "communicate()" replacement with a twist.
308 309
309 310 - We are multithreaded. Writing in and reading out, err are all sep threads.
310 311 - We support concurrent (in and out) stream processing.
311 312 - The output is not a stream. It's a queue of read string (bytes, not unicode)
312 313 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
313 314 - We are non-blocking in more respects than communicate()
314 315 (reading from subprocess out pauses when internal buffer is full, but
315 316 does not block the parent calling code. On the flip side, reading from
316 317 slow-yielding subprocess may block the iteration until data shows up. This
317 318 does not block the parallel inpipe reading occurring parallel thread.)
318 319
319 320 The purpose of the object is to allow us to wrap subprocess interactions into
320 321 and interable that can be passed to a WSGI server as the application's return
321 322 value. Because of stream-processing-ability, WSGI does not have to read ALL
322 323 of the subprocess's output and buffer it, before handing it to WSGI server for
323 324 HTTP response. Instead, the class initializer reads just a bit of the stream
324 325 to figure out if error ocurred or likely to occur and if not, just hands the
325 326 further iteration over subprocess output to the server for completion of HTTP
326 327 response.
327 328
328 329 The real or perceived subprocess error is trapped and raised as one of
329 330 EnvironmentError family of exceptions
330 331
331 332 Example usage:
332 333 # try:
333 334 # answer = SubprocessIOChunker(
334 335 # cmd,
335 336 # input,
336 337 # buffer_size = 65536,
337 338 # chunk_size = 4096
338 339 # )
339 340 # except (EnvironmentError) as e:
340 341 # print str(e)
341 342 # raise e
342 343 #
343 344 # return answer
344 345
345 346
346 347 """
347 348
348 349 # TODO: johbo: This is used to make sure that the open end of the PIPE
349 350 # is closed in the end. It would be way better to wrap this into an
350 351 # object, so that it is closed automatically once it is consumed or
351 352 # something similar.
352 353 _close_input_fd = None
353 354
354 355 _closed = False
355 356
356 357 def __init__(self, cmd, inputstream=None, buffer_size=65536,
357 chunk_size=4096, starting_values=[], fail_on_stderr=True,
358 chunk_size=4096, starting_values=None, fail_on_stderr=True,
358 359 fail_on_return_code=True, **kwargs):
359 360 """
360 361 Initializes SubprocessIOChunker
361 362
362 363 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
363 364 :param inputstream: (Default: None) A file-like, string, or file pointer.
364 365 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
365 366 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
366 367 :param starting_values: (Default: []) An array of strings to put in front of output que.
367 368 :param fail_on_stderr: (Default: True) Whether to raise an exception in
368 369 case something is written to stderr.
369 370 :param fail_on_return_code: (Default: True) Whether to raise an
370 371 exception if the return code is not 0.
371 372 """
372 373
374 starting_values = starting_values or []
373 375 if inputstream:
374 376 input_streamer = StreamFeeder(inputstream)
375 377 input_streamer.start()
376 378 inputstream = input_streamer.output
377 379 self._close_input_fd = inputstream
378 380
379 381 self._fail_on_stderr = fail_on_stderr
380 382 self._fail_on_return_code = fail_on_return_code
381 383
382 384 _shell = kwargs.get('shell', True)
383 385 kwargs['shell'] = _shell
384 386
385 387 _p = subprocess.Popen(cmd, bufsize=-1,
386 388 stdin=inputstream,
387 389 stdout=subprocess.PIPE,
388 390 stderr=subprocess.PIPE,
389 391 **kwargs)
390 392
391 393 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
392 394 starting_values)
393 395 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
394 396
395 397 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
396 398 # doing this until we reach either end of file, or end of buffer.
397 399 bg_out.data_added_event.wait(1)
398 400 bg_out.data_added_event.clear()
399 401
400 402 # at this point it's still ambiguous if we are done reading or just full buffer.
401 403 # Either way, if error (returned by ended process, or implied based on
402 404 # presence of stuff in stderr output) we error out.
403 405 # Else, we are happy.
404 406 _returncode = _p.poll()
405 407
406 408 if ((_returncode and fail_on_return_code) or
407 409 (fail_on_stderr and _returncode is None and bg_err.length)):
408 410 try:
409 411 _p.terminate()
410 412 except Exception:
411 413 pass
412 414 bg_out.stop()
413 415 bg_err.stop()
414 416 if fail_on_stderr:
415 417 err = ''.join(bg_err)
416 418 raise EnvironmentError(
417 419 "Subprocess exited due to an error:\n" + err)
418 420 if _returncode and fail_on_return_code:
419 421 err = ''.join(bg_err)
420 422 raise EnvironmentError(
421 423 "Subprocess exited with non 0 ret code:%s: stderr:%s" % (
422 424 _returncode, err))
423 425
424 426 self.process = _p
425 427 self.output = bg_out
426 428 self.error = bg_err
427 429
428 430 def __iter__(self):
429 431 return self
430 432
431 433 def next(self):
432 434 # Note: mikhail: We need to be sure that we are checking the return
433 435 # code after the stdout stream is closed. Some processes, e.g. git
434 436 # are doing some magic in between closing stdout and terminating the
435 437 # process and, as a result, we are not getting return code on "slow"
436 438 # systems.
439 result = None
437 440 stop_iteration = None
438 441 try:
439 442 result = self.output.next()
440 443 except StopIteration as e:
441 444 stop_iteration = e
442 445
443 446 if self.process.poll() and self._fail_on_return_code:
444 447 err = '%s' % ''.join(self.error)
445 448 raise EnvironmentError(
446 449 "Subprocess exited due to an error:\n" + err)
447 450
448 451 if stop_iteration:
449 452 raise stop_iteration
450 453 return result
451 454
452 455 def throw(self, type, value=None, traceback=None):
453 456 if self.output.length or not self.output.done_reading:
454 457 raise type(value)
455 458
456 459 def close(self):
457 460 if self._closed:
458 461 return
459 462 self._closed = True
460 463 try:
461 464 self.process.terminate()
462 465 except:
463 466 pass
464 467 if self._close_input_fd:
465 468 os.close(self._close_input_fd)
466 469 try:
467 470 self.output.close()
468 471 except:
469 472 pass
470 473 try:
471 474 self.error.close()
472 475 except:
473 476 pass
474 477
475 478 def __del__(self):
476 479 self.close()
General Comments 0
You need to be logged in to leave comments. Login now