diff --git a/mercurial/exchange.py b/mercurial/exchange.py --- a/mercurial/exchange.py +++ b/mercurial/exchange.py @@ -5,11 +5,10 @@ # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. -import time from i18n import _ from node import hex, nullid import errno, urllib -import util, scmutil, changegroup, base85, error, store +import util, scmutil, changegroup, base85, error import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey import lock as lockmod import tags @@ -1468,131 +1467,3 @@ def unbundle(repo, cg, heads, source, ur if recordout is not None: recordout(repo.ui.popbuffer()) return r - -# This is it's own function so extensions can override it. -def _walkstreamfiles(repo): - return repo.store.walk() - -def generatestreamclone(repo): - """Emit content for a streaming clone. - - This is a generator of raw chunks that constitute a streaming clone. - - The stream begins with a line of 2 space-delimited integers containing the - number of entries and total bytes size. - - Next, are N entries for each file being transferred. Each file entry starts - as a line with the file name and integer size delimited by a null byte. - The raw file data follows. Following the raw file data is the next file - entry, or EOF. - - When used on the wire protocol, an additional line indicating protocol - success will be prepended to the stream. This function is not responsible - for adding it. - - This function will obtain a repository lock to ensure a consistent view of - the store is captured. It therefore may raise LockError. - """ - entries = [] - total_bytes = 0 - # Get consistent snapshot of repo, lock during scan. - lock = repo.lock() - try: - repo.ui.debug('scanning\n') - for name, ename, size in _walkstreamfiles(repo): - if size: - entries.append((name, size)) - total_bytes += size - finally: - lock.release() - - repo.ui.debug('%d files, %d bytes to transfer\n' % - (len(entries), total_bytes)) - yield '%d %d\n' % (len(entries), total_bytes) - - svfs = repo.svfs - oldaudit = svfs.mustaudit - debugflag = repo.ui.debugflag - svfs.mustaudit = False - - try: - for name, size in entries: - if debugflag: - repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) - # partially encode name over the wire for backwards compat - yield '%s\0%d\n' % (store.encodedir(name), size) - if size <= 65536: - fp = svfs(name) - try: - data = fp.read(size) - finally: - fp.close() - yield data - else: - for chunk in util.filechunkiter(svfs(name), limit=size): - yield chunk - finally: - svfs.mustaudit = oldaudit - -def consumestreamclone(repo, fp): - """Apply the contents from a streaming clone file. - - This takes the output from "streamout" and applies it to the specified - repository. - - Like "streamout," the status line added by the wire protocol is not handled - by this function. - """ - lock = repo.lock() - try: - repo.ui.status(_('streaming all changes\n')) - l = fp.readline() - try: - total_files, total_bytes = map(int, l.split(' ', 1)) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - repo.ui.status(_('%d files to transfer, %s of data\n') % - (total_files, util.bytecount(total_bytes))) - handled_bytes = 0 - repo.ui.progress(_('clone'), 0, total=total_bytes) - start = time.time() - - tr = repo.transaction(_('clone')) - try: - for i in xrange(total_files): - # XXX doesn't support '\n' or '\r' in filenames - l = fp.readline() - try: - name, size = l.split('\0', 1) - size = int(size) - except (ValueError, TypeError): - raise error.ResponseError( - _('unexpected response from remote server:'), l) - if repo.ui.debugflag: - repo.ui.debug('adding %s (%s)\n' % - (name, util.bytecount(size))) - # for backwards compat, name was partially encoded - ofp = repo.svfs(store.decodedir(name), 'w') - for chunk in util.filechunkiter(fp, limit=size): - handled_bytes += len(chunk) - repo.ui.progress(_('clone'), handled_bytes, - total=total_bytes) - ofp.write(chunk) - ofp.close() - tr.close() - finally: - tr.release() - - # Writing straight to files circumvented the inmemory caches - repo.invalidate() - - elapsed = time.time() - start - if elapsed <= 0: - elapsed = 0.001 - repo.ui.progress(_('clone'), None) - repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % - (util.bytecount(total_bytes), elapsed, - util.bytecount(total_bytes / elapsed))) - finally: - lock.release() diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -7,14 +7,144 @@ from __future__ import absolute_import +import time + from .i18n import _ from . import ( branchmap, error, - exchange, + store, util, ) +# This is it's own function so extensions can override it. +def _walkstreamfiles(repo): + return repo.store.walk() + +def generatev1(repo): + """Emit content for version 1 of a streaming clone. + + This is a generator of raw chunks that constitute a streaming clone. + + The stream begins with a line of 2 space-delimited integers containing the + number of entries and total bytes size. + + Next, are N entries for each file being transferred. Each file entry starts + as a line with the file name and integer size delimited by a null byte. + The raw file data follows. Following the raw file data is the next file + entry, or EOF. + + When used on the wire protocol, an additional line indicating protocol + success will be prepended to the stream. This function is not responsible + for adding it. + + This function will obtain a repository lock to ensure a consistent view of + the store is captured. It therefore may raise LockError. + """ + entries = [] + total_bytes = 0 + # Get consistent snapshot of repo, lock during scan. + lock = repo.lock() + try: + repo.ui.debug('scanning\n') + for name, ename, size in _walkstreamfiles(repo): + if size: + entries.append((name, size)) + total_bytes += size + finally: + lock.release() + + repo.ui.debug('%d files, %d bytes to transfer\n' % + (len(entries), total_bytes)) + yield '%d %d\n' % (len(entries), total_bytes) + + svfs = repo.svfs + oldaudit = svfs.mustaudit + debugflag = repo.ui.debugflag + svfs.mustaudit = False + + try: + for name, size in entries: + if debugflag: + repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) + # partially encode name over the wire for backwards compat + yield '%s\0%d\n' % (store.encodedir(name), size) + if size <= 65536: + fp = svfs(name) + try: + data = fp.read(size) + finally: + fp.close() + yield data + else: + for chunk in util.filechunkiter(svfs(name), limit=size): + yield chunk + finally: + svfs.mustaudit = oldaudit + +def consumev1(repo, fp): + """Apply the contents from version 1 of a streaming clone file handle. + + This takes the output from "streamout" and applies it to the specified + repository. + + Like "streamout," the status line added by the wire protocol is not handled + by this function. + """ + lock = repo.lock() + try: + repo.ui.status(_('streaming all changes\n')) + l = fp.readline() + try: + total_files, total_bytes = map(int, l.split(' ', 1)) + except (ValueError, TypeError): + raise error.ResponseError( + _('unexpected response from remote server:'), l) + repo.ui.status(_('%d files to transfer, %s of data\n') % + (total_files, util.bytecount(total_bytes))) + handled_bytes = 0 + repo.ui.progress(_('clone'), 0, total=total_bytes) + start = time.time() + + tr = repo.transaction(_('clone')) + try: + for i in xrange(total_files): + # XXX doesn't support '\n' or '\r' in filenames + l = fp.readline() + try: + name, size = l.split('\0', 1) + size = int(size) + except (ValueError, TypeError): + raise error.ResponseError( + _('unexpected response from remote server:'), l) + if repo.ui.debugflag: + repo.ui.debug('adding %s (%s)\n' % + (name, util.bytecount(size))) + # for backwards compat, name was partially encoded + ofp = repo.svfs(store.decodedir(name), 'w') + for chunk in util.filechunkiter(fp, limit=size): + handled_bytes += len(chunk) + repo.ui.progress(_('clone'), handled_bytes, + total=total_bytes) + ofp.write(chunk) + ofp.close() + tr.close() + finally: + tr.release() + + # Writing straight to files circumvented the inmemory caches + repo.invalidate() + + elapsed = time.time() - start + if elapsed <= 0: + elapsed = 0.001 + repo.ui.progress(_('clone'), None) + repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % + (util.bytecount(total_bytes), elapsed, + util.bytecount(total_bytes / elapsed))) + finally: + lock.release() + def streamin(repo, remote, remotereqs): # Save remote branchmap. We will use it later # to speed up branchcache creation @@ -46,11 +176,11 @@ def applyremotedata(repo, remotereqs, re "remotebranchmap" is the result of a branchmap lookup on the remote. It can be None. "fp" is a file object containing the raw stream data, suitable for - feeding into exchange.consumestreamclone. + feeding into consumev1(). """ lock = repo.lock() try: - exchange.consumestreamclone(repo, fp) + consumev1(repo, fp) # new requirements = old non-format requirements + # new format-related remote requirements diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -26,6 +26,7 @@ from . import ( exchange, peer, pushkey as pushkeymod, + streamclone, util, ) @@ -720,7 +721,7 @@ def stream(repo, proto): try: # LockError may be raised before the first result is yielded. Don't # emit output until we're sure we got the lock successfully. - it = exchange.generatestreamclone(repo) + it = streamclone.generatev1(repo) return streamres(getstream(it)) except error.LockError: return '2\n'