##// END OF EJS Templates
wireproto: allow direct stream processing for unbundle...
Joerg Sonnenberger -
r37432:2d965bfe default
parent child Browse files
Show More
@@ -41,7 +41,8 b' def putlfile(repo, proto, sha):'
41 41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42 42
43 43 try:
44 proto.forwardpayload(tmpfp)
44 for p in proto.getpayload():
45 tmpfp.write(p)
45 46 tmpfp._fp.seek(0)
46 47 if sha != lfutil.hexsha1(tmpfp._fp):
47 48 raise IOError(0, _('largefile contents do not match hash'))
@@ -917,6 +917,9 b" coreconfigitem('server', 'concurrent-pus"
917 917 coreconfigitem('server', 'disablefullbundle',
918 918 default=False,
919 919 )
920 coreconfigitem('server', 'streamunbundle',
921 default=False,
922 )
920 923 coreconfigitem('server', 'maxhttpheaderlen',
921 924 default=1024,
922 925 )
@@ -1791,6 +1791,11 b' Controls generic server settings.'
1791 1791 are highly recommended. Partial clones will still be allowed.
1792 1792 (default: False)
1793 1793
1794 ``streamunbundle``
1795 When set, servers will apply data sent from the client directly,
1796 otherwise it will be written to a temporary file first. This option
1797 effectively prevents concurrent pushes.
1798
1794 1799 ``concurrent-push-mode``
1795 1800 Level of allowed race condition between two pushing clients.
1796 1801
@@ -1082,14 +1082,33 b' def unbundle(repo, proto, heads):'
1082 1082 with proto.mayberedirectstdio() as output:
1083 1083 try:
1084 1084 exchange.check_heads(repo, their_heads, 'preparing changes')
1085 cleanup = lambda: None
1086 try:
1087 payload = proto.getpayload()
1088 if repo.ui.configbool('server', 'streamunbundle'):
1089 def cleanup():
1090 # Ensure that the full payload is consumed, so
1091 # that the connection doesn't contain trailing garbage.
1092 for p in payload:
1093 pass
1094 fp = util.chunkbuffer(payload)
1095 else:
1096 # write bundle data to temporary file as it can be big
1097 fp, tempname = None, None
1098 def cleanup():
1099 if fp:
1100 fp.close()
1101 if tempname:
1102 os.unlink(tempname)
1103 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1104 repo.ui.debug('redirecting incoming bundle to %s\n' %
1105 tempname)
1106 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1107 r = 0
1108 for p in payload:
1109 fp.write(p)
1110 fp.seek(0)
1085 1111
1086 # write bundle data to temporary file because it can be big
1087 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1088 fp = os.fdopen(fd, r'wb+')
1089 r = 0
1090 try:
1091 proto.forwardpayload(fp)
1092 fp.seek(0)
1093 1112 gen = exchange.readbundle(repo.ui, fp, None)
1094 1113 if (isinstance(gen, changegroupmod.cg1unpacker)
1095 1114 and not bundle1allowed(repo, 'push')):
@@ -1112,8 +1131,7 b' def unbundle(repo, proto, heads):'
1112 1131 r, output.getvalue() if output else '')
1113 1132
1114 1133 finally:
1115 fp.close()
1116 os.unlink(tempname)
1134 cleanup()
1117 1135
1118 1136 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1119 1137 # handle non-bundle2 case first
@@ -106,15 +106,14 b' class httpv1protocolhandler(object):'
106 106 self._protocaps = set(value.split(' '))
107 107 return self._protocaps
108 108
109 def forwardpayload(self, fp):
109 def getpayload(self):
110 110 # Existing clients *always* send Content-Length.
111 111 length = int(self._req.headers[b'Content-Length'])
112 112
113 113 # If httppostargs is used, we need to read Content-Length
114 114 # minus the amount that was consumed by args.
115 115 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
116 for s in util.filechunkiter(self._req.bodyfh, limit=length):
117 fp.write(s)
116 return util.filechunkiter(self._req.bodyfh, limit=length)
118 117
119 118 @contextlib.contextmanager
120 119 def mayberedirectstdio(self):
@@ -610,7 +609,7 b' class httpv2protocolhandler(object):'
610 609 # Protocol capabilities are currently not implemented for HTTP V2.
611 610 return set()
612 611
613 def forwardpayload(self, fp):
612 def getpayload(self):
614 613 raise NotImplementedError
615 614
616 615 @contextlib.contextmanager
@@ -783,7 +782,7 b' class sshv1protocolhandler(object):'
783 782 def getprotocaps(self):
784 783 return self._protocaps
785 784
786 def forwardpayload(self, fpout):
785 def getpayload(self):
787 786 # We initially send an empty response. This tells the client it is
788 787 # OK to start sending data. If a client sees any other response, it
789 788 # interprets it as an error.
@@ -796,7 +795,7 b' class sshv1protocolhandler(object):'
796 795 # 0\n
797 796 count = int(self._fin.readline())
798 797 while count:
799 fpout.write(self._fin.read(count))
798 yield self._fin.read(count)
800 799 count = int(self._fin.readline())
801 800
802 801 @contextlib.contextmanager
@@ -123,10 +123,11 b' class baseprotocolhandler(zi.Interface):'
123 123 Returns a list of capabilities as declared by the client for
124 124 the current request (or connection for stateful protocol handlers)."""
125 125
126 def forwardpayload(fp):
127 """Read the raw payload and forward to a file.
126 def getpayload():
127 """Provide a generator for the raw payload.
128 128
129 The payload is read in full before the function returns.
129 The caller is responsible for ensuring that the full payload is
130 processed.
130 131 """
131 132
132 133 def mayberedirectstdio():
@@ -23,7 +23,7 b''
23 23 $ echo a >> a
24 24 $ hg ci -mb
25 25 $ req() {
26 > hg serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
26 > hg $1 serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
27 27 > cat hg.pid >> $DAEMON_PIDS
28 28 > hg --cwd ../test2 push http://localhost:$HGPORT/
29 29 > exitstatus=$?
@@ -70,6 +70,58 b' expect success'
70 70 > echo "phase-move: $HG_NODE: $HG_OLDPHASE -> $HG_PHASE"
71 71 > EOF
72 72
73 #if bundle1
74 $ cat >> .hg/hgrc <<EOF
75 > allow_push = *
76 > [hooks]
77 > changegroup = sh -c "printenv.py changegroup 0"
78 > pushkey = sh -c "printenv.py pushkey 0"
79 > txnclose-phase.test = sh $TESTTMP/hook.sh
80 > EOF
81 $ req "--debug --config extensions.blackbox="
82 listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
83 pushing to http://localhost:$HGPORT/
84 searching for changes
85 remote: redirecting incoming bundle to */hg-unbundle-* (glob)
86 remote: adding changesets
87 remote: add changeset ba677d0156c1
88 remote: adding manifests
89 remote: adding file changes
90 remote: adding a revisions
91 remote: added 1 changesets with 1 changes to 1 files
92 remote: updating the branch cache
93 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
94 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
95 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
96 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
97 remote: running hook changegroup: sh -c "printenv.py changegroup 0"
98 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
99 % serve errors
100 $ hg rollback
101 repository tip rolled back to revision 0 (undo serve)
102 $ req "--debug --config server.streamunbundle=True --config extensions.blackbox="
103 listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
104 pushing to http://localhost:$HGPORT/
105 searching for changes
106 remote: adding changesets
107 remote: add changeset ba677d0156c1
108 remote: adding manifests
109 remote: adding file changes
110 remote: adding a revisions
111 remote: added 1 changesets with 1 changes to 1 files
112 remote: updating the branch cache
113 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
114 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
115 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
116 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
117 remote: running hook changegroup: sh -c "printenv.py changegroup 0"
118 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
119 % serve errors
120 $ hg rollback
121 repository tip rolled back to revision 0 (undo serve)
122 #endif
123
124 #if bundle2
73 125 $ cat >> .hg/hgrc <<EOF
74 126 > allow_push = *
75 127 > [hooks]
@@ -86,11 +138,11 b' expect success'
86 138 remote: added 1 changesets with 1 changes to 1 files
87 139 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
88 140 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
89 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle1 !)
90 remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle2 !)
141 remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
91 142 % serve errors
92 143 $ hg rollback
93 144 repository tip rolled back to revision 0 (undo serve)
145 #endif
94 146
95 147 expect success, server lacks the httpheader capability
96 148
General Comments 0
You need to be logged in to leave comments. Login now