test_process.py
63 lines
| 1.6 KiB
| text/x-python
|
PythonLexer
Gael Varoquaux
|
r1457 | # encoding: utf-8 | ||
""" | ||||
Test process execution and IO redirection. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is | ||||
# in the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
from cStringIO import StringIO | ||||
from time import sleep | ||||
import sys | ||||
from IPython.frontend._process import PipedProcess | ||||
def test_capture_out(): | ||||
""" A simple test to see if we can execute a process and get the output. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess('echo 1', out_callback=s.write, ) | ||||
p.start() | ||||
p.join() | ||||
assert s.getvalue() == '1\n' | ||||
def test_io(): | ||||
""" Checks that we can send characters on stdin to the process. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"', | ||||
out_callback=s.write, ) | ||||
p.start() | ||||
test_string = '12345\n' | ||||
while not hasattr(p, 'process'): | ||||
sleep(0.1) | ||||
p.process.stdin.write(test_string) | ||||
p.join() | ||||
assert s.getvalue() == test_string | ||||
def test_kill(): | ||||
""" Check that we can kill a process, and its subprocess. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess(sys.executable + ' -c "a = raw_input();"', | ||||
out_callback=s.write, ) | ||||
p.start() | ||||
while not hasattr(p, 'process'): | ||||
sleep(0.1) | ||||
p.process.kill() | ||||
assert p.process.poll() is not None | ||||
if __name__ == '__main__': | ||||
test_capture_out() | ||||
test_io() | ||||
test_kill() | ||||