##// END OF EJS Templates
wrap db before host dir is gone in new test
Aleksey Bogdanov -
Show More
@@ -1,295 +1,305 b''
1 1 # coding: utf-8
2 2 """Tests for the IPython tab-completion machinery.
3 3 """
4 4 #-----------------------------------------------------------------------------
5 5 # Module imports
6 6 #-----------------------------------------------------------------------------
7 7
8 8 # stdlib
9 9 import io
10 10 import sqlite3
11 11 import sys
12 12 import tempfile
13 13 from datetime import datetime
14 14 from pathlib import Path
15 15
16 16 from tempfile import TemporaryDirectory
17 17 # our own packages
18 18 from traitlets.config.loader import Config
19 19
20 20 from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges
21 21
22 22
23 23 def test_proper_default_encoding():
24 24 assert sys.getdefaultencoding() == "utf-8"
25 25
26 26 def test_history():
27 27 ip = get_ipython()
28 28 with TemporaryDirectory() as tmpdir:
29 29 tmp_path = Path(tmpdir)
30 30 hist_manager_ori = ip.history_manager
31 31 hist_file = tmp_path / "history.sqlite"
32 32 try:
33 33 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
34 34 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
35 35 for i, h in enumerate(hist, start=1):
36 36 ip.history_manager.store_inputs(i, h)
37 37
38 38 ip.history_manager.db_log_output = True
39 39 # Doesn't match the input, but we'll just check it's stored.
40 40 ip.history_manager.output_hist_reprs[3] = "spam"
41 41 ip.history_manager.store_output(3)
42 42
43 43 assert ip.history_manager.input_hist_raw == [""] + hist
44 44
45 45 # Detailed tests for _get_range_session
46 46 grs = ip.history_manager._get_range_session
47 47 assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
48 48 assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
49 49 assert list(grs(output=True)) == list(
50 50 zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
51 51 )
52 52
53 53 # Check whether specifying a range beyond the end of the current
54 54 # session results in an error (gh-804)
55 55 ip.run_line_magic("hist", "2-500")
56 56
57 57 # Check that we can write non-ascii characters to a file
58 58 ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
59 59 ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
60 60 ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
61 61 ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))
62 62
63 63 # New session
64 64 ip.history_manager.reset()
65 65 newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"]
66 66 for i, cmd in enumerate(newcmds, start=1):
67 67 ip.history_manager.store_inputs(i, cmd)
68 68 gothist = ip.history_manager.get_range(start=1, stop=4)
69 69 assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
70 70 # Previous session:
71 71 gothist = ip.history_manager.get_range(-1, 1, 4)
72 72 assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))
73 73
74 74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
75 75
76 76 # Check get_hist_tail
77 77 gothist = ip.history_manager.get_tail(5, output=True,
78 78 include_latest=True)
79 79 expected = [(1, 3, (hist[-1], "spam"))] \
80 80 + [(s, n, (c, None)) for (s, n, c) in newhist]
81 81 assert list(gothist) == expected
82 82
83 83 gothist = ip.history_manager.get_tail(2)
84 84 expected = newhist[-3:-1]
85 85 assert list(gothist) == expected
86 86
87 87 # Check get_hist_search
88 88
89 89 gothist = ip.history_manager.search("*test*")
90 90 assert list(gothist) == [(1, 2, hist[1])]
91 91
92 92 gothist = ip.history_manager.search("*=*")
93 93 assert list(gothist) == [
94 94 (1, 1, hist[0]),
95 95 (1, 2, hist[1]),
96 96 (1, 3, hist[2]),
97 97 newhist[0],
98 98 newhist[2],
99 99 newhist[3],
100 100 ]
101 101
102 102 gothist = ip.history_manager.search("*=*", n=4)
103 103 assert list(gothist) == [
104 104 (1, 3, hist[2]),
105 105 newhist[0],
106 106 newhist[2],
107 107 newhist[3],
108 108 ]
109 109
110 110 gothist = ip.history_manager.search("*=*", unique=True)
111 111 assert list(gothist) == [
112 112 (1, 1, hist[0]),
113 113 (1, 2, hist[1]),
114 114 (1, 3, hist[2]),
115 115 newhist[2],
116 116 newhist[3],
117 117 ]
118 118
119 119 gothist = ip.history_manager.search("*=*", unique=True, n=3)
120 120 assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]
121 121
122 122 gothist = ip.history_manager.search("b*", output=True)
123 123 assert list(gothist) == [(1, 3, (hist[2], "spam"))]
124 124
125 125 # Cross testing: check that magic %save can get previous session.
126 126 testfilename = (tmp_path / "test.py").resolve()
127 127 ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
128 128 with io.open(testfilename, encoding="utf-8") as testfile:
129 129 assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"
130 130
131 131 # Duplicate line numbers - check that it doesn't crash, and
132 132 # gets a new session
133 133 ip.history_manager.store_inputs(1, "rogue")
134 134 ip.history_manager.writeout_cache()
135 135 assert ip.history_manager.session_number == 3
136 136
137 137 # Check that session and line values are not just max values
138 138 sessid, lineno, entry = newhist[-1]
139 139 assert lineno > 1
140 140 ip.history_manager.reset()
141 141 lineno = 1
142 142 ip.history_manager.store_inputs(lineno, entry)
143 143 gothist = ip.history_manager.search("*=*", unique=True)
144 144 hist = list(gothist)[-1]
145 145 assert sessid < hist[0]
146 146 assert hist[1:] == (lineno, entry)
147 147 finally:
148 148 # Ensure saving thread is shut down before we try to clean up the files
149 149 ip.history_manager.save_thread.stop()
150 150 # Forcibly close database rather than relying on garbage collection
151 151 ip.history_manager.db.close()
152 152 # Restore history manager
153 153 ip.history_manager = hist_manager_ori
154 154
155 155
156 156 def test_extract_hist_ranges():
157 157 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
158 158 expected = [(0, 1, 2), # 0 == current session
159 159 (2, 3, 4),
160 160 (-4, 5, 7),
161 161 (-4, 7, 10),
162 162 (-9, 2, None), # None == to end
163 163 (-8, 1, None),
164 164 (-7, 1, 6),
165 165 (-10, 1, None)]
166 166 actual = list(extract_hist_ranges(instr))
167 167 assert actual == expected
168 168
169 169
170 170 def test_extract_hist_ranges_empty_str():
171 171 instr = ""
172 172 expected = [(0, 1, None)] # 0 == current session, None == to end
173 173 actual = list(extract_hist_ranges(instr))
174 174 assert actual == expected
175 175
176 176
177 177 def test_magic_rerun():
178 178 """Simple test for %rerun (no args -> rerun last line)"""
179 179 ip = get_ipython()
180 180 ip.run_cell("a = 10", store_history=True)
181 181 ip.run_cell("a += 1", store_history=True)
182 182 assert ip.user_ns["a"] == 11
183 183 ip.run_cell("%rerun", store_history=True)
184 184 assert ip.user_ns["a"] == 12
185 185
186 186 def test_timestamp_type():
187 187 ip = get_ipython()
188 188 info = ip.history_manager.get_session_info()
189 189 assert isinstance(info[1], datetime)
190 190
191 191 def test_hist_file_config():
192 192 cfg = Config()
193 193 tfile = tempfile.NamedTemporaryFile(delete=False)
194 194 cfg.HistoryManager.hist_file = Path(tfile.name)
195 195 try:
196 196 hm = HistoryManager(shell=get_ipython(), config=cfg)
197 197 assert hm.hist_file == cfg.HistoryManager.hist_file
198 198 finally:
199 199 try:
200 200 Path(tfile.name).unlink()
201 201 except OSError:
202 202 # same catch as in testing.tools.TempFileMixin
203 203 # On Windows, even though we close the file, we still can't
204 204 # delete it. I have no clue why
205 205 pass
206 206
207 207 def test_histmanager_disabled():
208 208 """Ensure that disabling the history manager doesn't create a database."""
209 209 cfg = Config()
210 210 cfg.HistoryAccessor.enabled = False
211 211
212 212 ip = get_ipython()
213 213 with TemporaryDirectory() as tmpdir:
214 214 hist_manager_ori = ip.history_manager
215 215 hist_file = Path(tmpdir) / "history.sqlite"
216 216 cfg.HistoryManager.hist_file = hist_file
217 217 try:
218 218 ip.history_manager = HistoryManager(shell=ip, config=cfg)
219 219 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
220 220 for i, h in enumerate(hist, start=1):
221 221 ip.history_manager.store_inputs(i, h)
222 222 assert ip.history_manager.input_hist_raw == [""] + hist
223 223 ip.history_manager.reset()
224 224 ip.history_manager.end_session()
225 225 finally:
226 226 ip.history_manager = hist_manager_ori
227 227
228 228 # hist_file should not be created
229 229 assert hist_file.exists() is False
230 230
231 231
232 232 def test_get_tail_session_awareness():
233 233 """Test .get_tail() is:
234 234 - session specific in HistoryManager
235 235 - session agnostic in HistoryAccessor
236 236 same for .get_last_session_id()
237 237 """
238 238 ip = get_ipython()
239 239 with TemporaryDirectory() as tmpdir:
240 240 tmp_path = Path(tmpdir)
241 241 hist_file = tmp_path / "history.sqlite"
242 242 get_source = lambda x: x[2]
243
244 # hm1 creates a new session and adds history entries,
245 # ha catches up
246 hm1 = HistoryManager(shell=ip, hist_file=hist_file)
247 hm1_last_sid = hm1.get_last_session_id
248 ha = HistoryAccessor(hist_file=hist_file)
249 ha_last_sid = ha.get_last_session_id
250
251 hist1 = ["a=1", "b=1", "c=1"]
252 for i, h in enumerate(hist1 + [""], start=1):
253 hm1.store_inputs(i, h)
254 assert list(map(get_source, hm1.get_tail())) == hist1
255 assert list(map(get_source, ha.get_tail())) == hist1
256 sid1 = hm1_last_sid()
257 assert sid1 is not None
258 assert ha_last_sid() == sid1
259
260 # hm2 creates a new session and adds entries,
261 # ha catches up
262 hm2 = HistoryManager(shell=ip, hist_file=hist_file)
263 hm2_last_sid = hm2.get_last_session_id
264
265 hist2 = ["a=2", "b=2", "c=2"]
266 for i, h in enumerate(hist2 + [""], start=1):
267 hm2.store_inputs(i, h)
268 tail = hm2.get_tail(n=3)
269 assert list(map(get_source, tail)) == hist2
270 tail = ha.get_tail(n=3)
271 assert list(map(get_source, tail)) == hist2
272 sid2 = hm2_last_sid()
273 assert sid2 is not None
274 assert ha_last_sid() == sid2
275 assert sid2 != sid1
276
277 # but hm1 still maintains its point of reference
278 # and adding more entries to it doesn't change others
279 # immediate perspective
280 assert hm1_last_sid() == sid1
281 tail = hm1.get_tail(n=3)
282 assert list(map(get_source, tail)) == hist1
283
284 hist3 = ["a=3", "b=3", "c=3"]
285 for i, h in enumerate(hist3 + [""], start=5):
286 hm1.store_inputs(i, h)
287 tail = hm1.get_tail(n=7)
288 assert list(map(get_source, tail)) == hist1 + [""] + hist3
289 tail = hm2.get_tail(n=3)
290 assert list(map(get_source, tail)) == hist2
291 tail = ha.get_tail(n=3)
292 assert list(map(get_source, tail)) == hist2
293 assert hm1_last_sid() == sid1
294 assert hm2_last_sid() == sid2
295 assert ha_last_sid() == sid2
243 hm1 = None
244 hm2 = None
245 try:
246 # hm1 creates a new session and adds history entries,
247 # ha catches up
248 hm1 = HistoryManager(shell=ip, hist_file=hist_file)
249 hm1_last_sid = hm1.get_last_session_id
250 ha = HistoryAccessor(hist_file=hist_file)
251 ha_last_sid = ha.get_last_session_id
252
253 hist1 = ["a=1", "b=1", "c=1"]
254 for i, h in enumerate(hist1 + [""], start=1):
255 hm1.store_inputs(i, h)
256 assert list(map(get_source, hm1.get_tail())) == hist1
257 assert list(map(get_source, ha.get_tail())) == hist1
258 sid1 = hm1_last_sid()
259 assert sid1 is not None
260 assert ha_last_sid() == sid1
261
262 # hm2 creates a new session and adds entries,
263 # ha catches up
264 hm2 = HistoryManager(shell=ip, hist_file=hist_file)
265 hm2_last_sid = hm2.get_last_session_id
266
267 hist2 = ["a=2", "b=2", "c=2"]
268 for i, h in enumerate(hist2 + [""], start=1):
269 hm2.store_inputs(i, h)
270 tail = hm2.get_tail(n=3)
271 assert list(map(get_source, tail)) == hist2
272 tail = ha.get_tail(n=3)
273 assert list(map(get_source, tail)) == hist2
274 sid2 = hm2_last_sid()
275 assert sid2 is not None
276 assert ha_last_sid() == sid2
277 assert sid2 != sid1
278
279 # but hm1 still maintains its point of reference
280 # and adding more entries to it doesn't change others
281 # immediate perspective
282 assert hm1_last_sid() == sid1
283 tail = hm1.get_tail(n=3)
284 assert list(map(get_source, tail)) == hist1
285
286 hist3 = ["a=3", "b=3", "c=3"]
287 for i, h in enumerate(hist3 + [""], start=5):
288 hm1.store_inputs(i, h)
289 tail = hm1.get_tail(n=7)
290 assert list(map(get_source, tail)) == hist1 + [""] + hist3
291 tail = hm2.get_tail(n=3)
292 assert list(map(get_source, tail)) == hist2
293 tail = ha.get_tail(n=3)
294 assert list(map(get_source, tail)) == hist2
295 assert hm1_last_sid() == sid1
296 assert hm2_last_sid() == sid2
297 assert ha_last_sid() == sid2
298 finally:
299 if hm1:
300 hm1.save_thread.stop()
301 if hm2:
302 hm2.save_thread.stop()
303 hm = hm1 or hm2
304 if hm:
305 hm.db.close()
General Comments 0
You need to be logged in to leave comments. Login now