##// END OF EJS Templates
exchange: move code for generating a streaming clone into exchange...
Gregory Szorc -
r25235:41965bf2 default
parent child Browse files
Show More
@@ -8,7 +8,7 b''
8 from i18n import _
8 from i18n import _
9 from node import hex, nullid
9 from node import hex, nullid
10 import errno, urllib
10 import errno, urllib
11 import util, scmutil, changegroup, base85, error
11 import util, scmutil, changegroup, base85, error, store
12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 import lock as lockmod
13 import lock as lockmod
14
14
@@ -1332,3 +1332,68 b' def unbundle(repo, cg, heads, source, ur'
1332 if recordout is not None:
1332 if recordout is not None:
1333 recordout(repo.ui.popbuffer())
1333 recordout(repo.ui.popbuffer())
1334 return r
1334 return r
1335
1336 # This is it's own function so extensions can override it.
1337 def _walkstreamfiles(repo):
1338 return repo.store.walk()
1339
1340 def generatestreamclone(repo):
1341 """Emit content for a streaming clone.
1342
1343 This is a generator of raw chunks that constitute a streaming clone.
1344
1345 The stream begins with a line of 2 space-delimited integers containing the
1346 number of entries and total bytes size.
1347
1348 Next, are N entries for each file being transferred. Each file entry starts
1349 as a line with the file name and integer size delimited by a null byte.
1350 The raw file data follows. Following the raw file data is the next file
1351 entry, or EOF.
1352
1353 When used on the wire protocol, an additional line indicating protocol
1354 success will be prepended to the stream. This function is not responsible
1355 for adding it.
1356
1357 This function will obtain a repository lock to ensure a consistent view of
1358 the store is captured. It therefore may raise LockError.
1359 """
1360 entries = []
1361 total_bytes = 0
1362 # Get consistent snapshot of repo, lock during scan.
1363 lock = repo.lock()
1364 try:
1365 repo.ui.debug('scanning\n')
1366 for name, ename, size in _walkstreamfiles(repo):
1367 if size:
1368 entries.append((name, size))
1369 total_bytes += size
1370 finally:
1371 lock.release()
1372
1373 repo.ui.debug('%d files, %d bytes to transfer\n' %
1374 (len(entries), total_bytes))
1375 yield '%d %d\n' % (len(entries), total_bytes)
1376
1377 sopener = repo.svfs
1378 oldaudit = sopener.mustaudit
1379 debugflag = repo.ui.debugflag
1380 sopener.mustaudit = False
1381
1382 try:
1383 for name, size in entries:
1384 if debugflag:
1385 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1386 # partially encode name over the wire for backwards compat
1387 yield '%s\0%d\n' % (store.encodedir(name), size)
1388 if size <= 65536:
1389 fp = sopener(name)
1390 try:
1391 data = fp.read(size)
1392 finally:
1393 fp.close()
1394 yield data
1395 else:
1396 for chunk in util.filechunkiter(sopener(name), limit=size):
1397 yield chunk
1398 finally:
1399 sopener.mustaudit = oldaudit
@@ -744,73 +744,27 b' def pushkey(repo, proto, namespace, key,'
744 def _allowstream(ui):
744 def _allowstream(ui):
745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
746
746
747 def _walkstreamfiles(repo):
748 # this is it's own function so extensions can override it
749 return repo.store.walk()
750
751 @wireprotocommand('stream_out')
747 @wireprotocommand('stream_out')
752 def stream(repo, proto):
748 def stream(repo, proto):
753 '''If the server supports streaming clone, it advertises the "stream"
749 '''If the server supports streaming clone, it advertises the "stream"
754 capability with a value representing the version and flags of the repo
750 capability with a value representing the version and flags of the repo
755 it is serving. Client checks to see if it understands the format.
751 it is serving. Client checks to see if it understands the format.
756
757 The format is simple: the server writes out a line with the amount
758 of files, then the total amount of bytes to be transferred (separated
759 by a space). Then, for each file, the server first writes the filename
760 and file size (separated by the null character), then the file contents.
761 '''
752 '''
762
763 if not _allowstream(repo.ui):
753 if not _allowstream(repo.ui):
764 return '1\n'
754 return '1\n'
765
755
766 entries = []
756 def getstream(it):
767 total_bytes = 0
757 yield '0\n'
768 try:
758 for chunk in it:
769 # get consistent snapshot of repo, lock during scan
759 yield chunk
770 lock = repo.lock()
771 try:
772 repo.ui.debug('scanning\n')
773 for name, ename, size in _walkstreamfiles(repo):
774 if size:
775 entries.append((name, size))
776 total_bytes += size
777 finally:
778 lock.release()
779 except error.LockError:
780 return '2\n' # error: 2
781
782 def streamer(repo, entries, total):
783 '''stream out all metadata files in repository.'''
784 yield '0\n' # success
785 repo.ui.debug('%d files, %d bytes to transfer\n' %
786 (len(entries), total_bytes))
787 yield '%d %d\n' % (len(entries), total_bytes)
788
760
789 sopener = repo.svfs
761 try:
790 oldaudit = sopener.mustaudit
762 # LockError may be raised before the first result is yielded. Don't
791 debugflag = repo.ui.debugflag
763 # emit output until we're sure we got the lock successfully.
792 sopener.mustaudit = False
764 it = exchange.generatestreamclone(repo)
793
765 return streamres(getstream(it))
794 try:
766 except error.LockError:
795 for name, size in entries:
767 return '2\n'
796 if debugflag:
797 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
798 # partially encode name over the wire for backwards compat
799 yield '%s\0%d\n' % (store.encodedir(name), size)
800 if size <= 65536:
801 fp = sopener(name)
802 try:
803 data = fp.read(size)
804 finally:
805 fp.close()
806 yield data
807 else:
808 for chunk in util.filechunkiter(sopener(name), limit=size):
809 yield chunk
810 finally:
811 sopener.mustaudit = oldaudit
812
813 return streamres(streamer(repo, entries, total_bytes))
814
768
815 @wireprotocommand('unbundle', 'heads')
769 @wireprotocommand('unbundle', 'heads')
816 def unbundle(repo, proto, heads):
770 def unbundle(repo, proto, heads):
General Comments 0
You need to be logged in to leave comments. Login now