Show More
@@ -8,7 +8,7 b'' | |||||
8 | from i18n import _ |
|
8 | from i18n import _ | |
9 | from node import hex, nullid |
|
9 | from node import hex, nullid | |
10 | import errno, urllib |
|
10 | import errno, urllib | |
11 | import util, scmutil, changegroup, base85, error |
|
11 | import util, scmutil, changegroup, base85, error, store | |
12 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey |
|
12 | import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey | |
13 | import lock as lockmod |
|
13 | import lock as lockmod | |
14 |
|
14 | |||
@@ -1332,3 +1332,68 b' def unbundle(repo, cg, heads, source, ur' | |||||
1332 | if recordout is not None: |
|
1332 | if recordout is not None: | |
1333 | recordout(repo.ui.popbuffer()) |
|
1333 | recordout(repo.ui.popbuffer()) | |
1334 | return r |
|
1334 | return r | |
|
1335 | ||||
|
1336 | # This is it's own function so extensions can override it. | |||
|
1337 | def _walkstreamfiles(repo): | |||
|
1338 | return repo.store.walk() | |||
|
1339 | ||||
|
1340 | def generatestreamclone(repo): | |||
|
1341 | """Emit content for a streaming clone. | |||
|
1342 | ||||
|
1343 | This is a generator of raw chunks that constitute a streaming clone. | |||
|
1344 | ||||
|
1345 | The stream begins with a line of 2 space-delimited integers containing the | |||
|
1346 | number of entries and total bytes size. | |||
|
1347 | ||||
|
1348 | Next, are N entries for each file being transferred. Each file entry starts | |||
|
1349 | as a line with the file name and integer size delimited by a null byte. | |||
|
1350 | The raw file data follows. Following the raw file data is the next file | |||
|
1351 | entry, or EOF. | |||
|
1352 | ||||
|
1353 | When used on the wire protocol, an additional line indicating protocol | |||
|
1354 | success will be prepended to the stream. This function is not responsible | |||
|
1355 | for adding it. | |||
|
1356 | ||||
|
1357 | This function will obtain a repository lock to ensure a consistent view of | |||
|
1358 | the store is captured. It therefore may raise LockError. | |||
|
1359 | """ | |||
|
1360 | entries = [] | |||
|
1361 | total_bytes = 0 | |||
|
1362 | # Get consistent snapshot of repo, lock during scan. | |||
|
1363 | lock = repo.lock() | |||
|
1364 | try: | |||
|
1365 | repo.ui.debug('scanning\n') | |||
|
1366 | for name, ename, size in _walkstreamfiles(repo): | |||
|
1367 | if size: | |||
|
1368 | entries.append((name, size)) | |||
|
1369 | total_bytes += size | |||
|
1370 | finally: | |||
|
1371 | lock.release() | |||
|
1372 | ||||
|
1373 | repo.ui.debug('%d files, %d bytes to transfer\n' % | |||
|
1374 | (len(entries), total_bytes)) | |||
|
1375 | yield '%d %d\n' % (len(entries), total_bytes) | |||
|
1376 | ||||
|
1377 | sopener = repo.svfs | |||
|
1378 | oldaudit = sopener.mustaudit | |||
|
1379 | debugflag = repo.ui.debugflag | |||
|
1380 | sopener.mustaudit = False | |||
|
1381 | ||||
|
1382 | try: | |||
|
1383 | for name, size in entries: | |||
|
1384 | if debugflag: | |||
|
1385 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) | |||
|
1386 | # partially encode name over the wire for backwards compat | |||
|
1387 | yield '%s\0%d\n' % (store.encodedir(name), size) | |||
|
1388 | if size <= 65536: | |||
|
1389 | fp = sopener(name) | |||
|
1390 | try: | |||
|
1391 | data = fp.read(size) | |||
|
1392 | finally: | |||
|
1393 | fp.close() | |||
|
1394 | yield data | |||
|
1395 | else: | |||
|
1396 | for chunk in util.filechunkiter(sopener(name), limit=size): | |||
|
1397 | yield chunk | |||
|
1398 | finally: | |||
|
1399 | sopener.mustaudit = oldaudit |
@@ -744,73 +744,27 b' def pushkey(repo, proto, namespace, key,' | |||||
744 | def _allowstream(ui): |
|
744 | def _allowstream(ui): | |
745 | return ui.configbool('server', 'uncompressed', True, untrusted=True) |
|
745 | return ui.configbool('server', 'uncompressed', True, untrusted=True) | |
746 |
|
746 | |||
747 | def _walkstreamfiles(repo): |
|
|||
748 | # this is it's own function so extensions can override it |
|
|||
749 | return repo.store.walk() |
|
|||
750 |
|
||||
751 | @wireprotocommand('stream_out') |
|
747 | @wireprotocommand('stream_out') | |
752 | def stream(repo, proto): |
|
748 | def stream(repo, proto): | |
753 | '''If the server supports streaming clone, it advertises the "stream" |
|
749 | '''If the server supports streaming clone, it advertises the "stream" | |
754 | capability with a value representing the version and flags of the repo |
|
750 | capability with a value representing the version and flags of the repo | |
755 | it is serving. Client checks to see if it understands the format. |
|
751 | it is serving. Client checks to see if it understands the format. | |
756 |
|
||||
757 | The format is simple: the server writes out a line with the amount |
|
|||
758 | of files, then the total amount of bytes to be transferred (separated |
|
|||
759 | by a space). Then, for each file, the server first writes the filename |
|
|||
760 | and file size (separated by the null character), then the file contents. |
|
|||
761 | ''' |
|
752 | ''' | |
762 |
|
||||
763 | if not _allowstream(repo.ui): |
|
753 | if not _allowstream(repo.ui): | |
764 | return '1\n' |
|
754 | return '1\n' | |
765 |
|
755 | |||
766 | entries = [] |
|
756 | def getstream(it): | |
767 | total_bytes = 0 |
|
757 | yield '0\n' | |
768 | try: |
|
758 | for chunk in it: | |
769 | # get consistent snapshot of repo, lock during scan |
|
759 | yield chunk | |
770 | lock = repo.lock() |
|
|||
771 | try: |
|
|||
772 | repo.ui.debug('scanning\n') |
|
|||
773 | for name, ename, size in _walkstreamfiles(repo): |
|
|||
774 | if size: |
|
|||
775 | entries.append((name, size)) |
|
|||
776 | total_bytes += size |
|
|||
777 | finally: |
|
|||
778 | lock.release() |
|
|||
779 | except error.LockError: |
|
|||
780 | return '2\n' # error: 2 |
|
|||
781 |
|
||||
782 | def streamer(repo, entries, total): |
|
|||
783 | '''stream out all metadata files in repository.''' |
|
|||
784 | yield '0\n' # success |
|
|||
785 | repo.ui.debug('%d files, %d bytes to transfer\n' % |
|
|||
786 | (len(entries), total_bytes)) |
|
|||
787 | yield '%d %d\n' % (len(entries), total_bytes) |
|
|||
788 |
|
760 | |||
789 | sopener = repo.svfs |
|
761 | try: | |
790 | oldaudit = sopener.mustaudit |
|
762 | # LockError may be raised before the first result is yielded. Don't | |
791 | debugflag = repo.ui.debugflag |
|
763 | # emit output until we're sure we got the lock successfully. | |
792 | sopener.mustaudit = False |
|
764 | it = exchange.generatestreamclone(repo) | |
793 |
|
765 | return streamres(getstream(it)) | ||
794 | try: |
|
766 | except error.LockError: | |
795 | for name, size in entries: |
|
767 | return '2\n' | |
796 | if debugflag: |
|
|||
797 | repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) |
|
|||
798 | # partially encode name over the wire for backwards compat |
|
|||
799 | yield '%s\0%d\n' % (store.encodedir(name), size) |
|
|||
800 | if size <= 65536: |
|
|||
801 | fp = sopener(name) |
|
|||
802 | try: |
|
|||
803 | data = fp.read(size) |
|
|||
804 | finally: |
|
|||
805 | fp.close() |
|
|||
806 | yield data |
|
|||
807 | else: |
|
|||
808 | for chunk in util.filechunkiter(sopener(name), limit=size): |
|
|||
809 | yield chunk |
|
|||
810 | finally: |
|
|||
811 | sopener.mustaudit = oldaudit |
|
|||
812 |
|
||||
813 | return streamres(streamer(repo, entries, total_bytes)) |
|
|||
814 |
|
768 | |||
815 | @wireprotocommand('unbundle', 'heads') |
|
769 | @wireprotocommand('unbundle', 'heads') | |
816 | def unbundle(repo, proto, heads): |
|
770 | def unbundle(repo, proto, heads): |
General Comments 0
You need to be logged in to leave comments.
Login now