Show More
@@ -5,11 +5,10 b'' | |||||
5 | # This software may be used and distributed according to the terms of the |
|
5 | # This software may be used and distributed according to the terms of the | |
6 | # GNU General Public License version 2 or any later version. |
|
6 | # GNU General Public License version 2 or any later version. | |
7 |
|
7 | |||
8 | import time |
|
|||
9 | from i18n import _ |
|
8 | from i18n import _ | |
10 | from node import hex, nullid |
|
9 | from node import hex, nullid | |
11 | import errno, urllib |
|
10 | import errno, urllib | |
12 |
import util, scmutil, changegroup, base85, error |
|
11 | import util, scmutil, changegroup, base85, error | |
13 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey |
|
12 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey | |
14 | import lock as lockmod |
|
13 | import lock as lockmod | |
15 | import tags |
|
14 | import tags | |
@@ -1468,131 +1467,3 b' def unbundle(repo, cg, heads, source, ur' | |||||
1468 | if recordout is not None: |
|
1467 | if recordout is not None: | |
1469 | recordout(repo.ui.popbuffer()) |
|
1468 | recordout(repo.ui.popbuffer()) | |
1470 | return r |
|
1469 | return r | |
1471 |
|
||||
1472 | # This is it's own function so extensions can override it. |
|
|||
1473 | def _walkstreamfiles(repo): |
|
|||
1474 | return repo.store.walk() |
|
|||
1475 |
|
||||
1476 | def generatestreamclone(repo): |
|
|||
1477 | """Emit content for a streaming clone. |
|
|||
1478 |
|
||||
1479 | This is a generator of raw chunks that constitute a streaming clone. |
|
|||
1480 |
|
||||
1481 | The stream begins with a line of 2 space-delimited integers containing the |
|
|||
1482 | number of entries and total bytes size. |
|
|||
1483 |
|
||||
1484 | Next, are N entries for each file being transferred. Each file entry starts |
|
|||
1485 | as a line with the file name and integer size delimited by a null byte. |
|
|||
1486 | The raw file data follows. Following the raw file data is the next file |
|
|||
1487 | entry, or EOF. |
|
|||
1488 |
|
||||
1489 | When used on the wire protocol, an additional line indicating protocol |
|
|||
1490 | success will be prepended to the stream. This function is not responsible |
|
|||
1491 | for adding it. |
|
|||
1492 |
|
||||
1493 | This function will obtain a repository lock to ensure a consistent view of |
|
|||
1494 | the store is captured. It therefore may raise LockError. |
|
|||
1495 | """ |
|
|||
1496 | entries = [] |
|
|||
1497 | total_bytes = 0 |
|
|||
1498 | # Get consistent snapshot of repo, lock during scan. |
|
|||
1499 | lock = repo.lock() |
|
|||
1500 | try: |
|
|||
1501 | repo.ui.debug('scanning\n') |
|
|||
1502 | for name, ename, size in _walkstreamfiles(repo): |
|
|||
1503 | if size: |
|
|||
1504 | entries.append((name, size)) |
|
|||
1505 | total_bytes += size |
|
|||
1506 | finally: |
|
|||
1507 | lock.release() |
|
|||
1508 |
|
||||
1509 | repo.ui.debug('%d files, %d bytes to transfer\n' % |
|
|||
1510 | (len(entries), total_bytes)) |
|
|||
1511 | yield '%d %d\n' % (len(entries), total_bytes) |
|
|||
1512 |
|
||||
1513 | svfs = repo.svfs |
|
|||
1514 | oldaudit = svfs.mustaudit |
|
|||
1515 | debugflag = repo.ui.debugflag |
|
|||
1516 | svfs.mustaudit = False |
|
|||
1517 |
|
||||
1518 | try: |
|
|||
1519 | for name, size in entries: |
|
|||
1520 | if debugflag: |
|
|||
1521 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) |
|
|||
1522 | # partially encode name over the wire for backwards compat |
|
|||
1523 | yield '%s\0%d\n' % (store.encodedir(name), size) |
|
|||
1524 | if size <= 65536: |
|
|||
1525 | fp = svfs(name) |
|
|||
1526 | try: |
|
|||
1527 | data = fp.read(size) |
|
|||
1528 | finally: |
|
|||
1529 | fp.close() |
|
|||
1530 | yield data |
|
|||
1531 | else: |
|
|||
1532 | for chunk in util.filechunkiter(svfs(name), limit=size): |
|
|||
1533 | yield chunk |
|
|||
1534 | finally: |
|
|||
1535 | svfs.mustaudit = oldaudit |
|
|||
1536 |
|
||||
1537 | def consumestreamclone(repo, fp): |
|
|||
1538 | """Apply the contents from a streaming clone file. |
|
|||
1539 |
|
||||
1540 | This takes the output from "streamout" and applies it to the specified |
|
|||
1541 | repository. |
|
|||
1542 |
|
||||
1543 | Like "streamout," the status line added by the wire protocol is not handled |
|
|||
1544 | by this function. |
|
|||
1545 | """ |
|
|||
1546 | lock = repo.lock() |
|
|||
1547 | try: |
|
|||
1548 | repo.ui.status(_('streaming all changes\n')) |
|
|||
1549 | l = fp.readline() |
|
|||
1550 | try: |
|
|||
1551 | total_files, total_bytes = map(int, l.split(' ', 1)) |
|
|||
1552 | except (ValueError, TypeError): |
|
|||
1553 | raise error.ResponseError( |
|
|||
1554 | _('unexpected response from remote server:'), l) |
|
|||
1555 | repo.ui.status(_('%d files to transfer, %s of data\n') % |
|
|||
1556 | (total_files, util.bytecount(total_bytes))) |
|
|||
1557 | handled_bytes = 0 |
|
|||
1558 | repo.ui.progress(_('clone'), 0, total=total_bytes) |
|
|||
1559 | start = time.time() |
|
|||
1560 |
|
||||
1561 | tr = repo.transaction(_('clone')) |
|
|||
1562 | try: |
|
|||
1563 | for i in xrange(total_files): |
|
|||
1564 | # XXX doesn't support '\n' or '\r' in filenames |
|
|||
1565 | l = fp.readline() |
|
|||
1566 | try: |
|
|||
1567 | name, size = l.split('\0', 1) |
|
|||
1568 | size = int(size) |
|
|||
1569 | except (ValueError, TypeError): |
|
|||
1570 | raise error.ResponseError( |
|
|||
1571 | _('unexpected response from remote server:'), l) |
|
|||
1572 | if repo.ui.debugflag: |
|
|||
1573 | repo.ui.debug('adding %s (%s)\n' % |
|
|||
1574 | (name, util.bytecount(size))) |
|
|||
1575 | # for backwards compat, name was partially encoded |
|
|||
1576 | ofp = repo.svfs(store.decodedir(name), 'w') |
|
|||
1577 | for chunk in util.filechunkiter(fp, limit=size): |
|
|||
1578 | handled_bytes += len(chunk) |
|
|||
1579 | repo.ui.progress(_('clone'), handled_bytes, |
|
|||
1580 | total=total_bytes) |
|
|||
1581 | ofp.write(chunk) |
|
|||
1582 | ofp.close() |
|
|||
1583 | tr.close() |
|
|||
1584 | finally: |
|
|||
1585 | tr.release() |
|
|||
1586 |
|
||||
1587 | # Writing straight to files circumvented the inmemory caches |
|
|||
1588 | repo.invalidate() |
|
|||
1589 |
|
||||
1590 | elapsed = time.time() - start |
|
|||
1591 | if elapsed <= 0: |
|
|||
1592 | elapsed = 0.001 |
|
|||
1593 | repo.ui.progress(_('clone'), None) |
|
|||
1594 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % |
|
|||
1595 | (util.bytecount(total_bytes), elapsed, |
|
|||
1596 | util.bytecount(total_bytes / elapsed))) |
|
|||
1597 | finally: |
|
|||
1598 | lock.release() |
|
@@ -7,14 +7,144 b'' | |||||
7 |
|
7 | |||
8 | from __future__ import absolute_import |
|
8 | from __future__ import absolute_import | |
9 |
|
9 | |||
|
10 | import time | |||
|
11 | ||||
10 | from .i18n import _ |
|
12 | from .i18n import _ | |
11 | from . import ( |
|
13 | from . import ( | |
12 | branchmap, |
|
14 | branchmap, | |
13 | error, |
|
15 | error, | |
14 | exchange, |
|
16 | store, | |
15 | util, |
|
17 | util, | |
16 | ) |
|
18 | ) | |
17 |
|
19 | |||
|
20 | # This is it's own function so extensions can override it. | |||
|
21 | def _walkstreamfiles(repo): | |||
|
22 | return repo.store.walk() | |||
|
23 | ||||
|
24 | def generatev1(repo): | |||
|
25 | """Emit content for version 1 of a streaming clone. | |||
|
26 | ||||
|
27 | This is a generator of raw chunks that constitute a streaming clone. | |||
|
28 | ||||
|
29 | The stream begins with a line of 2 space-delimited integers containing the | |||
|
30 | number of entries and total bytes size. | |||
|
31 | ||||
|
32 | Next, are N entries for each file being transferred. Each file entry starts | |||
|
33 | as a line with the file name and integer size delimited by a null byte. | |||
|
34 | The raw file data follows. Following the raw file data is the next file | |||
|
35 | entry, or EOF. | |||
|
36 | ||||
|
37 | When used on the wire protocol, an additional line indicating protocol | |||
|
38 | success will be prepended to the stream. This function is not responsible | |||
|
39 | for adding it. | |||
|
40 | ||||
|
41 | This function will obtain a repository lock to ensure a consistent view of | |||
|
42 | the store is captured. It therefore may raise LockError. | |||
|
43 | """ | |||
|
44 | entries = [] | |||
|
45 | total_bytes = 0 | |||
|
46 | # Get consistent snapshot of repo, lock during scan. | |||
|
47 | lock = repo.lock() | |||
|
48 | try: | |||
|
49 | repo.ui.debug('scanning\n') | |||
|
50 | for name, ename, size in _walkstreamfiles(repo): | |||
|
51 | if size: | |||
|
52 | entries.append((name, size)) | |||
|
53 | total_bytes += size | |||
|
54 | finally: | |||
|
55 | lock.release() | |||
|
56 | ||||
|
57 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |||
|
58 | (len(entries), total_bytes)) | |||
|
59 | yield '%d %d\n' % (len(entries), total_bytes) | |||
|
60 | ||||
|
61 | svfs = repo.svfs | |||
|
62 | oldaudit = svfs.mustaudit | |||
|
63 | debugflag = repo.ui.debugflag | |||
|
64 | svfs.mustaudit = False | |||
|
65 | ||||
|
66 | try: | |||
|
67 | for name, size in entries: | |||
|
68 | if debugflag: | |||
|
69 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |||
|
70 | # partially encode name over the wire for backwards compat | |||
|
71 | yield '%s\0%d\n' % (store.encodedir(name), size) | |||
|
72 | if size <= 65536: | |||
|
73 | fp = svfs(name) | |||
|
74 | try: | |||
|
75 | data = fp.read(size) | |||
|
76 | finally: | |||
|
77 | fp.close() | |||
|
78 | yield data | |||
|
79 | else: | |||
|
80 | for chunk in util.filechunkiter(svfs(name), limit=size): | |||
|
81 | yield chunk | |||
|
82 | finally: | |||
|
83 | svfs.mustaudit = oldaudit | |||
|
84 | ||||
|
85 | def consumev1(repo, fp): | |||
|
86 | """Apply the contents from version 1 of a streaming clone file handle. | |||
|
87 | ||||
|
88 | This takes the output from "streamout" and applies it to the specified | |||
|
89 | repository. | |||
|
90 | ||||
|
91 | Like "streamout," the status line added by the wire protocol is not handled | |||
|
92 | by this function. | |||
|
93 | """ | |||
|
94 | lock = repo.lock() | |||
|
95 | try: | |||
|
96 | repo.ui.status(_('streaming all changes\n')) | |||
|
97 | l = fp.readline() | |||
|
98 | try: | |||
|
99 | total_files, total_bytes = map(int, l.split(' ', 1)) | |||
|
100 | except (ValueError, TypeError): | |||
|
101 | raise error.ResponseError( | |||
|
102 | _('unexpected response from remote server:'), l) | |||
|
103 | repo.ui.status(_('%d files to transfer, %s of data\n') % | |||
|
104 | (total_files, util.bytecount(total_bytes))) | |||
|
105 | handled_bytes = 0 | |||
|
106 | repo.ui.progress(_('clone'), 0, total=total_bytes) | |||
|
107 | start = time.time() | |||
|
108 | ||||
|
109 | tr = repo.transaction(_('clone')) | |||
|
110 | try: | |||
|
111 | for i in xrange(total_files): | |||
|
112 | # XXX doesn't support '\n' or '\r' in filenames | |||
|
113 | l = fp.readline() | |||
|
114 | try: | |||
|
115 | name, size = l.split('\0', 1) | |||
|
116 | size = int(size) | |||
|
117 | except (ValueError, TypeError): | |||
|
118 | raise error.ResponseError( | |||
|
119 | _('unexpected response from remote server:'), l) | |||
|
120 | if repo.ui.debugflag: | |||
|
121 | repo.ui.debug('adding %s (%s)\n' % | |||
|
122 | (name, util.bytecount(size))) | |||
|
123 | # for backwards compat, name was partially encoded | |||
|
124 | ofp = repo.svfs(store.decodedir(name), 'w') | |||
|
125 | for chunk in util.filechunkiter(fp, limit=size): | |||
|
126 | handled_bytes += len(chunk) | |||
|
127 | repo.ui.progress(_('clone'), handled_bytes, | |||
|
128 | total=total_bytes) | |||
|
129 | ofp.write(chunk) | |||
|
130 | ofp.close() | |||
|
131 | tr.close() | |||
|
132 | finally: | |||
|
133 | tr.release() | |||
|
134 | ||||
|
135 | # Writing straight to files circumvented the inmemory caches | |||
|
136 | repo.invalidate() | |||
|
137 | ||||
|
138 | elapsed = time.time() - start | |||
|
139 | if elapsed <= 0: | |||
|
140 | elapsed = 0.001 | |||
|
141 | repo.ui.progress(_('clone'), None) | |||
|
142 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |||
|
143 | (util.bytecount(total_bytes), elapsed, | |||
|
144 | util.bytecount(total_bytes / elapsed))) | |||
|
145 | finally: | |||
|
146 | lock.release() | |||
|
147 | ||||
18 | def streamin(repo, remote, remotereqs): |
|
148 | def streamin(repo, remote, remotereqs): | |
19 | # Save remote branchmap. We will use it later |
|
149 | # Save remote branchmap. We will use it later | |
20 | # to speed up branchcache creation |
|
150 | # to speed up branchcache creation | |
@@ -46,11 +176,11 b' def applyremotedata(repo, remotereqs, re' | |||||
46 | "remotebranchmap" is the result of a branchmap lookup on the remote. It |
|
176 | "remotebranchmap" is the result of a branchmap lookup on the remote. It | |
47 | can be None. |
|
177 | can be None. | |
48 | "fp" is a file object containing the raw stream data, suitable for |
|
178 | "fp" is a file object containing the raw stream data, suitable for | |
49 |
feeding into |
|
179 | feeding into consumev1(). | |
50 | """ |
|
180 | """ | |
51 | lock = repo.lock() |
|
181 | lock = repo.lock() | |
52 | try: |
|
182 | try: | |
53 |
|
|
183 | consumev1(repo, fp) | |
54 |
|
184 | |||
55 | # new requirements = old non-format requirements + |
|
185 | # new requirements = old non-format requirements + | |
56 | # new format-related remote requirements |
|
186 | # new format-related remote requirements |
@@ -26,6 +26,7 b' from . import (' | |||||
26 | exchange, |
|
26 | exchange, | |
27 | peer, |
|
27 | peer, | |
28 | pushkey as pushkeymod, |
|
28 | pushkey as pushkeymod, | |
|
29 | streamclone, | |||
29 | util, |
|
30 | util, | |
30 | ) |
|
31 | ) | |
31 |
|
32 | |||
@@ -720,7 +721,7 b' def stream(repo, proto):' | |||||
720 | try: |
|
721 | try: | |
721 | # LockError may be raised before the first result is yielded. Don't |
|
722 | # LockError may be raised before the first result is yielded. Don't | |
722 | # emit output until we're sure we got the lock successfully. |
|
723 | # emit output until we're sure we got the lock successfully. | |
723 |
it = |
|
724 | it = streamclone.generatev1(repo) | |
724 | return streamres(getstream(it)) |
|
725 | return streamres(getstream(it)) | |
725 | except error.LockError: |
|
726 | except error.LockError: | |
726 | return '2\n' |
|
727 | return '2\n' |
General Comments 0
You need to be logged in to leave comments.
Login now