Show More
@@ -155,11 +155,14 b' def findcommonheads(ui, local, remote,' | |||||
155 | sample = _limitsample(ownheads, initialsamplesize) |
|
155 | sample = _limitsample(ownheads, initialsamplesize) | |
156 | # indices between sample and externalized version must match |
|
156 | # indices between sample and externalized version must match | |
157 | sample = list(sample) |
|
157 | sample = list(sample) | |
158 | batch = remote.iterbatch() |
|
158 | ||
159 | batch.heads() |
|
159 | with remote.commandexecutor() as e: | |
160 | batch.known(dag.externalizeall(sample)) |
|
160 | fheads = e.callcommand('heads', {}) | |
161 | batch.submit() |
|
161 | fknown = e.callcommand('known', { | |
162 | srvheadhashes, yesno = batch.results() |
|
162 | 'nodes': dag.externalizeall(sample), | |
|
163 | }) | |||
|
164 | ||||
|
165 | srvheadhashes, yesno = fheads.result(), fknown.result() | |||
163 |
|
166 | |||
164 | if cl.tip() == nullid: |
|
167 | if cl.tip() == nullid: | |
165 | if srvheadhashes != [nullid]: |
|
168 | if srvheadhashes != [nullid]: |
@@ -9,6 +9,7 b' from __future__ import absolute_import' | |||||
9 |
|
9 | |||
10 | import hashlib |
|
10 | import hashlib | |
11 | import sys |
|
11 | import sys | |
|
12 | import weakref | |||
12 |
|
13 | |||
13 | from .i18n import _ |
|
14 | from .i18n import _ | |
14 | from .node import ( |
|
15 | from .node import ( | |
@@ -180,6 +181,26 b' def encodebatchcmds(req):' | |||||
180 |
|
181 | |||
181 | return ';'.join(cmds) |
|
182 | return ';'.join(cmds) | |
182 |
|
183 | |||
|
184 | class unsentfuture(pycompat.futures.Future): | |||
|
185 | """A Future variation to represent an unsent command. | |||
|
186 | ||||
|
187 | Because we buffer commands and don't submit them immediately, calling | |||
|
188 | ``result()`` on an unsent future could deadlock. Futures for buffered | |||
|
189 | commands are represented by this type, which wraps ``result()`` to | |||
|
190 | call ``sendcommands()``. | |||
|
191 | """ | |||
|
192 | ||||
|
193 | def result(self, timeout=None): | |||
|
194 | if self.done(): | |||
|
195 | return pycompat.futures.Future.result(self, timeout) | |||
|
196 | ||||
|
197 | self._peerexecutor.sendcommands() | |||
|
198 | ||||
|
199 | # This looks like it will infinitely recurse. However, | |||
|
200 | # sendcommands() should modify __class__. This call serves as a check | |||
|
201 | # on that. | |||
|
202 | return self.result(timeout) | |||
|
203 | ||||
183 | @zi.implementer(repository.ipeercommandexecutor) |
|
204 | @zi.implementer(repository.ipeercommandexecutor) | |
184 | class peerexecutor(object): |
|
205 | class peerexecutor(object): | |
185 | def __init__(self, peer): |
|
206 | def __init__(self, peer): | |
@@ -187,6 +208,9 b' class peerexecutor(object):' | |||||
187 | self._sent = False |
|
208 | self._sent = False | |
188 | self._closed = False |
|
209 | self._closed = False | |
189 | self._calls = [] |
|
210 | self._calls = [] | |
|
211 | self._futures = weakref.WeakSet() | |||
|
212 | self._responseexecutor = None | |||
|
213 | self._responsef = None | |||
190 |
|
214 | |||
191 | def __enter__(self): |
|
215 | def __enter__(self): | |
192 | return self |
|
216 | return self | |
@@ -214,20 +238,35 b' class peerexecutor(object):' | |||||
214 | # Commands are either batchable or they aren't. If a command |
|
238 | # Commands are either batchable or they aren't. If a command | |
215 | # isn't batchable, we send it immediately because the executor |
|
239 | # isn't batchable, we send it immediately because the executor | |
216 | # can no longer accept new commands after a non-batchable command. |
|
240 | # can no longer accept new commands after a non-batchable command. | |
217 | # If a command is batchable, we queue it for later. |
|
241 | # If a command is batchable, we queue it for later. But we have | |
|
242 | # to account for the case of a non-batchable command arriving after | |||
|
243 | # a batchable one and refuse to service it. | |||
|
244 | ||||
|
245 | def addcall(): | |||
|
246 | f = pycompat.futures.Future() | |||
|
247 | self._futures.add(f) | |||
|
248 | self._calls.append((command, args, fn, f)) | |||
|
249 | return f | |||
218 |
|
250 | |||
219 | if getattr(fn, 'batchable', False): |
|
251 | if getattr(fn, 'batchable', False): | |
220 |
|
|
252 | f = addcall() | |
|
253 | ||||
|
254 | # But since we don't issue it immediately, we wrap its result() | |||
|
255 | # to trigger sending so we avoid deadlocks. | |||
|
256 | f.__class__ = unsentfuture | |||
|
257 | f._peerexecutor = self | |||
221 | else: |
|
258 | else: | |
222 | if self._calls: |
|
259 | if self._calls: | |
223 | raise error.ProgrammingError( |
|
260 | raise error.ProgrammingError( | |
224 | '%s is not batchable and cannot be called on a command ' |
|
261 | '%s is not batchable and cannot be called on a command ' | |
225 | 'executor along with other commands' % command) |
|
262 | 'executor along with other commands' % command) | |
226 |
|
263 | |||
227 | # We don't support batching yet. So resolve it immediately. |
|
264 | f = addcall() | |
228 | f = pycompat.futures.Future() |
|
265 | ||
229 | self._calls.append((command, args, fn, f)) |
|
266 | # Non-batchable commands can never coexist with another command | |
|
267 | # in this executor. So send the command immediately. | |||
230 | self.sendcommands() |
|
268 | self.sendcommands() | |
|
269 | ||||
231 | return f |
|
270 | return f | |
232 |
|
271 | |||
233 | def sendcommands(self): |
|
272 | def sendcommands(self): | |
@@ -239,10 +278,18 b' class peerexecutor(object):' | |||||
239 |
|
278 | |||
240 | self._sent = True |
|
279 | self._sent = True | |
241 |
|
280 | |||
|
281 | # Unhack any future types so caller seens a clean type and to break | |||
|
282 | # cycle between us and futures. | |||
|
283 | for f in self._futures: | |||
|
284 | if isinstance(f, unsentfuture): | |||
|
285 | f.__class__ = pycompat.futures.Future | |||
|
286 | f._peerexecutor = None | |||
|
287 | ||||
242 | calls = self._calls |
|
288 | calls = self._calls | |
243 | # Mainly to destroy references to futures. |
|
289 | # Mainly to destroy references to futures. | |
244 | self._calls = None |
|
290 | self._calls = None | |
245 |
|
291 | |||
|
292 | # Simple case of a single command. We call it synchronously. | |||
246 | if len(calls) == 1: |
|
293 | if len(calls) == 1: | |
247 | command, args, fn, f = calls[0] |
|
294 | command, args, fn, f = calls[0] | |
248 |
|
295 | |||
@@ -259,14 +306,99 b' class peerexecutor(object):' | |||||
259 |
|
306 | |||
260 | return |
|
307 | return | |
261 |
|
308 | |||
262 | raise error.ProgrammingError('support for multiple commands not ' |
|
309 | # Batch commands are a bit harder. First, we have to deal with the | |
263 | 'yet implemented') |
|
310 | # @batchable coroutine. That's a bit annoying. Furthermore, we also | |
|
311 | # need to preserve streaming. i.e. it should be possible for the | |||
|
312 | # futures to resolve as data is coming in off the wire without having | |||
|
313 | # to wait for the final byte of the final response. We do this by | |||
|
314 | # spinning up a thread to read the responses. | |||
|
315 | ||||
|
316 | requests = [] | |||
|
317 | states = [] | |||
|
318 | ||||
|
319 | for command, args, fn, f in calls: | |||
|
320 | # Future was cancelled. Ignore it. | |||
|
321 | if not f.set_running_or_notify_cancel(): | |||
|
322 | continue | |||
|
323 | ||||
|
324 | try: | |||
|
325 | batchable = fn.batchable(fn.__self__, | |||
|
326 | **pycompat.strkwargs(args)) | |||
|
327 | except Exception: | |||
|
328 | f.set_exception_info(*sys.exc_info()[1:]) | |||
|
329 | return | |||
|
330 | ||||
|
331 | # Encoded arguments and future holding remote result. | |||
|
332 | try: | |||
|
333 | encodedargs, fremote = next(batchable) | |||
|
334 | except Exception: | |||
|
335 | f.set_exception_info(*sys.exc_info()[1:]) | |||
|
336 | return | |||
|
337 | ||||
|
338 | requests.append((command, encodedargs)) | |||
|
339 | states.append((command, f, batchable, fremote)) | |||
|
340 | ||||
|
341 | if not requests: | |||
|
342 | return | |||
|
343 | ||||
|
344 | # This will emit responses in order they were executed. | |||
|
345 | wireresults = self._peer._submitbatch(requests) | |||
|
346 | ||||
|
347 | # The use of a thread pool executor here is a bit weird for something | |||
|
348 | # that only spins up a single thread. However, thread management is | |||
|
349 | # hard and it is easy to encounter race conditions, deadlocks, etc. | |||
|
350 | # concurrent.futures already solves these problems and its thread pool | |||
|
351 | # executor has minimal overhead. So we use it. | |||
|
352 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |||
|
353 | self._responsef = self._responseexecutor.submit(self._readbatchresponse, | |||
|
354 | states, wireresults) | |||
264 |
|
355 | |||
265 | def close(self): |
|
356 | def close(self): | |
266 | self.sendcommands() |
|
357 | self.sendcommands() | |
267 |
|
358 | |||
|
359 | if self._closed: | |||
|
360 | return | |||
|
361 | ||||
268 | self._closed = True |
|
362 | self._closed = True | |
269 |
|
363 | |||
|
364 | if not self._responsef: | |||
|
365 | return | |||
|
366 | ||||
|
367 | # We need to wait on our in-flight response and then shut down the | |||
|
368 | # executor once we have a result. | |||
|
369 | try: | |||
|
370 | self._responsef.result() | |||
|
371 | finally: | |||
|
372 | self._responseexecutor.shutdown(wait=True) | |||
|
373 | self._responsef = None | |||
|
374 | self._responseexecutor = None | |||
|
375 | ||||
|
376 | # If any of our futures are still in progress, mark them as | |||
|
377 | # errored. Otherwise a result() could wait indefinitely. | |||
|
378 | for f in self._futures: | |||
|
379 | if not f.done(): | |||
|
380 | f.set_exception(error.ResponseError( | |||
|
381 | _('unfulfilled batch command response'))) | |||
|
382 | ||||
|
383 | self._futures = None | |||
|
384 | ||||
|
385 | def _readbatchresponse(self, states, wireresults): | |||
|
386 | # Executes in a thread to read data off the wire. | |||
|
387 | ||||
|
388 | for command, f, batchable, fremote in states: | |||
|
389 | # Grab raw result off the wire and teach the internal future | |||
|
390 | # about it. | |||
|
391 | remoteresult = next(wireresults) | |||
|
392 | fremote.set(remoteresult) | |||
|
393 | ||||
|
394 | # And ask the coroutine to decode that value. | |||
|
395 | try: | |||
|
396 | result = next(batchable) | |||
|
397 | except Exception: | |||
|
398 | f.set_exception_info(*sys.exc_info()[1:]) | |||
|
399 | else: | |||
|
400 | f.set_result(result) | |||
|
401 | ||||
270 | class wirepeer(repository.legacypeer): |
|
402 | class wirepeer(repository.legacypeer): | |
271 | """Client-side interface for communicating with a peer repository. |
|
403 | """Client-side interface for communicating with a peer repository. | |
272 |
|
404 |
General Comments 0
You need to be logged in to leave comments.
Login now