##// END OF EJS Templates
Add link_or_copy to IPython.utils.path
David Wolever -
Show More
@@ -16,6 +16,9 b' Utilities for path handling.'
16
16
17 import os
17 import os
18 import sys
18 import sys
19 import errno
20 import shutil
21 import random
19 import tempfile
22 import tempfile
20 import warnings
23 import warnings
21 from hashlib import md5
24 from hashlib import md5
@@ -510,3 +513,50 b" def get_security_file(filename, profile='default'):"
510 raise IOError("Profile %r not found")
513 raise IOError("Profile %r not found")
511 return filefind(filename, ['.', pd.security_dir])
514 return filefind(filename, ['.', pd.security_dir])
512
515
516
517 def link(src, dst):
518 """Attempts to hardlink 'src' to 'dst', errno on failure (or 0 on success).
519
520 Note that the special errno ``1998`` will be returned if ``os.link`` isn't
521 supported by the operating system.
522 """
523
524 if not hasattr(os, "link"):
525 return 1998
526 link_errno = 0
527 try:
528 os.link(src, dst)
529 except OSError as e:
530 link_errno = e.errno
531 return link_errno
532
533
534 def link_or_copy(src, dst):
535 """Attempts to hardlink ``src`` to ``dst``, copying if the link fails.
536
537 Attempts to maintain the semantics of ``shutil.copy``.
538
539 Because ``os.link`` does not overwrite files, a unique temporary file
540 will be used if the target already exists, then that file will be moved
541 into place.
542 """
543
544 if os.path.isdir(dst):
545 dst = os.path.join(dst, os.path.basename(src))
546
547 link_errno = link(src, dst)
548 if link_errno == errno.EEXIST:
549 new_dst = dst + "-temp-%04X" %(random.randint(1, 16**4), )
550 try:
551 link_or_copy(src, new_dst)
552 except:
553 try:
554 os.remove(new_dst)
555 except OSError:
556 pass
557 raise
558 os.rename(new_dst, dst)
559 elif link_errno != 0:
560 # Either link isn't supported, or the filesystem doesn't support
561 # linking, or 'src' and 'dst' are on different filesystems.
562 shutil.copy(src, dst)
@@ -18,7 +18,7 b' import os'
18 import shutil
18 import shutil
19 import sys
19 import sys
20 import tempfile
20 import tempfile
21 from contextlib import contextmanager
21 from contextlib import contextmanager, nested
22
22
23 from os.path import join, abspath, split
23 from os.path import join, abspath, split
24
24
@@ -559,3 +559,68 b' def test_unescape_glob():'
559 nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*')
559 nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*')
560 nt.assert_equals(path.unescape_glob(r'\\a'), r'\a')
560 nt.assert_equals(path.unescape_glob(r'\\a'), r'\a')
561 nt.assert_equals(path.unescape_glob(r'\a'), r'\a')
561 nt.assert_equals(path.unescape_glob(r'\a'), r'\a')
562
563
564 class TestLinkOrCopy(object):
565 def setUp(self):
566 self.tempdir = TemporaryDirectory()
567 self.src = self.dst("src")
568 with open(self.src, "w") as f:
569 f.write("Hello, world!")
570
571 def tearDown(self):
572 self.tempdir.cleanup()
573
574 def dst(self, *args):
575 return os.path.join(self.tempdir.name, *args)
576
577 def assert_inode_not_equal(self, a, b):
578 nt.assert_not_equals(os.stat(a).st_ino, os.stat(b).st_ino,
579 "%r and %r do reference the same indoes" %(a, b))
580
581 def assert_inode_equal(self, a, b):
582 nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino,
583 "%r and %r do not reference the same indoes" %(a, b))
584
585 def assert_content_eqal(self, a, b):
586 with nested(open(a), open(b)) as (a_f, b_f):
587 nt.assert_equals(a_f.read(), b_f.read())
588
589 @skip_win32
590 def test_link_successful(self):
591 dst = self.dst("target")
592 path.link_or_copy(self.src, dst)
593 self.assert_inode_equal(self.src, dst)
594
595 @skip_win32
596 def test_link_into_dir(self):
597 dst = self.dst("some_dir")
598 os.mkdir(dst)
599 path.link_or_copy(self.src, dst)
600 expected_dst = self.dst("some_dir", os.path.basename(self.src))
601 self.assert_inode_equal(self.src, expected_dst)
602
603 @skip_win32
604 def test_target_exists(self):
605 dst = self.dst("target")
606 open(dst, "w").close()
607 path.link_or_copy(self.src, dst)
608 self.assert_inode_equal(self.src, dst)
609
610 @skip_win32
611 def test_no_link(self):
612 real_link = os.link
613 try:
614 del os.link
615 dst = self.dst("target")
616 path.link_or_copy(self.src, dst)
617 self.assert_content_eqal(self.src, dst)
618 self.assert_inode_not_equal(self.src, dst)
619 finally:
620 os.link = real_link
621
622 @skip_if_not_win32
623 def test_windows(self):
624 dst = self.dst("target")
625 path.link_or_copy(self.src, dst)
626 self.assert_content_eqal(self.src, dst)
General Comments 0
You need to be logged in to leave comments. Login now