Show More
@@ -8,7 +8,7 | |||
|
8 | 8 | from i18n import _ |
|
9 | 9 | from node import hex, nullid |
|
10 | 10 | import errno, urllib |
|
11 | import util, scmutil, changegroup, base85, error | |
|
11 | import util, scmutil, changegroup, base85, error, store | |
|
12 | 12 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey |
|
13 | 13 | import lock as lockmod |
|
14 | 14 | |
@@ -1332,3 +1332,68 def unbundle(repo, cg, heads, source, ur | |||
|
1332 | 1332 | if recordout is not None: |
|
1333 | 1333 | recordout(repo.ui.popbuffer()) |
|
1334 | 1334 | return r |
|
1335 | ||
|
1336 | # This is it's own function so extensions can override it. | |
|
1337 | def _walkstreamfiles(repo): | |
|
1338 | return repo.store.walk() | |
|
1339 | ||
|
1340 | def generatestreamclone(repo): | |
|
1341 | """Emit content for a streaming clone. | |
|
1342 | ||
|
1343 | This is a generator of raw chunks that constitute a streaming clone. | |
|
1344 | ||
|
1345 | The stream begins with a line of 2 space-delimited integers containing the | |
|
1346 | number of entries and total bytes size. | |
|
1347 | ||
|
1348 | Next, are N entries for each file being transferred. Each file entry starts | |
|
1349 | as a line with the file name and integer size delimited by a null byte. | |
|
1350 | The raw file data follows. Following the raw file data is the next file | |
|
1351 | entry, or EOF. | |
|
1352 | ||
|
1353 | When used on the wire protocol, an additional line indicating protocol | |
|
1354 | success will be prepended to the stream. This function is not responsible | |
|
1355 | for adding it. | |
|
1356 | ||
|
1357 | This function will obtain a repository lock to ensure a consistent view of | |
|
1358 | the store is captured. It therefore may raise LockError. | |
|
1359 | """ | |
|
1360 | entries = [] | |
|
1361 | total_bytes = 0 | |
|
1362 | # Get consistent snapshot of repo, lock during scan. | |
|
1363 | lock = repo.lock() | |
|
1364 | try: | |
|
1365 | repo.ui.debug('scanning\n') | |
|
1366 | for name, ename, size in _walkstreamfiles(repo): | |
|
1367 | if size: | |
|
1368 | entries.append((name, size)) | |
|
1369 | total_bytes += size | |
|
1370 | finally: | |
|
1371 | lock.release() | |
|
1372 | ||
|
1373 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |
|
1374 | (len(entries), total_bytes)) | |
|
1375 | yield '%d %d\n' % (len(entries), total_bytes) | |
|
1376 | ||
|
1377 | sopener = repo.svfs | |
|
1378 | oldaudit = sopener.mustaudit | |
|
1379 | debugflag = repo.ui.debugflag | |
|
1380 | sopener.mustaudit = False | |
|
1381 | ||
|
1382 | try: | |
|
1383 | for name, size in entries: | |
|
1384 | if debugflag: | |
|
1385 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |
|
1386 | # partially encode name over the wire for backwards compat | |
|
1387 | yield '%s\0%d\n' % (store.encodedir(name), size) | |
|
1388 | if size <= 65536: | |
|
1389 | fp = sopener(name) | |
|
1390 | try: | |
|
1391 | data = fp.read(size) | |
|
1392 | finally: | |
|
1393 | fp.close() | |
|
1394 | yield data | |
|
1395 | else: | |
|
1396 | for chunk in util.filechunkiter(sopener(name), limit=size): | |
|
1397 | yield chunk | |
|
1398 | finally: | |
|
1399 | sopener.mustaudit = oldaudit |
@@ -744,73 +744,27 def pushkey(repo, proto, namespace, key, | |||
|
744 | 744 | def _allowstream(ui): |
|
745 | 745 | return ui.configbool('server', 'uncompressed', True, untrusted=True) |
|
746 | 746 | |
|
747 | def _walkstreamfiles(repo): | |
|
748 | # this is it's own function so extensions can override it | |
|
749 | return repo.store.walk() | |
|
750 | ||
|
751 | 747 | @wireprotocommand('stream_out') |
|
752 | 748 | def stream(repo, proto): |
|
753 | 749 | '''If the server supports streaming clone, it advertises the "stream" |
|
754 | 750 | capability with a value representing the version and flags of the repo |
|
755 | 751 | it is serving. Client checks to see if it understands the format. |
|
756 | ||
|
757 | The format is simple: the server writes out a line with the amount | |
|
758 | of files, then the total amount of bytes to be transferred (separated | |
|
759 | by a space). Then, for each file, the server first writes the filename | |
|
760 | and file size (separated by the null character), then the file contents. | |
|
761 | 752 | ''' |
|
762 | ||
|
763 | 753 | if not _allowstream(repo.ui): |
|
764 | 754 | return '1\n' |
|
765 | 755 | |
|
766 | entries = [] | |
|
767 | total_bytes = 0 | |
|
768 | try: | |
|
769 | # get consistent snapshot of repo, lock during scan | |
|
770 | lock = repo.lock() | |
|
771 | try: | |
|
772 | repo.ui.debug('scanning\n') | |
|
773 | for name, ename, size in _walkstreamfiles(repo): | |
|
774 | if size: | |
|
775 | entries.append((name, size)) | |
|
776 | total_bytes += size | |
|
777 | finally: | |
|
778 | lock.release() | |
|
779 | except error.LockError: | |
|
780 | return '2\n' # error: 2 | |
|
781 | ||
|
782 | def streamer(repo, entries, total): | |
|
783 | '''stream out all metadata files in repository.''' | |
|
784 | yield '0\n' # success | |
|
785 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |
|
786 | (len(entries), total_bytes)) | |
|
787 | yield '%d %d\n' % (len(entries), total_bytes) | |
|
788 | ||
|
789 | sopener = repo.svfs | |
|
790 | oldaudit = sopener.mustaudit | |
|
791 | debugflag = repo.ui.debugflag | |
|
792 | sopener.mustaudit = False | |
|
756 | def getstream(it): | |
|
757 | yield '0\n' | |
|
758 | for chunk in it: | |
|
759 | yield chunk | |
|
793 | 760 | |
|
794 | 761 |
|
|
795 | for name, size in entries: | |
|
796 | if debugflag: | |
|
797 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |
|
798 | # partially encode name over the wire for backwards compat | |
|
799 | yield '%s\0%d\n' % (store.encodedir(name), size) | |
|
800 | if size <= 65536: | |
|
801 | fp = sopener(name) | |
|
802 | try: | |
|
803 | data = fp.read(size) | |
|
804 | finally: | |
|
805 | fp.close() | |
|
806 | yield data | |
|
807 | else: | |
|
808 | for chunk in util.filechunkiter(sopener(name), limit=size): | |
|
809 | yield chunk | |
|
810 | finally: | |
|
811 | sopener.mustaudit = oldaudit | |
|
812 | ||
|
813 | return streamres(streamer(repo, entries, total_bytes)) | |
|
762 | # LockError may be raised before the first result is yielded. Don't | |
|
763 | # emit output until we're sure we got the lock successfully. | |
|
764 | it = exchange.generatestreamclone(repo) | |
|
765 | return streamres(getstream(it)) | |
|
766 | except error.LockError: | |
|
767 | return '2\n' | |
|
814 | 768 | |
|
815 | 769 | @wireprotocommand('unbundle', 'heads') |
|
816 | 770 | def unbundle(repo, proto, heads): |
General Comments 0
You need to be logged in to leave comments.
Login now