##// END OF EJS Templates
wrap db before host dir is gone in new test (v2)
Aleksey Bogdanov -
Show More
@@ -1,305 +1,307 b''
1 1 # coding: utf-8
2 2 """Tests for the IPython tab-completion machinery.
3 3 """
4 4 #-----------------------------------------------------------------------------
5 5 # Module imports
6 6 #-----------------------------------------------------------------------------
7 7
8 8 # stdlib
9 9 import io
10 10 import sqlite3
11 11 import sys
12 12 import tempfile
13 13 from datetime import datetime
14 14 from pathlib import Path
15 15
16 16 from tempfile import TemporaryDirectory
17 17 # our own packages
18 18 from traitlets.config.loader import Config
19 19
20 20 from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges
21 21
22 22
23 23 def test_proper_default_encoding():
24 24 assert sys.getdefaultencoding() == "utf-8"
25 25
26 26 def test_history():
27 27 ip = get_ipython()
28 28 with TemporaryDirectory() as tmpdir:
29 29 tmp_path = Path(tmpdir)
30 30 hist_manager_ori = ip.history_manager
31 31 hist_file = tmp_path / "history.sqlite"
32 32 try:
33 33 ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
34 34 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
35 35 for i, h in enumerate(hist, start=1):
36 36 ip.history_manager.store_inputs(i, h)
37 37
38 38 ip.history_manager.db_log_output = True
39 39 # Doesn't match the input, but we'll just check it's stored.
40 40 ip.history_manager.output_hist_reprs[3] = "spam"
41 41 ip.history_manager.store_output(3)
42 42
43 43 assert ip.history_manager.input_hist_raw == [""] + hist
44 44
45 45 # Detailed tests for _get_range_session
46 46 grs = ip.history_manager._get_range_session
47 47 assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1]))
48 48 assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:]))
49 49 assert list(grs(output=True)) == list(
50 50 zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"]))
51 51 )
52 52
53 53 # Check whether specifying a range beyond the end of the current
54 54 # session results in an error (gh-804)
55 55 ip.run_line_magic("hist", "2-500")
56 56
57 57 # Check that we can write non-ascii characters to a file
58 58 ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1"))
59 59 ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2"))
60 60 ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3"))
61 61 ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4"))
62 62
63 63 # New session
64 64 ip.history_manager.reset()
65 65 newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"]
66 66 for i, cmd in enumerate(newcmds, start=1):
67 67 ip.history_manager.store_inputs(i, cmd)
68 68 gothist = ip.history_manager.get_range(start=1, stop=4)
69 69 assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds))
70 70 # Previous session:
71 71 gothist = ip.history_manager.get_range(-1, 1, 4)
72 72 assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist))
73 73
74 74 newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)]
75 75
76 76 # Check get_hist_tail
77 77 gothist = ip.history_manager.get_tail(5, output=True,
78 78 include_latest=True)
79 79 expected = [(1, 3, (hist[-1], "spam"))] \
80 80 + [(s, n, (c, None)) for (s, n, c) in newhist]
81 81 assert list(gothist) == expected
82 82
83 83 gothist = ip.history_manager.get_tail(2)
84 84 expected = newhist[-3:-1]
85 85 assert list(gothist) == expected
86 86
87 87 # Check get_hist_search
88 88
89 89 gothist = ip.history_manager.search("*test*")
90 90 assert list(gothist) == [(1, 2, hist[1])]
91 91
92 92 gothist = ip.history_manager.search("*=*")
93 93 assert list(gothist) == [
94 94 (1, 1, hist[0]),
95 95 (1, 2, hist[1]),
96 96 (1, 3, hist[2]),
97 97 newhist[0],
98 98 newhist[2],
99 99 newhist[3],
100 100 ]
101 101
102 102 gothist = ip.history_manager.search("*=*", n=4)
103 103 assert list(gothist) == [
104 104 (1, 3, hist[2]),
105 105 newhist[0],
106 106 newhist[2],
107 107 newhist[3],
108 108 ]
109 109
110 110 gothist = ip.history_manager.search("*=*", unique=True)
111 111 assert list(gothist) == [
112 112 (1, 1, hist[0]),
113 113 (1, 2, hist[1]),
114 114 (1, 3, hist[2]),
115 115 newhist[2],
116 116 newhist[3],
117 117 ]
118 118
119 119 gothist = ip.history_manager.search("*=*", unique=True, n=3)
120 120 assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]]
121 121
122 122 gothist = ip.history_manager.search("b*", output=True)
123 123 assert list(gothist) == [(1, 3, (hist[2], "spam"))]
124 124
125 125 # Cross testing: check that magic %save can get previous session.
126 126 testfilename = (tmp_path / "test.py").resolve()
127 127 ip.run_line_magic("save", str(testfilename) + " ~1/1-3")
128 128 with io.open(testfilename, encoding="utf-8") as testfile:
129 129 assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n"
130 130
131 131 # Duplicate line numbers - check that it doesn't crash, and
132 132 # gets a new session
133 133 ip.history_manager.store_inputs(1, "rogue")
134 134 ip.history_manager.writeout_cache()
135 135 assert ip.history_manager.session_number == 3
136 136
137 137 # Check that session and line values are not just max values
138 138 sessid, lineno, entry = newhist[-1]
139 139 assert lineno > 1
140 140 ip.history_manager.reset()
141 141 lineno = 1
142 142 ip.history_manager.store_inputs(lineno, entry)
143 143 gothist = ip.history_manager.search("*=*", unique=True)
144 144 hist = list(gothist)[-1]
145 145 assert sessid < hist[0]
146 146 assert hist[1:] == (lineno, entry)
147 147 finally:
148 148 # Ensure saving thread is shut down before we try to clean up the files
149 149 ip.history_manager.save_thread.stop()
150 150 # Forcibly close database rather than relying on garbage collection
151 151 ip.history_manager.db.close()
152 152 # Restore history manager
153 153 ip.history_manager = hist_manager_ori
154 154
155 155
156 156 def test_extract_hist_ranges():
157 157 instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/"
158 158 expected = [(0, 1, 2), # 0 == current session
159 159 (2, 3, 4),
160 160 (-4, 5, 7),
161 161 (-4, 7, 10),
162 162 (-9, 2, None), # None == to end
163 163 (-8, 1, None),
164 164 (-7, 1, 6),
165 165 (-10, 1, None)]
166 166 actual = list(extract_hist_ranges(instr))
167 167 assert actual == expected
168 168
169 169
170 170 def test_extract_hist_ranges_empty_str():
171 171 instr = ""
172 172 expected = [(0, 1, None)] # 0 == current session, None == to end
173 173 actual = list(extract_hist_ranges(instr))
174 174 assert actual == expected
175 175
176 176
177 177 def test_magic_rerun():
178 178 """Simple test for %rerun (no args -> rerun last line)"""
179 179 ip = get_ipython()
180 180 ip.run_cell("a = 10", store_history=True)
181 181 ip.run_cell("a += 1", store_history=True)
182 182 assert ip.user_ns["a"] == 11
183 183 ip.run_cell("%rerun", store_history=True)
184 184 assert ip.user_ns["a"] == 12
185 185
186 186 def test_timestamp_type():
187 187 ip = get_ipython()
188 188 info = ip.history_manager.get_session_info()
189 189 assert isinstance(info[1], datetime)
190 190
191 191 def test_hist_file_config():
192 192 cfg = Config()
193 193 tfile = tempfile.NamedTemporaryFile(delete=False)
194 194 cfg.HistoryManager.hist_file = Path(tfile.name)
195 195 try:
196 196 hm = HistoryManager(shell=get_ipython(), config=cfg)
197 197 assert hm.hist_file == cfg.HistoryManager.hist_file
198 198 finally:
199 199 try:
200 200 Path(tfile.name).unlink()
201 201 except OSError:
202 202 # same catch as in testing.tools.TempFileMixin
203 203 # On Windows, even though we close the file, we still can't
204 204 # delete it. I have no clue why
205 205 pass
206 206
207 207 def test_histmanager_disabled():
208 208 """Ensure that disabling the history manager doesn't create a database."""
209 209 cfg = Config()
210 210 cfg.HistoryAccessor.enabled = False
211 211
212 212 ip = get_ipython()
213 213 with TemporaryDirectory() as tmpdir:
214 214 hist_manager_ori = ip.history_manager
215 215 hist_file = Path(tmpdir) / "history.sqlite"
216 216 cfg.HistoryManager.hist_file = hist_file
217 217 try:
218 218 ip.history_manager = HistoryManager(shell=ip, config=cfg)
219 219 hist = ["a=1", "def f():\n test = 1\n return test", "b='β‚¬Γ†ΒΎΓ·ΓŸ'"]
220 220 for i, h in enumerate(hist, start=1):
221 221 ip.history_manager.store_inputs(i, h)
222 222 assert ip.history_manager.input_hist_raw == [""] + hist
223 223 ip.history_manager.reset()
224 224 ip.history_manager.end_session()
225 225 finally:
226 226 ip.history_manager = hist_manager_ori
227 227
228 228 # hist_file should not be created
229 229 assert hist_file.exists() is False
230 230
231 231
232 232 def test_get_tail_session_awareness():
233 233 """Test .get_tail() is:
234 234 - session specific in HistoryManager
235 235 - session agnostic in HistoryAccessor
236 236 same for .get_last_session_id()
237 237 """
238 238 ip = get_ipython()
239 239 with TemporaryDirectory() as tmpdir:
240 240 tmp_path = Path(tmpdir)
241 241 hist_file = tmp_path / "history.sqlite"
242 242 get_source = lambda x: x[2]
243 243 hm1 = None
244 244 hm2 = None
245 ha = None
245 246 try:
246 247 # hm1 creates a new session and adds history entries,
247 248 # ha catches up
248 249 hm1 = HistoryManager(shell=ip, hist_file=hist_file)
249 250 hm1_last_sid = hm1.get_last_session_id
250 251 ha = HistoryAccessor(hist_file=hist_file)
251 252 ha_last_sid = ha.get_last_session_id
252 253
253 254 hist1 = ["a=1", "b=1", "c=1"]
254 255 for i, h in enumerate(hist1 + [""], start=1):
255 256 hm1.store_inputs(i, h)
256 257 assert list(map(get_source, hm1.get_tail())) == hist1
257 258 assert list(map(get_source, ha.get_tail())) == hist1
258 259 sid1 = hm1_last_sid()
259 260 assert sid1 is not None
260 261 assert ha_last_sid() == sid1
261 262
262 263 # hm2 creates a new session and adds entries,
263 264 # ha catches up
264 265 hm2 = HistoryManager(shell=ip, hist_file=hist_file)
265 266 hm2_last_sid = hm2.get_last_session_id
266 267
267 268 hist2 = ["a=2", "b=2", "c=2"]
268 269 for i, h in enumerate(hist2 + [""], start=1):
269 270 hm2.store_inputs(i, h)
270 271 tail = hm2.get_tail(n=3)
271 272 assert list(map(get_source, tail)) == hist2
272 273 tail = ha.get_tail(n=3)
273 274 assert list(map(get_source, tail)) == hist2
274 275 sid2 = hm2_last_sid()
275 276 assert sid2 is not None
276 277 assert ha_last_sid() == sid2
277 278 assert sid2 != sid1
278 279
279 280 # but hm1 still maintains its point of reference
280 281 # and adding more entries to it doesn't change others
281 282 # immediate perspective
282 283 assert hm1_last_sid() == sid1
283 284 tail = hm1.get_tail(n=3)
284 285 assert list(map(get_source, tail)) == hist1
285 286
286 287 hist3 = ["a=3", "b=3", "c=3"]
287 288 for i, h in enumerate(hist3 + [""], start=5):
288 289 hm1.store_inputs(i, h)
289 290 tail = hm1.get_tail(n=7)
290 291 assert list(map(get_source, tail)) == hist1 + [""] + hist3
291 292 tail = hm2.get_tail(n=3)
292 293 assert list(map(get_source, tail)) == hist2
293 294 tail = ha.get_tail(n=3)
294 295 assert list(map(get_source, tail)) == hist2
295 296 assert hm1_last_sid() == sid1
296 297 assert hm2_last_sid() == sid2
297 298 assert ha_last_sid() == sid2
298 299 finally:
299 300 if hm1:
300 301 hm1.save_thread.stop()
302 hm1.db.close()
301 303 if hm2:
302 304 hm2.save_thread.stop()
303 hm = hm1 or hm2
304 if hm:
305 hm.db.close()
305 hm2.db.close()
306 if ha:
307 ha.db.close()
General Comments 0
You need to be logged in to leave comments. Login now