From 7b756166b64b6618e0bf08fd8c14b61835468d8b 2013-07-24 01:32:16 From: David Wolever Date: 2013-07-24 01:32:16 Subject: [PATCH] Add link_or_copy to IPython.utils.path --- diff --git a/IPython/utils/path.py b/IPython/utils/path.py index 76e9377..def5301 100644 --- a/IPython/utils/path.py +++ b/IPython/utils/path.py @@ -16,6 +16,9 @@ Utilities for path handling. import os import sys +import errno +import shutil +import random import tempfile import warnings from hashlib import md5 @@ -510,3 +513,50 @@ def get_security_file(filename, profile='default'): raise IOError("Profile %r not found") return filefind(filename, ['.', pd.security_dir]) + +def link(src, dst): + """Attempts to hardlink 'src' to 'dst', errno on failure (or 0 on success). + + Note that the special errno ``1998`` will be returned if ``os.link`` isn't + supported by the operating system. + """ + + if not hasattr(os, "link"): + return 1998 + link_errno = 0 + try: + os.link(src, dst) + except OSError as e: + link_errno = e.errno + return link_errno + + +def link_or_copy(src, dst): + """Attempts to hardlink ``src`` to ``dst``, copying if the link fails. + + Attempts to maintain the semantics of ``shutil.copy``. + + Because ``os.link`` does not overwrite files, a unique temporary file + will be used if the target already exists, then that file will be moved + into place. + """ + + if os.path.isdir(dst): + dst = os.path.join(dst, os.path.basename(src)) + + link_errno = link(src, dst) + if link_errno == errno.EEXIST: + new_dst = dst + "-temp-%04X" %(random.randint(1, 16**4), ) + try: + link_or_copy(src, new_dst) + except: + try: + os.remove(new_dst) + except OSError: + pass + raise + os.rename(new_dst, dst) + elif link_errno != 0: + # Either link isn't supported, or the filesystem doesn't support + # linking, or 'src' and 'dst' are on different filesystems. + shutil.copy(src, dst) diff --git a/IPython/utils/tests/test_path.py b/IPython/utils/tests/test_path.py index 0273266..04ea0b8 100644 --- a/IPython/utils/tests/test_path.py +++ b/IPython/utils/tests/test_path.py @@ -18,7 +18,7 @@ import os import shutil import sys import tempfile -from contextlib import contextmanager +from contextlib import contextmanager, nested from os.path import join, abspath, split @@ -559,3 +559,68 @@ def test_unescape_glob(): nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*') nt.assert_equals(path.unescape_glob(r'\\a'), r'\a') nt.assert_equals(path.unescape_glob(r'\a'), r'\a') + + +class TestLinkOrCopy(object): + def setUp(self): + self.tempdir = TemporaryDirectory() + self.src = self.dst("src") + with open(self.src, "w") as f: + f.write("Hello, world!") + + def tearDown(self): + self.tempdir.cleanup() + + def dst(self, *args): + return os.path.join(self.tempdir.name, *args) + + def assert_inode_not_equal(self, a, b): + nt.assert_not_equals(os.stat(a).st_ino, os.stat(b).st_ino, + "%r and %r do reference the same indoes" %(a, b)) + + def assert_inode_equal(self, a, b): + nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino, + "%r and %r do not reference the same indoes" %(a, b)) + + def assert_content_eqal(self, a, b): + with nested(open(a), open(b)) as (a_f, b_f): + nt.assert_equals(a_f.read(), b_f.read()) + + @skip_win32 + def test_link_successful(self): + dst = self.dst("target") + path.link_or_copy(self.src, dst) + self.assert_inode_equal(self.src, dst) + + @skip_win32 + def test_link_into_dir(self): + dst = self.dst("some_dir") + os.mkdir(dst) + path.link_or_copy(self.src, dst) + expected_dst = self.dst("some_dir", os.path.basename(self.src)) + self.assert_inode_equal(self.src, expected_dst) + + @skip_win32 + def test_target_exists(self): + dst = self.dst("target") + open(dst, "w").close() + path.link_or_copy(self.src, dst) + self.assert_inode_equal(self.src, dst) + + @skip_win32 + def test_no_link(self): + real_link = os.link + try: + del os.link + dst = self.dst("target") + path.link_or_copy(self.src, dst) + self.assert_content_eqal(self.src, dst) + self.assert_inode_not_equal(self.src, dst) + finally: + os.link = real_link + + @skip_if_not_win32 + def test_windows(self): + dst = self.dst("target") + path.link_or_copy(self.src, dst) + self.assert_content_eqal(self.src, dst)