##// END OF EJS Templates
small docstring fixes
marcink -
r3895:e39fb661 beta
parent child Browse files
Show More
@@ -1,415 +1,415 b''
1 1 """
2 2 Module provides a class allowing to wrap communication over subprocess.Popen
3 3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 4 stream processor exposing the output data as an iterator fitting to be a
5 5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6 6
7 7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8 8
9 9 This file is part of git_http_backend.py Project.
10 10
11 11 git_http_backend.py Project is free software: you can redistribute it and/or
12 12 modify it under the terms of the GNU Lesser General Public License as
13 13 published by the Free Software Foundation, either version 2.1 of the License,
14 14 or (at your option) any later version.
15 15
16 16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 19 GNU Lesser General Public License for more details.
20 20
21 21 You should have received a copy of the GNU Lesser General Public License
22 22 along with git_http_backend.py Project.
23 23 If not, see <http://www.gnu.org/licenses/>.
24 24 """
25 25 import os
26 26 import subprocess
27 27 from rhodecode.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
28 28
29 29
30 30 class StreamFeeder(Thread):
31 31 """
32 32 Normal writing into pipe-like is blocking once the buffer is filled.
33 33 This thread allows a thread to seep data from a file-like into a pipe
34 34 without blocking the main thread.
35 35 We close inpipe once the end of the source stream is reached.
36 36 """
37 37 def __init__(self, source):
38 38 super(StreamFeeder, self).__init__()
39 39 self.daemon = True
40 40 filelike = False
41 41 self.bytes = _bytes()
42 42 if type(source) in (type(''), _bytes, _bytearray): # string-like
43 43 self.bytes = _bytes(source)
44 44 else: # can be either file pointer or file-like
45 45 if type(source) in (int, long): # file pointer it is
46 46 ## converting file descriptor (int) stdin into file-like
47 47 try:
48 48 source = os.fdopen(source, 'rb', 16384)
49 49 except Exception:
50 50 pass
51 51 # let's see if source is file-like by now
52 52 try:
53 53 filelike = source.read
54 54 except Exception:
55 55 pass
56 56 if not filelike and not self.bytes:
57 57 raise TypeError("StreamFeeder's source object must be a readable "
58 58 "file-like, a file descriptor, or a string-like.")
59 59 self.source = source
60 60 self.readiface, self.writeiface = os.pipe()
61 61
62 62 def run(self):
63 63 t = self.writeiface
64 64 if self.bytes:
65 65 os.write(t, self.bytes)
66 66 else:
67 67 s = self.source
68 68 b = s.read(4096)
69 69 while b:
70 70 os.write(t, b)
71 71 b = s.read(4096)
72 72 os.close(t)
73 73
74 74 @property
75 75 def output(self):
76 76 return self.readiface
77 77
78 78
79 79 class InputStreamChunker(Thread):
80 80 def __init__(self, source, target, buffer_size, chunk_size):
81 81
82 82 super(InputStreamChunker, self).__init__()
83 83
84 84 self.daemon = True # die die die.
85 85
86 86 self.source = source
87 87 self.target = target
88 88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 89 self.chunk_size = chunk_size
90 90
91 91 self.data_added = Event()
92 92 self.data_added.clear()
93 93
94 94 self.keep_reading = Event()
95 95 self.keep_reading.set()
96 96
97 97 self.EOF = Event()
98 98 self.EOF.clear()
99 99
100 100 self.go = Event()
101 101 self.go.set()
102 102
103 103 def stop(self):
104 104 self.go.clear()
105 105 self.EOF.set()
106 106 try:
107 107 # this is not proper, but is done to force the reader thread let
108 108 # go of the input because, if successful, .close() will send EOF
109 109 # down the pipe.
110 110 self.source.close()
111 111 except:
112 112 pass
113 113
114 114 def run(self):
115 115 s = self.source
116 116 t = self.target
117 117 cs = self.chunk_size
118 118 ccm = self.chunk_count_max
119 119 kr = self.keep_reading
120 120 da = self.data_added
121 121 go = self.go
122 122
123 123 try:
124 124 b = s.read(cs)
125 125 except ValueError:
126 126 b = ''
127 127
128 128 while b and go.is_set():
129 129 if len(t) > ccm:
130 130 kr.clear()
131 131 kr.wait(2)
132 132 # # this only works on 2.7.x and up
133 133 # if not kr.wait(10):
134 134 # raise Exception("Timed out while waiting for input to be read.")
135 135 # instead we'll use this
136 136 if len(t) > ccm + 3:
137 137 raise IOError("Timed out while waiting for input from subprocess.")
138 138 t.append(b)
139 139 da.set()
140 140 b = s.read(cs)
141 141 self.EOF.set()
142 142 da.set() # for cases when done but there was no input.
143 143
144 144
145 145 class BufferedGenerator():
146 146 """
147 147 Class behaves as a non-blocking, buffered pipe reader.
148 148 Reads chunks of data (through a thread)
149 149 from a blocking pipe, and attaches these to an array (Deque) of chunks.
150 150 Reading is halted in the thread when max chunks is internally buffered.
151 151 The .next() may operate in blocking or non-blocking fashion by yielding
152 152 '' if no data is ready
153 153 to be sent or by not returning until there is some data to send
154 154 When we get EOF from underlying source pipe we raise the marker to raise
155 155 StopIteration after the last chunk of data is yielded.
156 156 """
157 157
158 158 def __init__(self, source, buffer_size=65536, chunk_size=4096,
159 159 starting_values=[], bottomless=False):
160 160
161 161 if bottomless:
162 162 maxlen = int(buffer_size / chunk_size)
163 163 else:
164 164 maxlen = None
165 165
166 166 self.data = deque(starting_values, maxlen)
167 167
168 168 self.worker = InputStreamChunker(source, self.data, buffer_size,
169 169 chunk_size)
170 170 if starting_values:
171 171 self.worker.data_added.set()
172 172 self.worker.start()
173 173
174 174 ####################
175 175 # Generator's methods
176 176 ####################
177 177
178 178 def __iter__(self):
179 179 return self
180 180
181 181 def next(self):
182 182 while not len(self.data) and not self.worker.EOF.is_set():
183 183 self.worker.data_added.clear()
184 184 self.worker.data_added.wait(0.2)
185 185 if len(self.data):
186 186 self.worker.keep_reading.set()
187 187 return _bytes(self.data.popleft())
188 188 elif self.worker.EOF.is_set():
189 189 raise StopIteration
190 190
191 191 def throw(self, type, value=None, traceback=None):
192 192 if not self.worker.EOF.is_set():
193 193 raise type(value)
194 194
195 195 def start(self):
196 196 self.worker.start()
197 197
198 198 def stop(self):
199 199 self.worker.stop()
200 200
201 201 def close(self):
202 202 try:
203 203 self.worker.stop()
204 204 self.throw(GeneratorExit)
205 205 except (GeneratorExit, StopIteration):
206 206 pass
207 207
208 208 def __del__(self):
209 209 self.close()
210 210
211 211 ####################
212 212 # Threaded reader's infrastructure.
213 213 ####################
214 214 @property
215 215 def input(self):
216 216 return self.worker.w
217 217
218 218 @property
219 219 def data_added_event(self):
220 220 return self.worker.data_added
221 221
222 222 @property
223 223 def data_added(self):
224 224 return self.worker.data_added.is_set()
225 225
226 226 @property
227 227 def reading_paused(self):
228 228 return not self.worker.keep_reading.is_set()
229 229
230 230 @property
231 231 def done_reading_event(self):
232 232 """
233 233 Done_reding does not mean that the iterator's buffer is empty.
234 234 Iterator might have done reading from underlying source, but the read
235 235 chunks might still be available for serving through .next() method.
236 236
237 @return An Event class instance.
237 :returns: An Event class instance.
238 238 """
239 239 return self.worker.EOF
240 240
241 241 @property
242 242 def done_reading(self):
243 243 """
244 244 Done_reding does not mean that the iterator's buffer is empty.
245 245 Iterator might have done reading from underlying source, but the read
246 246 chunks might still be available for serving through .next() method.
247 247
248 @return An Bool value.
248 :returns: An Bool value.
249 249 """
250 250 return self.worker.EOF.is_set()
251 251
252 252 @property
253 253 def length(self):
254 254 """
255 255 returns int.
256 256
257 257 This is the lenght of the que of chunks, not the length of
258 258 the combined contents in those chunks.
259 259
260 260 __len__() cannot be meaningfully implemented because this
261 261 reader is just flying throuh a bottomless pit content and
262 262 can only know the lenght of what it already saw.
263 263
264 264 If __len__() on WSGI server per PEP 3333 returns a value,
265 265 the responce's length will be set to that. In order not to
266 266 confuse WSGI PEP3333 servers, we will not implement __len__
267 267 at all.
268 268 """
269 269 return len(self.data)
270 270
271 271 def prepend(self, x):
272 272 self.data.appendleft(x)
273 273
274 274 def append(self, x):
275 275 self.data.append(x)
276 276
277 277 def extend(self, o):
278 278 self.data.extend(o)
279 279
280 280 def __getitem__(self, i):
281 281 return self.data[i]
282 282
283 283
284 284 class SubprocessIOChunker(object):
285 285 """
286 286 Processor class wrapping handling of subprocess IO.
287 287
288 288 In a way, this is a "communicate()" replacement with a twist.
289 289
290 290 - We are multithreaded. Writing in and reading out, err are all sep threads.
291 291 - We support concurrent (in and out) stream processing.
292 292 - The output is not a stream. It's a queue of read string (bytes, not unicode)
293 293 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
294 294 - We are non-blocking in more respects than communicate()
295 295 (reading from subprocess out pauses when internal buffer is full, but
296 296 does not block the parent calling code. On the flip side, reading from
297 297 slow-yielding subprocess may block the iteration until data shows up. This
298 298 does not block the parallel inpipe reading occurring parallel thread.)
299 299
300 300 The purpose of the object is to allow us to wrap subprocess interactions into
301 301 and interable that can be passed to a WSGI server as the application's return
302 302 value. Because of stream-processing-ability, WSGI does not have to read ALL
303 303 of the subprocess's output and buffer it, before handing it to WSGI server for
304 304 HTTP response. Instead, the class initializer reads just a bit of the stream
305 305 to figure out if error ocurred or likely to occur and if not, just hands the
306 306 further iteration over subprocess output to the server for completion of HTTP
307 307 response.
308 308
309 309 The real or perceived subprocess error is trapped and raised as one of
310 310 EnvironmentError family of exceptions
311 311
312 312 Example usage:
313 313 # try:
314 314 # answer = SubprocessIOChunker(
315 315 # cmd,
316 316 # input,
317 317 # buffer_size = 65536,
318 318 # chunk_size = 4096
319 319 # )
320 320 # except (EnvironmentError) as e:
321 321 # print str(e)
322 322 # raise e
323 323 #
324 324 # return answer
325 325
326 326
327 327 """
328 328 def __init__(self, cmd, inputstream=None, buffer_size=65536,
329 329 chunk_size=4096, starting_values=[], **kwargs):
330 330 """
331 331 Initializes SubprocessIOChunker
332 332
333 333 :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
334 334 :param inputstream: (Default: None) A file-like, string, or file pointer.
335 335 :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
336 336 :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
337 337 :param starting_values: (Default: []) An array of strings to put in front of output que.
338 338 """
339 339
340 340 if inputstream:
341 341 input_streamer = StreamFeeder(inputstream)
342 342 input_streamer.start()
343 343 inputstream = input_streamer.output
344 344
345 345 _shell = kwargs.get('shell', True)
346 346 if isinstance(cmd, (list, tuple)):
347 347 cmd = ' '.join(cmd)
348 348
349 349 kwargs['shell'] = _shell
350 350 _p = subprocess.Popen(cmd,
351 351 bufsize=-1,
352 352 stdin=inputstream,
353 353 stdout=subprocess.PIPE,
354 354 stderr=subprocess.PIPE,
355 355 **kwargs
356 356 )
357 357
358 358 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
359 359 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
360 360
361 361 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
362 362 # doing this until we reach either end of file, or end of buffer.
363 363 bg_out.data_added_event.wait(1)
364 364 bg_out.data_added_event.clear()
365 365
366 366 # at this point it's still ambiguous if we are done reading or just full buffer.
367 367 # Either way, if error (returned by ended process, or implied based on
368 368 # presence of stuff in stderr output) we error out.
369 369 # Else, we are happy.
370 370 _returncode = _p.poll()
371 371 if _returncode or (_returncode is None and bg_err.length):
372 372 try:
373 373 _p.terminate()
374 374 except:
375 375 pass
376 376 bg_out.stop()
377 377 bg_err.stop()
378 378 err = '%s' % ''.join(bg_err)
379 379 if err:
380 380 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
381 381 raise EnvironmentError("Subprocess exited with non 0 ret code:%s" % _returncode)
382 382
383 383 self.process = _p
384 384 self.output = bg_out
385 385 self.error = bg_err
386 386
387 387 def __iter__(self):
388 388 return self
389 389
390 390 def next(self):
391 391 if self.process.poll():
392 392 err = '%s' % ''.join(self.error)
393 393 raise EnvironmentError("Subprocess exited due to an error:\n" + err)
394 394 return self.output.next()
395 395
396 396 def throw(self, type, value=None, traceback=None):
397 397 if self.output.length or not self.output.done_reading:
398 398 raise type(value)
399 399
400 400 def close(self):
401 401 try:
402 402 self.process.terminate()
403 403 except:
404 404 pass
405 405 try:
406 406 self.output.close()
407 407 except:
408 408 pass
409 409 try:
410 410 self.error.close()
411 411 except:
412 412 pass
413 413
414 414 def __del__(self):
415 415 self.close()
General Comments 0
You need to be logged in to leave comments. Login now