##// END OF EJS Templates
wrap db before host dir is gone in new test
Aleksey Bogdanov -
Show More
@@ -1,295 +1,305 b''
1 # coding: utf-8
1 # coding: utf-8
2 """Tests for the IPython tab-completion machinery.
2 """Tests for the IPython tab-completion machinery.
3 """
3 """
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Module imports
5 # Module imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7
7
8 # stdlib
8 # stdlib
9 import io
9 import io
10 import sqlite3
10 import sqlite3
11 import sys
11 import sys
12 import tempfile
12 import tempfile
13 from datetime import datetime
13 from datetime import datetime
14 from pathlib import Path
14 from pathlib import Path
15
15
16 from tempfile import TemporaryDirectory
16 from tempfile import TemporaryDirectory
17 # our own packages
17 # our own packages
18 from traitlets.config.loader import Config
18 from traitlets.config.loader import Config
19
19
20 from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges
20 from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges
21
21
22
22
23 def test_proper_default_encoding():
23 def test_proper_default_encoding():
24 assert sys.getdefaultencoding() == "utf-8"
24 assert sys.getdefaultencoding() == "utf-8"
25
25
26 def test_history():
26 def test_history():
27 ip = get_ipython()
27 ip = get_ipython()
28 with TemporaryDirectory() as tmpdir:
28 with TemporaryDirectory() as tmpdir:
29 tmp_path = Path(tmpdir)
29 tmp_path = Path(tmpdir)
30 hist_manager_ori = ip.history_manager
30 hist_manager_ori = ip.history_manager
31 hist_file = tmp_path / "history.sqlite"
31 hist_file = tmp_path / "history.sqlite"
32 try:
32 try:
33 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
33 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
34 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
34 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
35 for i, h in enumerate(hist, start=1):
35 for i, h in enumerate(hist, start=1):
36 ip.history_manager.store_inputs(i, h)
36 ip.history_manager.store_inputs(i, h)
37
37
38 ip.history_manager.db_log_output = True
38 ip.history_manager.db_log_output = True
39 # Doesn't match the input, but we'll just check it's stored.
39 # Doesn't match the input, but we'll just check it's stored.
40 ip.history_manager.output_hist_reprs[3] = "spam"
40 ip.history_manager.output_hist_reprs[3] = "spam"
41 ip.history_manager.store_output(3)
41 ip.history_manager.store_output(3)
42
42
43 assert ip.history_manager.input_hist_raw == [""] + hist
43 assert ip.history_manager.input_hist_raw == [""] + hist
44
44
45 # Detailed tests for _get_range_session
45 # Detailed tests for _get_range_session
46 grs = ip.history_manager._get_range_session
46 grs = ip.history_manager._get_range_session
47 assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
47 assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
48 assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
48 assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
49 assert list(grs(output=True)) == list(
49 assert list(grs(output=True)) == list(
50 zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
50 zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
51 )
51 )
52
52
53 # Check whether specifying a range beyond the end of the current
53 # Check whether specifying a range beyond the end of the current
54 # session results in an error (gh-804)
54 # session results in an error (gh-804)
55 ip.run_line_magic("hist", "2-500")
55 ip.run_line_magic("hist", "2-500")
56
56
57 # Check that we can write non-ascii characters to a file
57 # Check that we can write non-ascii characters to a file
58 ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
58 ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
59 ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
59 ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
60 ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
60 ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
61 ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))
61 ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))
62
62
63 # New session
63 # New session
64 ip.history_manager.reset()
64 ip.history_manager.reset()
65 newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"]
65 newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"]
66 for i, cmd in enumerate(newcmds, start=1):
66 for i, cmd in enumerate(newcmds, start=1):
67 ip.history_manager.store_inputs(i, cmd)
67 ip.history_manager.store_inputs(i, cmd)
68 gothist = ip.history_manager.get_range(start=1, stop=4)
68 gothist = ip.history_manager.get_range(start=1, stop=4)
69 assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
69 assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
70 # Previous session:
70 # Previous session:
71 gothist = ip.history_manager.get_range(-1, 1, 4)
71 gothist = ip.history_manager.get_range(-1, 1, 4)
72 assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))
72 assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))
73
73
74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
75
75
76 # Check get_hist_tail
76 # Check get_hist_tail
77 gothist = ip.history_manager.get_tail(5, output=True,
77 gothist = ip.history_manager.get_tail(5, output=True,
78 include_latest=True)
78 include_latest=True)
79 expected = [(1, 3, (hist[-1], "spam"))] \
79 expected = [(1, 3, (hist[-1], "spam"))] \
80 + [(s, n, (c, None)) for (s, n, c) in newhist]
80 + [(s, n, (c, None)) for (s, n, c) in newhist]
81 assert list(gothist) == expected
81 assert list(gothist) == expected
82
82
83 gothist = ip.history_manager.get_tail(2)
83 gothist = ip.history_manager.get_tail(2)
84 expected = newhist[-3:-1]
84 expected = newhist[-3:-1]
85 assert list(gothist) == expected
85 assert list(gothist) == expected
86
86
87 # Check get_hist_search
87 # Check get_hist_search
88
88
89 gothist = ip.history_manager.search("*test*")
89 gothist = ip.history_manager.search("*test*")
90 assert list(gothist) == [(1, 2, hist[1])]
90 assert list(gothist) == [(1, 2, hist[1])]
91
91
92 gothist = ip.history_manager.search("*=*")
92 gothist = ip.history_manager.search("*=*")
93 assert list(gothist) == [
93 assert list(gothist) == [
94 (1, 1, hist[0]),
94 (1, 1, hist[0]),
95 (1, 2, hist[1]),
95 (1, 2, hist[1]),
96 (1, 3, hist[2]),
96 (1, 3, hist[2]),
97 newhist[0],
97 newhist[0],
98 newhist[2],
98 newhist[2],
99 newhist[3],
99 newhist[3],
100 ]
100 ]
101
101
102 gothist = ip.history_manager.search("*=*", n=4)
102 gothist = ip.history_manager.search("*=*", n=4)
103 assert list(gothist) == [
103 assert list(gothist) == [
104 (1, 3, hist[2]),
104 (1, 3, hist[2]),
105 newhist[0],
105 newhist[0],
106 newhist[2],
106 newhist[2],
107 newhist[3],
107 newhist[3],
108 ]
108 ]
109
109
110 gothist = ip.history_manager.search("*=*", unique=True)
110 gothist = ip.history_manager.search("*=*", unique=True)
111 assert list(gothist) == [
111 assert list(gothist) == [
112 (1, 1, hist[0]),
112 (1, 1, hist[0]),
113 (1, 2, hist[1]),
113 (1, 2, hist[1]),
114 (1, 3, hist[2]),
114 (1, 3, hist[2]),
115 newhist[2],
115 newhist[2],
116 newhist[3],
116 newhist[3],
117 ]
117 ]
118
118
119 gothist = ip.history_manager.search("*=*", unique=True, n=3)
119 gothist = ip.history_manager.search("*=*", unique=True, n=3)
120 assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]
120 assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]
121
121
122 gothist = ip.history_manager.search("b*", output=True)
122 gothist = ip.history_manager.search("b*", output=True)
123 assert list(gothist) == [(1, 3, (hist[2], "spam"))]
123 assert list(gothist) == [(1, 3, (hist[2], "spam"))]
124
124
125 # Cross testing: check that magic %save can get previous session.
125 # Cross testing: check that magic %save can get previous session.
126 testfilename = (tmp_path / "test.py").resolve()
126 testfilename = (tmp_path / "test.py").resolve()
127 ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
127 ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
128 with io.open(testfilename, encoding="utf-8") as testfile:
128 with io.open(testfilename, encoding="utf-8") as testfile:
129 assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"
129 assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"
130
130
131 # Duplicate line numbers - check that it doesn't crash, and
131 # Duplicate line numbers - check that it doesn't crash, and
132 # gets a new session
132 # gets a new session
133 ip.history_manager.store_inputs(1, "rogue")
133 ip.history_manager.store_inputs(1, "rogue")
134 ip.history_manager.writeout_cache()
134 ip.history_manager.writeout_cache()
135 assert ip.history_manager.session_number == 3
135 assert ip.history_manager.session_number == 3
136
136
137 # Check that session and line values are not just max values
137 # Check that session and line values are not just max values
138 sessid, lineno, entry = newhist[-1]
138 sessid, lineno, entry = newhist[-1]
139 assert lineno > 1
139 assert lineno > 1
140 ip.history_manager.reset()
140 ip.history_manager.reset()
141 lineno = 1
141 lineno = 1
142 ip.history_manager.store_inputs(lineno, entry)
142 ip.history_manager.store_inputs(lineno, entry)
143 gothist = ip.history_manager.search("*=*", unique=True)
143 gothist = ip.history_manager.search("*=*", unique=True)
144 hist = list(gothist)[-1]
144 hist = list(gothist)[-1]
145 assert sessid < hist[0]
145 assert sessid < hist[0]
146 assert hist[1:] == (lineno, entry)
146 assert hist[1:] == (lineno, entry)
147 finally:
147 finally:
148 # Ensure saving thread is shut down before we try to clean up the files
148 # Ensure saving thread is shut down before we try to clean up the files
149 ip.history_manager.save_thread.stop()
149 ip.history_manager.save_thread.stop()
150 # Forcibly close database rather than relying on garbage collection
150 # Forcibly close database rather than relying on garbage collection
151 ip.history_manager.db.close()
151 ip.history_manager.db.close()
152 # Restore history manager
152 # Restore history manager
153 ip.history_manager = hist_manager_ori
153 ip.history_manager = hist_manager_ori
154
154
155
155
156 def test_extract_hist_ranges():
156 def test_extract_hist_ranges():
157 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
157 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
158 expected = [(0, 1, 2), # 0 == current session
158 expected = [(0, 1, 2), # 0 == current session
159 (2, 3, 4),
159 (2, 3, 4),
160 (-4, 5, 7),
160 (-4, 5, 7),
161 (-4, 7, 10),
161 (-4, 7, 10),
162 (-9, 2, None), # None == to end
162 (-9, 2, None), # None == to end
163 (-8, 1, None),
163 (-8, 1, None),
164 (-7, 1, 6),
164 (-7, 1, 6),
165 (-10, 1, None)]
165 (-10, 1, None)]
166 actual = list(extract_hist_ranges(instr))
166 actual = list(extract_hist_ranges(instr))
167 assert actual == expected
167 assert actual == expected
168
168
169
169
170 def test_extract_hist_ranges_empty_str():
170 def test_extract_hist_ranges_empty_str():
171 instr = ""
171 instr = ""
172 expected = [(0, 1, None)] # 0 == current session, None == to end
172 expected = [(0, 1, None)] # 0 == current session, None == to end
173 actual = list(extract_hist_ranges(instr))
173 actual = list(extract_hist_ranges(instr))
174 assert actual == expected
174 assert actual == expected
175
175
176
176
177 def test_magic_rerun():
177 def test_magic_rerun():
178 """Simple test for %rerun (no args -> rerun last line)"""
178 """Simple test for %rerun (no args -> rerun last line)"""
179 ip = get_ipython()
179 ip = get_ipython()
180 ip.run_cell("a = 10", store_history=True)
180 ip.run_cell("a = 10", store_history=True)
181 ip.run_cell("a += 1", store_history=True)
181 ip.run_cell("a += 1", store_history=True)
182 assert ip.user_ns["a"] == 11
182 assert ip.user_ns["a"] == 11
183 ip.run_cell("%rerun", store_history=True)
183 ip.run_cell("%rerun", store_history=True)
184 assert ip.user_ns["a"] == 12
184 assert ip.user_ns["a"] == 12
185
185
186 def test_timestamp_type():
186 def test_timestamp_type():
187 ip = get_ipython()
187 ip = get_ipython()
188 info = ip.history_manager.get_session_info()
188 info = ip.history_manager.get_session_info()
189 assert isinstance(info[1], datetime)
189 assert isinstance(info[1], datetime)
190
190
191 def test_hist_file_config():
191 def test_hist_file_config():
192 cfg = Config()
192 cfg = Config()
193 tfile = tempfile.NamedTemporaryFile(delete=False)
193 tfile = tempfile.NamedTemporaryFile(delete=False)
194 cfg.HistoryManager.hist_file = Path(tfile.name)
194 cfg.HistoryManager.hist_file = Path(tfile.name)
195 try:
195 try:
196 hm = HistoryManager(shell=get_ipython(), config=cfg)
196 hm = HistoryManager(shell=get_ipython(), config=cfg)
197 assert hm.hist_file == cfg.HistoryManager.hist_file
197 assert hm.hist_file == cfg.HistoryManager.hist_file
198 finally:
198 finally:
199 try:
199 try:
200 Path(tfile.name).unlink()
200 Path(tfile.name).unlink()
201 except OSError:
201 except OSError:
202 # same catch as in testing.tools.TempFileMixin
202 # same catch as in testing.tools.TempFileMixin
203 # On Windows, even though we close the file, we still can't
203 # On Windows, even though we close the file, we still can't
204 # delete it. I have no clue why
204 # delete it. I have no clue why
205 pass
205 pass
206
206
207 def test_histmanager_disabled():
207 def test_histmanager_disabled():
208 """Ensure that disabling the history manager doesn't create a database."""
208 """Ensure that disabling the history manager doesn't create a database."""
209 cfg = Config()
209 cfg = Config()
210 cfg.HistoryAccessor.enabled = False
210 cfg.HistoryAccessor.enabled = False
211
211
212 ip = get_ipython()
212 ip = get_ipython()
213 with TemporaryDirectory() as tmpdir:
213 with TemporaryDirectory() as tmpdir:
214 hist_manager_ori = ip.history_manager
214 hist_manager_ori = ip.history_manager
215 hist_file = Path(tmpdir) / "history.sqlite"
215 hist_file = Path(tmpdir) / "history.sqlite"
216 cfg.HistoryManager.hist_file = hist_file
216 cfg.HistoryManager.hist_file = hist_file
217 try:
217 try:
218 ip.history_manager = HistoryManager(shell=ip, config=cfg)
218 ip.history_manager = HistoryManager(shell=ip, config=cfg)
219 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
219 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
220 for i, h in enumerate(hist, start=1):
220 for i, h in enumerate(hist, start=1):
221 ip.history_manager.store_inputs(i, h)
221 ip.history_manager.store_inputs(i, h)
222 assert ip.history_manager.input_hist_raw == [""] + hist
222 assert ip.history_manager.input_hist_raw == [""] + hist
223 ip.history_manager.reset()
223 ip.history_manager.reset()
224 ip.history_manager.end_session()
224 ip.history_manager.end_session()
225 finally:
225 finally:
226 ip.history_manager = hist_manager_ori
226 ip.history_manager = hist_manager_ori
227
227
228 # hist_file should not be created
228 # hist_file should not be created
229 assert hist_file.exists() is False
229 assert hist_file.exists() is False
230
230
231
231
232 def test_get_tail_session_awareness():
232 def test_get_tail_session_awareness():
233 """Test .get_tail() is:
233 """Test .get_tail() is:
234 - session specific in HistoryManager
234 - session specific in HistoryManager
235 - session agnostic in HistoryAccessor
235 - session agnostic in HistoryAccessor
236 same for .get_last_session_id()
236 same for .get_last_session_id()
237 """
237 """
238 ip = get_ipython()
238 ip = get_ipython()
239 with TemporaryDirectory() as tmpdir:
239 with TemporaryDirectory() as tmpdir:
240 tmp_path = Path(tmpdir)
240 tmp_path = Path(tmpdir)
241 hist_file = tmp_path / "history.sqlite"
241 hist_file = tmp_path / "history.sqlite"
242 get_source = lambda x: x[2]
242 get_source = lambda x: x[2]
243
243 hm1 = None
244 hm2 = None
245 try:
244 # hm1 creates a new session and adds history entries,
246 # hm1 creates a new session and adds history entries,
245 # ha catches up
247 # ha catches up
246 hm1 = HistoryManager(shell=ip, hist_file=hist_file)
248 hm1 = HistoryManager(shell=ip, hist_file=hist_file)
247 hm1_last_sid = hm1.get_last_session_id
249 hm1_last_sid = hm1.get_last_session_id
248 ha = HistoryAccessor(hist_file=hist_file)
250 ha = HistoryAccessor(hist_file=hist_file)
249 ha_last_sid = ha.get_last_session_id
251 ha_last_sid = ha.get_last_session_id
250
252
251 hist1 = ["a=1", "b=1", "c=1"]
253 hist1 = ["a=1", "b=1", "c=1"]
252 for i, h in enumerate(hist1 + [""], start=1):
254 for i, h in enumerate(hist1 + [""], start=1):
253 hm1.store_inputs(i, h)
255 hm1.store_inputs(i, h)
254 assert list(map(get_source, hm1.get_tail())) == hist1
256 assert list(map(get_source, hm1.get_tail())) == hist1
255 assert list(map(get_source, ha.get_tail())) == hist1
257 assert list(map(get_source, ha.get_tail())) == hist1
256 sid1 = hm1_last_sid()
258 sid1 = hm1_last_sid()
257 assert sid1 is not None
259 assert sid1 is not None
258 assert ha_last_sid() == sid1
260 assert ha_last_sid() == sid1
259
261
260 # hm2 creates a new session and adds entries,
262 # hm2 creates a new session and adds entries,
261 # ha catches up
263 # ha catches up
262 hm2 = HistoryManager(shell=ip, hist_file=hist_file)
264 hm2 = HistoryManager(shell=ip, hist_file=hist_file)
263 hm2_last_sid = hm2.get_last_session_id
265 hm2_last_sid = hm2.get_last_session_id
264
266
265 hist2 = ["a=2", "b=2", "c=2"]
267 hist2 = ["a=2", "b=2", "c=2"]
266 for i, h in enumerate(hist2 + [""], start=1):
268 for i, h in enumerate(hist2 + [""], start=1):
267 hm2.store_inputs(i, h)
269 hm2.store_inputs(i, h)
268 tail = hm2.get_tail(n=3)
270 tail = hm2.get_tail(n=3)
269 assert list(map(get_source, tail)) == hist2
271 assert list(map(get_source, tail)) == hist2
270 tail = ha.get_tail(n=3)
272 tail = ha.get_tail(n=3)
271 assert list(map(get_source, tail)) == hist2
273 assert list(map(get_source, tail)) == hist2
272 sid2 = hm2_last_sid()
274 sid2 = hm2_last_sid()
273 assert sid2 is not None
275 assert sid2 is not None
274 assert ha_last_sid() == sid2
276 assert ha_last_sid() == sid2
275 assert sid2 != sid1
277 assert sid2 != sid1
276
278
277 # but hm1 still maintains its point of reference
279 # but hm1 still maintains its point of reference
278 # and adding more entries to it doesn't change others
280 # and adding more entries to it doesn't change others
279 # immediate perspective
281 # immediate perspective
280 assert hm1_last_sid() == sid1
282 assert hm1_last_sid() == sid1
281 tail = hm1.get_tail(n=3)
283 tail = hm1.get_tail(n=3)
282 assert list(map(get_source, tail)) == hist1
284 assert list(map(get_source, tail)) == hist1
283
285
284 hist3 = ["a=3", "b=3", "c=3"]
286 hist3 = ["a=3", "b=3", "c=3"]
285 for i, h in enumerate(hist3 + [""], start=5):
287 for i, h in enumerate(hist3 + [""], start=5):
286 hm1.store_inputs(i, h)
288 hm1.store_inputs(i, h)
287 tail = hm1.get_tail(n=7)
289 tail = hm1.get_tail(n=7)
288 assert list(map(get_source, tail)) == hist1 + [""] + hist3
290 assert list(map(get_source, tail)) == hist1 + [""] + hist3
289 tail = hm2.get_tail(n=3)
291 tail = hm2.get_tail(n=3)
290 assert list(map(get_source, tail)) == hist2
292 assert list(map(get_source, tail)) == hist2
291 tail = ha.get_tail(n=3)
293 tail = ha.get_tail(n=3)
292 assert list(map(get_source, tail)) == hist2
294 assert list(map(get_source, tail)) == hist2
293 assert hm1_last_sid() == sid1
295 assert hm1_last_sid() == sid1
294 assert hm2_last_sid() == sid2
296 assert hm2_last_sid() == sid2
295 assert ha_last_sid() == sid2
297 assert ha_last_sid() == sid2
298 finally:
299 if hm1:
300 hm1.save_thread.stop()
301 if hm2:
302 hm2.save_thread.stop()
303 hm = hm1 or hm2
304 if hm:
305 hm.db.close()
General Comments 0
You need to be logged in to leave comments. Login now