##// END OF EJS Templates
Simplify StreamCapturer for subprocess testing...
Thomas Kluyver -
Show More
@@ -43,11 +43,10 b' KC = None'
43 def start_new_kernel(argv=None):
43 def start_new_kernel(argv=None):
44 """start a new kernel, and return its Manager and Client"""
44 """start a new kernel, and return its Manager and Client"""
45 km = KernelManager()
45 km = KernelManager()
46 kwargs = dict(stdout=PIPE, stderr=STDOUT)
46 kwargs = dict(stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT)
47 if argv:
47 if argv:
48 kwargs['extra_arguments'] = argv
48 kwargs['extra_arguments'] = argv
49 km.start_kernel(**kwargs)
49 km.start_kernel(**kwargs)
50 nose.ipy_stream_capturer.add_stream(km.kernel.stdout.fileno())
51 nose.ipy_stream_capturer.ensure_started()
50 nose.ipy_stream_capturer.ensure_started()
52 kc = km.client()
51 kc = km.client()
53 kc.start_channels()
52 kc.start_channels()
@@ -38,7 +38,7 b' class TestProcessLauncher(LocalProcessLauncher):'
38 def start(self):
38 def start(self):
39 if self.state == 'before':
39 if self.state == 'before':
40 self.process = Popen(self.args,
40 self.process = Popen(self.args,
41 stdout=PIPE, stderr=STDOUT,
41 stdout=nose.ipy_stream_capturer.writefd, stderr=STDOUT,
42 env=os.environ,
42 env=os.environ,
43 cwd=self.work_dir
43 cwd=self.work_dir
44 )
44 )
@@ -46,7 +46,6 b' class TestProcessLauncher(LocalProcessLauncher):'
46 self.poll = self.process.poll
46 self.poll = self.process.poll
47 # Store stdout & stderr to show with failing tests.
47 # Store stdout & stderr to show with failing tests.
48 # This is defined in IPython.testing.iptest
48 # This is defined in IPython.testing.iptest
49 nose.ipy_stream_capturer.add_stream(self.process.stdout.fileno())
50 nose.ipy_stream_capturer.ensure_started()
49 nose.ipy_stream_capturer.ensure_started()
51 else:
50 else:
52 s = 'The process was already started and has state: %r' % self.state
51 s = 'The process was already started and has state: %r' % self.state
@@ -114,7 +113,6 b' def teardown():'
114 time.sleep(1)
113 time.sleep(1)
115 while launchers:
114 while launchers:
116 p = launchers.pop()
115 p = launchers.pop()
117 nose.ipy_stream_capturer.remove_stream(p.process.stdout.fileno())
118 if p.poll() is None:
116 if p.poll() is None:
119 try:
117 try:
120 p.stop()
118 p.stop()
@@ -365,47 +365,22 b' class StreamCapturer(Thread):'
365 super(StreamCapturer, self).__init__()
365 super(StreamCapturer, self).__init__()
366 self.streams = []
366 self.streams = []
367 self.buffer = BytesIO()
367 self.buffer = BytesIO()
368 self.streams_lock = Lock()
368 self.readfd, self.writefd = os.pipe()
369 self.buffer_lock = Lock()
369 self.buffer_lock = Lock()
370 self.stream_added = Event()
371 self.stop = Event()
370 self.stop = Event()
372
371
373 def run(self):
372 def run(self):
374 self.started = True
373 self.started = True
374
375 while not self.stop.is_set():
375 while not self.stop.is_set():
376 with self.streams_lock:
376 ready = select([self.readfd], [], [], 1)[0]
377 streams = self.streams
377
378
378 if ready:
379 if not streams:
379 with self.buffer_lock:
380 self.stream_added.wait(timeout=1)
380 self.buffer.write(os.read(self.readfd, 1024))
381 self.stream_added.clear()
382 continue
383
384 ready = select(streams, [], [], 0.5)[0]
385 dead = []
386 with self.buffer_lock:
387 for fd in ready:
388 try:
389 self.buffer.write(os.read(fd, 1024))
390 except OSError as e:
391 import errno
392 if e.errno == errno.EBADF:
393 dead.append(fd)
394 else:
395 raise
396
397 with self.streams_lock:
398 for fd in dead:
399 self.streams.remove(fd)
400
401 def add_stream(self, fd):
402 with self.streams_lock:
403 self.streams.append(fd)
404 self.stream_added.set()
405
381
406 def remove_stream(self, fd):
382 os.close(self.readfd)
407 with self.streams_lock:
383 os.close(self.writefd)
408 self.streams.remove(fd)
409
384
410 def reset_buffer(self):
385 def reset_buffer(self):
411 with self.buffer_lock:
386 with self.buffer_lock:
@@ -426,7 +401,7 b' class SubprocessStreamCapturePlugin(Plugin):'
426 Plugin.__init__(self)
401 Plugin.__init__(self)
427 self.stream_capturer = StreamCapturer()
402 self.stream_capturer = StreamCapturer()
428 # This is ugly, but distant parts of the test machinery need to be able
403 # This is ugly, but distant parts of the test machinery need to be able
429 # to add streams, so we make the object globally accessible.
404 # to redirect streams, so we make the object globally accessible.
430 nose.ipy_stream_capturer = self.stream_capturer
405 nose.ipy_stream_capturer = self.stream_capturer
431
406
432 def configure(self, options, config):
407 def configure(self, options, config):
General Comments 0
You need to be logged in to leave comments. Login now