Show More
@@ -155,11 +155,14 b' def findcommonheads(ui, local, remote,' | |||
|
155 | 155 | sample = _limitsample(ownheads, initialsamplesize) |
|
156 | 156 | # indices between sample and externalized version must match |
|
157 | 157 | sample = list(sample) |
|
158 | batch = remote.iterbatch() | |
|
159 | batch.heads() | |
|
160 | batch.known(dag.externalizeall(sample)) | |
|
161 | batch.submit() | |
|
162 | srvheadhashes, yesno = batch.results() | |
|
158 | ||
|
159 | with remote.commandexecutor() as e: | |
|
160 | fheads = e.callcommand('heads', {}) | |
|
161 | fknown = e.callcommand('known', { | |
|
162 | 'nodes': dag.externalizeall(sample), | |
|
163 | }) | |
|
164 | ||
|
165 | srvheadhashes, yesno = fheads.result(), fknown.result() | |
|
163 | 166 | |
|
164 | 167 | if cl.tip() == nullid: |
|
165 | 168 | if srvheadhashes != [nullid]: |
@@ -9,6 +9,7 b' from __future__ import absolute_import' | |||
|
9 | 9 | |
|
10 | 10 | import hashlib |
|
11 | 11 | import sys |
|
12 | import weakref | |
|
12 | 13 | |
|
13 | 14 | from .i18n import _ |
|
14 | 15 | from .node import ( |
@@ -180,6 +181,26 b' def encodebatchcmds(req):' | |||
|
180 | 181 | |
|
181 | 182 | return ';'.join(cmds) |
|
182 | 183 | |
|
184 | class unsentfuture(pycompat.futures.Future): | |
|
185 | """A Future variation to represent an unsent command. | |
|
186 | ||
|
187 | Because we buffer commands and don't submit them immediately, calling | |
|
188 | ``result()`` on an unsent future could deadlock. Futures for buffered | |
|
189 | commands are represented by this type, which wraps ``result()`` to | |
|
190 | call ``sendcommands()``. | |
|
191 | """ | |
|
192 | ||
|
193 | def result(self, timeout=None): | |
|
194 | if self.done(): | |
|
195 | return pycompat.futures.Future.result(self, timeout) | |
|
196 | ||
|
197 | self._peerexecutor.sendcommands() | |
|
198 | ||
|
199 | # This looks like it will infinitely recurse. However, | |
|
200 | # sendcommands() should modify __class__. This call serves as a check | |
|
201 | # on that. | |
|
202 | return self.result(timeout) | |
|
203 | ||
|
183 | 204 | @zi.implementer(repository.ipeercommandexecutor) |
|
184 | 205 | class peerexecutor(object): |
|
185 | 206 | def __init__(self, peer): |
@@ -187,6 +208,9 b' class peerexecutor(object):' | |||
|
187 | 208 | self._sent = False |
|
188 | 209 | self._closed = False |
|
189 | 210 | self._calls = [] |
|
211 | self._futures = weakref.WeakSet() | |
|
212 | self._responseexecutor = None | |
|
213 | self._responsef = None | |
|
190 | 214 | |
|
191 | 215 | def __enter__(self): |
|
192 | 216 | return self |
@@ -214,20 +238,35 b' class peerexecutor(object):' | |||
|
214 | 238 | # Commands are either batchable or they aren't. If a command |
|
215 | 239 | # isn't batchable, we send it immediately because the executor |
|
216 | 240 | # can no longer accept new commands after a non-batchable command. |
|
217 | # If a command is batchable, we queue it for later. | |
|
241 | # If a command is batchable, we queue it for later. But we have | |
|
242 | # to account for the case of a non-batchable command arriving after | |
|
243 | # a batchable one and refuse to service it. | |
|
244 | ||
|
245 | def addcall(): | |
|
246 | f = pycompat.futures.Future() | |
|
247 | self._futures.add(f) | |
|
248 | self._calls.append((command, args, fn, f)) | |
|
249 | return f | |
|
218 | 250 | |
|
219 | 251 | if getattr(fn, 'batchable', False): |
|
220 |
|
|
|
252 | f = addcall() | |
|
253 | ||
|
254 | # But since we don't issue it immediately, we wrap its result() | |
|
255 | # to trigger sending so we avoid deadlocks. | |
|
256 | f.__class__ = unsentfuture | |
|
257 | f._peerexecutor = self | |
|
221 | 258 | else: |
|
222 | 259 | if self._calls: |
|
223 | 260 | raise error.ProgrammingError( |
|
224 | 261 | '%s is not batchable and cannot be called on a command ' |
|
225 | 262 | 'executor along with other commands' % command) |
|
226 | 263 | |
|
227 | # We don't support batching yet. So resolve it immediately. | |
|
228 | f = pycompat.futures.Future() | |
|
229 | self._calls.append((command, args, fn, f)) | |
|
230 | self.sendcommands() | |
|
264 | f = addcall() | |
|
265 | ||
|
266 | # Non-batchable commands can never coexist with another command | |
|
267 | # in this executor. So send the command immediately. | |
|
268 | self.sendcommands() | |
|
269 | ||
|
231 | 270 | return f |
|
232 | 271 | |
|
233 | 272 | def sendcommands(self): |
@@ -239,10 +278,18 b' class peerexecutor(object):' | |||
|
239 | 278 | |
|
240 | 279 | self._sent = True |
|
241 | 280 | |
|
281 | # Unhack any future types so caller seens a clean type and to break | |
|
282 | # cycle between us and futures. | |
|
283 | for f in self._futures: | |
|
284 | if isinstance(f, unsentfuture): | |
|
285 | f.__class__ = pycompat.futures.Future | |
|
286 | f._peerexecutor = None | |
|
287 | ||
|
242 | 288 | calls = self._calls |
|
243 | 289 | # Mainly to destroy references to futures. |
|
244 | 290 | self._calls = None |
|
245 | 291 | |
|
292 | # Simple case of a single command. We call it synchronously. | |
|
246 | 293 | if len(calls) == 1: |
|
247 | 294 | command, args, fn, f = calls[0] |
|
248 | 295 | |
@@ -259,14 +306,99 b' class peerexecutor(object):' | |||
|
259 | 306 | |
|
260 | 307 | return |
|
261 | 308 | |
|
262 | raise error.ProgrammingError('support for multiple commands not ' | |
|
263 | 'yet implemented') | |
|
309 | # Batch commands are a bit harder. First, we have to deal with the | |
|
310 | # @batchable coroutine. That's a bit annoying. Furthermore, we also | |
|
311 | # need to preserve streaming. i.e. it should be possible for the | |
|
312 | # futures to resolve as data is coming in off the wire without having | |
|
313 | # to wait for the final byte of the final response. We do this by | |
|
314 | # spinning up a thread to read the responses. | |
|
315 | ||
|
316 | requests = [] | |
|
317 | states = [] | |
|
318 | ||
|
319 | for command, args, fn, f in calls: | |
|
320 | # Future was cancelled. Ignore it. | |
|
321 | if not f.set_running_or_notify_cancel(): | |
|
322 | continue | |
|
323 | ||
|
324 | try: | |
|
325 | batchable = fn.batchable(fn.__self__, | |
|
326 | **pycompat.strkwargs(args)) | |
|
327 | except Exception: | |
|
328 | f.set_exception_info(*sys.exc_info()[1:]) | |
|
329 | return | |
|
330 | ||
|
331 | # Encoded arguments and future holding remote result. | |
|
332 | try: | |
|
333 | encodedargs, fremote = next(batchable) | |
|
334 | except Exception: | |
|
335 | f.set_exception_info(*sys.exc_info()[1:]) | |
|
336 | return | |
|
337 | ||
|
338 | requests.append((command, encodedargs)) | |
|
339 | states.append((command, f, batchable, fremote)) | |
|
340 | ||
|
341 | if not requests: | |
|
342 | return | |
|
343 | ||
|
344 | # This will emit responses in order they were executed. | |
|
345 | wireresults = self._peer._submitbatch(requests) | |
|
346 | ||
|
347 | # The use of a thread pool executor here is a bit weird for something | |
|
348 | # that only spins up a single thread. However, thread management is | |
|
349 | # hard and it is easy to encounter race conditions, deadlocks, etc. | |
|
350 | # concurrent.futures already solves these problems and its thread pool | |
|
351 | # executor has minimal overhead. So we use it. | |
|
352 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
|
353 | self._responsef = self._responseexecutor.submit(self._readbatchresponse, | |
|
354 | states, wireresults) | |
|
264 | 355 | |
|
265 | 356 | def close(self): |
|
266 | 357 | self.sendcommands() |
|
267 | 358 | |
|
359 | if self._closed: | |
|
360 | return | |
|
361 | ||
|
268 | 362 | self._closed = True |
|
269 | 363 | |
|
364 | if not self._responsef: | |
|
365 | return | |
|
366 | ||
|
367 | # We need to wait on our in-flight response and then shut down the | |
|
368 | # executor once we have a result. | |
|
369 | try: | |
|
370 | self._responsef.result() | |
|
371 | finally: | |
|
372 | self._responseexecutor.shutdown(wait=True) | |
|
373 | self._responsef = None | |
|
374 | self._responseexecutor = None | |
|
375 | ||
|
376 | # If any of our futures are still in progress, mark them as | |
|
377 | # errored. Otherwise a result() could wait indefinitely. | |
|
378 | for f in self._futures: | |
|
379 | if not f.done(): | |
|
380 | f.set_exception(error.ResponseError( | |
|
381 | _('unfulfilled batch command response'))) | |
|
382 | ||
|
383 | self._futures = None | |
|
384 | ||
|
385 | def _readbatchresponse(self, states, wireresults): | |
|
386 | # Executes in a thread to read data off the wire. | |
|
387 | ||
|
388 | for command, f, batchable, fremote in states: | |
|
389 | # Grab raw result off the wire and teach the internal future | |
|
390 | # about it. | |
|
391 | remoteresult = next(wireresults) | |
|
392 | fremote.set(remoteresult) | |
|
393 | ||
|
394 | # And ask the coroutine to decode that value. | |
|
395 | try: | |
|
396 | result = next(batchable) | |
|
397 | except Exception: | |
|
398 | f.set_exception_info(*sys.exc_info()[1:]) | |
|
399 | else: | |
|
400 | f.set_result(result) | |
|
401 | ||
|
270 | 402 | class wirepeer(repository.legacypeer): |
|
271 | 403 | """Client-side interface for communicating with a peer repository. |
|
272 | 404 |
General Comments 0
You need to be logged in to leave comments.
Login now