test_process.py
67 lines
| 1.7 KiB
| text/x-python
|
PythonLexer
Gael Varoquaux
|
r1457 | # encoding: utf-8 | ||
""" | ||||
Test process execution and IO redirection. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is | ||||
# in the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
from cStringIO import StringIO | ||||
from time import sleep | ||||
import sys | ||||
Gael Varoquaux
|
r1947 | from IPython.frontend.process import PipedProcess | ||
Brian Granger
|
r1561 | from IPython.testing import decorators as testdec | ||
Gael Varoquaux
|
r1457 | |||
Brian Granger
|
r1616 | |||
Gael Varoquaux
|
r1457 | def test_capture_out(): | ||
""" A simple test to see if we can execute a process and get the output. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess('echo 1', out_callback=s.write, ) | ||||
p.start() | ||||
p.join() | ||||
Brian Granger
|
r1561 | result = s.getvalue().rstrip() | ||
assert result == '1' | ||||
Gael Varoquaux
|
r1457 | |||
Brian Granger
|
r1616 | |||
Gael Varoquaux
|
r1457 | def test_io(): | ||
""" Checks that we can send characters on stdin to the process. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"', | ||||
out_callback=s.write, ) | ||||
p.start() | ||||
test_string = '12345\n' | ||||
while not hasattr(p, 'process'): | ||||
sleep(0.1) | ||||
p.process.stdin.write(test_string) | ||||
p.join() | ||||
Brian Granger
|
r1561 | result = s.getvalue() | ||
assert result == test_string | ||||
Gael Varoquaux
|
r1457 | |||
def test_kill(): | ||||
""" Check that we can kill a process, and its subprocess. | ||||
""" | ||||
s = StringIO() | ||||
p = PipedProcess(sys.executable + ' -c "a = raw_input();"', | ||||
out_callback=s.write, ) | ||||
p.start() | ||||
while not hasattr(p, 'process'): | ||||
sleep(0.1) | ||||
p.process.kill() | ||||
assert p.process.poll() is not None | ||||
if __name__ == '__main__': | ||||
test_capture_out() | ||||
test_io() | ||||
test_kill() | ||||