##// END OF EJS Templates
Simplify StreamCapturer for subprocess testing...
Thomas Kluyver -
Show More
@@ -43,11 +43,10 b' KC = None'
43 43 def start_new_kernel(argv=None):
44 44 """start a new kernel, and return its Manager and Client"""
45 45 km = KernelManager()
46 kwargs = dict(stdout=PIPE, stderr=STDOUT)
46 kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT)
47 47 if argv:
48 48 kwargs['extra_arguments'] = argv
49 49 km.start_kernel(**kwargs)
50 nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno())
51 50 nose.ipy_stream_capturer.ensure_started()
52 51 kc = km.client()
53 52 kc.start_channels()
@@ -38,7 +38,7 b' class TestProcessLauncher(LocalProcessLauncher):'
38 38 def start(self):
39 39 if self.state == 'before':
40 40 self.process = Popen(self.args,
41 stdout=PIPE, stderr=STDOUT,
41 stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT,
42 42 env=os.environ,
43 43 cwd=self.work_dir
44 44 )
@@ -46,7 +46,6 b' class TestProcessLauncher(LocalProcessLauncher):'
46 46 self.poll = self.process.poll
47 47 # Store stdout & stderr to show with failing tests.
48 48 # This is defined in IPython.testing.iptest
49 nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno())
50 49 nose.ipy_stream_capturer.ensure_started()
51 50 else:
52 51 s = 'The process was already started and has state: %r' % self.state
@@ -114,7 +113,6 b' def teardown():'
114 113 time.sleep(1)
115 114 while launchers:
116 115 p = launchers.pop()
117 nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno())
118 116 if p.poll() is None:
119 117 try:
120 118 p.stop()
@@ -365,47 +365,22 b' class StreamCapturer(Thread):'
365 365 super(StreamCapturer, self).__init__()
366 366 self.streams = []
367 367 self.buffer = BytesIO()
368 self.streams_lock = Lock()
368 self.readfd, self.writefd = os.pipe()
369 369 self.buffer_lock = Lock()
370 self.stream_added = Event()
371 370 self.stop = Event()
372 371
373 372 def run(self):
374 373 self.started = True
375 while not self.stop.is_set():
376 with self.streams_lock:
377 streams = self.streams
378 374
379 if not streams:
380 self.stream_added.wait(timeout=1)
381 self.stream_added.clear()
382 continue
375 while not self.stop.is_set():
376 ready = select([self.readfd], [], [], 1)[0]
383 377
384 ready = select(streams, [], [], 0.5)[0]
385 dead = []
378 if ready:
386 379 with self.buffer_lock:
387 for fd in ready:
388 try:
389 self.buffer.write(os.read(fd, 1024))
390 except OSError as e:
391 import errno
392 if e.errno == errno.EBADF:
393 dead.append(fd)
394 else:
395 raise
396
397 with self.streams_lock:
398 for fd in dead:
399 self.streams.remove(fd)
400
401 def add_stream(self, fd):
402 with self.streams_lock:
403 self.streams.append(fd)
404 self.stream_added.set()
380 self.buffer.write(os.read(self.readfd, 1024))
405 381
406 def remove_stream(self, fd):
407 with self.streams_lock:
408 self.streams.remove(fd)
382 os.close(self.readfd)
383 os.close(self.writefd)
409 384
410 385 def reset_buffer(self):
411 386 with self.buffer_lock:
@@ -426,7 +401,7 b' class SubprocessStreamCapturePlugin(Plugin):'
426 401 Plugin.__init__(self)
427 402 self.stream_capturer = StreamCapturer()
428 403 # This is ugly, but distant parts of the test machinery need to be able
429 # to add streams, so we make the object globally accessible.
404 # to redirect streams, so we make the object globally accessible.
430 405 nose.ipy_stream_capturer = self.stream_capturer
431 406
432 407 def configure(self, options, config):
General Comments 0
You need to be logged in to leave comments. Login now