diff --git a/hgext/largefiles/proto.py b/hgext/largefiles/proto.py --- a/hgext/largefiles/proto.py +++ b/hgext/largefiles/proto.py @@ -41,7 +41,8 @@ def putlfile(repo, proto, sha): tmpfp = util.atomictempfile(path, createmode=repo.store.createmode) try: - proto.forwardpayload(tmpfp) + for p in proto.getpayload(): + tmpfp.write(p) tmpfp._fp.seek(0) if sha != lfutil.hexsha1(tmpfp._fp): raise IOError(0, _('largefile contents do not match hash')) diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -917,6 +917,9 @@ coreconfigitem('server', 'concurrent-pus coreconfigitem('server', 'disablefullbundle', default=False, ) +coreconfigitem('server', 'streamunbundle', + default=False, +) coreconfigitem('server', 'maxhttpheaderlen', default=1024, ) diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -1791,6 +1791,11 @@ Controls generic server settings. are highly recommended. Partial clones will still be allowed. (default: False) +``streamunbundle`` + When set, servers will apply data sent from the client directly, + otherwise it will be written to a temporary file first. This option + effectively prevents concurrent pushes. + ``concurrent-push-mode`` Level of allowed race condition between two pushing clients. diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py --- a/mercurial/wireproto.py +++ b/mercurial/wireproto.py @@ -1082,14 +1082,33 @@ def unbundle(repo, proto, heads): with proto.mayberedirectstdio() as output: try: exchange.check_heads(repo, their_heads, 'preparing changes') + cleanup = lambda: None + try: + payload = proto.getpayload() + if repo.ui.configbool('server', 'streamunbundle'): + def cleanup(): + # Ensure that the full payload is consumed, so + # that the connection doesn't contain trailing garbage. + for p in payload: + pass + fp = util.chunkbuffer(payload) + else: + # write bundle data to temporary file as it can be big + fp, tempname = None, None + def cleanup(): + if fp: + fp.close() + if tempname: + os.unlink(tempname) + fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') + repo.ui.debug('redirecting incoming bundle to %s\n' % + tempname) + fp = os.fdopen(fd, pycompat.sysstr('wb+')) + r = 0 + for p in payload: + fp.write(p) + fp.seek(0) - # write bundle data to temporary file because it can be big - fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-') - fp = os.fdopen(fd, r'wb+') - r = 0 - try: - proto.forwardpayload(fp) - fp.seek(0) gen = exchange.readbundle(repo.ui, fp, None) if (isinstance(gen, changegroupmod.cg1unpacker) and not bundle1allowed(repo, 'push')): @@ -1112,8 +1131,7 @@ def unbundle(repo, proto, heads): r, output.getvalue() if output else '') finally: - fp.close() - os.unlink(tempname) + cleanup() except (error.BundleValueError, error.Abort, error.PushRaced) as exc: # handle non-bundle2 case first diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -106,15 +106,14 @@ class httpv1protocolhandler(object): self._protocaps = set(value.split(' ')) return self._protocaps - def forwardpayload(self, fp): + def getpayload(self): # Existing clients *always* send Content-Length. length = int(self._req.headers[b'Content-Length']) # If httppostargs is used, we need to read Content-Length # minus the amount that was consumed by args. length -= int(self._req.headers.get(b'X-HgArgs-Post', 0)) - for s in util.filechunkiter(self._req.bodyfh, limit=length): - fp.write(s) + return util.filechunkiter(self._req.bodyfh, limit=length) @contextlib.contextmanager def mayberedirectstdio(self): @@ -610,7 +609,7 @@ class httpv2protocolhandler(object): # Protocol capabilities are currently not implemented for HTTP V2. return set() - def forwardpayload(self, fp): + def getpayload(self): raise NotImplementedError @contextlib.contextmanager @@ -783,7 +782,7 @@ class sshv1protocolhandler(object): def getprotocaps(self): return self._protocaps - def forwardpayload(self, fpout): + def getpayload(self): # We initially send an empty response. This tells the client it is # OK to start sending data. If a client sees any other response, it # interprets it as an error. @@ -796,7 +795,7 @@ class sshv1protocolhandler(object): # 0\n count = int(self._fin.readline()) while count: - fpout.write(self._fin.read(count)) + yield self._fin.read(count) count = int(self._fin.readline()) @contextlib.contextmanager diff --git a/mercurial/wireprototypes.py b/mercurial/wireprototypes.py --- a/mercurial/wireprototypes.py +++ b/mercurial/wireprototypes.py @@ -123,10 +123,11 @@ class baseprotocolhandler(zi.Interface): Returns a list of capabilities as declared by the client for the current request (or connection for stateful protocol handlers).""" - def forwardpayload(fp): - """Read the raw payload and forward to a file. + def getpayload(): + """Provide a generator for the raw payload. - The payload is read in full before the function returns. + The caller is responsible for ensuring that the full payload is + processed. """ def mayberedirectstdio(): diff --git a/tests/test-push-http.t b/tests/test-push-http.t --- a/tests/test-push-http.t +++ b/tests/test-push-http.t @@ -23,7 +23,7 @@ $ echo a >> a $ hg ci -mb $ req() { - > hg serve -p $HGPORT -d --pid-file=hg.pid -E errors.log + > hg $1 serve -p $HGPORT -d --pid-file=hg.pid -E errors.log > cat hg.pid >> $DAEMON_PIDS > hg --cwd ../test2 push http://localhost:$HGPORT/ > exitstatus=$? @@ -70,6 +70,58 @@ expect success > echo "phase-move: $HG_NODE: $HG_OLDPHASE -> $HG_PHASE" > EOF +#if bundle1 + $ cat >> .hg/hgrc < allow_push = * + > [hooks] + > changegroup = sh -c "printenv.py changegroup 0" + > pushkey = sh -c "printenv.py pushkey 0" + > txnclose-phase.test = sh $TESTTMP/hook.sh + > EOF + $ req "--debug --config extensions.blackbox=" + listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT) + pushing to http://localhost:$HGPORT/ + searching for changes + remote: redirecting incoming bundle to */hg-unbundle-* (glob) + remote: adding changesets + remote: add changeset ba677d0156c1 + remote: adding manifests + remote: adding file changes + remote: adding a revisions + remote: added 1 changesets with 1 changes to 1 files + remote: updating the branch cache + remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh + remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public + remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh + remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public + remote: running hook changegroup: sh -c "printenv.py changegroup 0" + remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) + % serve errors + $ hg rollback + repository tip rolled back to revision 0 (undo serve) + $ req "--debug --config server.streamunbundle=True --config extensions.blackbox=" + listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT) + pushing to http://localhost:$HGPORT/ + searching for changes + remote: adding changesets + remote: add changeset ba677d0156c1 + remote: adding manifests + remote: adding file changes + remote: adding a revisions + remote: added 1 changesets with 1 changes to 1 files + remote: updating the branch cache + remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh + remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public + remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh + remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public + remote: running hook changegroup: sh -c "printenv.py changegroup 0" + remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) + % serve errors + $ hg rollback + repository tip rolled back to revision 0 (undo serve) +#endif + +#if bundle2 $ cat >> .hg/hgrc < allow_push = * > [hooks] @@ -86,11 +138,11 @@ expect success remote: added 1 changesets with 1 changes to 1 files remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public - remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle1 !) - remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle2 !) + remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) % serve errors $ hg rollback repository tip rolled back to revision 0 (undo serve) +#endif expect success, server lacks the httpheader capability