Show More
@@ -16,6 +16,9 b' Utilities for path handling.' | |||||
16 |
|
16 | |||
17 | import os |
|
17 | import os | |
18 | import sys |
|
18 | import sys | |
|
19 | import errno | |||
|
20 | import shutil | |||
|
21 | import random | |||
19 | import tempfile |
|
22 | import tempfile | |
20 | import warnings |
|
23 | import warnings | |
21 | from hashlib import md5 |
|
24 | from hashlib import md5 | |
@@ -510,3 +513,50 b" def get_security_file(filename, profile='default'):" | |||||
510 | raise IOError("Profile %r not found") |
|
513 | raise IOError("Profile %r not found") | |
511 | return filefind(filename, ['.', pd.security_dir]) |
|
514 | return filefind(filename, ['.', pd.security_dir]) | |
512 |
|
515 | |||
|
516 | ||||
|
517 | def link(src, dst): | |||
|
518 | """Attempts to hardlink 'src' to 'dst', errno on failure (or 0 on success). | |||
|
519 | ||||
|
520 | Note that the special errno ``1998`` will be returned if ``os.link`` isn't | |||
|
521 | supported by the operating system. | |||
|
522 | """ | |||
|
523 | ||||
|
524 | if not hasattr(os, "link"): | |||
|
525 | return 1998 | |||
|
526 | link_errno = 0 | |||
|
527 | try: | |||
|
528 | os.link(src, dst) | |||
|
529 | except OSError as e: | |||
|
530 | link_errno = e.errno | |||
|
531 | return link_errno | |||
|
532 | ||||
|
533 | ||||
|
534 | def link_or_copy(src, dst): | |||
|
535 | """Attempts to hardlink ``src`` to ``dst``, copying if the link fails. | |||
|
536 | ||||
|
537 | Attempts to maintain the semantics of ``shutil.copy``. | |||
|
538 | ||||
|
539 | Because ``os.link`` does not overwrite files, a unique temporary file | |||
|
540 | will be used if the target already exists, then that file will be moved | |||
|
541 | into place. | |||
|
542 | """ | |||
|
543 | ||||
|
544 | if os.path.isdir(dst): | |||
|
545 | dst = os.path.join(dst, os.path.basename(src)) | |||
|
546 | ||||
|
547 | link_errno = link(src, dst) | |||
|
548 | if link_errno == errno.EEXIST: | |||
|
549 | new_dst = dst + "-temp-%04X" %(random.randint(1, 16**4), ) | |||
|
550 | try: | |||
|
551 | link_or_copy(src, new_dst) | |||
|
552 | except: | |||
|
553 | try: | |||
|
554 | os.remove(new_dst) | |||
|
555 | except OSError: | |||
|
556 | pass | |||
|
557 | raise | |||
|
558 | os.rename(new_dst, dst) | |||
|
559 | elif link_errno != 0: | |||
|
560 | # Either link isn't supported, or the filesystem doesn't support | |||
|
561 | # linking, or 'src' and 'dst' are on different filesystems. | |||
|
562 | shutil.copy(src, dst) |
@@ -18,7 +18,7 b' import os' | |||||
18 | import shutil |
|
18 | import shutil | |
19 | import sys |
|
19 | import sys | |
20 | import tempfile |
|
20 | import tempfile | |
21 | from contextlib import contextmanager |
|
21 | from contextlib import contextmanager, nested | |
22 |
|
22 | |||
23 | from os.path import join, abspath, split |
|
23 | from os.path import join, abspath, split | |
24 |
|
24 | |||
@@ -559,3 +559,68 b' def test_unescape_glob():' | |||||
559 | nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*') |
|
559 | nt.assert_equals(path.unescape_glob(r'\\\*'), r'\*') | |
560 | nt.assert_equals(path.unescape_glob(r'\\a'), r'\a') |
|
560 | nt.assert_equals(path.unescape_glob(r'\\a'), r'\a') | |
561 | nt.assert_equals(path.unescape_glob(r'\a'), r'\a') |
|
561 | nt.assert_equals(path.unescape_glob(r'\a'), r'\a') | |
|
562 | ||||
|
563 | ||||
|
564 | class TestLinkOrCopy(object): | |||
|
565 | def setUp(self): | |||
|
566 | self.tempdir = TemporaryDirectory() | |||
|
567 | self.src = self.dst("src") | |||
|
568 | with open(self.src, "w") as f: | |||
|
569 | f.write("Hello, world!") | |||
|
570 | ||||
|
571 | def tearDown(self): | |||
|
572 | self.tempdir.cleanup() | |||
|
573 | ||||
|
574 | def dst(self, *args): | |||
|
575 | return os.path.join(self.tempdir.name, *args) | |||
|
576 | ||||
|
577 | def assert_inode_not_equal(self, a, b): | |||
|
578 | nt.assert_not_equals(os.stat(a).st_ino, os.stat(b).st_ino, | |||
|
579 | "%r and %r do reference the same indoes" %(a, b)) | |||
|
580 | ||||
|
581 | def assert_inode_equal(self, a, b): | |||
|
582 | nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino, | |||
|
583 | "%r and %r do not reference the same indoes" %(a, b)) | |||
|
584 | ||||
|
585 | def assert_content_eqal(self, a, b): | |||
|
586 | with nested(open(a), open(b)) as (a_f, b_f): | |||
|
587 | nt.assert_equals(a_f.read(), b_f.read()) | |||
|
588 | ||||
|
589 | @skip_win32 | |||
|
590 | def test_link_successful(self): | |||
|
591 | dst = self.dst("target") | |||
|
592 | path.link_or_copy(self.src, dst) | |||
|
593 | self.assert_inode_equal(self.src, dst) | |||
|
594 | ||||
|
595 | @skip_win32 | |||
|
596 | def test_link_into_dir(self): | |||
|
597 | dst = self.dst("some_dir") | |||
|
598 | os.mkdir(dst) | |||
|
599 | path.link_or_copy(self.src, dst) | |||
|
600 | expected_dst = self.dst("some_dir", os.path.basename(self.src)) | |||
|
601 | self.assert_inode_equal(self.src, expected_dst) | |||
|
602 | ||||
|
603 | @skip_win32 | |||
|
604 | def test_target_exists(self): | |||
|
605 | dst = self.dst("target") | |||
|
606 | open(dst, "w").close() | |||
|
607 | path.link_or_copy(self.src, dst) | |||
|
608 | self.assert_inode_equal(self.src, dst) | |||
|
609 | ||||
|
610 | @skip_win32 | |||
|
611 | def test_no_link(self): | |||
|
612 | real_link = os.link | |||
|
613 | try: | |||
|
614 | del os.link | |||
|
615 | dst = self.dst("target") | |||
|
616 | path.link_or_copy(self.src, dst) | |||
|
617 | self.assert_content_eqal(self.src, dst) | |||
|
618 | self.assert_inode_not_equal(self.src, dst) | |||
|
619 | finally: | |||
|
620 | os.link = real_link | |||
|
621 | ||||
|
622 | @skip_if_not_win32 | |||
|
623 | def test_windows(self): | |||
|
624 | dst = self.dst("target") | |||
|
625 | path.link_or_copy(self.src, dst) | |||
|
626 | self.assert_content_eqal(self.src, dst) |
General Comments 0
You need to be logged in to leave comments.
Login now