##// END OF EJS Templates
wireproto: allow direct stream processing for unbundle...
Joerg Sonnenberger -
r37432:2d965bfe default
parent child Browse files
Show More
@@ -41,7 +41,8 b' def putlfile(repo, proto, sha):'
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42
42
43 try:
43 try:
44 proto.forwardpayload(tmpfp)
44 for p in proto.getpayload():
45 tmpfp.write(p)
45 tmpfp._fp.seek(0)
46 tmpfp._fp.seek(0)
46 if sha != lfutil.hexsha1(tmpfp._fp):
47 if sha != lfutil.hexsha1(tmpfp._fp):
47 raise IOError(0, _('largefile contents do not match hash'))
48 raise IOError(0, _('largefile contents do not match hash'))
@@ -917,6 +917,9 b" coreconfigitem('server', 'concurrent-pus"
917 coreconfigitem('server', 'disablefullbundle',
917 coreconfigitem('server', 'disablefullbundle',
918 default=False,
918 default=False,
919 )
919 )
920 coreconfigitem('server', 'streamunbundle',
921 default=False,
922 )
920 coreconfigitem('server', 'maxhttpheaderlen',
923 coreconfigitem('server', 'maxhttpheaderlen',
921 default=1024,
924 default=1024,
922 )
925 )
@@ -1791,6 +1791,11 b' Controls generic server settings.'
1791 are highly recommended. Partial clones will still be allowed.
1791 are highly recommended. Partial clones will still be allowed.
1792 (default: False)
1792 (default: False)
1793
1793
1794 ``streamunbundle``
1795 When set, servers will apply data sent from the client directly,
1796 otherwise it will be written to a temporary file first. This option
1797 effectively prevents concurrent pushes.
1798
1794 ``concurrent-push-mode``
1799 ``concurrent-push-mode``
1795 Level of allowed race condition between two pushing clients.
1800 Level of allowed race condition between two pushing clients.
1796
1801
@@ -1082,14 +1082,33 b' def unbundle(repo, proto, heads):'
1082 with proto.mayberedirectstdio() as output:
1082 with proto.mayberedirectstdio() as output:
1083 try:
1083 try:
1084 exchange.check_heads(repo, their_heads, 'preparing changes')
1084 exchange.check_heads(repo, their_heads, 'preparing changes')
1085
1085 cleanup = lambda: None
1086 # write bundle data to temporary file because it can be big
1086 try:
1087 payload = proto.getpayload()
1088 if repo.ui.configbool('server', 'streamunbundle'):
1089 def cleanup():
1090 # Ensure that the full payload is consumed, so
1091 # that the connection doesn't contain trailing garbage.
1092 for p in payload:
1093 pass
1094 fp = util.chunkbuffer(payload)
1095 else:
1096 # write bundle data to temporary file as it can be big
1097 fp, tempname = None, None
1098 def cleanup():
1099 if fp:
1100 fp.close()
1101 if tempname:
1102 os.unlink(tempname)
1087 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1103 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1088 fp = os.fdopen(fd, r'wb+')
1104 repo.ui.debug('redirecting incoming bundle to %s\n' %
1105 tempname)
1106 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1089 r = 0
1107 r = 0
1090 try:
1108 for p in payload:
1091 proto.forwardpayload(fp)
1109 fp.write(p)
1092 fp.seek(0)
1110 fp.seek(0)
1111
1093 gen = exchange.readbundle(repo.ui, fp, None)
1112 gen = exchange.readbundle(repo.ui, fp, None)
1094 if (isinstance(gen, changegroupmod.cg1unpacker)
1113 if (isinstance(gen, changegroupmod.cg1unpacker)
1095 and not bundle1allowed(repo, 'push')):
1114 and not bundle1allowed(repo, 'push')):
@@ -1112,8 +1131,7 b' def unbundle(repo, proto, heads):'
1112 r, output.getvalue() if output else '')
1131 r, output.getvalue() if output else '')
1113
1132
1114 finally:
1133 finally:
1115 fp.close()
1134 cleanup()
1116 os.unlink(tempname)
1117
1135
1118 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1136 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1119 # handle non-bundle2 case first
1137 # handle non-bundle2 case first
@@ -106,15 +106,14 b' class httpv1protocolhandler(object):'
106 self._protocaps = set(value.split(' '))
106 self._protocaps = set(value.split(' '))
107 return self._protocaps
107 return self._protocaps
108
108
109 def forwardpayload(self, fp):
109 def getpayload(self):
110 # Existing clients *always* send Content-Length.
110 # Existing clients *always* send Content-Length.
111 length = int(self._req.headers[b'Content-Length'])
111 length = int(self._req.headers[b'Content-Length'])
112
112
113 # If httppostargs is used, we need to read Content-Length
113 # If httppostargs is used, we need to read Content-Length
114 # minus the amount that was consumed by args.
114 # minus the amount that was consumed by args.
115 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
115 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
116 for s in util.filechunkiter(self._req.bodyfh, limit=length):
116 return util.filechunkiter(self._req.bodyfh, limit=length)
117 fp.write(s)
118
117
119 @contextlib.contextmanager
118 @contextlib.contextmanager
120 def mayberedirectstdio(self):
119 def mayberedirectstdio(self):
@@ -610,7 +609,7 b' class httpv2protocolhandler(object):'
610 # Protocol capabilities are currently not implemented for HTTP V2.
609 # Protocol capabilities are currently not implemented for HTTP V2.
611 return set()
610 return set()
612
611
613 def forwardpayload(self, fp):
612 def getpayload(self):
614 raise NotImplementedError
613 raise NotImplementedError
615
614
616 @contextlib.contextmanager
615 @contextlib.contextmanager
@@ -783,7 +782,7 b' class sshv1protocolhandler(object):'
783 def getprotocaps(self):
782 def getprotocaps(self):
784 return self._protocaps
783 return self._protocaps
785
784
786 def forwardpayload(self, fpout):
785 def getpayload(self):
787 # We initially send an empty response. This tells the client it is
786 # We initially send an empty response. This tells the client it is
788 # OK to start sending data. If a client sees any other response, it
787 # OK to start sending data. If a client sees any other response, it
789 # interprets it as an error.
788 # interprets it as an error.
@@ -796,7 +795,7 b' class sshv1protocolhandler(object):'
796 # 0\n
795 # 0\n
797 count = int(self._fin.readline())
796 count = int(self._fin.readline())
798 while count:
797 while count:
799 fpout.write(self._fin.read(count))
798 yield self._fin.read(count)
800 count = int(self._fin.readline())
799 count = int(self._fin.readline())
801
800
802 @contextlib.contextmanager
801 @contextlib.contextmanager
@@ -123,10 +123,11 b' class baseprotocolhandler(zi.Interface):'
123 Returns a list of capabilities as declared by the client for
123 Returns a list of capabilities as declared by the client for
124 the current request (or connection for stateful protocol handlers)."""
124 the current request (or connection for stateful protocol handlers)."""
125
125
126 def forwardpayload(fp):
126 def getpayload():
127 """Read the raw payload and forward to a file.
127 """Provide a generator for the raw payload.
128
128
129 The payload is read in full before the function returns.
129 The caller is responsible for ensuring that the full payload is
130 processed.
130 """
131 """
131
132
132 def mayberedirectstdio():
133 def mayberedirectstdio():
@@ -23,7 +23,7 b''
23 $ echo a >> a
23 $ echo a >> a
24 $ hg ci -mb
24 $ hg ci -mb
25 $ req() {
25 $ req() {
26 > hg serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
26 > hg $1 serve -p $HGPORT -d --pid-file=hg.pid -E errors.log
27 > cat hg.pid >> $DAEMON_PIDS
27 > cat hg.pid >> $DAEMON_PIDS
28 > hg --cwd ../test2 push http://localhost:$HGPORT/
28 > hg --cwd ../test2 push http://localhost:$HGPORT/
29 > exitstatus=$?
29 > exitstatus=$?
@@ -70,6 +70,58 b' expect success'
70 > echo "phase-move: $HG_NODE: $HG_OLDPHASE -> $HG_PHASE"
70 > echo "phase-move: $HG_NODE: $HG_OLDPHASE -> $HG_PHASE"
71 > EOF
71 > EOF
72
72
73 #if bundle1
74 $ cat >> .hg/hgrc <<EOF
75 > allow_push = *
76 > [hooks]
77 > changegroup = sh -c "printenv.py changegroup 0"
78 > pushkey = sh -c "printenv.py pushkey 0"
79 > txnclose-phase.test = sh $TESTTMP/hook.sh
80 > EOF
81 $ req "--debug --config extensions.blackbox="
82 listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
83 pushing to http://localhost:$HGPORT/
84 searching for changes
85 remote: redirecting incoming bundle to */hg-unbundle-* (glob)
86 remote: adding changesets
87 remote: add changeset ba677d0156c1
88 remote: adding manifests
89 remote: adding file changes
90 remote: adding a revisions
91 remote: added 1 changesets with 1 changes to 1 files
92 remote: updating the branch cache
93 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
94 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
95 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
96 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
97 remote: running hook changegroup: sh -c "printenv.py changegroup 0"
98 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
99 % serve errors
100 $ hg rollback
101 repository tip rolled back to revision 0 (undo serve)
102 $ req "--debug --config server.streamunbundle=True --config extensions.blackbox="
103 listening at http://localhost:$HGPORT/ (bound to $LOCALIP:$HGPORT)
104 pushing to http://localhost:$HGPORT/
105 searching for changes
106 remote: adding changesets
107 remote: add changeset ba677d0156c1
108 remote: adding manifests
109 remote: adding file changes
110 remote: adding a revisions
111 remote: added 1 changesets with 1 changes to 1 files
112 remote: updating the branch cache
113 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
114 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
115 remote: running hook txnclose-phase.test: sh $TESTTMP/hook.sh
116 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
117 remote: running hook changegroup: sh -c "printenv.py changegroup 0"
118 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
119 % serve errors
120 $ hg rollback
121 repository tip rolled back to revision 0 (undo serve)
122 #endif
123
124 #if bundle2
73 $ cat >> .hg/hgrc <<EOF
125 $ cat >> .hg/hgrc <<EOF
74 > allow_push = *
126 > allow_push = *
75 > [hooks]
127 > [hooks]
@@ -86,11 +138,11 b' expect success'
86 remote: added 1 changesets with 1 changes to 1 files
138 remote: added 1 changesets with 1 changes to 1 files
87 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
139 remote: phase-move: cb9a9f314b8b07ba71012fcdbc544b5a4d82ff5b: draft -> public
88 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
140 remote: phase-move: ba677d0156c1196c1a699fa53f390dcfc3ce3872: -> public
89 remote: changegroup hook: HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle1 !)
141 remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob)
90 remote: changegroup hook: HG_BUNDLE2=1 HG_HOOKNAME=changegroup HG_HOOKTYPE=changegroup HG_NODE=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_NODE_LAST=ba677d0156c1196c1a699fa53f390dcfc3ce3872 HG_SOURCE=serve HG_TXNID=TXN:$ID$ HG_URL=remote:http:$LOCALIP: (glob) (bundle2 !)
91 % serve errors
142 % serve errors
92 $ hg rollback
143 $ hg rollback
93 repository tip rolled back to revision 0 (undo serve)
144 repository tip rolled back to revision 0 (undo serve)
145 #endif
94
146
95 expect success, server lacks the httpheader capability
147 expect success, server lacks the httpheader capability
96
148
General Comments 0
You need to be logged in to leave comments. Login now