##// END OF EJS Templates
wireproto: make iterbatcher behave streamily over http(s)...
Augie Fackler -
r28438:48fd02da default
parent child Browse files
Show More
@@ -231,6 +231,31 b' class sshpeer(wireproto.wirepeer):'
231
231
232 __del__ = cleanup
232 __del__ = cleanup
233
233
234 def _submitbatch(self, req):
235 cmds = []
236 for op, argsdict in req:
237 args = ','.join('%s=%s' % (wireproto.escapearg(k),
238 wireproto.escapearg(v))
239 for k, v in argsdict.iteritems())
240 cmds.append('%s %s' % (op, args))
241 rsp = self._callstream("batch", cmds=';'.join(cmds))
242 available = self._getamount()
243 # TODO this response parsing is probably suboptimal for large
244 # batches with large responses.
245 toread = min(available, 1024)
246 work = rsp.read(toread)
247 available -= toread
248 chunk = work
249 while chunk:
250 while ';' in work:
251 one, work = work.split(';', 1)
252 yield wireproto.unescapearg(one)
253 toread = min(available, 1024)
254 chunk = rsp.read(toread)
255 available -= toread
256 work += chunk
257 yield wireproto.unescapearg(work)
258
234 def _callstream(self, cmd, **args):
259 def _callstream(self, cmd, **args):
235 self.ui.debug("sending %s command\n" % cmd)
260 self.ui.debug("sending %s command\n" % cmd)
236 self.pipeo.write("%s\n" % cmd)
261 self.pipeo.write("%s\n" % cmd)
@@ -291,7 +316,7 b' class sshpeer(wireproto.wirepeer):'
291 self._send("", flush=True)
316 self._send("", flush=True)
292 return self.pipei
317 return self.pipei
293
318
294 def _recv(self):
319 def _getamount(self):
295 l = self.pipei.readline()
320 l = self.pipei.readline()
296 if l == '\n':
321 if l == '\n':
297 self.readerr()
322 self.readerr()
@@ -299,10 +324,12 b' class sshpeer(wireproto.wirepeer):'
299 self._abort(error.OutOfBandError(hint=msg))
324 self._abort(error.OutOfBandError(hint=msg))
300 self.readerr()
325 self.readerr()
301 try:
326 try:
302 l = int(l)
327 return int(l)
303 except ValueError:
328 except ValueError:
304 self._abort(error.ResponseError(_("unexpected response:"), l))
329 self._abort(error.ResponseError(_("unexpected response:"), l))
305 return self.pipei.read(l)
330
331 def _recv(self):
332 return self.pipei.read(self._getamount())
306
333
307 def _send(self, data, flush=False):
334 def _send(self, data, flush=False):
308 self.pipeo.write("%d\n" % len(data))
335 self.pipeo.write("%d\n" % len(data))
@@ -7,6 +7,7 b''
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import itertools
10 import os
11 import os
11 import sys
12 import sys
12 import tempfile
13 import tempfile
@@ -119,19 +120,35 b' class remoteiterbatcher(peer.iterbatcher'
119 super(remoteiterbatcher, self).__init__()
120 super(remoteiterbatcher, self).__init__()
120 self._remote = remote
121 self._remote = remote
121
122
123 def __getattr__(self, name):
124 if not getattr(self._remote, name, False):
125 raise AttributeError(
126 'Attempted to iterbatch non-batchable call to %r' % name)
127 return super(remoteiterbatcher, self).__getattr__(name)
128
122 def submit(self):
129 def submit(self):
123 """Break the batch request into many patch calls and pipeline them.
130 """Break the batch request into many patch calls and pipeline them.
124
131
125 This is mostly valuable over http where request sizes can be
132 This is mostly valuable over http where request sizes can be
126 limited, but can be used in other places as well.
133 limited, but can be used in other places as well.
127 """
134 """
128 rb = self._remote.batch()
135 req, rsp = [], []
129 rb.calls = self.calls
136 for name, args, opts, resref in self.calls:
130 rb.submit()
137 mtd = getattr(self._remote, name)
138 batchable = mtd.batchable(mtd.im_self, *args, **opts)
139 encargsorres, encresref = batchable.next()
140 assert encresref
141 req.append((name, encargsorres))
142 rsp.append((batchable, encresref))
143 if req:
144 self._resultiter = self._remote._submitbatch(req)
145 self._rsp = rsp
131
146
132 def results(self):
147 def results(self):
133 for name, args, opts, resref in self.calls:
148 for (batchable, encresref), encres in itertools.izip(
134 yield resref.value
149 self._rsp, self._resultiter):
150 encresref.set(encres)
151 yield batchable.next()
135
152
136 # Forward a couple of names from peer to make wireproto interactions
153 # Forward a couple of names from peer to make wireproto interactions
137 # slightly more sensible.
154 # slightly more sensible.
@@ -202,13 +219,28 b' class wirepeer(peer.peerrepository):'
202 else:
219 else:
203 return peer.localbatch(self)
220 return peer.localbatch(self)
204 def _submitbatch(self, req):
221 def _submitbatch(self, req):
222 """run batch request <req> on the server
223
224 Returns an iterator of the raw responses from the server.
225 """
205 cmds = []
226 cmds = []
206 for op, argsdict in req:
227 for op, argsdict in req:
207 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
228 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
208 for k, v in argsdict.iteritems())
229 for k, v in argsdict.iteritems())
209 cmds.append('%s %s' % (op, args))
230 cmds.append('%s %s' % (op, args))
210 rsp = self._call("batch", cmds=';'.join(cmds))
231 rsp = self._callstream("batch", cmds=';'.join(cmds))
211 return [unescapearg(r) for r in rsp.split(';')]
232 # TODO this response parsing is probably suboptimal for large
233 # batches with large responses.
234 work = rsp.read(1024)
235 chunk = work
236 while chunk:
237 while ';' in work:
238 one, work = work.split(';', 1)
239 yield unescapearg(one)
240 chunk = rsp.read(1024)
241 work += chunk
242 yield unescapearg(work)
243
212 def _submitone(self, op, args):
244 def _submitone(self, op, args):
213 return self._call(op, **args)
245 return self._call(op, **args)
214
246
@@ -1,5 +1,7 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import StringIO
4
3 from mercurial import wireproto
5 from mercurial import wireproto
4
6
5 class proto(object):
7 class proto(object):
@@ -21,6 +23,9 b' class clientpeer(wireproto.wirepeer):'
21 def _call(self, cmd, **args):
23 def _call(self, cmd, **args):
22 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
24 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
23
25
26 def _callstream(self, cmd, **args):
27 return StringIO.StringIO(self._call(cmd, **args))
28
24 @wireproto.batchable
29 @wireproto.batchable
25 def greet(self, name):
30 def greet(self, name):
26 f = wireproto.future()
31 f = wireproto.future()
General Comments 0
You need to be logged in to leave comments. Login now