diff --git a/mercurial/exchange.py b/mercurial/exchange.py --- a/mercurial/exchange.py +++ b/mercurial/exchange.py @@ -8,7 +8,7 @@ from i18n import _ from node import hex, nullid import errno, urllib -import util, scmutil, changegroup, base85, error +import util, scmutil, changegroup, base85, error, store import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey import lock as lockmod @@ -1332,3 +1332,68 @@ def unbundle(repo, cg, heads, source, ur if recordout is not None: recordout(repo.ui.popbuffer()) return r + +# This is it's own function so extensions can override it. +def _walkstreamfiles(repo): + return repo.store.walk() + +def generatestreamclone(repo): + """Emit content for a streaming clone. + + This is a generator of raw chunks that constitute a streaming clone. + + The stream begins with a line of 2 space-delimited integers containing the + number of entries and total bytes size. + + Next, are N entries for each file being transferred. Each file entry starts + as a line with the file name and integer size delimited by a null byte. + The raw file data follows. Following the raw file data is the next file + entry, or EOF. + + When used on the wire protocol, an additional line indicating protocol + success will be prepended to the stream. This function is not responsible + for adding it. + + This function will obtain a repository lock to ensure a consistent view of + the store is captured. It therefore may raise LockError. + """ + entries = [] + total_bytes = 0 + # Get consistent snapshot of repo, lock during scan. + lock = repo.lock() + try: + repo.ui.debug('scanning\n') + for name, ename, size in _walkstreamfiles(repo): + if size: + entries.append((name, size)) + total_bytes += size + finally: + lock.release() + + repo.ui.debug('%d files, %d bytes to transfer\n' % + (len(entries), total_bytes)) + yield '%d %d\n' % (len(entries), total_bytes) + + sopener = repo.svfs + oldaudit = sopener.mustaudit + debugflag = repo.ui.debugflag + sopener.mustaudit = False + + try: + for name, size in entries: + if debugflag: + repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) + # partially encode name over the wire for backwards compat + yield '%s\0%d\n' % (store.encodedir(name), size) + if size <= 65536: + fp = sopener(name) + try: + data = fp.read(size) + finally: + fp.close() + yield data + else: + for chunk in util.filechunkiter(sopener(name), limit=size): + yield chunk + finally: + sopener.mustaudit = oldaudit diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -744,73 +744,27 @@ def pushkey(repo, proto, namespace, key, def _allowstream(ui): return ui.configbool('server', 'uncompressed', True, untrusted=True) -def _walkstreamfiles(repo): - # this is it's own function so extensions can override it - return repo.store.walk() - @wireprotocommand('stream_out') def stream(repo, proto): '''If the server supports streaming clone, it advertises the "stream" capability with a value representing the version and flags of the repo it is serving. Client checks to see if it understands the format. - - The format is simple: the server writes out a line with the amount - of files, then the total amount of bytes to be transferred (separated - by a space). Then, for each file, the server first writes the filename - and file size (separated by the null character), then the file contents. ''' - if not _allowstream(repo.ui): return '1\n' - entries = [] - total_bytes = 0 - try: - # get consistent snapshot of repo, lock during scan - lock = repo.lock() - try: - repo.ui.debug('scanning\n') - for name, ename, size in _walkstreamfiles(repo): - if size: - entries.append((name, size)) - total_bytes += size - finally: - lock.release() - except error.LockError: - return '2\n' # error: 2 - - def streamer(repo, entries, total): - '''stream out all metadata files in repository.''' - yield '0\n' # success - repo.ui.debug('%d files, %d bytes to transfer\n' % - (len(entries), total_bytes)) - yield '%d %d\n' % (len(entries), total_bytes) + def getstream(it): + yield '0\n' + for chunk in it: + yield chunk - sopener = repo.svfs - oldaudit = sopener.mustaudit - debugflag = repo.ui.debugflag - sopener.mustaudit = False - - try: - for name, size in entries: - if debugflag: - repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) - # partially encode name over the wire for backwards compat - yield '%s\0%d\n' % (store.encodedir(name), size) - if size <= 65536: - fp = sopener(name) - try: - data = fp.read(size) - finally: - fp.close() - yield data - else: - for chunk in util.filechunkiter(sopener(name), limit=size): - yield chunk - finally: - sopener.mustaudit = oldaudit - - return streamres(streamer(repo, entries, total_bytes)) + try: + # LockError may be raised before the first result is yielded. Don't + # emit output until we're sure we got the lock successfully. + it = exchange.generatestreamclone(repo) + return streamres(getstream(it)) + except error.LockError: + return '2\n' @wireprotocommand('unbundle', 'heads') def unbundle(repo, proto, heads):