##// END OF EJS Templates
exchange: move code for generating a streaming clone into exchange...
Gregory Szorc -
r25235:41965bf2 default
parent child Browse files
Show More
@@ -8,7 +8,7 b''
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 import util, scmutil, changegroup, base85, error
11 import util, scmutil, changegroup, base85, error, store
12 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 13 import lock as lockmod
14 14
@@ -1332,3 +1332,68 b' def unbundle(repo, cg, heads, source, ur'
1332 1332 if recordout is not None:
1333 1333 recordout(repo.ui.popbuffer())
1334 1334 return r
1335
1336 # This is it's own function so extensions can override it.
1337 def _walkstreamfiles(repo):
1338 return repo.store.walk()
1339
1340 def generatestreamclone(repo):
1341 """Emit content for a streaming clone.
1342
1343 This is a generator of raw chunks that constitute a streaming clone.
1344
1345 The stream begins with a line of 2 space-delimited integers containing the
1346 number of entries and total bytes size.
1347
1348 Next, are N entries for each file being transferred. Each file entry starts
1349 as a line with the file name and integer size delimited by a null byte.
1350 The raw file data follows. Following the raw file data is the next file
1351 entry, or EOF.
1352
1353 When used on the wire protocol, an additional line indicating protocol
1354 success will be prepended to the stream. This function is not responsible
1355 for adding it.
1356
1357 This function will obtain a repository lock to ensure a consistent view of
1358 the store is captured. It therefore may raise LockError.
1359 """
1360 entries = []
1361 total_bytes = 0
1362 # Get consistent snapshot of repo, lock during scan.
1363 lock = repo.lock()
1364 try:
1365 repo.ui.debug('scanning\n')
1366 for name, ename, size in _walkstreamfiles(repo):
1367 if size:
1368 entries.append((name, size))
1369 total_bytes += size
1370 finally:
1371 lock.release()
1372
1373 repo.ui.debug('%d files, %d bytes to transfer\n' %
1374 (len(entries), total_bytes))
1375 yield '%d %d\n' % (len(entries), total_bytes)
1376
1377 sopener = repo.svfs
1378 oldaudit = sopener.mustaudit
1379 debugflag = repo.ui.debugflag
1380 sopener.mustaudit = False
1381
1382 try:
1383 for name, size in entries:
1384 if debugflag:
1385 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1386 # partially encode name over the wire for backwards compat
1387 yield '%s\0%d\n' % (store.encodedir(name), size)
1388 if size <= 65536:
1389 fp = sopener(name)
1390 try:
1391 data = fp.read(size)
1392 finally:
1393 fp.close()
1394 yield data
1395 else:
1396 for chunk in util.filechunkiter(sopener(name), limit=size):
1397 yield chunk
1398 finally:
1399 sopener.mustaudit = oldaudit
@@ -744,73 +744,27 b' def pushkey(repo, proto, namespace, key,'
744 744 def _allowstream(ui):
745 745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
746 746
747 def _walkstreamfiles(repo):
748 # this is it's own function so extensions can override it
749 return repo.store.walk()
750
751 747 @wireprotocommand('stream_out')
752 748 def stream(repo, proto):
753 749 '''If the server supports streaming clone, it advertises the "stream"
754 750 capability with a value representing the version and flags of the repo
755 751 it is serving. Client checks to see if it understands the format.
756
757 The format is simple: the server writes out a line with the amount
758 of files, then the total amount of bytes to be transferred (separated
759 by a space). Then, for each file, the server first writes the filename
760 and file size (separated by the null character), then the file contents.
761 752 '''
762
763 753 if not _allowstream(repo.ui):
764 754 return '1\n'
765 755
766 entries = []
767 total_bytes = 0
768 try:
769 # get consistent snapshot of repo, lock during scan
770 lock = repo.lock()
771 try:
772 repo.ui.debug('scanning\n')
773 for name, ename, size in _walkstreamfiles(repo):
774 if size:
775 entries.append((name, size))
776 total_bytes += size
777 finally:
778 lock.release()
779 except error.LockError:
780 return '2\n' # error: 2
781
782 def streamer(repo, entries, total):
783 '''stream out all metadata files in repository.'''
784 yield '0\n' # success
785 repo.ui.debug('%d files, %d bytes to transfer\n' %
786 (len(entries), total_bytes))
787 yield '%d %d\n' % (len(entries), total_bytes)
756 def getstream(it):
757 yield '0\n'
758 for chunk in it:
759 yield chunk
788 760
789 sopener = repo.svfs
790 oldaudit = sopener.mustaudit
791 debugflag = repo.ui.debugflag
792 sopener.mustaudit = False
793
794 try:
795 for name, size in entries:
796 if debugflag:
797 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
798 # partially encode name over the wire for backwards compat
799 yield '%s\0%d\n' % (store.encodedir(name), size)
800 if size <= 65536:
801 fp = sopener(name)
802 try:
803 data = fp.read(size)
804 finally:
805 fp.close()
806 yield data
807 else:
808 for chunk in util.filechunkiter(sopener(name), limit=size):
809 yield chunk
810 finally:
811 sopener.mustaudit = oldaudit
812
813 return streamres(streamer(repo, entries, total_bytes))
761 try:
762 # LockError may be raised before the first result is yielded. Don't
763 # emit output until we're sure we got the lock successfully.
764 it = exchange.generatestreamclone(repo)
765 return streamres(getstream(it))
766 except error.LockError:
767 return '2\n'
814 768
815 769 @wireprotocommand('unbundle', 'heads')
816 770 def unbundle(repo, proto, heads):
General Comments 0
You need to be logged in to leave comments. Login now