##// END OF EJS Templates
wireproto: implement command executor interface for version 1 peers...
Gregory Szorc -
r37648:e1b32dc4 default
parent child Browse files
Show More
@@ -11,6 +11,7 b' import errno'
11 11 import hashlib
12 12 import os
13 13 import random
14 import sys
14 15 import time
15 16 import weakref
16 17
@@ -167,6 +168,49 b' class localiterbatcher(wireprotov1peer.i'
167 168 resref.set(getattr(self.local, name)(*args, **opts))
168 169 yield resref.value
169 170
171 @zi.implementer(repository.ipeercommandexecutor)
172 class localcommandexecutor(object):
173 def __init__(self, peer):
174 self._peer = peer
175 self._sent = False
176 self._closed = False
177
178 def __enter__(self):
179 return self
180
181 def __exit__(self, exctype, excvalue, exctb):
182 self.close()
183
184 def callcommand(self, command, args):
185 if self._sent:
186 raise error.ProgrammingError('callcommand() cannot be used after '
187 'sendcommands()')
188
189 if self._closed:
190 raise error.ProgrammingError('callcommand() cannot be used after '
191 'close()')
192
193 # We don't need to support anything fancy. Just call the named
194 # method on the peer and return a resolved future.
195 fn = getattr(self._peer, pycompat.sysstr(command))
196
197 f = pycompat.futures.Future()
198
199 try:
200 result = fn(**args)
201 except Exception:
202 f.set_exception_info(*sys.exc_info()[1:])
203 else:
204 f.set_result(result)
205
206 return f
207
208 def sendcommands(self):
209 self._sent = True
210
211 def close(self):
212 self._closed = True
213
170 214 class localpeer(repository.peer):
171 215 '''peer for a local repo; reflects only the most recent API'''
172 216
@@ -286,6 +330,9 b' class localpeer(repository.peer):'
286 330
287 331 # Begin of peer interface.
288 332
333 def commandexecutor(self):
334 return localcommandexecutor(self)
335
289 336 def iterbatch(self):
290 337 return localiterbatcher(self)
291 338
@@ -278,7 +278,8 b' class ipeerrequests(zi.Interface):'
278 278 being issued.
279 279 """
280 280
281 class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands):
281 class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands,
282 ipeerrequests):
282 283 """Unified interface for peer repositories.
283 284
284 285 All peer instances must conform to this interface.
@@ -228,7 +228,12 b' def findcommonheads(ui, local, remote,'
228 228 % (roundtrips, len(undecided), len(sample)))
229 229 # indices between sample and externalized version must match
230 230 sample = list(sample)
231 yesno = remote.known(dag.externalizeall(sample))
231
232 with remote.commandexecutor() as e:
233 yesno = e.callcommand('known', {
234 'nodes': dag.externalizeall(sample),
235 }).result()
236
232 237 full = True
233 238
234 239 if sample:
@@ -8,12 +8,15 b''
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 import sys
11 12
12 13 from .i18n import _
13 14 from .node import (
14 15 bin,
15 16 )
16
17 from .thirdparty.zope import (
18 interface as zi,
19 )
17 20 from . import (
18 21 bundle2,
19 22 changegroup as changegroupmod,
@@ -177,6 +180,93 b' def encodebatchcmds(req):'
177 180
178 181 return ';'.join(cmds)
179 182
183 @zi.implementer(repository.ipeercommandexecutor)
184 class peerexecutor(object):
185 def __init__(self, peer):
186 self._peer = peer
187 self._sent = False
188 self._closed = False
189 self._calls = []
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exctype, excvalee, exctb):
195 self.close()
196
197 def callcommand(self, command, args):
198 if self._sent:
199 raise error.ProgrammingError('callcommand() cannot be used '
200 'after commands are sent')
201
202 if self._closed:
203 raise error.ProgrammingError('callcommand() cannot be used '
204 'after close()')
205
206 # Commands are dispatched through methods on the peer.
207 fn = getattr(self._peer, pycompat.sysstr(command), None)
208
209 if not fn:
210 raise error.ProgrammingError(
211 'cannot call command %s: method of same name not available '
212 'on peer' % command)
213
214 # Commands are either batchable or they aren't. If a command
215 # isn't batchable, we send it immediately because the executor
216 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later.
218
219 if getattr(fn, 'batchable', False):
220 pass
221 else:
222 if self._calls:
223 raise error.ProgrammingError(
224 '%s is not batchable and cannot be called on a command '
225 'executor along with other commands' % command)
226
227 # We don't support batching yet. So resolve it immediately.
228 f = pycompat.futures.Future()
229 self._calls.append((command, args, fn, f))
230 self.sendcommands()
231 return f
232
233 def sendcommands(self):
234 if self._sent:
235 return
236
237 if not self._calls:
238 return
239
240 self._sent = True
241
242 calls = self._calls
243 # Mainly to destroy references to futures.
244 self._calls = None
245
246 if len(calls) == 1:
247 command, args, fn, f = calls[0]
248
249 # Future was cancelled. Ignore it.
250 if not f.set_running_or_notify_cancel():
251 return
252
253 try:
254 result = fn(**pycompat.strkwargs(args))
255 except Exception:
256 f.set_exception_info(*sys.exc_info()[1:])
257 else:
258 f.set_result(result)
259
260 return
261
262 raise error.ProgrammingError('support for multiple commands not '
263 'yet implemented')
264
265 def close(self):
266 self.sendcommands()
267
268 self._closed = True
269
180 270 class wirepeer(repository.legacypeer):
181 271 """Client-side interface for communicating with a peer repository.
182 272
@@ -185,6 +275,9 b' class wirepeer(repository.legacypeer):'
185 275 See also httppeer.py and sshpeer.py for protocol-specific
186 276 implementations of this interface.
187 277 """
278 def commandexecutor(self):
279 return peerexecutor(self)
280
188 281 # Begin of ipeercommands interface.
189 282
190 283 def iterbatch(self):
@@ -23,6 +23,7 b' from mercurial import ('
23 23 vfs as vfsmod,
24 24 wireprotoserver,
25 25 wireprototypes,
26 wireprotov1peer,
26 27 wireprotov2server,
27 28 )
28 29
@@ -102,6 +103,14 b' def main():'
102 103 localrepo.localpeer)
103 104 checkzobject(localrepo.localpeer(dummyrepo()))
104 105
106 ziverify.verifyClass(repository.ipeercommandexecutor,
107 localrepo.localcommandexecutor)
108 checkzobject(localrepo.localcommandexecutor(None))
109
110 ziverify.verifyClass(repository.ipeercommandexecutor,
111 wireprotov1peer.peerexecutor)
112 checkzobject(wireprotov1peer.peerexecutor(None))
113
105 114 ziverify.verifyClass(repository.ipeerbaselegacycommands,
106 115 sshpeer.sshv1peer)
107 116 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
General Comments 0
You need to be logged in to leave comments. Login now