##// END OF EJS Templates
Add optional parameters to subprocessio that allow passing params to Popen
marcink -
r2399:a8635cda beta
parent child Browse files
Show More
@@ -1,401 +1,402 b''
1 1 '''
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 '''
25 25 import os
26 26 import subprocess
27 27 import threading
28 28 from collections import deque
29 29
30 30
31 31 class StreamFeeder(threading.Thread):
32 32 """
33 33 Normal writing into pipe-like is blocking once the buffer is filled.
34 34 This thread allows a thread to seep data from a file-like into a pipe
35 35 without blocking the main thread.
36 36 We close inpipe once the end of the source stream is reached.
37 37 """
38 38 def __init__(self, source):
39 39 super(StreamFeeder, self).__init__()
40 40 self.daemon = True
41 41 filelike = False
42 42 self.bytes = b''
43 43 if type(source) in (type(''), bytes, bytearray): # string-like
44 44 self.bytes = bytes(source)
45 45 else: # can be either file pointer or file-like
46 46 if type(source) in (int, long): # file pointer it is
47 47 ## converting file descriptor (int) stdin into file-like
48 48 try:
49 49 source = os.fdopen(source, 'rb', 16384)
50 50 except:
51 51 pass
52 52 # let's see if source is file-like by now
53 53 try:
54 54 filelike = source.read
55 55 except:
56 56 pass
57 57 if not filelike and not self.bytes:
58 58 raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
59 59 self.source = source
60 60 self.readiface, self.writeiface = os.pipe()
61 61
62 62 def run(self):
63 63 t = self.writeiface
64 64 if self.bytes:
65 65 os.write(t, self.bytes)
66 66 else:
67 67 s = self.source
68 68 b = s.read(4096)
69 69 while b:
70 70 os.write(t, b)
71 71 b = s.read(4096)
72 72 os.close(t)
73 73
74 74 @property
75 75 def output(self):
76 76 return self.readiface
77 77
78 78
79 79 class InputStreamChunker(threading.Thread):
80 80 def __init__(self, source, target, buffer_size, chunk_size):
81 81
82 82 super(InputStreamChunker, self).__init__()
83 83
84 84 self.daemon = True # die die die.
85 85
86 86 self.source = source
87 87 self.target = target
88 88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 89 self.chunk_size = chunk_size
90 90
91 91 self.data_added = threading.Event()
92 92 self.data_added.clear()
93 93
94 94 self.keep_reading = threading.Event()
95 95 self.keep_reading.set()
96 96
97 97 self.EOF = threading.Event()
98 98 self.EOF.clear()
99 99
100 100 self.go = threading.Event()
101 101 self.go.set()
102 102
103 103 def stop(self):
104 104 self.go.clear()
105 105 self.EOF.set()
106 106 try:
107 107 # this is not proper, but is done to force the reader thread let
108 108 # go of the input because, if successful, .close() will send EOF
109 109 # down the pipe.
110 110 self.source.close()
111 111 except:
112 112 pass
113 113
114 114 def run(self):
115 115 s = self.source
116 116 t = self.target
117 117 cs = self.chunk_size
118 118 ccm = self.chunk_count_max
119 119 kr = self.keep_reading
120 120 da = self.data_added
121 121 go = self.go
122 122 b = s.read(cs)
123 123 while b and go.is_set():
124 124 if len(t) > ccm:
125 125 kr.clear()
126 126 kr.wait(2)
127 127 # # this only works on 2.7.x and up
128 128 # if not kr.wait(10):
129 129 # raise Exception("Timed out while waiting for input to be read.")
130 130 # instead we'll use this
131 131 if len(t) > ccm + 3:
132 132 raise IOError("Timed out while waiting for input from subprocess.")
133 133 t.append(b)
134 134 da.set()
135 135 b = s.read(cs)
136 136 self.EOF.set()
137 137 da.set() # for cases when done but there was no input.
138 138
139 139
140 140 class BufferedGenerator():
141 141 '''
142 142 Class behaves as a non-blocking, buffered pipe reader.
143 143 Reads chunks of data (through a thread)
144 144 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 145 Reading is halted in the thread when max chunks is internally buffered.
146 146 The .next() may operate in blocking or non-blocking fashion by yielding
147 147 '' if no data is ready
148 148 to be sent or by not returning until there is some data to send
149 149 When we get EOF from underlying source pipe we raise the marker to raise
150 150 StopIteration after the last chunk of data is yielded.
151 151 '''
152 152
153 153 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 154 starting_values=[], bottomless=False):
155 155
156 156 if bottomless:
157 157 maxlen = int(buffer_size / chunk_size)
158 158 else:
159 159 maxlen = None
160 160
161 161 self.data = deque(starting_values, maxlen)
162 162
163 163 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 164 chunk_size)
165 165 if starting_values:
166 166 self.worker.data_added.set()
167 167 self.worker.start()
168 168
169 169 ####################
170 170 # Generator's methods
171 171 ####################
172 172
173 173 def __iter__(self):
174 174 return self
175 175
176 176 def next(self):
177 177 while not len(self.data) and not self.worker.EOF.is_set():
178 178 self.worker.data_added.clear()
179 179 self.worker.data_added.wait(0.2)
180 180 if len(self.data):
181 181 self.worker.keep_reading.set()
182 182 return bytes(self.data.popleft())
183 183 elif self.worker.EOF.is_set():
184 184 raise StopIteration
185 185
186 186 def throw(self, type, value=None, traceback=None):
187 187 if not self.worker.EOF.is_set():
188 188 raise type(value)
189 189
190 190 def start(self):
191 191 self.worker.start()
192 192
193 193 def stop(self):
194 194 self.worker.stop()
195 195
196 196 def close(self):
197 197 try:
198 198 self.worker.stop()
199 199 self.throw(GeneratorExit)
200 200 except (GeneratorExit, StopIteration):
201 201 pass
202 202
203 203 def __del__(self):
204 204 self.close()
205 205
206 206 ####################
207 207 # Threaded reader's infrastructure.
208 208 ####################
209 209 @property
210 210 def input(self):
211 211 return self.worker.w
212 212
213 213 @property
214 214 def data_added_event(self):
215 215 return self.worker.data_added
216 216
217 217 @property
218 218 def data_added(self):
219 219 return self.worker.data_added.is_set()
220 220
221 221 @property
222 222 def reading_paused(self):
223 223 return not self.worker.keep_reading.is_set()
224 224
225 225 @property
226 226 def done_reading_event(self):
227 227 '''
228 228 Done_reding does not mean that the iterator's buffer is empty.
229 229 Iterator might have done reading from underlying source, but the read
230 230 chunks might still be available for serving through .next() method.
231 231
232 232 @return An Event class instance.
233 233 '''
234 234 return self.worker.EOF
235 235
236 236 @property
237 237 def done_reading(self):
238 238 '''
239 239 Done_reding does not mean that the iterator's buffer is empty.
240 240 Iterator might have done reading from underlying source, but the read
241 241 chunks might still be available for serving through .next() method.
242 242
243 243 @return An Bool value.
244 244 '''
245 245 return self.worker.EOF.is_set()
246 246
247 247 @property
248 248 def length(self):
249 249 '''
250 250 returns int.
251 251
252 252 This is the lenght of the que of chunks, not the length of
253 253 the combined contents in those chunks.
254 254
255 255 __len__() cannot be meaningfully implemented because this
256 256 reader is just flying throuh a bottomless pit content and
257 257 can only know the lenght of what it already saw.
258 258
259 259 If __len__() on WSGI server per PEP 3333 returns a value,
260 260 the responce's length will be set to that. In order not to
261 261 confuse WSGI PEP3333 servers, we will not implement __len__
262 262 at all.
263 263 '''
264 264 return len(self.data)
265 265
266 266 def prepend(self, x):
267 267 self.data.appendleft(x)
268 268
269 269 def append(self, x):
270 270 self.data.append(x)
271 271
272 272 def extend(self, o):
273 273 self.data.extend(o)
274 274
275 275 def __getitem__(self, i):
276 276 return self.data[i]
277 277
278 278
279 class SubprocessIOChunker():
279 class SubprocessIOChunker(object):
280 280 '''
281 281 Processor class wrapping handling of subprocess IO.
282 282
283 283 In a way, this is a "communicate()" replacement with a twist.
284 284
285 285 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 286 - We support concurrent (in and out) stream processing.
287 287 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 288 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 289 - We are non-blocking in more respects than communicate()
290 290 (reading from subprocess out pauses when internal buffer is full, but
291 291 does not block the parent calling code. On the flip side, reading from
292 292 slow-yielding subprocess may block the iteration until data shows up. This
293 293 does not block the parallel inpipe reading occurring parallel thread.)
294 294
295 295 The purpose of the object is to allow us to wrap subprocess interactions into
296 296 and interable that can be passed to a WSGI server as the application's return
297 297 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 298 of the subprocess's output and buffer it, before handing it to WSGI server for
299 299 HTTP response. Instead, the class initializer reads just a bit of the stream
300 300 to figure out if error ocurred or likely to occur and if not, just hands the
301 301 further iteration over subprocess output to the server for completion of HTTP
302 302 response.
303 303
304 304 The real or perceived subprocess error is trapped and raised as one of
305 305 EnvironmentError family of exceptions
306 306
307 307 Example usage:
308 308 # try:
309 309 # answer = SubprocessIOChunker(
310 310 # cmd,
311 311 # input,
312 312 # buffer_size = 65536,
313 313 # chunk_size = 4096
314 314 # )
315 315 # except (EnvironmentError) as e:
316 316 # print str(e)
317 317 # raise e
318 318 #
319 319 # return answer
320 320
321 321
322 322 '''
323 323 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 chunk_size=4096, starting_values=[]):
324 chunk_size=4096, starting_values=[], **kwargs):
325 325 '''
326 326 Initializes SubprocessIOChunker
327 327
328 328 @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
329 329 @param inputstream (Default: None) A file-like, string, or file pointer.
330 330 @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
331 331 @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 332 @param starting_values (Default: []) An array of strings to put in front of output que.
333 333 '''
334 334
335 335 if inputstream:
336 336 input_streamer = StreamFeeder(inputstream)
337 337 input_streamer.start()
338 338 inputstream = input_streamer.output
339 339
340 340 _p = subprocess.Popen(cmd,
341 341 bufsize=-1,
342 342 shell=True,
343 343 stdin=inputstream,
344 344 stdout=subprocess.PIPE,
345 stderr=subprocess.PIPE
345 stderr=subprocess.PIPE,
346 **kwargs
346 347 )
347 348
348 349 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
349 350 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
350 351
351 352 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
352 353 # doing this until we reach either end of file, or end of buffer.
353 354 bg_out.data_added_event.wait(1)
354 355 bg_out.data_added_event.clear()
355 356
356 357 # at this point it's still ambiguous if we are done reading or just full buffer.
357 358 # Either way, if error (returned by ended process, or implied based on
358 359 # presence of stuff in stderr output) we error out.
359 360 # Else, we are happy.
360 361 _returncode = _p.poll()
361 362 if _returncode or (_returncode == None and bg_err.length):
362 363 try:
363 364 _p.terminate()
364 365 except:
365 366 pass
366 367 bg_out.stop()
367 368 bg_err.stop()
368 369 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
369 370
370 371 self.process = _p
371 372 self.output = bg_out
372 373 self.error = bg_err
373 374
374 375 def __iter__(self):
375 376 return self
376 377
377 378 def next(self):
378 379 if self.process.poll():
379 380 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
380 381 return self.output.next()
381 382
382 383 def throw(self, type, value=None, traceback=None):
383 384 if self.output.length or not self.output.done_reading:
384 385 raise type(value)
385 386
386 387 def close(self):
387 388 try:
388 389 self.process.terminate()
389 390 except:
390 391 pass
391 392 try:
392 393 self.output.close()
393 394 except:
394 395 pass
395 396 try:
396 397 self.error.close()
397 398 except:
398 399 pass
399 400
400 401 def __del__(self):
401 402 self.close()
General Comments 0
You need to be logged in to leave comments. Login now