##// END OF EJS Templates
wireproto: implement batching on peer executor interface...
Gregory Szorc -
r37649:2f626233 default
parent child Browse files
Show More
@@ -155,11 +155,14 b' def findcommonheads(ui, local, remote,'
155 155 sample = _limitsample(ownheads, initialsamplesize)
156 156 # indices between sample and externalized version must match
157 157 sample = list(sample)
158 batch = remote.iterbatch()
159 batch.heads()
160 batch.known(dag.externalizeall(sample))
161 batch.submit()
162 srvheadhashes, yesno = batch.results()
158
159 with remote.commandexecutor() as e:
160 fheads = e.callcommand('heads', {})
161 fknown = e.callcommand('known', {
162 'nodes': dag.externalizeall(sample),
163 })
164
165 srvheadhashes, yesno = fheads.result(), fknown.result()
163 166
164 167 if cl.tip() == nullid:
165 168 if srvheadhashes != [nullid]:
@@ -9,6 +9,7 b' from __future__ import absolute_import'
9 9
10 10 import hashlib
11 11 import sys
12 import weakref
12 13
13 14 from .i18n import _
14 15 from .node import (
@@ -180,6 +181,26 b' def encodebatchcmds(req):'
180 181
181 182 return ';'.join(cmds)
182 183
184 class unsentfuture(pycompat.futures.Future):
185 """A Future variation to represent an unsent command.
186
187 Because we buffer commands and don't submit them immediately, calling
188 ``result()`` on an unsent future could deadlock. Futures for buffered
189 commands are represented by this type, which wraps ``result()`` to
190 call ``sendcommands()``.
191 """
192
193 def result(self, timeout=None):
194 if self.done():
195 return pycompat.futures.Future.result(self, timeout)
196
197 self._peerexecutor.sendcommands()
198
199 # This looks like it will infinitely recurse. However,
200 # sendcommands() should modify __class__. This call serves as a check
201 # on that.
202 return self.result(timeout)
203
183 204 @zi.implementer(repository.ipeercommandexecutor)
184 205 class peerexecutor(object):
185 206 def __init__(self, peer):
@@ -187,6 +208,9 b' class peerexecutor(object):'
187 208 self._sent = False
188 209 self._closed = False
189 210 self._calls = []
211 self._futures = weakref.WeakSet()
212 self._responseexecutor = None
213 self._responsef = None
190 214
191 215 def __enter__(self):
192 216 return self
@@ -214,20 +238,35 b' class peerexecutor(object):'
214 238 # Commands are either batchable or they aren't. If a command
215 239 # isn't batchable, we send it immediately because the executor
216 240 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later.
241 # If a command is batchable, we queue it for later. But we have
242 # to account for the case of a non-batchable command arriving after
243 # a batchable one and refuse to service it.
244
245 def addcall():
246 f = pycompat.futures.Future()
247 self._futures.add(f)
248 self._calls.append((command, args, fn, f))
249 return f
218 250
219 251 if getattr(fn, 'batchable', False):
220 pass
252 f = addcall()
253
254 # But since we don't issue it immediately, we wrap its result()
255 # to trigger sending so we avoid deadlocks.
256 f.__class__ = unsentfuture
257 f._peerexecutor = self
221 258 else:
222 259 if self._calls:
223 260 raise error.ProgrammingError(
224 261 '%s is not batchable and cannot be called on a command '
225 262 'executor along with other commands' % command)
226 263
227 # We don't support batching yet. So resolve it immediately.
228 f = pycompat.futures.Future()
229 self._calls.append((command, args, fn, f))
230 self.sendcommands()
264 f = addcall()
265
266 # Non-batchable commands can never coexist with another command
267 # in this executor. So send the command immediately.
268 self.sendcommands()
269
231 270 return f
232 271
233 272 def sendcommands(self):
@@ -239,10 +278,18 b' class peerexecutor(object):'
239 278
240 279 self._sent = True
241 280
281 # Unhack any future types so caller seens a clean type and to break
282 # cycle between us and futures.
283 for f in self._futures:
284 if isinstance(f, unsentfuture):
285 f.__class__ = pycompat.futures.Future
286 f._peerexecutor = None
287
242 288 calls = self._calls
243 289 # Mainly to destroy references to futures.
244 290 self._calls = None
245 291
292 # Simple case of a single command. We call it synchronously.
246 293 if len(calls) == 1:
247 294 command, args, fn, f = calls[0]
248 295
@@ -259,14 +306,99 b' class peerexecutor(object):'
259 306
260 307 return
261 308
262 raise error.ProgrammingError('support for multiple commands not '
263 'yet implemented')
309 # Batch commands are a bit harder. First, we have to deal with the
310 # @batchable coroutine. That's a bit annoying. Furthermore, we also
311 # need to preserve streaming. i.e. it should be possible for the
312 # futures to resolve as data is coming in off the wire without having
313 # to wait for the final byte of the final response. We do this by
314 # spinning up a thread to read the responses.
315
316 requests = []
317 states = []
318
319 for command, args, fn, f in calls:
320 # Future was cancelled. Ignore it.
321 if not f.set_running_or_notify_cancel():
322 continue
323
324 try:
325 batchable = fn.batchable(fn.__self__,
326 **pycompat.strkwargs(args))
327 except Exception:
328 f.set_exception_info(*sys.exc_info()[1:])
329 return
330
331 # Encoded arguments and future holding remote result.
332 try:
333 encodedargs, fremote = next(batchable)
334 except Exception:
335 f.set_exception_info(*sys.exc_info()[1:])
336 return
337
338 requests.append((command, encodedargs))
339 states.append((command, f, batchable, fremote))
340
341 if not requests:
342 return
343
344 # This will emit responses in order they were executed.
345 wireresults = self._peer._submitbatch(requests)
346
347 # The use of a thread pool executor here is a bit weird for something
348 # that only spins up a single thread. However, thread management is
349 # hard and it is easy to encounter race conditions, deadlocks, etc.
350 # concurrent.futures already solves these problems and its thread pool
351 # executor has minimal overhead. So we use it.
352 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
353 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
354 states, wireresults)
264 355
265 356 def close(self):
266 357 self.sendcommands()
267 358
359 if self._closed:
360 return
361
268 362 self._closed = True
269 363
364 if not self._responsef:
365 return
366
367 # We need to wait on our in-flight response and then shut down the
368 # executor once we have a result.
369 try:
370 self._responsef.result()
371 finally:
372 self._responseexecutor.shutdown(wait=True)
373 self._responsef = None
374 self._responseexecutor = None
375
376 # If any of our futures are still in progress, mark them as
377 # errored. Otherwise a result() could wait indefinitely.
378 for f in self._futures:
379 if not f.done():
380 f.set_exception(error.ResponseError(
381 _('unfulfilled batch command response')))
382
383 self._futures = None
384
385 def _readbatchresponse(self, states, wireresults):
386 # Executes in a thread to read data off the wire.
387
388 for command, f, batchable, fremote in states:
389 # Grab raw result off the wire and teach the internal future
390 # about it.
391 remoteresult = next(wireresults)
392 fremote.set(remoteresult)
393
394 # And ask the coroutine to decode that value.
395 try:
396 result = next(batchable)
397 except Exception:
398 f.set_exception_info(*sys.exc_info()[1:])
399 else:
400 f.set_result(result)
401
270 402 class wirepeer(repository.legacypeer):
271 403 """Client-side interface for communicating with a peer repository.
272 404
General Comments 0
You need to be logged in to leave comments. Login now