##// END OF EJS Templates
Fix timeout test
Jessica B. Hamrick -
Show More
@@ -1,149 +1,151 b''
1 """
1 """
2 Module with tests for the execute preprocessor.
2 Module with tests for the execute preprocessor.
3 """
3 """
4
4
5 # Copyright (c) IPython Development Team.
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
6 # Distributed under the terms of the Modified BSD License.
7
7
8 import copy
8 import copy
9 import glob
9 import glob
10 import io
10 import io
11 import os
11 import os
12 import re
12 import re
13
13
14 try:
14 try:
15 from queue import Empty # Py 3
15 from queue import Empty # Py 3
16 except ImportError:
16 except ImportError:
17 from Queue import Empty # Py 2
17 from Queue import Empty # Py 2
18
18
19 from IPython import nbformat
19 from IPython import nbformat
20
20
21 from .base import PreprocessorTestsBase
21 from .base import PreprocessorTestsBase
22 from ..execute import ExecutePreprocessor
22 from ..execute import ExecutePreprocessor
23
23
24 from IPython.nbconvert.filters import strip_ansi
24 from IPython.nbconvert.filters import strip_ansi
25 from nose.tools import assert_raises
25 from nose.tools import assert_raises
26
26
27 addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
27 addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
28
28
29 class TestExecute(PreprocessorTestsBase):
29 class TestExecute(PreprocessorTestsBase):
30 """Contains test functions for execute.py"""
30 """Contains test functions for execute.py"""
31
31
32 @staticmethod
32 @staticmethod
33 def normalize_output(output):
33 def normalize_output(output):
34 """
34 """
35 Normalizes outputs for comparison.
35 Normalizes outputs for comparison.
36 """
36 """
37 output = dict(output)
37 output = dict(output)
38 if 'metadata' in output:
38 if 'metadata' in output:
39 del output['metadata']
39 del output['metadata']
40 if 'text' in output:
40 if 'text' in output:
41 output['text'] = re.sub(addr_pat, '<HEXADDR>', output['text'])
41 output['text'] = re.sub(addr_pat, '<HEXADDR>', output['text'])
42 if 'text/plain' in output.get('data', {}):
42 if 'text/plain' in output.get('data', {}):
43 output['data']['text/plain'] = \
43 output['data']['text/plain'] = \
44 re.sub(addr_pat, '<HEXADDR>', output['data']['text/plain'])
44 re.sub(addr_pat, '<HEXADDR>', output['data']['text/plain'])
45 if 'traceback' in output:
45 if 'traceback' in output:
46 tb = []
46 tb = []
47 for line in output['traceback']:
47 for line in output['traceback']:
48 tb.append(strip_ansi(line))
48 tb.append(strip_ansi(line))
49 output['traceback'] = tb
49 output['traceback'] = tb
50
50
51 return output
51 return output
52
52
53
53
54 def assert_notebooks_equal(self, expected, actual):
54 def assert_notebooks_equal(self, expected, actual):
55 expected_cells = expected['cells']
55 expected_cells = expected['cells']
56 actual_cells = actual['cells']
56 actual_cells = actual['cells']
57 self.assertEqual(len(expected_cells), len(actual_cells))
57 self.assertEqual(len(expected_cells), len(actual_cells))
58
58
59 for expected_cell, actual_cell in zip(expected_cells, actual_cells):
59 for expected_cell, actual_cell in zip(expected_cells, actual_cells):
60 expected_outputs = expected_cell.get('outputs', [])
60 expected_outputs = expected_cell.get('outputs', [])
61 actual_outputs = actual_cell.get('outputs', [])
61 actual_outputs = actual_cell.get('outputs', [])
62 normalized_expected_outputs = list(map(self.normalize_output, expected_outputs))
62 normalized_expected_outputs = list(map(self.normalize_output, expected_outputs))
63 normalized_actual_outputs = list(map(self.normalize_output, actual_outputs))
63 normalized_actual_outputs = list(map(self.normalize_output, actual_outputs))
64 self.assertEqual(normalized_expected_outputs, normalized_actual_outputs)
64 self.assertEqual(normalized_expected_outputs, normalized_actual_outputs)
65
65
66 expected_execution_count = expected_cell.get('execution_count', None)
66 expected_execution_count = expected_cell.get('execution_count', None)
67 actual_execution_count = actual_cell.get('execution_count', None)
67 actual_execution_count = actual_cell.get('execution_count', None)
68 self.assertEqual(expected_execution_count, actual_execution_count)
68 self.assertEqual(expected_execution_count, actual_execution_count)
69
69
70
70
71 def build_preprocessor(self, opts):
71 def build_preprocessor(self, opts):
72 """Make an instance of a preprocessor"""
72 """Make an instance of a preprocessor"""
73 preprocessor = ExecutePreprocessor()
73 preprocessor = ExecutePreprocessor()
74 preprocessor.enabled = True
74 preprocessor.enabled = True
75 for opt in opts:
75 for opt in opts:
76 setattr(preprocessor, opt, opts[opt])
76 setattr(preprocessor, opt, opts[opt])
77 return preprocessor
77 return preprocessor
78
78
79
79
80 def test_constructor(self):
80 def test_constructor(self):
81 """Can a ExecutePreprocessor be constructed?"""
81 """Can a ExecutePreprocessor be constructed?"""
82 self.build_preprocessor({})
82 self.build_preprocessor({})
83
83
84
84
85 def run_notebook(self, filename, opts, resources):
85 def run_notebook(self, filename, opts, resources):
86 """Loads and runs a notebook, returning both the version prior to
86 """Loads and runs a notebook, returning both the version prior to
87 running it and the version after running it.
87 running it and the version after running it.
88
88
89 """
89 """
90 with io.open(filename) as f:
90 with io.open(filename) as f:
91 input_nb = nbformat.read(f, 4)
91 input_nb = nbformat.read(f, 4)
92 preprocessor = self.build_preprocessor(opts)
92 preprocessor = self.build_preprocessor(opts)
93 cleaned_input_nb = copy.deepcopy(input_nb)
93 cleaned_input_nb = copy.deepcopy(input_nb)
94 for cell in cleaned_input_nb.cells:
94 for cell in cleaned_input_nb.cells:
95 if 'execution_count' in cell:
95 if 'execution_count' in cell:
96 del cell['execution_count']
96 del cell['execution_count']
97 cell['outputs'] = []
97 cell['outputs'] = []
98 output_nb, _ = preprocessor(cleaned_input_nb, resources)
98 output_nb, _ = preprocessor(cleaned_input_nb, resources)
99 return input_nb, output_nb
99 return input_nb, output_nb
100
100
101 def test_run_notebooks(self):
101 def test_run_notebooks(self):
102 """Runs a series of test notebooks and compares them to their actual output"""
102 """Runs a series of test notebooks and compares them to their actual output"""
103 current_dir = os.path.dirname(__file__)
103 current_dir = os.path.dirname(__file__)
104 input_files = glob.glob(os.path.join(current_dir, 'files', '*.ipynb'))
104 input_files = glob.glob(os.path.join(current_dir, 'files', '*.ipynb'))
105 for filename in input_files:
105 for filename in input_files:
106 if os.path.basename(filename) == "Disable Stdin.ipynb":
106 if os.path.basename(filename) == "Disable Stdin.ipynb":
107 continue
107 continue
108 elif os.path.basename(filename) == "Interrupt.ipynb":
108 elif os.path.basename(filename) == "Interrupt.ipynb":
109 opts = dict(timeout=1, interrupt_on_timeout=True)
109 opts = dict(timeout=1, interrupt_on_timeout=True)
110 else:
110 else:
111 opts = {}
111 opts = {}
112 res = self.build_resources()
112 res = self.build_resources()
113 res['metadata']['path'] = os.path.dirname(filename)
113 res['metadata']['path'] = os.path.dirname(filename)
114 input_nb, output_nb = self.run_notebook(filename, opts, res)
114 input_nb, output_nb = self.run_notebook(filename, opts, res)
115 self.assert_notebooks_equal(input_nb, output_nb)
115 self.assert_notebooks_equal(input_nb, output_nb)
116
116
117 def test_empty_path(self):
117 def test_empty_path(self):
118 """Can the kernel be started when the path is empty?"""
118 """Can the kernel be started when the path is empty?"""
119 current_dir = os.path.dirname(__file__)
119 current_dir = os.path.dirname(__file__)
120 filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')
120 filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')
121 res = self.build_resources()
121 res = self.build_resources()
122 res['metadata']['path'] = ''
122 res['metadata']['path'] = ''
123 input_nb, output_nb = self.run_notebook(filename, {}, res)
123 input_nb, output_nb = self.run_notebook(filename, {}, res)
124 self.assert_notebooks_equal(input_nb, output_nb)
124 self.assert_notebooks_equal(input_nb, output_nb)
125
125
126 def test_disable_stdin(self):
126 def test_disable_stdin(self):
127 """Test disabling standard input"""
127 """Test disabling standard input"""
128 current_dir = os.path.dirname(__file__)
128 current_dir = os.path.dirname(__file__)
129 filename = os.path.join(current_dir, 'files', 'Disable Stdin.ipynb')
129 filename = os.path.join(current_dir, 'files', 'Disable Stdin.ipynb')
130 res = self.build_resources()
130 res = self.build_resources()
131 res['metadata']['path'] = os.path.dirname(filename)
131 res['metadata']['path'] = os.path.dirname(filename)
132 input_nb, output_nb = self.run_notebook(filename, {}, res)
132 input_nb, output_nb = self.run_notebook(filename, {}, res)
133
133
134 # We need to special-case this particular notebook, because the
134 # We need to special-case this particular notebook, because the
135 # traceback contains machine-specific stuff like where IPython
135 # traceback contains machine-specific stuff like where IPython
136 # is installed. It is sufficient here to just check that an error
136 # is installed. It is sufficient here to just check that an error
137 # was thrown, and that it was a StdinNotImplementedError
137 # was thrown, and that it was a StdinNotImplementedError
138 self.assertEqual(len(output_nb['cells']), 1)
138 self.assertEqual(len(output_nb['cells']), 1)
139 self.assertEqual(len(output_nb['cells'][0]['outputs']), 1)
139 self.assertEqual(len(output_nb['cells'][0]['outputs']), 1)
140 output = output_nb['cells'][0]['outputs'][0]
140 output = output_nb['cells'][0]['outputs'][0]
141 self.assertEqual(output['output_type'], 'error')
141 self.assertEqual(output['output_type'], 'error')
142 self.assertEqual(output['ename'], 'StdinNotImplementedError')
142 self.assertEqual(output['ename'], 'StdinNotImplementedError')
143 self.assertEqual(output['evalue'], 'raw_input was called, but this frontend does not support input requests.')
143 self.assertEqual(output['evalue'], 'raw_input was called, but this frontend does not support input requests.')
144
144
145 def test_timeout(self):
145 def test_timeout(self):
146 """Check that an error is raised when a computation times out"""
146 """Check that an error is raised when a computation times out"""
147 current_dir = os.path.dirname(__file__)
147 current_dir = os.path.dirname(__file__)
148 filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
148 filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
149 assert_raises(Empty, self.run_notebook, filename, dict(timeout=1))
149 res = self.build_resources()
150 res['metadata']['path'] = os.path.dirname(filename)
151 assert_raises(Empty, self.run_notebook, filename, dict(timeout=1), res)
General Comments 0
You need to be logged in to leave comments. Login now