##// END OF EJS Templates
Ensure history database is closed in test...
Thomas Kluyver -
Show More
@@ -1,185 +1,187 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the IPython tab-completion machinery.
2 """Tests for the IPython tab-completion machinery.
3 """
3 """
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Module imports
5 # Module imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7
7
8 # stdlib
8 # stdlib
9 import os
9 import os
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12 from datetime import datetime
12 from datetime import datetime
13
13
14 # third party
14 # third party
15 import nose.tools as nt
15 import nose.tools as nt
16
16
17 # our own packages
17 # our own packages
18 from IPython.config.loader import Config
18 from IPython.config.loader import Config
19 from IPython.utils.tempdir import TemporaryDirectory
19 from IPython.utils.tempdir import TemporaryDirectory
20 from IPython.core.history import HistoryManager, extract_hist_ranges
20 from IPython.core.history import HistoryManager, extract_hist_ranges
21 from IPython.utils import py3compat
21 from IPython.utils import py3compat
22
22
23 def setUp():
23 def setUp():
24 nt.assert_equal(sys.getdefaultencoding(), "utf-8" if py3compat.PY3 else "ascii")
24 nt.assert_equal(sys.getdefaultencoding(), "utf-8" if py3compat.PY3 else "ascii")
25
25
26 def test_history():
26 def test_history():
27 ip = get_ipython()
27 ip = get_ipython()
28 with TemporaryDirectory() as tmpdir:
28 with TemporaryDirectory() as tmpdir:
29 hist_manager_ori = ip.history_manager
29 hist_manager_ori = ip.history_manager
30 hist_file = os.path.join(tmpdir, 'history.sqlite')
30 hist_file = os.path.join(tmpdir, 'history.sqlite')
31 try:
31 try:
32 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
32 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
33 hist = [u'a=1', u'def f():\n test = 1\n return test', u"b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
33 hist = [u'a=1', u'def f():\n test = 1\n return test', u"b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
34 for i, h in enumerate(hist, start=1):
34 for i, h in enumerate(hist, start=1):
35 ip.history_manager.store_inputs(i, h)
35 ip.history_manager.store_inputs(i, h)
36
36
37 ip.history_manager.db_log_output = True
37 ip.history_manager.db_log_output = True
38 # Doesn't match the input, but we'll just check it's stored.
38 # Doesn't match the input, but we'll just check it's stored.
39 ip.history_manager.output_hist_reprs[3] = "spam"
39 ip.history_manager.output_hist_reprs[3] = "spam"
40 ip.history_manager.store_output(3)
40 ip.history_manager.store_output(3)
41
41
42 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
42 nt.assert_equal(ip.history_manager.input_hist_raw, [''] + hist)
43
43
44 # Detailed tests for _get_range_session
44 # Detailed tests for _get_range_session
45 grs = ip.history_manager._get_range_session
45 grs = ip.history_manager._get_range_session
46 nt.assert_equal(list(grs(start=2,stop=-1)), list(zip([0], [2], hist[1:-1])))
46 nt.assert_equal(list(grs(start=2,stop=-1)), list(zip([0], [2], hist[1:-1])))
47 nt.assert_equal(list(grs(start=-2)), list(zip([0,0], [2,3], hist[-2:])))
47 nt.assert_equal(list(grs(start=-2)), list(zip([0,0], [2,3], hist[-2:])))
48 nt.assert_equal(list(grs(output=True)), list(zip([0,0,0], [1,2,3], zip(hist, [None,None,'spam']))))
48 nt.assert_equal(list(grs(output=True)), list(zip([0,0,0], [1,2,3], zip(hist, [None,None,'spam']))))
49
49
50 # Check whether specifying a range beyond the end of the current
50 # Check whether specifying a range beyond the end of the current
51 # session results in an error (gh-804)
51 # session results in an error (gh-804)
52 ip.magic('%hist 2-500')
52 ip.magic('%hist 2-500')
53
53
54 # Check that we can write non-ascii characters to a file
54 # Check that we can write non-ascii characters to a file
55 ip.magic("%%hist -f %s" % os.path.join(tmpdir, "test1"))
55 ip.magic("%%hist -f %s" % os.path.join(tmpdir, "test1"))
56 ip.magic("%%hist -pf %s" % os.path.join(tmpdir, "test2"))
56 ip.magic("%%hist -pf %s" % os.path.join(tmpdir, "test2"))
57 ip.magic("%%hist -nf %s" % os.path.join(tmpdir, "test3"))
57 ip.magic("%%hist -nf %s" % os.path.join(tmpdir, "test3"))
58 ip.magic("%%save %s 1-10" % os.path.join(tmpdir, "test4"))
58 ip.magic("%%save %s 1-10" % os.path.join(tmpdir, "test4"))
59
59
60 # New session
60 # New session
61 ip.history_manager.reset()
61 ip.history_manager.reset()
62 newcmds = [u"z=5",
62 newcmds = [u"z=5",
63 u"class X(object):\n pass",
63 u"class X(object):\n pass",
64 u"k='p'",
64 u"k='p'",
65 u"z=5"]
65 u"z=5"]
66 for i, cmd in enumerate(newcmds, start=1):
66 for i, cmd in enumerate(newcmds, start=1):
67 ip.history_manager.store_inputs(i, cmd)
67 ip.history_manager.store_inputs(i, cmd)
68 gothist = ip.history_manager.get_range(start=1, stop=4)
68 gothist = ip.history_manager.get_range(start=1, stop=4)
69 nt.assert_equal(list(gothist), list(zip([0,0,0],[1,2,3], newcmds)))
69 nt.assert_equal(list(gothist), list(zip([0,0,0],[1,2,3], newcmds)))
70 # Previous session:
70 # Previous session:
71 gothist = ip.history_manager.get_range(-1, 1, 4)
71 gothist = ip.history_manager.get_range(-1, 1, 4)
72 nt.assert_equal(list(gothist), list(zip([1,1,1],[1,2,3], hist)))
72 nt.assert_equal(list(gothist), list(zip([1,1,1],[1,2,3], hist)))
73
73
74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
75
75
76 # Check get_hist_tail
76 # Check get_hist_tail
77 gothist = ip.history_manager.get_tail(5, output=True,
77 gothist = ip.history_manager.get_tail(5, output=True,
78 include_latest=True)
78 include_latest=True)
79 expected = [(1, 3, (hist[-1], "spam"))] \
79 expected = [(1, 3, (hist[-1], "spam"))] \
80 + [(s, n, (c, None)) for (s, n, c) in newhist]
80 + [(s, n, (c, None)) for (s, n, c) in newhist]
81 nt.assert_equal(list(gothist), expected)
81 nt.assert_equal(list(gothist), expected)
82
82
83 gothist = ip.history_manager.get_tail(2)
83 gothist = ip.history_manager.get_tail(2)
84 expected = newhist[-3:-1]
84 expected = newhist[-3:-1]
85 nt.assert_equal(list(gothist), expected)
85 nt.assert_equal(list(gothist), expected)
86
86
87 # Check get_hist_search
87 # Check get_hist_search
88 gothist = ip.history_manager.search("*test*")
88 gothist = ip.history_manager.search("*test*")
89 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
89 nt.assert_equal(list(gothist), [(1,2,hist[1])] )
90
90
91 gothist = ip.history_manager.search("*=*")
91 gothist = ip.history_manager.search("*=*")
92 nt.assert_equal(list(gothist),
92 nt.assert_equal(list(gothist),
93 [(1, 1, hist[0]),
93 [(1, 1, hist[0]),
94 (1, 2, hist[1]),
94 (1, 2, hist[1]),
95 (1, 3, hist[2]),
95 (1, 3, hist[2]),
96 newhist[0],
96 newhist[0],
97 newhist[2],
97 newhist[2],
98 newhist[3]])
98 newhist[3]])
99
99
100 gothist = ip.history_manager.search("*=*", n=4)
100 gothist = ip.history_manager.search("*=*", n=4)
101 nt.assert_equal(list(gothist),
101 nt.assert_equal(list(gothist),
102 [(1, 3, hist[2]),
102 [(1, 3, hist[2]),
103 newhist[0],
103 newhist[0],
104 newhist[2],
104 newhist[2],
105 newhist[3]])
105 newhist[3]])
106
106
107 gothist = ip.history_manager.search("*=*", unique=True)
107 gothist = ip.history_manager.search("*=*", unique=True)
108 nt.assert_equal(list(gothist),
108 nt.assert_equal(list(gothist),
109 [(1, 1, hist[0]),
109 [(1, 1, hist[0]),
110 (1, 2, hist[1]),
110 (1, 2, hist[1]),
111 (1, 3, hist[2]),
111 (1, 3, hist[2]),
112 newhist[2],
112 newhist[2],
113 newhist[3]])
113 newhist[3]])
114
114
115 gothist = ip.history_manager.search("*=*", unique=True, n=3)
115 gothist = ip.history_manager.search("*=*", unique=True, n=3)
116 nt.assert_equal(list(gothist),
116 nt.assert_equal(list(gothist),
117 [(1, 3, hist[2]),
117 [(1, 3, hist[2]),
118 newhist[2],
118 newhist[2],
119 newhist[3]])
119 newhist[3]])
120
120
121 gothist = ip.history_manager.search("b*", output=True)
121 gothist = ip.history_manager.search("b*", output=True)
122 nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
122 nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
123
123
124 # Cross testing: check that magic %save can get previous session.
124 # Cross testing: check that magic %save can get previous session.
125 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
125 testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
126 ip.magic("save " + testfilename + " ~1/1-3")
126 ip.magic("save " + testfilename + " ~1/1-3")
127 with py3compat.open(testfilename, encoding='utf-8') as testfile:
127 with py3compat.open(testfilename, encoding='utf-8') as testfile:
128 nt.assert_equal(testfile.read(),
128 nt.assert_equal(testfile.read(),
129 u"# coding: utf-8\n" + u"\n".join(hist)+u"\n")
129 u"# coding: utf-8\n" + u"\n".join(hist)+u"\n")
130
130
131 # Duplicate line numbers - check that it doesn't crash, and
131 # Duplicate line numbers - check that it doesn't crash, and
132 # gets a new session
132 # gets a new session
133 ip.history_manager.store_inputs(1, "rogue")
133 ip.history_manager.store_inputs(1, "rogue")
134 ip.history_manager.writeout_cache()
134 ip.history_manager.writeout_cache()
135 nt.assert_equal(ip.history_manager.session_number, 3)
135 nt.assert_equal(ip.history_manager.session_number, 3)
136 finally:
136 finally:
137 # Ensure saving thread is shut down before we try to clean up the files
137 # Ensure saving thread is shut down before we try to clean up the files
138 ip.history_manager.save_thread.stop()
138 ip.history_manager.save_thread.stop()
139 # Forcibly close database rather than relying on garbage collection
140 ip.history_manager.db.close()
139 # Restore history manager
141 # Restore history manager
140 ip.history_manager = hist_manager_ori
142 ip.history_manager = hist_manager_ori
141
143
142
144
143 def test_extract_hist_ranges():
145 def test_extract_hist_ranges():
144 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
146 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
145 expected = [(0, 1, 2), # 0 == current session
147 expected = [(0, 1, 2), # 0 == current session
146 (2, 3, 4),
148 (2, 3, 4),
147 (-4, 5, 7),
149 (-4, 5, 7),
148 (-4, 7, 10),
150 (-4, 7, 10),
149 (-9, 2, None), # None == to end
151 (-9, 2, None), # None == to end
150 (-8, 1, None),
152 (-8, 1, None),
151 (-7, 1, 6),
153 (-7, 1, 6),
152 (-10, 1, None)]
154 (-10, 1, None)]
153 actual = list(extract_hist_ranges(instr))
155 actual = list(extract_hist_ranges(instr))
154 nt.assert_equal(actual, expected)
156 nt.assert_equal(actual, expected)
155
157
156 def test_magic_rerun():
158 def test_magic_rerun():
157 """Simple test for %rerun (no args -> rerun last line)"""
159 """Simple test for %rerun (no args -> rerun last line)"""
158 ip = get_ipython()
160 ip = get_ipython()
159 ip.run_cell("a = 10", store_history=True)
161 ip.run_cell("a = 10", store_history=True)
160 ip.run_cell("a += 1", store_history=True)
162 ip.run_cell("a += 1", store_history=True)
161 nt.assert_equal(ip.user_ns["a"], 11)
163 nt.assert_equal(ip.user_ns["a"], 11)
162 ip.run_cell("%rerun", store_history=True)
164 ip.run_cell("%rerun", store_history=True)
163 nt.assert_equal(ip.user_ns["a"], 12)
165 nt.assert_equal(ip.user_ns["a"], 12)
164
166
165 def test_timestamp_type():
167 def test_timestamp_type():
166 ip = get_ipython()
168 ip = get_ipython()
167 info = ip.history_manager.get_session_info()
169 info = ip.history_manager.get_session_info()
168 nt.assert_true(isinstance(info[1], datetime))
170 nt.assert_true(isinstance(info[1], datetime))
169
171
170 def test_hist_file_config():
172 def test_hist_file_config():
171 cfg = Config()
173 cfg = Config()
172 tfile = tempfile.NamedTemporaryFile(delete=False)
174 tfile = tempfile.NamedTemporaryFile(delete=False)
173 cfg.HistoryManager.hist_file = tfile.name
175 cfg.HistoryManager.hist_file = tfile.name
174 try:
176 try:
175 hm = HistoryManager(shell=get_ipython(), config=cfg)
177 hm = HistoryManager(shell=get_ipython(), config=cfg)
176 nt.assert_equal(hm.hist_file, cfg.HistoryManager.hist_file)
178 nt.assert_equal(hm.hist_file, cfg.HistoryManager.hist_file)
177 finally:
179 finally:
178 try:
180 try:
179 os.remove(tfile.name)
181 os.remove(tfile.name)
180 except OSError:
182 except OSError:
181 # same catch as in testing.tools.TempFileMixin
183 # same catch as in testing.tools.TempFileMixin
182 # On Windows, even though we close the file, we still can't
184 # On Windows, even though we close the file, we still can't
183 # delete it. I have no clue why
185 # delete it. I have no clue why
184 pass
186 pass
185
187
General Comments 0
You need to be logged in to leave comments. Login now