##// END OF EJS Templates
allow engines to wait for url_files to arrive...
allow engines to wait for url_files to arrive if connection info (url, ip, etc.) is passed explicitly, the engine will not wait for the file. If the file never arrives and the connection info was not specified manually, the app will abort. This is primarily useful in batch-systems where engines might start before the controller has a chance to write connection files. (delay does not help in these cases, because it could be hours before jobs start)

File last commit:

r2661:595fc3b9
r4120:890c6020
Show More
test_process.py
68 lines | 1.8 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
Test process execution and IO redirection.
"""
__docformat__ = "restructuredtext en"
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is
# in the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from cStringIO import StringIO
from time import sleep
import sys
from IPython.frontend.process import PipedProcess
from IPython.testing import decorators as testdec
def test_capture_out():
""" A simple test to see if we can execute a process and get the output.
"""
s = StringIO()
p = PipedProcess('echo 1', out_callback=s.write, )
p.start()
p.join()
result = s.getvalue().rstrip()
assert result == '1'
def test_io():
""" Checks that we can send characters on stdin to the process.
"""
s = StringIO()
p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"',
out_callback=s.write, )
p.start()
test_string = '12345\n'
while not hasattr(p, 'process'):
sleep(0.1)
p.process.stdin.write(test_string)
p.join()
result = s.getvalue()
assert result == test_string
@testdec.skip_win32
def test_kill():
""" Check that we can kill a process, and its subprocess.
"""
s = StringIO()
p = PipedProcess(sys.executable + ' -c "a = raw_input();"',
out_callback=s.write, )
p.start()
while not hasattr(p, 'process'):
sleep(0.1)
p.process.kill()
assert p.process.poll() is not None
if __name__ == '__main__':
test_capture_out()
test_io()
test_kill()