From 4967ec2a4c1f30b5e8f79d145a290094b79d3523 2020-10-26 21:15:21
From: Matthias Bussonnier <bussonniermatthias@gmail.com>
Date: 2020-10-26 21:15:21
Subject: [PATCH] Merge pull request #12644 from deep-jkl/fix-pathlib-in-tests


---

diff --git a/IPython/core/history.py b/IPython/core/history.py
index 82d4d6e..08db18f 100644
--- a/IPython/core/history.py
+++ b/IPython/core/history.py
@@ -6,7 +6,7 @@
 
 import atexit
 import datetime
-import os
+from pathlib import Path
 import re
 import sqlite3
 import threading
@@ -16,8 +16,17 @@ from decorator import decorator
 from IPython.utils.decorators import undoc
 from IPython.paths import locate_profile
 from traitlets import (
-    Any, Bool, Dict, Instance, Integer, List, Unicode, TraitError,
-    default, observe,
+    Any,
+    Bool,
+    Dict,
+    Instance,
+    Integer,
+    List,
+    Unicode,
+    Union,
+    TraitError,
+    default,
+    observe,
 )
 
 #-----------------------------------------------------------------------------
@@ -27,17 +36,17 @@ from traitlets import (
 @undoc
 class DummyDB(object):
     """Dummy DB that will act as a black hole for history.
-    
+
     Only used in the absence of sqlite"""
     def execute(*args, **kwargs):
         return []
-    
+
     def commit(self, *args, **kwargs):
         pass
-    
+
     def __enter__(self, *args, **kwargs):
         pass
-    
+
     def __exit__(self, *args, **kwargs):
         pass
 
@@ -73,17 +82,18 @@ def catch_corrupt_db(f, self, *a, **kw):
             if self._corrupt_db_counter > self._corrupt_db_limit:
                 self.hist_file = ':memory:'
                 self.log.error("Failed to load history too many times, history will not be saved.")
-            elif os.path.isfile(self.hist_file):
+            elif self.hist_file.is_file():
                 # move the file out of the way
-                base, ext = os.path.splitext(self.hist_file)
-                size = os.stat(self.hist_file).st_size
+                base = str(self.hist_file.parent / self.hist_file.stem)
+                ext = self.hist_file.suffix
+                size = self.hist_file.stat().st_size
                 if size >= _SAVE_DB_SIZE:
                     # if there's significant content, avoid clobbering
                     now = datetime.datetime.now().isoformat().replace(':', '.')
                     newpath = base + '-corrupt-' + now + ext
                     # don't clobber previous corrupt backups
                     for i in range(100):
-                        if not os.path.isfile(newpath):
+                        if not Path(newpath).exists():
                             break
                         else:
                             newpath = base + '-corrupt-' + now + (u'-%i' % i) + ext
@@ -91,14 +101,15 @@ def catch_corrupt_db(f, self, *a, **kw):
                     # not much content, possibly empty; don't worry about clobbering
                     # maybe we should just delete it?
                     newpath = base + '-corrupt' + ext
-                os.rename(self.hist_file, newpath)
+                self.hist_file.rename(newpath)
                 self.log.error("History file was moved to %s and a new file created.", newpath)
             self.init_db()
             return []
         else:
             # Failed with :memory:, something serious is wrong
             raise
-        
+
+
 class HistoryAccessorBase(LoggingConfigurable):
     """An abstract class for History Accessors """
 
@@ -118,7 +129,7 @@ class HistoryAccessorBase(LoggingConfigurable):
 
 class HistoryAccessor(HistoryAccessorBase):
     """Access the history database without adding to it.
-    
+
     This is intended for use by standalone history tools. IPython shells use
     HistoryManager, below, which is a subclass of this."""
 
@@ -128,37 +139,39 @@ class HistoryAccessor(HistoryAccessorBase):
     _corrupt_db_limit = 2
 
     # String holding the path to the history file
-    hist_file = Unicode(
+    hist_file = Union(
+        [Instance(Path), Unicode()],
         help="""Path to file to use for SQLite history database.
-        
+
         By default, IPython will put the history database in the IPython
         profile directory.  If you would rather share one history among
         profiles, you can set this value in each, so that they are consistent.
-        
+
         Due to an issue with fcntl, SQLite is known to misbehave on some NFS
         mounts.  If you see IPython hanging, try setting this to something on a
         local disk, e.g::
-        
+
             ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
 
         you can also use the specific value `:memory:` (including the colon
         at both end but not the back ticks), to avoid creating an history file.
-        
-        """).tag(config=True)
-    
+
+        """,
+    ).tag(config=True)
+
     enabled = Bool(True,
         help="""enable the SQLite history
-        
+
         set enabled=False to disable the SQLite history,
         in which case there will be no stored history, no SQLite connection,
         and no background saving thread.  This may be necessary in some
         threaded environments where IPython is embedded.
         """
     ).tag(config=True)
-    
+
     connection_options = Dict(
         help="""Options for configuring the SQLite connection
-        
+
         These options are passed as keyword args to sqlite3.connect
         when establishing database connections.
         """
@@ -175,10 +188,10 @@ class HistoryAccessor(HistoryAccessorBase):
             msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % \
                     (self.__class__.__name__, new)
             raise TraitError(msg)
-    
-    def __init__(self, profile='default', hist_file=u'', **traits):
+
+    def __init__(self, profile="default", hist_file="", **traits):
         """Create a new history accessor.
-        
+
         Parameters
         ----------
         profile : str
@@ -196,37 +209,39 @@ class HistoryAccessor(HistoryAccessorBase):
         # set by config
         if hist_file:
             self.hist_file = hist_file
-        
-        if self.hist_file == u'':
+
+        try:
+            self.hist_file
+        except TraitError:
             # No one has set the hist_file, yet.
             self.hist_file = self._get_hist_file_name(profile)
-        
+
         self.init_db()
-    
+
     def _get_hist_file_name(self, profile='default'):
         """Find the history file for the given profile name.
-        
+
         This is overridden by the HistoryManager subclass, to use the shell's
         active profile.
-        
+
         Parameters
         ----------
         profile : str
           The name of a profile which has a history file.
         """
-        return os.path.join(locate_profile(profile), 'history.sqlite')
-    
+        return Path(locate_profile(profile)) / "history.sqlite"
+
     @catch_corrupt_db
     def init_db(self):
         """Connect to the database, and create tables if necessary."""
         if not self.enabled:
             self.db = DummyDB()
             return
-        
+
         # use detect_types so that timestamps return datetime objects
         kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
         kwargs.update(self.connection_options)
-        self.db = sqlite3.connect(self.hist_file, **kwargs)
+        self.db = sqlite3.connect(str(self.hist_file), **kwargs)
         self.db.execute("""CREATE TABLE IF NOT EXISTS sessions (session integer
                         primary key autoincrement, start timestamp,
                         end timestamp, num_cmds integer, remark text)""")
@@ -290,7 +305,7 @@ class HistoryAccessor(HistoryAccessorBase):
 
         Returns
         -------
-        
+
         session_id : int
            Session ID number
         start : datetime
@@ -308,7 +323,7 @@ class HistoryAccessor(HistoryAccessorBase):
     @catch_corrupt_db
     def get_last_session_id(self):
         """Get the last session ID currently in the database.
-        
+
         Within IPython, this should be the same as the value stored in
         :attr:`HistoryManager.session_number`.
         """
@@ -384,7 +399,7 @@ class HistoryAccessor(HistoryAccessorBase):
         if n is not None:
             return reversed(list(cur))
         return cur
-    
+
     @catch_corrupt_db
     def get_range(self, session, start=1, stop=None, raw=True,output=False):
         """Retrieve input by session.
@@ -461,7 +476,7 @@ class HistoryManager(HistoryAccessor):
     @default('dir_hist')
     def _dir_hist_default(self):
         try:
-            return [os.getcwd()]
+            return [Path.cwd()]
         except OSError:
             return []
 
@@ -473,7 +488,7 @@ class HistoryManager(HistoryAccessor):
 
     # The number of the current session in the history database
     session_number = Integer()
-    
+
     db_log_output = Bool(False,
         help="Should the history database include output? (default: no)"
     ).tag(config=True)
@@ -484,12 +499,12 @@ class HistoryManager(HistoryAccessor):
     # The input and output caches
     db_input_cache = List()
     db_output_cache = List()
-    
+
     # History saving in separate thread
     save_thread = Instance('IPython.core.history.HistorySavingThread',
                            allow_none=True)
     save_flag = Instance(threading.Event, allow_none=True)
-    
+
     # Private interface
     # Variables used to store the three last inputs from the user.  On each new
     # history update, we populate the user's namespace with these, shifted as
@@ -513,37 +528,37 @@ class HistoryManager(HistoryAccessor):
         self.save_flag = threading.Event()
         self.db_input_cache_lock = threading.Lock()
         self.db_output_cache_lock = threading.Lock()
-        
+
         try:
             self.new_session()
         except sqlite3.OperationalError:
             self.log.error("Failed to create history session in %s. History will not be saved.",
                 self.hist_file, exc_info=True)
             self.hist_file = ':memory:'
-        
+
         if self.enabled and self.hist_file != ':memory:':
             self.save_thread = HistorySavingThread(self)
             self.save_thread.start()
 
     def _get_hist_file_name(self, profile=None):
         """Get default history file name based on the Shell's profile.
-        
+
         The profile parameter is ignored, but must exist for compatibility with
         the parent class."""
         profile_dir = self.shell.profile_dir.location
-        return os.path.join(profile_dir, 'history.sqlite')
-    
+        return Path(profile_dir) / "history.sqlite"
+
     @only_when_enabled
     def new_session(self, conn=None):
         """Get a new session number."""
         if conn is None:
             conn = self.db
-        
+
         with conn:
             cur = conn.execute("""INSERT INTO sessions VALUES (NULL, ?, NULL,
                             NULL, "") """, (datetime.datetime.now(),))
             self.session_number = cur.lastrowid
-            
+
     def end_session(self):
         """Close the database session, filling in the end time and line count."""
         self.writeout_cache()
@@ -552,20 +567,20 @@ class HistoryManager(HistoryAccessor):
                             session==?""", (datetime.datetime.now(),
                             len(self.input_hist_parsed)-1, self.session_number))
         self.session_number = 0
-                            
+
     def name_session(self, name):
         """Give the current session a name in the history database."""
         with self.db:
             self.db.execute("UPDATE sessions SET remark=? WHERE session==?",
                             (name, self.session_number))
-                            
+
     def reset(self, new_session=True):
         """Clear the session history, releasing all object references, and
         optionally open a new session."""
         self.output_hist.clear()
         # The directory history can't be completely empty
-        self.dir_hist[:] = [os.getcwd()]
-        
+        self.dir_hist[:] = [Path.cwd()]
+
         if new_session:
             if self.session_number:
                 self.end_session()
@@ -588,7 +603,7 @@ class HistoryManager(HistoryAccessor):
 
         Returns
         -------
-        
+
         session_id : int
            Session ID number
         start : datetime
@@ -609,7 +624,7 @@ class HistoryManager(HistoryAccessor):
         """Get input and output history from the current session. Called by
         get_range, and takes similar parameters."""
         input_hist = self.input_hist_raw if raw else self.input_hist_parsed
-            
+
         n = len(input_hist)
         if start < 0:
             start += n
@@ -617,17 +632,17 @@ class HistoryManager(HistoryAccessor):
             stop = n
         elif stop < 0:
             stop += n
-        
+
         for i in range(start, stop):
             if output:
                 line = (input_hist[i], self.output_hist_reprs.get(i))
             else:
                 line = input_hist[i]
             yield (0, i, line)
-    
+
     def get_range(self, session=0, start=1, stop=None, raw=True,output=False):
         """Retrieve input by session.
-        
+
         Parameters
         ----------
         session : int
@@ -645,7 +660,7 @@ class HistoryManager(HistoryAccessor):
             objects for the current session, or text reprs from previous
             sessions if db_log_output was enabled at the time. Where no output
             is found, None is used.
-            
+
         Returns
         -------
         entries
@@ -709,7 +724,7 @@ class HistoryManager(HistoryAccessor):
                    '_ii': self._ii,
                    '_iii': self._iii,
                    new_i : self._i00 }
-        
+
         if self.shell is not None:
             self.shell.push(to_main, interactive=False)
 
@@ -797,8 +812,9 @@ class HistorySavingThread(threading.Thread):
     def run(self):
         # We need a separate db connection per thread:
         try:
-            self.db = sqlite3.connect(self.history_manager.hist_file,
-                            **self.history_manager.connection_options
+            self.db = sqlite3.connect(
+                str(self.history_manager.hist_file),
+                **self.history_manager.connection_options
             )
             while True:
                 self.history_manager.save_flag.wait()
diff --git a/IPython/core/tests/test_history.py b/IPython/core/tests/test_history.py
index f4f080d..57266e5 100644
--- a/IPython/core/tests/test_history.py
+++ b/IPython/core/tests/test_history.py
@@ -7,7 +7,7 @@
 
 # stdlib
 import io
-import os
+from pathlib import Path
 import sys
 import tempfile
 from datetime import datetime
@@ -29,8 +29,9 @@ def test_proper_default_encoding():
 def test_history():
     ip = get_ipython()
     with TemporaryDirectory() as tmpdir:
+        tmp_path = Path(tmpdir)
         hist_manager_ori = ip.history_manager
-        hist_file = os.path.join(tmpdir, 'history.sqlite')
+        hist_file = tmp_path / "history.sqlite"
         try:
             ip.history_manager = HistoryManager(shell=ip, hist_file=hist_file)
             hist = [u'a=1', u'def f():\n    test = 1\n    return test', u"b='€Æ¾÷ß'"]
@@ -55,10 +56,10 @@ def test_history():
             ip.magic('%hist 2-500')
 
             # Check that we can write non-ascii characters to a file
-            ip.magic("%%hist -f %s" % os.path.join(tmpdir, "test1"))
-            ip.magic("%%hist -pf %s" % os.path.join(tmpdir, "test2"))
-            ip.magic("%%hist -nf %s" % os.path.join(tmpdir, "test3"))
-            ip.magic("%%save %s 1-10" % os.path.join(tmpdir, "test4"))
+            ip.magic("%%hist -f %s" % (tmp_path / "test1"))
+            ip.magic("%%hist -pf %s" % (tmp_path / "test2"))
+            ip.magic("%%hist -nf %s" % (tmp_path / "test3"))
+            ip.magic("%%save %s 1-10" % (tmp_path / "test4"))
 
             # New session
             ip.history_manager.reset()
@@ -126,8 +127,8 @@ def test_history():
             nt.assert_equal(list(gothist), [(1,3,(hist[2],"spam"))] )
 
             # Cross testing: check that magic %save can get previous session.
-            testfilename = os.path.realpath(os.path.join(tmpdir, "test.py"))
-            ip.magic("save " + testfilename + " ~1/1-3")
+            testfilename = (tmp_path / "test.py").resolve()
+            ip.magic("save " + str(testfilename) + " ~1/1-3")
             with io.open(testfilename, encoding='utf-8') as testfile:
                 nt.assert_equal(testfile.read(),
                                         u"# coding: utf-8\n" + u"\n".join(hist)+u"\n")
@@ -176,13 +177,13 @@ def test_timestamp_type():
 def test_hist_file_config():
     cfg = Config()
     tfile = tempfile.NamedTemporaryFile(delete=False)
-    cfg.HistoryManager.hist_file = tfile.name
+    cfg.HistoryManager.hist_file = Path(tfile.name)
     try:
         hm = HistoryManager(shell=get_ipython(), config=cfg)
         nt.assert_equal(hm.hist_file, cfg.HistoryManager.hist_file)
     finally:
         try:
-            os.remove(tfile.name)
+            Path(tfile.name).unlink()
         except OSError:
             # same catch as in testing.tools.TempFileMixin
             # On Windows, even though we close the file, we still can't
@@ -197,7 +198,7 @@ def test_histmanager_disabled():
     ip = get_ipython()
     with TemporaryDirectory() as tmpdir:
         hist_manager_ori = ip.history_manager
-        hist_file = os.path.join(tmpdir, 'history.sqlite')
+        hist_file = Path(tmpdir) / "history.sqlite"
         cfg.HistoryManager.hist_file = hist_file
         try:
             ip.history_manager = HistoryManager(shell=ip, config=cfg)
@@ -211,4 +212,4 @@ def test_histmanager_disabled():
             ip.history_manager = hist_manager_ori
 
     # hist_file should not be created
-    nt.assert_false(os.path.exists(hist_file))
+    nt.assert_false(hist_file.exists())
diff --git a/IPython/testing/tools.py b/IPython/testing/tools.py
index 498ae48..b4301f8 100644
--- a/IPython/testing/tools.py
+++ b/IPython/testing/tools.py
@@ -10,6 +10,7 @@ Authors
 # Distributed under the terms of the Modified BSD License.
 
 import os
+from pathlib import Path
 import re
 import sys
 import tempfile
@@ -142,7 +143,7 @@ def default_config():
     config.TerminalTerminalInteractiveShell.term_title = False,
     config.TerminalInteractiveShell.autocall = 0
     f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
-    config.HistoryManager.hist_file = f.name
+    config.HistoryManager.hist_file = Path(f.name)
     f.close()
     config.HistoryManager.db_cache_size = 10000
     return config