test_history.py
306 lines
| 11.4 KiB
| text/x-python
|
PythonLexer
Thomas Kluyver
|
r3449 | # coding: utf-8 | ||
Satrajit Ghosh
|
r3240 | """Tests for the IPython tab-completion machinery. | ||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Module imports | ||||
#----------------------------------------------------------------------------- | ||||
# stdlib | ||||
Thomas Kluyver
|
r17299 | import io | ||
Satrajit Ghosh
|
r3240 | import sys | ||
MinRK
|
r5233 | import tempfile | ||
MinRK
|
r4487 | from datetime import datetime | ||
Matthias Bussonnier
|
r27509 | from pathlib import Path | ||
MinRK
|
r5233 | |||
Matthias Bussonnier
|
r27509 | from tempfile import TemporaryDirectory | ||
Satrajit Ghosh
|
r3240 | # our own packages | ||
Min RK
|
r21253 | from traitlets.config.loader import Config | ||
Matthias Bussonnier
|
r27509 | |||
Aleksey Bogdanov
|
r27707 | from IPython.core.history import HistoryAccessor, HistoryManager, extract_hist_ranges | ||
Satrajit Ghosh
|
r3240 | |||
Matthias Bussonnier
|
r27509 | |||
Matthias Bussonnier
|
r25082 | def test_proper_default_encoding(): | ||
Samuel Gaist
|
r26896 | assert sys.getdefaultencoding() == "utf-8" | ||
Thomas Kluyver
|
r3458 | |||
def test_history(): | ||||
Satrajit Ghosh
|
r3240 | ip = get_ipython() | ||
with TemporaryDirectory() as tmpdir: | ||||
Jakub Klus
|
r26129 | tmp_path = Path(tmpdir) | ||
Satrajit Ghosh
|
r3240 | hist_manager_ori = ip.history_manager | ||
Jakub Klus
|
r26131 | hist_file = tmp_path / "history.sqlite" | ||
Satrajit Ghosh
|
r3240 | try: | ||
Thomas Kluyver
|
r3715 | ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file) | ||
Samuel Gaist
|
r26896 | hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"] | ||
Thomas Kluyver
|
r3396 | for i, h in enumerate(hist, start=1): | ||
ip.history_manager.store_inputs(i, h) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3400 | ip.history_manager.db_log_output = True | ||
# Doesn't match the input, but we'll just check it's stored. | ||||
Thomas Kluyver
|
r3741 | ip.history_manager.output_hist_reprs[3] = "spam" | ||
Thomas Kluyver
|
r3415 | ip.history_manager.store_output(3) | ||
Bernardo B. Marques
|
r4872 | |||
Samuel Gaist
|
r26896 | assert ip.history_manager.input_hist_raw == [""] + hist | ||
luciana
|
r24717 | |||
Thomas Kluyver
|
r4889 | # Detailed tests for _get_range_session | ||
grs = ip.history_manager._get_range_session | ||||
Samuel Gaist
|
r26896 | assert list(grs(start=2, stop=-1)) == list(zip([0], [2], hist[1:-1])) | ||
assert list(grs(start=-2)) == list(zip([0, 0], [2, 3], hist[-2:])) | ||||
assert list(grs(output=True)) == list( | ||||
zip([0, 0, 0], [1, 2, 3], zip(hist, [None, None, "spam"])) | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
# Check whether specifying a range beyond the end of the current | ||||
Thomas Kluyver
|
r4859 | # session results in an error (gh-804) | ||
Matthias Bussonnier
|
r27549 | ip.run_line_magic("hist", "2-500") | ||
luciana
|
r24717 | |||
Thomas Kluyver
|
r6074 | # Check that we can write non-ascii characters to a file | ||
Matthias Bussonnier
|
r27549 | ip.run_line_magic("hist", "-f %s" % (tmp_path / "test1")) | ||
ip.run_line_magic("hist", "-pf %s" % (tmp_path / "test2")) | ||||
ip.run_line_magic("hist", "-nf %s" % (tmp_path / "test3")) | ||||
ip.run_line_magic("save", "%s 1-10" % (tmp_path / "test4")) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3396 | # New session | ||
ip.history_manager.reset() | ||||
Samuel Gaist
|
r26896 | newcmds = ["z=5", "class X(object):\n pass", "k='p'", "z=5"] | ||
Thomas Kluyver
|
r3400 | for i, cmd in enumerate(newcmds, start=1): | ||
Thomas Kluyver
|
r3396 | ip.history_manager.store_inputs(i, cmd) | ||
Thomas Kluyver
|
r3435 | gothist = ip.history_manager.get_range(start=1, stop=4) | ||
Samuel Gaist
|
r26896 | assert list(gothist) == list(zip([0, 0, 0], [1, 2, 3], newcmds)) | ||
Thomas Kluyver
|
r3396 | # Previous session: | ||
Thomas Kluyver
|
r3435 | gothist = ip.history_manager.get_range(-1, 1, 4) | ||
Samuel Gaist
|
r26896 | assert list(gothist) == list(zip([1, 1, 1], [1, 2, 3], hist)) | ||
Bernardo B. Marques
|
r4872 | |||
Takafumi Arakaki
|
r8785 | newhist = [(2, i, c) for (i, c) in enumerate(newcmds, 1)] | ||
Takafumi Arakaki
|
r8779 | |||
Thomas Kluyver
|
r3400 | # Check get_hist_tail | ||
Takafumi Arakaki
|
r8779 | gothist = ip.history_manager.get_tail(5, output=True, | ||
Thomas Kluyver
|
r3420 | include_latest=True) | ||
Takafumi Arakaki
|
r8779 | expected = [(1, 3, (hist[-1], "spam"))] \ | ||
+ [(s, n, (c, None)) for (s, n, c) in newhist] | ||||
Samuel Gaist
|
r26896 | assert list(gothist) == expected | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3435 | gothist = ip.history_manager.get_tail(2) | ||
Takafumi Arakaki
|
r8779 | expected = newhist[-3:-1] | ||
Samuel Gaist
|
r26896 | assert list(gothist) == expected | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3400 | # Check get_hist_search | ||
luciana
|
r24717 | |||
Thomas Kluyver
|
r3435 | gothist = ip.history_manager.search("*test*") | ||
Samuel Gaist
|
r26896 | assert list(gothist) == [(1, 2, hist[1])] | ||
Takafumi Arakaki
|
r8781 | |||
Takafumi Arakaki
|
r8400 | gothist = ip.history_manager.search("*=*") | ||
Samuel Gaist
|
r26896 | assert list(gothist) == [ | ||
(1, 1, hist[0]), | ||||
(1, 2, hist[1]), | ||||
(1, 3, hist[2]), | ||||
newhist[0], | ||||
newhist[2], | ||||
newhist[3], | ||||
] | ||||
Takafumi Arakaki
|
r8781 | |||
Takafumi Arakaki
|
r8779 | gothist = ip.history_manager.search("*=*", n=4) | ||
Samuel Gaist
|
r26896 | assert list(gothist) == [ | ||
(1, 3, hist[2]), | ||||
newhist[0], | ||||
newhist[2], | ||||
newhist[3], | ||||
] | ||||
Takafumi Arakaki
|
r8781 | |||
gothist = ip.history_manager.search("*=*", unique=True) | ||||
Samuel Gaist
|
r26896 | assert list(gothist) == [ | ||
(1, 1, hist[0]), | ||||
(1, 2, hist[1]), | ||||
(1, 3, hist[2]), | ||||
newhist[2], | ||||
newhist[3], | ||||
] | ||||
Takafumi Arakaki
|
r8781 | |||
Takafumi Arakaki
|
r8782 | gothist = ip.history_manager.search("*=*", unique=True, n=3) | ||
Samuel Gaist
|
r26896 | assert list(gothist) == [(1, 3, hist[2]), newhist[2], newhist[3]] | ||
Takafumi Arakaki
|
r8782 | |||
Thomas Kluyver
|
r3435 | gothist = ip.history_manager.search("b*", output=True) | ||
Samuel Gaist
|
r26896 | assert list(gothist) == [(1, 3, (hist[2], "spam"))] | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3396 | # Cross testing: check that magic %save can get previous session. | ||
Jakub Klus
|
r26131 | testfilename = (tmp_path / "test.py").resolve() | ||
Matthias Bussonnier
|
r27549 | ip.run_line_magic("save", str(testfilename) + " ~1/1-3") | ||
Samuel Gaist
|
r26896 | with io.open(testfilename, encoding="utf-8") as testfile: | ||
assert testfile.read() == "# coding: utf-8\n" + "\n".join(hist) + "\n" | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r3438 | # Duplicate line numbers - check that it doesn't crash, and | ||
# gets a new session | ||||
ip.history_manager.store_inputs(1, "rogue") | ||||
Thomas Kluyver
|
r3715 | ip.history_manager.writeout_cache() | ||
Samuel Gaist
|
r26896 | assert ip.history_manager.session_number == 3 | ||
Nikita Kniazev
|
r27045 | |||
# Check that session and line values are not just max values | ||||
sessid, lineno, entry = newhist[-1] | ||||
assert lineno > 1 | ||||
ip.history_manager.reset() | ||||
lineno = 1 | ||||
ip.history_manager.store_inputs(lineno, entry) | ||||
gothist = ip.history_manager.search("*=*", unique=True) | ||||
hist = list(gothist)[-1] | ||||
assert sessid < hist[0] | ||||
assert hist[1:] == (lineno, entry) | ||||
Satrajit Ghosh
|
r3240 | finally: | ||
Thomas Kluyver
|
r16768 | # Ensure saving thread is shut down before we try to clean up the files | ||
ip.history_manager.save_thread.stop() | ||||
Thomas Kluyver
|
r16771 | # Forcibly close database rather than relying on garbage collection | ||
ip.history_manager.db.close() | ||||
Satrajit Ghosh
|
r3240 | # Restore history manager | ||
ip.history_manager = hist_manager_ori | ||||
Thomas Kluyver
|
r3396 | |||
Thomas Kluyver
|
r3447 | |||
Thomas Kluyver
|
r3396 | def test_extract_hist_ranges(): | ||
Joon Ro
|
r9797 | instr = "1 2/3 ~4/5-6 ~4/7-~4/9 ~9/2-~7/5 ~10/" | ||
Thomas Kluyver
|
r3396 | expected = [(0, 1, 2), # 0 == current session | ||
(2, 3, 4), | ||||
(-4, 5, 7), | ||||
(-4, 7, 10), | ||||
(-9, 2, None), # None == to end | ||||
(-8, 1, None), | ||||
Joon Ro
|
r9798 | (-7, 1, 6), | ||
Joon Ro
|
r9797 | (-10, 1, None)] | ||
Thomas Kluyver
|
r3396 | actual = list(extract_hist_ranges(instr)) | ||
Samuel Gaist
|
r26896 | assert actual == expected | ||
Bernardo B. Marques
|
r4872 | |||
Blazej Michalik
|
r26630 | |||
def test_extract_hist_ranges_empty_str(): | ||||
Blazej Michalik
|
r26641 | instr = "" | ||
Blazej Michalik
|
r26630 | expected = [(0, 1, None)] # 0 == current session, None == to end | ||
actual = list(extract_hist_ranges(instr)) | ||||
Samuel Gaist
|
r26896 | assert actual == expected | ||
Blazej Michalik
|
r26630 | |||
Thomas Kluyver
|
r3420 | def test_magic_rerun(): | ||
"""Simple test for %rerun (no args -> rerun last line)""" | ||||
ip = get_ipython() | ||||
Thomas Kluyver
|
r4995 | ip.run_cell("a = 10", store_history=True) | ||
ip.run_cell("a += 1", store_history=True) | ||||
Samuel Gaist
|
r26896 | assert ip.user_ns["a"] == 11 | ||
Thomas Kluyver
|
r4995 | ip.run_cell("%rerun", store_history=True) | ||
Samuel Gaist
|
r26896 | assert ip.user_ns["a"] == 12 | ||
MinRK
|
r4487 | |||
def test_timestamp_type(): | ||||
ip = get_ipython() | ||||
info = ip.history_manager.get_session_info() | ||||
Samuel Gaist
|
r26896 | assert isinstance(info[1], datetime) | ||
MinRK
|
r5233 | |||
def test_hist_file_config(): | ||||
cfg = Config() | ||||
MinRK
|
r5242 | tfile = tempfile.NamedTemporaryFile(delete=False) | ||
Jakub Klus
|
r26129 | cfg.HistoryManager.hist_file = Path(tfile.name) | ||
MinRK
|
r5242 | try: | ||
hm = HistoryManager(shell=get_ipython(), config=cfg) | ||||
Samuel Gaist
|
r26896 | assert hm.hist_file == cfg.HistoryManager.hist_file | ||
MinRK
|
r5242 | finally: | ||
MinRK
|
r5297 | try: | ||
Jakub Klus
|
r26129 | Path(tfile.name).unlink() | ||
MinRK
|
r5297 | except OSError: | ||
# same catch as in testing.tools.TempFileMixin | ||||
# On Windows, even though we close the file, we still can't | ||||
# delete it. I have no clue why | ||||
pass | ||||
MinRK
|
r5233 | |||
Trevor Bekolay
|
r22185 | def test_histmanager_disabled(): | ||
"""Ensure that disabling the history manager doesn't create a database.""" | ||||
cfg = Config() | ||||
cfg.HistoryAccessor.enabled = False | ||||
ip = get_ipython() | ||||
with TemporaryDirectory() as tmpdir: | ||||
hist_manager_ori = ip.history_manager | ||||
Jakub Klus
|
r26131 | hist_file = Path(tmpdir) / "history.sqlite" | ||
Trevor Bekolay
|
r22185 | cfg.HistoryManager.hist_file = hist_file | ||
try: | ||||
ip.history_manager = HistoryManager(shell=ip, config=cfg) | ||||
Samuel Gaist
|
r26896 | hist = ["a=1", "def f():\n test = 1\n return test", "b='€Æ¾÷ß'"] | ||
Trevor Bekolay
|
r22185 | for i, h in enumerate(hist, start=1): | ||
ip.history_manager.store_inputs(i, h) | ||||
Samuel Gaist
|
r26896 | assert ip.history_manager.input_hist_raw == [""] + hist | ||
Trevor Bekolay
|
r22185 | ip.history_manager.reset() | ||
ip.history_manager.end_session() | ||||
finally: | ||||
ip.history_manager = hist_manager_ori | ||||
# hist_file should not be created | ||||
Samuel Gaist
|
r26896 | assert hist_file.exists() is False | ||
Aleksey Bogdanov
|
r27707 | |||
Aleksey Bogdanov
|
r27708 | |||
Aleksey Bogdanov
|
r27707 | def test_get_tail_session_awareness(): | ||
"""Test .get_tail() is: | ||||
Aleksey Bogdanov
|
r27708 | - session specific in HistoryManager | ||
- session agnostic in HistoryAccessor | ||||
same for .get_last_session_id() | ||||
Aleksey Bogdanov
|
r27707 | """ | ||
ip = get_ipython() | ||||
with TemporaryDirectory() as tmpdir: | ||||
tmp_path = Path(tmpdir) | ||||
hist_file = tmp_path / "history.sqlite" | ||||
get_source = lambda x: x[2] | ||||
Aleksey Bogdanov
|
r27709 | hm1 = None | ||
hm2 = None | ||||
Aleksey Bogdanov
|
r27710 | ha = None | ||
Aleksey Bogdanov
|
r27709 | try: | ||
# hm1 creates a new session and adds history entries, | ||||
# ha catches up | ||||
hm1 = HistoryManager(shell=ip, hist_file=hist_file) | ||||
hm1_last_sid = hm1.get_last_session_id | ||||
ha = HistoryAccessor(hist_file=hist_file) | ||||
ha_last_sid = ha.get_last_session_id | ||||
hist1 = ["a=1", "b=1", "c=1"] | ||||
for i, h in enumerate(hist1 + [""], start=1): | ||||
hm1.store_inputs(i, h) | ||||
assert list(map(get_source, hm1.get_tail())) == hist1 | ||||
assert list(map(get_source, ha.get_tail())) == hist1 | ||||
sid1 = hm1_last_sid() | ||||
assert sid1 is not None | ||||
assert ha_last_sid() == sid1 | ||||
# hm2 creates a new session and adds entries, | ||||
# ha catches up | ||||
hm2 = HistoryManager(shell=ip, hist_file=hist_file) | ||||
hm2_last_sid = hm2.get_last_session_id | ||||
hist2 = ["a=2", "b=2", "c=2"] | ||||
for i, h in enumerate(hist2 + [""], start=1): | ||||
hm2.store_inputs(i, h) | ||||
tail = hm2.get_tail(n=3) | ||||
assert list(map(get_source, tail)) == hist2 | ||||
tail = ha.get_tail(n=3) | ||||
assert list(map(get_source, tail)) == hist2 | ||||
sid2 = hm2_last_sid() | ||||
assert sid2 is not None | ||||
assert ha_last_sid() == sid2 | ||||
assert sid2 != sid1 | ||||
# but hm1 still maintains its point of reference | ||||
# and adding more entries to it doesn't change others | ||||
# immediate perspective | ||||
assert hm1_last_sid() == sid1 | ||||
tail = hm1.get_tail(n=3) | ||||
assert list(map(get_source, tail)) == hist1 | ||||
hist3 = ["a=3", "b=3", "c=3"] | ||||
for i, h in enumerate(hist3 + [""], start=5): | ||||
hm1.store_inputs(i, h) | ||||
tail = hm1.get_tail(n=7) | ||||
assert list(map(get_source, tail)) == hist1 + [""] + hist3 | ||||
tail = hm2.get_tail(n=3) | ||||
assert list(map(get_source, tail)) == hist2 | ||||
tail = ha.get_tail(n=3) | ||||
assert list(map(get_source, tail)) == hist2 | ||||
assert hm1_last_sid() == sid1 | ||||
assert hm2_last_sid() == sid2 | ||||
assert ha_last_sid() == sid2 | ||||
finally: | ||||
if hm1: | ||||
hm1.save_thread.stop() | ||||
Aleksey Bogdanov
|
r27710 | hm1.db.close() | ||
Aleksey Bogdanov
|
r27709 | if hm2: | ||
hm2.save_thread.stop() | ||||
Aleksey Bogdanov
|
r27710 | hm2.db.close() | ||
if ha: | ||||
ha.db.close() | ||||