Show More
@@ -5,11 +5,10 b'' | |||
|
5 | 5 | # This software may be used and distributed according to the terms of the |
|
6 | 6 | # GNU General Public License version 2 or any later version. |
|
7 | 7 | |
|
8 | import time | |
|
9 | 8 | from i18n import _ |
|
10 | 9 | from node import hex, nullid |
|
11 | 10 | import errno, urllib |
|
12 |
import util, scmutil, changegroup, base85, error |
|
|
11 | import util, scmutil, changegroup, base85, error | |
|
13 | 12 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey |
|
14 | 13 | import lock as lockmod |
|
15 | 14 | import tags |
@@ -1468,131 +1467,3 b' def unbundle(repo, cg, heads, source, ur' | |||
|
1468 | 1467 | if recordout is not None: |
|
1469 | 1468 | recordout(repo.ui.popbuffer()) |
|
1470 | 1469 | return r |
|
1471 | ||
|
1472 | # This is it's own function so extensions can override it. | |
|
1473 | def _walkstreamfiles(repo): | |
|
1474 | return repo.store.walk() | |
|
1475 | ||
|
1476 | def generatestreamclone(repo): | |
|
1477 | """Emit content for a streaming clone. | |
|
1478 | ||
|
1479 | This is a generator of raw chunks that constitute a streaming clone. | |
|
1480 | ||
|
1481 | The stream begins with a line of 2 space-delimited integers containing the | |
|
1482 | number of entries and total bytes size. | |
|
1483 | ||
|
1484 | Next, are N entries for each file being transferred. Each file entry starts | |
|
1485 | as a line with the file name and integer size delimited by a null byte. | |
|
1486 | The raw file data follows. Following the raw file data is the next file | |
|
1487 | entry, or EOF. | |
|
1488 | ||
|
1489 | When used on the wire protocol, an additional line indicating protocol | |
|
1490 | success will be prepended to the stream. This function is not responsible | |
|
1491 | for adding it. | |
|
1492 | ||
|
1493 | This function will obtain a repository lock to ensure a consistent view of | |
|
1494 | the store is captured. It therefore may raise LockError. | |
|
1495 | """ | |
|
1496 | entries = [] | |
|
1497 | total_bytes = 0 | |
|
1498 | # Get consistent snapshot of repo, lock during scan. | |
|
1499 | lock = repo.lock() | |
|
1500 | try: | |
|
1501 | repo.ui.debug('scanning\n') | |
|
1502 | for name, ename, size in _walkstreamfiles(repo): | |
|
1503 | if size: | |
|
1504 | entries.append((name, size)) | |
|
1505 | total_bytes += size | |
|
1506 | finally: | |
|
1507 | lock.release() | |
|
1508 | ||
|
1509 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |
|
1510 | (len(entries), total_bytes)) | |
|
1511 | yield '%d %d\n' % (len(entries), total_bytes) | |
|
1512 | ||
|
1513 | svfs = repo.svfs | |
|
1514 | oldaudit = svfs.mustaudit | |
|
1515 | debugflag = repo.ui.debugflag | |
|
1516 | svfs.mustaudit = False | |
|
1517 | ||
|
1518 | try: | |
|
1519 | for name, size in entries: | |
|
1520 | if debugflag: | |
|
1521 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |
|
1522 | # partially encode name over the wire for backwards compat | |
|
1523 | yield '%s\0%d\n' % (store.encodedir(name), size) | |
|
1524 | if size <= 65536: | |
|
1525 | fp = svfs(name) | |
|
1526 | try: | |
|
1527 | data = fp.read(size) | |
|
1528 | finally: | |
|
1529 | fp.close() | |
|
1530 | yield data | |
|
1531 | else: | |
|
1532 | for chunk in util.filechunkiter(svfs(name), limit=size): | |
|
1533 | yield chunk | |
|
1534 | finally: | |
|
1535 | svfs.mustaudit = oldaudit | |
|
1536 | ||
|
1537 | def consumestreamclone(repo, fp): | |
|
1538 | """Apply the contents from a streaming clone file. | |
|
1539 | ||
|
1540 | This takes the output from "streamout" and applies it to the specified | |
|
1541 | repository. | |
|
1542 | ||
|
1543 | Like "streamout," the status line added by the wire protocol is not handled | |
|
1544 | by this function. | |
|
1545 | """ | |
|
1546 | lock = repo.lock() | |
|
1547 | try: | |
|
1548 | repo.ui.status(_('streaming all changes\n')) | |
|
1549 | l = fp.readline() | |
|
1550 | try: | |
|
1551 | total_files, total_bytes = map(int, l.split(' ', 1)) | |
|
1552 | except (ValueError, TypeError): | |
|
1553 | raise error.ResponseError( | |
|
1554 | _('unexpected response from remote server:'), l) | |
|
1555 | repo.ui.status(_('%d files to transfer, %s of data\n') % | |
|
1556 | (total_files, util.bytecount(total_bytes))) | |
|
1557 | handled_bytes = 0 | |
|
1558 | repo.ui.progress(_('clone'), 0, total=total_bytes) | |
|
1559 | start = time.time() | |
|
1560 | ||
|
1561 | tr = repo.transaction(_('clone')) | |
|
1562 | try: | |
|
1563 | for i in xrange(total_files): | |
|
1564 | # XXX doesn't support '\n' or '\r' in filenames | |
|
1565 | l = fp.readline() | |
|
1566 | try: | |
|
1567 | name, size = l.split('\0', 1) | |
|
1568 | size = int(size) | |
|
1569 | except (ValueError, TypeError): | |
|
1570 | raise error.ResponseError( | |
|
1571 | _('unexpected response from remote server:'), l) | |
|
1572 | if repo.ui.debugflag: | |
|
1573 | repo.ui.debug('adding %s (%s)\n' % | |
|
1574 | (name, util.bytecount(size))) | |
|
1575 | # for backwards compat, name was partially encoded | |
|
1576 | ofp = repo.svfs(store.decodedir(name), 'w') | |
|
1577 | for chunk in util.filechunkiter(fp, limit=size): | |
|
1578 | handled_bytes += len(chunk) | |
|
1579 | repo.ui.progress(_('clone'), handled_bytes, | |
|
1580 | total=total_bytes) | |
|
1581 | ofp.write(chunk) | |
|
1582 | ofp.close() | |
|
1583 | tr.close() | |
|
1584 | finally: | |
|
1585 | tr.release() | |
|
1586 | ||
|
1587 | # Writing straight to files circumvented the inmemory caches | |
|
1588 | repo.invalidate() | |
|
1589 | ||
|
1590 | elapsed = time.time() - start | |
|
1591 | if elapsed <= 0: | |
|
1592 | elapsed = 0.001 | |
|
1593 | repo.ui.progress(_('clone'), None) | |
|
1594 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |
|
1595 | (util.bytecount(total_bytes), elapsed, | |
|
1596 | util.bytecount(total_bytes / elapsed))) | |
|
1597 | finally: | |
|
1598 | lock.release() |
@@ -7,14 +7,144 b'' | |||
|
7 | 7 | |
|
8 | 8 | from __future__ import absolute_import |
|
9 | 9 | |
|
10 | import time | |
|
11 | ||
|
10 | 12 | from .i18n import _ |
|
11 | 13 | from . import ( |
|
12 | 14 | branchmap, |
|
13 | 15 | error, |
|
14 | exchange, | |
|
16 | store, | |
|
15 | 17 | util, |
|
16 | 18 | ) |
|
17 | 19 | |
|
20 | # This is it's own function so extensions can override it. | |
|
21 | def _walkstreamfiles(repo): | |
|
22 | return repo.store.walk() | |
|
23 | ||
|
24 | def generatev1(repo): | |
|
25 | """Emit content for version 1 of a streaming clone. | |
|
26 | ||
|
27 | This is a generator of raw chunks that constitute a streaming clone. | |
|
28 | ||
|
29 | The stream begins with a line of 2 space-delimited integers containing the | |
|
30 | number of entries and total bytes size. | |
|
31 | ||
|
32 | Next, are N entries for each file being transferred. Each file entry starts | |
|
33 | as a line with the file name and integer size delimited by a null byte. | |
|
34 | The raw file data follows. Following the raw file data is the next file | |
|
35 | entry, or EOF. | |
|
36 | ||
|
37 | When used on the wire protocol, an additional line indicating protocol | |
|
38 | success will be prepended to the stream. This function is not responsible | |
|
39 | for adding it. | |
|
40 | ||
|
41 | This function will obtain a repository lock to ensure a consistent view of | |
|
42 | the store is captured. It therefore may raise LockError. | |
|
43 | """ | |
|
44 | entries = [] | |
|
45 | total_bytes = 0 | |
|
46 | # Get consistent snapshot of repo, lock during scan. | |
|
47 | lock = repo.lock() | |
|
48 | try: | |
|
49 | repo.ui.debug('scanning\n') | |
|
50 | for name, ename, size in _walkstreamfiles(repo): | |
|
51 | if size: | |
|
52 | entries.append((name, size)) | |
|
53 | total_bytes += size | |
|
54 | finally: | |
|
55 | lock.release() | |
|
56 | ||
|
57 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |
|
58 | (len(entries), total_bytes)) | |
|
59 | yield '%d %d\n' % (len(entries), total_bytes) | |
|
60 | ||
|
61 | svfs = repo.svfs | |
|
62 | oldaudit = svfs.mustaudit | |
|
63 | debugflag = repo.ui.debugflag | |
|
64 | svfs.mustaudit = False | |
|
65 | ||
|
66 | try: | |
|
67 | for name, size in entries: | |
|
68 | if debugflag: | |
|
69 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |
|
70 | # partially encode name over the wire for backwards compat | |
|
71 | yield '%s\0%d\n' % (store.encodedir(name), size) | |
|
72 | if size <= 65536: | |
|
73 | fp = svfs(name) | |
|
74 | try: | |
|
75 | data = fp.read(size) | |
|
76 | finally: | |
|
77 | fp.close() | |
|
78 | yield data | |
|
79 | else: | |
|
80 | for chunk in util.filechunkiter(svfs(name), limit=size): | |
|
81 | yield chunk | |
|
82 | finally: | |
|
83 | svfs.mustaudit = oldaudit | |
|
84 | ||
|
85 | def consumev1(repo, fp): | |
|
86 | """Apply the contents from version 1 of a streaming clone file handle. | |
|
87 | ||
|
88 | This takes the output from "streamout" and applies it to the specified | |
|
89 | repository. | |
|
90 | ||
|
91 | Like "streamout," the status line added by the wire protocol is not handled | |
|
92 | by this function. | |
|
93 | """ | |
|
94 | lock = repo.lock() | |
|
95 | try: | |
|
96 | repo.ui.status(_('streaming all changes\n')) | |
|
97 | l = fp.readline() | |
|
98 | try: | |
|
99 | total_files, total_bytes = map(int, l.split(' ', 1)) | |
|
100 | except (ValueError, TypeError): | |
|
101 | raise error.ResponseError( | |
|
102 | _('unexpected response from remote server:'), l) | |
|
103 | repo.ui.status(_('%d files to transfer, %s of data\n') % | |
|
104 | (total_files, util.bytecount(total_bytes))) | |
|
105 | handled_bytes = 0 | |
|
106 | repo.ui.progress(_('clone'), 0, total=total_bytes) | |
|
107 | start = time.time() | |
|
108 | ||
|
109 | tr = repo.transaction(_('clone')) | |
|
110 | try: | |
|
111 | for i in xrange(total_files): | |
|
112 | # XXX doesn't support '\n' or '\r' in filenames | |
|
113 | l = fp.readline() | |
|
114 | try: | |
|
115 | name, size = l.split('\0', 1) | |
|
116 | size = int(size) | |
|
117 | except (ValueError, TypeError): | |
|
118 | raise error.ResponseError( | |
|
119 | _('unexpected response from remote server:'), l) | |
|
120 | if repo.ui.debugflag: | |
|
121 | repo.ui.debug('adding %s (%s)\n' % | |
|
122 | (name, util.bytecount(size))) | |
|
123 | # for backwards compat, name was partially encoded | |
|
124 | ofp = repo.svfs(store.decodedir(name), 'w') | |
|
125 | for chunk in util.filechunkiter(fp, limit=size): | |
|
126 | handled_bytes += len(chunk) | |
|
127 | repo.ui.progress(_('clone'), handled_bytes, | |
|
128 | total=total_bytes) | |
|
129 | ofp.write(chunk) | |
|
130 | ofp.close() | |
|
131 | tr.close() | |
|
132 | finally: | |
|
133 | tr.release() | |
|
134 | ||
|
135 | # Writing straight to files circumvented the inmemory caches | |
|
136 | repo.invalidate() | |
|
137 | ||
|
138 | elapsed = time.time() - start | |
|
139 | if elapsed <= 0: | |
|
140 | elapsed = 0.001 | |
|
141 | repo.ui.progress(_('clone'), None) | |
|
142 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |
|
143 | (util.bytecount(total_bytes), elapsed, | |
|
144 | util.bytecount(total_bytes / elapsed))) | |
|
145 | finally: | |
|
146 | lock.release() | |
|
147 | ||
|
18 | 148 | def streamin(repo, remote, remotereqs): |
|
19 | 149 | # Save remote branchmap. We will use it later |
|
20 | 150 | # to speed up branchcache creation |
@@ -46,11 +176,11 b' def applyremotedata(repo, remotereqs, re' | |||
|
46 | 176 | "remotebranchmap" is the result of a branchmap lookup on the remote. It |
|
47 | 177 | can be None. |
|
48 | 178 | "fp" is a file object containing the raw stream data, suitable for |
|
49 |
feeding into |
|
|
179 | feeding into consumev1(). | |
|
50 | 180 | """ |
|
51 | 181 | lock = repo.lock() |
|
52 | 182 | try: |
|
53 |
|
|
|
183 | consumev1(repo, fp) | |
|
54 | 184 | |
|
55 | 185 | # new requirements = old non-format requirements + |
|
56 | 186 | # new format-related remote requirements |
@@ -26,6 +26,7 b' from . import (' | |||
|
26 | 26 | exchange, |
|
27 | 27 | peer, |
|
28 | 28 | pushkey as pushkeymod, |
|
29 | streamclone, | |
|
29 | 30 | util, |
|
30 | 31 | ) |
|
31 | 32 | |
@@ -720,7 +721,7 b' def stream(repo, proto):' | |||
|
720 | 721 | try: |
|
721 | 722 | # LockError may be raised before the first result is yielded. Don't |
|
722 | 723 | # emit output until we're sure we got the lock successfully. |
|
723 |
it = |
|
|
724 | it = streamclone.generatev1(repo) | |
|
724 | 725 | return streamres(getstream(it)) |
|
725 | 726 | except error.LockError: |
|
726 | 727 | return '2\n' |
General Comments 0
You need to be logged in to leave comments.
Login now