##// END OF EJS Templates
More tests for frontends.
Gael Varoquaux -
Show More
@@ -0,0 +1,66 b''
1 # encoding: utf-8
2 """
3 Test process execution and IO redirection.
4 """
5
6 __docformat__ = "restructuredtext en"
7
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is
12 # in the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
14
15 from cStringIO import StringIO
16 from time import sleep
17 import sys
18
19 from IPython.frontend._process import PipedProcess
20
21 def test_capture_out():
22 """ A simple test to see if we can execute a process and get the output.
23 """
24 s = StringIO()
25 p = PipedProcess('echo 1', out_callback=s.write, )
26 p.start()
27 p.join()
28 assert s.getvalue() == '1\n'
29
30
31 def test_io():
32 """ Checks that we can send characters on stdin to the process.
33 """
34 s = StringIO()
35 p = PipedProcess(sys.executable + ' -c "a = raw_input(); print a"',
36 out_callback=s.write, )
37 p.start()
38 test_string = '12345\n'
39 while not hasattr(p, 'process'):
40 sleep(0.1)
41 p.process.stdin.write(test_string)
42 p.join()
43 assert s.getvalue() == test_string
44
45
46 def test_kill():
47 pass
48
49 if True:
50 """ Check that we can kill a process, and its subprocess.
51 """
52 s = StringIO()
53 p = PipedProcess(sys.executable + ' -c "a = raw_input();"',
54 out_callback=s.write, )
55 p.start()
56 while not hasattr(p, 'process'):
57 sleep(0.1)
58 p.process.kill()
59 assert p.process.poll() is not None
60
61
62 if __name__ == '__main__':
63 test_capture_out()
64 test_io()
65 test_kill()
66
@@ -1,55 +1,70 b''
1 1 # encoding: utf-8
2 2 """
3 3 Object for encapsulating process execution by using callbacks for stdout,
4 4 stderr and stdin.
5 5 """
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18 from killableprocess import Popen, PIPE
19 19 from threading import Thread
20 20 from time import sleep
21 21
22 22 class PipedProcess(Thread):
23 """ Class that encapsulates process execution by using callbacks for
24 stdout, stderr and stdin, and providing a reliable way of
25 killing it.
26 """
23 27
24 28 def __init__(self, command_string, out_callback,
25 29 end_callback=None,):
30 """ command_string: the command line executed to start the
31 process.
32
33 out_callback: the python callable called on stdout/stderr.
34
35 end_callback: an optional callable called when the process
36 finishes.
37
38 These callbacks are called from a different thread as the
39 thread from which is started.
40 """
26 41 self.command_string = command_string
27 42 self.out_callback = out_callback
28 43 self.end_callback = end_callback
29 44 Thread.__init__(self)
30 45
31 46
32 47 def run(self):
33 48 """ Start the process and hook up the callbacks.
34 49 """
35 50 process = Popen((self.command_string + ' 2>&1', ), shell=True,
36 51 universal_newlines=True,
37 52 stdout=PIPE, stdin=PIPE, )
38 53 self.process = process
39 54 while True:
40 55 out_char = process.stdout.read(1)
41 56 if out_char == '':
42 57 if process.poll() is not None:
43 58 # The process has finished
44 59 break
45 60 else:
46 61 # The process is not giving any interesting
47 62 # output. No use polling it immediatly.
48 63 sleep(0.1)
49 64 else:
50 65 self.out_callback(out_char)
51 66
52 67 if self.end_callback is not None:
53 68 self.end_callback()
54 69
55 70
@@ -1,151 +1,152 b''
1 1 # encoding: utf-8
2 2
3 3 """This file contains unittests for the frontendbase module."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #---------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #---------------------------------------------------------------------------
13 13
14 14 #---------------------------------------------------------------------------
15 15 # Imports
16 16 #---------------------------------------------------------------------------
17 17
18 18 import unittest
19 from IPython.frontend.asyncfrontendbase import AsyncFrontEndBase
19 20 from IPython.frontend import frontendbase
20 21 from IPython.kernel.engineservice import EngineService
21 22
22 class FrontEndCallbackChecker(frontendbase.AsyncFrontEndBase):
23 class FrontEndCallbackChecker(AsyncFrontEndBase):
23 24 """FrontEndBase subclass for checking callbacks"""
24 25 def __init__(self, engine=None, history=None):
25 26 super(FrontEndCallbackChecker, self).__init__(engine=engine,
26 27 history=history)
27 28 self.updateCalled = False
28 29 self.renderResultCalled = False
29 30 self.renderErrorCalled = False
30 31
31 32 def update_cell_prompt(self, result, blockID=None):
32 33 self.updateCalled = True
33 34 return result
34 35
35 36 def render_result(self, result):
36 37 self.renderResultCalled = True
37 38 return result
38 39
39 40
40 41 def render_error(self, failure):
41 42 self.renderErrorCalled = True
42 43 return failure
43 44
44 45
45 46
46 47
47 48 class TestAsyncFrontendBase(unittest.TestCase):
48 49 def setUp(self):
49 50 """Setup the EngineService and FrontEndBase"""
50 51
51 52 self.fb = FrontEndCallbackChecker(engine=EngineService())
52 53
53 54
54 55 def test_implements_IFrontEnd(self):
55 56 assert(frontendbase.IFrontEnd.implementedBy(
56 frontendbase.AsyncFrontEndBase))
57 AsyncFrontEndBase))
57 58
58 59
59 60 def test_is_complete_returns_False_for_incomplete_block(self):
60 61 """"""
61 62
62 63 block = """def test(a):"""
63 64
64 65 assert(self.fb.is_complete(block) == False)
65 66
66 67 def test_is_complete_returns_True_for_complete_block(self):
67 68 """"""
68 69
69 70 block = """def test(a): pass"""
70 71
71 72 assert(self.fb.is_complete(block))
72 73
73 74 block = """a=3"""
74 75
75 76 assert(self.fb.is_complete(block))
76 77
77 78
78 79 def test_blockID_added_to_result(self):
79 80 block = """3+3"""
80 81
81 82 d = self.fb.execute(block, blockID='TEST_ID')
82 83
83 84 d.addCallback(self.checkBlockID, expected='TEST_ID')
84 85
85 86 def test_blockID_added_to_failure(self):
86 87 block = "raise Exception()"
87 88
88 89 d = self.fb.execute(block,blockID='TEST_ID')
89 90 d.addErrback(self.checkFailureID, expected='TEST_ID')
90 91
91 92 def checkBlockID(self, result, expected=""):
92 93 assert(result['blockID'] == expected)
93 94
94 95
95 96 def checkFailureID(self, failure, expected=""):
96 97 assert(failure.blockID == expected)
97 98
98 99
99 100 def test_callbacks_added_to_execute(self):
100 101 """test that
101 102 update_cell_prompt
102 103 render_result
103 104
104 105 are added to execute request
105 106 """
106 107
107 108 d = self.fb.execute("10+10")
108 109 d.addCallback(self.checkCallbacks)
109 110
110 111
111 112 def checkCallbacks(self, result):
112 113 assert(self.fb.updateCalled)
113 114 assert(self.fb.renderResultCalled)
114 115
115 116
116 117 def test_error_callback_added_to_execute(self):
117 118 """test that render_error called on execution error"""
118 119
119 120 d = self.fb.execute("raise Exception()")
120 121 d.addCallback(self.checkRenderError)
121 122
122 123 def checkRenderError(self, result):
123 124 assert(self.fb.renderErrorCalled)
124 125
125 126 def test_history_returns_expected_block(self):
126 127 """Make sure history browsing doesn't fail"""
127 128
128 129 blocks = ["a=1","a=2","a=3"]
129 130 for b in blocks:
130 131 d = self.fb.execute(b)
131 132
132 133 # d is now the deferred for the last executed block
133 134 d.addCallback(self.historyTests, blocks)
134 135
135 136
136 137 def historyTests(self, result, blocks):
137 138 """historyTests"""
138 139
139 140 assert(len(blocks) >= 3)
140 141 assert(self.fb.get_history_previous("") == blocks[-2])
141 142 assert(self.fb.get_history_previous("") == blocks[-3])
142 143 assert(self.fb.get_history_next() == blocks[-2])
143 144
144 145
145 146 def test_history_returns_none_at_startup(self):
146 147 """test_history_returns_none_at_startup"""
147 148
148 149 assert(self.fb.get_history_previous("")==None)
149 150 assert(self.fb.get_history_next()==None)
150 151
151 152
@@ -1,81 +1,61 b''
1 1 # encoding: utf-8
2
3 2 """
4 3 Test the output capture at the OS level, using file descriptors.
5 4 """
6 5
7 6 __docformat__ = "restructuredtext en"
8 7
9 #---------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
10 9 # Copyright (C) 2008 The IPython Development Team
11 10 #
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
14 #---------------------------------------------------------------------------
11 # Distributed under the terms of the BSD License. The full license is
12 # in the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
15 14
16 15
17 16 import os
18 17 from cStringIO import StringIO
19 import sys
20 18
21 19
22 20 def test_redirector():
23 21 """ Checks that the redirector can be used to do synchronous capture.
24 22 """
25 # Flush the stdout, so as not to have side effects between
26 # tests.
27 sys.stdout.flush()
28 sys.stderr.flush()
29 23 from IPython.kernel.core.fd_redirector import FDRedirector
30 24 r = FDRedirector()
31 25 out = StringIO()
32 26 try:
33 27 r.start()
34 28 for i in range(10):
35 29 os.system('echo %ic' % i)
36 30 print >>out, r.getvalue(),
37 31 print >>out, i
38 32 except:
39 33 r.stop()
40 34 raise
41 35 r.stop()
42 36 assert out.getvalue() == "".join("%ic\n%i\n" %(i, i) for i in range(10))
43 sys.stdout.flush()
44 sys.stderr.flush()
45 37
46 38
47 39 def test_redirector_output_trap():
48 40 """ This test check not only that the redirector_output_trap does
49 41 trap the output, but also that it does it in a gready way, that
50 is by calling the callback ASAP.
42 is by calling the callabck ASAP.
51 43 """
52 # Flush the stdout, so as not to have side effects between
53 # tests.
54 sys.stdout.flush()
55 sys.stderr.flush()
56 44 from IPython.kernel.core.redirector_output_trap import RedirectorOutputTrap
57 45 out = StringIO()
58 46 trap = RedirectorOutputTrap(out.write, out.write)
59 47 try:
60 48 trap.set()
61 49 for i in range(10):
62 50 os.system('echo %ic' % i)
63 51 print "%ip" % i
64 52 print >>out, i
65 53 except:
66 54 trap.unset()
67 55 raise
68 56 trap.unset()
69 sys.stdout.flush()
70 sys.stderr.flush()
71 57 assert out.getvalue() == "".join("%ic\n%ip\n%i\n" %(i, i, i)
72 58 for i in range(10))
73 59
74 60
75 61
76 if __name__ == '__main__':
77 print "Testing redirector...",
78 test_redirector()
79 test_redirector_output_trap()
80 print "Done."
81
General Comments 0
You need to be logged in to leave comments. Login now